# **CityLife Dataset - MOT validation**
1. Using ByteTrack as a SoTA multiobject tracker - (Some of the scripts here were taken from the ByteTrack Colab)
2. Validation using PyMot

Hyper-parameters setup:

In [None]:
import os


# ignore small boxes (to skip box filtering in inference time set to zero)
min_box_side = 0
min_box_area = max(100, min_box_side ** 2)
min_segmentation_pixels = 10

dataset_root_on_gdrive = '/content/drive/MyDrive/.../Citylife/'
dump_path = os.path.join(dataset_root_on_gdrive, 'CityLife_randomwalk_128_v6_10fps')
print(f'Dataset: {dump_path}')

# download the dataset into 'dump_path'
!gdown --id "<put download key here>"

gt_json_path = os.path.join(dump_path, 'peds_bbox.json')
output_dir = os.path.join(dataset_root_on_gdrive, 'ByteTrackPred')
frame_rate = 10

# mount gdrive
from google.colab import drive
drive.mount('/content/drive')

# **Init data and model**
Download ByteTrack, install dependencies etc.

In [None]:
# mount drive
import sys
import json
from shutil import rmtree, copytree, copyfile
from google.colab import drive

drive_path = '/content/drive' 
drive.mount(drive_path)

In [None]:
# == Download the ByteTrack repo content and install dependencies ==
!git clone https://github.com/ifzhang/ByteTrack.git
%cd /content/ByteTrack/
%mkdir pretrained
%cd pretrained

# == Download pretrained X model weights ==
!gdown --id "1P4mY0Yyd3PPTybgZkjMYhFri88nTmJX5"

In [None]:
# == Install dependencies ==
!pip3 install cython
!pip3 install 'git+https://github.com/cocodataset/cocoapi.git#subdirectory=PythonAPI'
!pip3 install cython_bbox

%cd /content/ByteTrack/
!pip3 install -r requirements.txt

In [None]:
# == Install ByteTrack ==
!python3 setup.py develop

# **Looping CityLife!**
Track the videos/images

In [None]:
# recreate the output directory
def recreate_dir(parent, dir_name):
    """deletes dir if exist and creates a folder dir_name under parent - thread safe"""
    dir_path = os.path.join(parent, dir_name)
    if os.path.isdir(dir_path):
        rmtree(dir_path)
    os.makedirs(dir_path, exist_ok=True)
    return dir_path

print(f'recreated output dir at: {recreate_dir(*os.path.split(output_dir))}')

In [None]:
# Copy images to split folders
def create_dir_if_not_exist(parent, dir_name):
    """creates a folder dir_name under parent if is does not exist already - thread safe"""
    dir_path = os.path.join(parent, dir_name)
    if not os.path.isdir(dir_path):
        os.makedirs(dir_path, exist_ok=True)
    return dir_path
  
png_dirs_dir_name = 'cams_images'
png_dirs_dir_path = os.path.join(output_dir, png_dirs_dir_name)
print(f'Start copying pngs to {png_dirs_dir_path}')
rgb_frames_dir = os.path.join(dump_path, 'rgb')
pngs = [os.path.join(rgb_frames_dir, f) for f in os.listdir(rgb_frames_dir) if f.endswith('png')]
for png in pngs:
  file_name = os.path.basename(png)
  frame_fields = file_name.split('_')
  cam_id = int(frame_fields[1])
  frame_id = int(frame_fields[2])
  cam_dir = create_dir_if_not_exist(png_dirs_dir_path, f'cam_{cam_id}')

  # keeping order instead of lexicographical order
  file_name = f'{frame_fields[0]}_{cam_id:05d}_{frame_id:05d}_' + '_'.join(frame_fields[3:])

  target_path = os.path.join(cam_dir, file_name)
  if os.path.isfile(target_path):
    continue
  copyfile(png, target_path)
print('Done!')

In [None]:
# looping the dataset
from shutil import copyfile, rmtree, copytree
import subprocess

for cam_dir_name in os.listdir(png_dirs_dir_path):
  cam_dir_path = os.path.join(png_dirs_dir_path, cam_dir_name)
  print('#'*100)
  print(f'\nStart tracking: {cam_dir_path}')

  cam_id = int(os.path.basename(cam_dir_path).split('_')[1])
  video_name = os.path.basename(cam_dir_path)

  # run byte track
  os.chdir('/content/ByteTrack')
  if min_box_area > 0:
    !python3 tools/demo_track.py image -f exps/example/mot/yolox_x_mix_det.py -c pretrained/bytetrack_x_mot17.pth.tar --path $cam_dir_path --min_box_area $min_box_area --fp16 --fuse --save_result &> log.txt --camid $cam_id --fps $frame_rate
  else:
    !python3 tools/demo_track.py image -f exps/example/mot/yolox_x_mix_det.py -c pretrained/bytetrack_x_mot17.pth.tar --path $cam_dir_path --fp16 --fuse --save_result &> log.txt --camid $cam_id --fps $frame_rate

  import re
  %cd /content/ByteTrack
  with open('log.txt', 'r') as file_reader:
    text = file_reader.read().replace('\n', '')

  m = re.search('save results to ./(.+?).txt', text)
  if m:
    png_repo_found = '/content/ByteTrack/' + m.group(1)
    print(f'found vis: {png_repo_found}')
  if not png_repo_found:
    print(f'ERROR: failed compiling into a video on: {png_repo_found}')
  destination_dir = os.path.join(output_dir, f'boxed_pngs_vis_{video_name}')
  if os.path.isdir(destination_dir):
    rmtree(destination_dir)
  copytree(png_repo_found, destination_dir)

  # == Get result prediction file path ==
  pred_found = png_repo_found + ".txt"
  print(f'found pred: {pred_found}')

  if not pred_found:
    print(f'ERROR: failed retrieving the prediction file on: {pred_found}')
  copyfile(pred_found, os.path.join(output_dir, f'pred_{video_name}.txt'))
print('DONE!')

# **Estimate Ground Truth from Segmentation Maps into MOT15**

In [None]:
from PIL import Image
import numpy as np


class box_cmap:
  def __init__(self, x: int, y: int, w: int, h: int, segment_freq: int) -> None:
      self.x = x
      self.y = y
      self.w = w
      self.h = h
      self.freq = segment_freq

  def __repr__(self) -> str:
      return f'Box(x: {self.x}, y: {self.y}, w: {self.w}, h: {self.h})'

  def area(self):
    return self.w * self.h


def image_to_boxes_by_color_loop(image_path: str):
  """take a colormap segmentation image and infer the bounding boxes by color"""
  if not os.path.isfile(image_path):
    raise Exception(f'File not found: {image_path}')
  im = Image.open(image_path)
  pixels = im.load()
  width, height = im.size
  rgb_to_stat = dict()

  # TODO: Vectorize!
  for x in range(width):
    for y in range(height):
      box_color = pixels[x, y]

      # ignore BG color (green)
      if box_color[0] == 55 and box_color[1] == 181 and box_color[2] == 57:
        continue

      if box_color in rgb_to_stat:
        existing_box = rgb_to_stat[box_color]
        x2 = max(existing_box.x + existing_box.w, x)
        y2 = max(existing_box.y + existing_box.h, y)

        existing_box.x = min(existing_box.x, x)
        existing_box.y = min(existing_box.y, y)

        existing_box.w = x2 - existing_box.x
        existing_box.h = y2 - existing_box.y
        existing_box.freq += 1
        rgb_to_stat[box_color] = existing_box
      else:
        rgb_to_stat[box_color] = box_cmap(x, y, 0, 0, 1)

  # filter boxes with small segmentation maps
  return {box_color: box_stat for box_color, box_stat in rgb_to_stat.items() if box_stat.freq >= min_segmentation_pixels}


def image_to_boxes_by_color_vectorized(image_path: str):
  """take a colormap segmentation image and infer the bounding boxes by color"""
  if not os.path.isfile(image_path):
    raise Exception(f'File not found: {image_path}')
  im = Image.open(image_path)
  pixels = np.array(im)[:, :, :-1]
  width, height = im.size

  # ignore the green background color...
  bgc = np.array([55, 181, 57])
  forground_x, forground_y = np.where(np.all(pixels != bgc, axis=-1))
  print(forground_x.shape)
  print(forground_y.shape)

  rgb_to_stat = dict()
  for x in forground_x:
    for y in forground_y:
      box_color = (pixels[x, y][0], pixels[x, y][1], pixels[x, y][2])

      if box_color in rgb_to_stat:
        existing_box = rgb_to_stat[box_color]
        existing_box.x = min(existing_box.x, x)
        existing_box.y = min(existing_box.y, y)

        existing_box.w = max(existing_box.w, x - existing_box.x)
        existing_box.h = max(existing_box.h, y - existing_box.y)
        rgb_to_stat[box_color] = existing_box
      else:
        rgb_to_stat[box_color] = box_cmap(x, y, 0, 0)
  return rgb_to_stat


def hash_values(input_list):
    """
    maps/codes a list into integers
    :param input_list: enumerable of hashable elements
    """
    hash_map = dict()
    for val in input_list:
        hash_map[val] = hash_map.get(val, len(hash_map))
    return [hash_map[val] for val in input_list]


def frames_to_mot15(frames: list) -> None:
  """take a repo of frames and serialize it in MOT15 standard"""
  video_boxes = []
  colors = set()
  for frame_path in frames:
    frame_detections = image_to_boxes_by_color_loop(frame_path)
    video_boxes.append(frame_detections)
    colors |= set(frame_detections.keys())

  ordered_colors = list(colors)
  color_to_id = {color: index for color, index in zip(ordered_colors, hash_values(ordered_colors))}

  lines = []
  for frame_number, frame_boxes in enumerate(video_boxes):
    for rgb_color, box in frame_boxes.items():
      # ignore small boxes
      if min(box.h, box.w) < min_box_side or box.area() < min_box_area:
        continue

      pid = color_to_id[rgb_color]
      lines.append(','.join([str(frame_number), str(pid), str(box.x), str(box.y), str(box.w), str(box.h), "1", "-1", "-1", "-1"]) + "\n")
  return lines

# group segmentations by cams
video_repository = os.path.join(dump_path, 'seg')
seg_images = os.listdir(video_repository)
cam_to_segs = dict()
for seg_name in seg_images:
  cam_id = int(seg_name.split('_')[1])
  cam_to_segs[cam_id] = cam_to_segs.get(cam_id, []) + [seg_name]


for cam_id, unordered_frame_names_single_cam in cam_to_segs.items():
  # order by frame id
  ordered_names = sorted(unordered_frame_names_single_cam, key=lambda f: int(f.split('_')[2]))
  ordered_frame_paths = [os.path.join(video_repository, f) for f in ordered_names]
  print(f'Start analyzing cam id: {cam_id} with {len(ordered_frame_paths)} frames: {ordered_frame_paths[:10]}...')

  lines = frames_to_mot15(ordered_frame_paths)

  # serialize:
  mot_challenge_csv_path = os.path.join(output_dir, f'cam_id_{cam_id}.csv')
  with open(mot_challenge_csv_path, 'w') as csv_writer:
    csv_writer.writelines(lines)

print('DONE!')

# **Evaluate with ByteTrack**

In [None]:
from loguru import logger
from typing import List
import torch
import torch.backends.cudnn as cudnn
from torch.nn.parallel import DistributedDataParallel as DDP

from yolox.core import launch
from yolox.exp import get_exp
from yolox.utils import configure_nccl, fuse_model, get_local_rank, get_model_info, setup_logger
from yolox.evaluators import MOTEvaluator

import argparse
import os
import random
import warnings
import glob
import motmetrics as mm
from collections import OrderedDict
from pathlib import Path


def compare_dataframes(gts, ts):
    accs = []
    names = []
    for k, tsacc in ts.items():
        if k in gts:            
            logger.info('Comparing {}...'.format(k))
            accs.append(mm.utils.compare_to_groundtruth(gts[k], tsacc, 'iou', distth=0.5))
            names.append(k)
        else:
            logger.warning('No ground truth for {}, skipping.'.format(k))

    return accs, names


def evaluate_mota():
    # evaluate MOTA
    mm.lap.default_solver = 'lap'

    # gt_type = ''
    gtfiles = glob.glob(os.path.join(output_dir, 'cam_id_*.csv'))
    print('gt_files', gtfiles)
    tsfiles = glob.glob(os.path.join(output_dir, 'pred_cam_*.txt'))

    logger.info('Found {} groundtruths and {} test files.'.format(len(gtfiles), len(tsfiles)))
    logger.info('Available LAP solvers {}'.format(mm.lap.available_solvers))
    logger.info('Default LAP solver \'{}\''.format(mm.lap.default_solver))
    logger.info('Loading files.')

    gt = OrderedDict([(f.split('.')[-2].split('_')[-1], mm.io.loadtxt(f, fmt='mot15-2D', min_confidence=1)) for f in gtfiles])
    ts = OrderedDict([(os.path.splitext(Path(f).parts[-1])[0].split('_')[-1], mm.io.loadtxt(f, fmt='mot15-2D', min_confidence=-1.0)) for f in tsfiles])    

    mh = mm.metrics.create()    
    accs, names = compare_dataframes(gt, ts)

    logger.info('Running metrics')
    metrics = ['recall', 'precision', 'num_unique_objects', 'mostly_tracked',
                'partially_tracked', 'mostly_lost', 'num_false_positives', 'num_misses',
                'num_switches', 'num_fragmentations', 'mota', 'motp', 'num_objects']
    summary = mh.compute_many(accs, names=names, metrics=metrics, generate_overall=True)
    div_dict = {
        'num_objects': ['num_false_positives', 'num_misses', 'num_switches', 'num_fragmentations'],
        'num_unique_objects': ['mostly_tracked', 'partially_tracked', 'mostly_lost']}
    for divisor in div_dict:
        for divided in div_dict[divisor]:
            summary[divided] = (summary[divided] / summary[divisor])
    fmt = mh.formatters
    change_fmt_list = ['num_false_positives', 'num_misses', 'num_switches', 'num_fragmentations', 'mostly_tracked',
                        'partially_tracked', 'mostly_lost']
    for k in change_fmt_list:
        fmt[k] = fmt['mota']
    print(mm.io.render_summary(summary, formatters=fmt, namemap=mm.io.motchallenge_metric_names))

    metrics = mm.metrics.motchallenge_metrics + ['num_objects']
    summary = mh.compute_many(accs, names=names, metrics=metrics, generate_overall=True)
    print(mm.io.render_summary(summary, formatters=mh.formatters, namemap=mm.io.motchallenge_metric_names))
    logger.info('Completed')

def filter_mot_csv_by_area(file_paths: List[str], min_area: int):
    if min_area <= 1:
      return
    
    for f_path in file_paths:
      with open(f_path, 'r') as csv_reader:
        lines = csv_reader.readlines()

      filtered_lines = []
      for line in lines:
        line_fields = line.split(',')
        b_w = float(line_fields[4])
        b_h = float(line_fields[5])
        b_area = b_w * b_h
        if b_area >= min_area:
          filtered_lines.append(line)

      with open(f_path, 'w') as csv_writer:
        csv_writer.writelines(filtered_lines)


# Filter by few box sizes and evaluate
minimal_bbox_area = min_box_area
print('#'*20+f' Running evaluation with min area {minimal_bbox_area} '+'#'*20)
print(f'Dataset: {dump_path}')

# filter gt
gt_files = glob.glob(os.path.join(output_dir, 'cam_id_*.csv'))
filter_mot_csv_by_area(gt_files, minimal_bbox_area)

# filter prediction
prediction_files = glob.glob(os.path.join(output_dir, 'pred_cam_*.txt'))
filter_mot_csv_by_area(prediction_files, minimal_bbox_area)

# evaluate
evaluate_mota()
