In [None]:
# Detectron2 environment for instance segmentation
!pip install "git+https://github.com/facebookresearch/detectron2.git"

# SMP environment for tracking
!pip install segmentation-models-pytorch

# Filterpy for Kalman Filter definition (can be ignored, if Kalman Filter reference is not used)
!pip install filterpy

In [3]:
import sys
sys.path.append("/path/to/symmetry_tracker/")

from symmetry_tracker.general_functionalities.video_transformation import TransformVideoFromTIFF

from symmetry_tracker.segmentation.segmentator import SingleVideoSegmentation
from symmetry_tracker.segmentation.segmentation_io import DisplaySegmentation, WriteSegmentation

from symmetry_tracker.tracking.symmetry_tracker import SingleVideoSymmetryTracking
from symmetry_tracker.tracking.tracking_io import DisplayTracks, WriteTracks, SaveTracksVideo, SaveTracks, LoadTracks
from symmetry_tracker.tracking.post_processing import InterpolateMissingObjects, RemoveShortPaths, HeuristicalEquivalence

from symmetry_tracker.tracking.kalman_tracker import SingleVideoKalmanTracking
from symmetry_tracker.tracking.symmetry_tracker_l2dist import SingleVideoSymmetryTracking_L2Distance
from symmetry_tracker.tracking.symmetry_tracker_shapedist import SingleVideoSymmetryTracking_ShapeDistance

# Other necessary imports

import torch
import os
import shutil

# Preprocessing

In [4]:
import requests
import zipfile
import os

def download_file(url, output_path):
    """
    Download a file from a given URL and save it to the specified output path.

    Args:
    url (str): The URL of the file to download.
    output_path (str): The path where the file will be saved.
    """
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(output_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=1024):
                if chunk:
                    f.write(chunk)
        print(f"Downloaded {output_path}")
    else:
        print(f"Failed to download {output_path}, status code: {response.status_code}")

def unzip_file(zip_path, extract_to='.'):
    """
    Unzip a file to the specified directory.

    Args:
    zip_path (str): The path to the zip file.
    extract_to (str): The directory to extract the files to. Defaults to the current directory.
    """
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
        print(f"Extracted {zip_path} to {extract_to}")

In [5]:
download_file("https://motchallenge.net/data/MOTS.zip", "MOTS.zip")
unzip_file("MOTS.zip")

Downloaded MOTS.zip
Extracted MOTS.zip to .


In [None]:
#Transforming samples into lower resolution

import cv2

MOTS_root = "./MOTS/train/"

sample_names = sorted(os.listdir(MOTS_root))
new_resolution = [480,270]

for sample_name in sample_names:
    print(sample_name)
    sample_path = os.path.join(MOTS_root, sample_name, "img1")
    sample_images = sorted(os.listdir(sample_path))

    if not os.path.exists(os.path.join(MOTS_root, sample_name, "lowres")):
        os.makedirs(os.path.join(MOTS_root, sample_name, "lowres"))

    if not os.path.exists(os.path.join(MOTS_root, sample_name, "lowres_first60")):
        os.makedirs(os.path.join(MOTS_root, sample_name, "lowres_first60"))

    for i, image_name in enumerate(sample_images):
        if i%100 == 0:
          print(image_name)
        image_path = os.path.join(sample_path, image_name)
        image = cv2.imread(image_path)
        resized_image = cv2.resize(image, new_resolution)
        cv2.imwrite(os.path.join(MOTS_root, sample_name, "lowres", image_name), resized_image)
        if i < 60:
          cv2.imwrite(os.path.join(MOTS_root, sample_name, "lowres_first60", image_name), resized_image)


# Single Tracking

In [None]:
!mkdir downloads

segmentator_models_dir = "/path/to/models/"
segmentator_model_name = "model_final.pth"
segmentator_config_name = "config.yaml"

tracking_models_dir = "/path/to/models/"
tracking_model_name = "MOTSynth_KernelTracker_[DLV3p,resnet50]_FBtr2_Ep4_Adv2_final.pth"

!wget --no-clobber $segmentator_models_dir$segmentator_model_name -P downloads/
!wget --no-clobber $segmentator_models_dir$segmentator_config_name -P downloads/
!wget --no-clobber $tracking_models_dir$tracking_model_name -P downloads/

In [None]:
import cv2
import os
import gc
import numpy as np

from symmetry_tracker.tracking.tracker_utilities import LoadAnnotationDF, LoadPretrainedModel
from symmetry_tracker.tracking.symmetry_tracker import LocalTracking, GlobalAssignment, ConnectedIDReduction

from symmetry_tracker.tracking.kalman_tracker import KalmanTracking
from symmetry_tracker.tracking.symmetry_tracker_l2dist import GlobalAssignment_L2Distance
from symmetry_tracker.tracking.symmetry_tracker_shapedist import GlobalAssignment_ShapeDistance


samples_th_dict = {
    "MOTS20-02": 0.8,
    "MOTS20-09": 0.8,
    "MOTS20-11": 0.8,
}

# Matching colab environment (for now GPU vs CPU)
Device = ("cuda:0" if torch.cuda.is_available() else "cpu")
print("Colab environment: "+Device)

# Input paths
SegmentationModelPath = "./downloads/"+segmentator_model_name
SegmentationModelConfigPath = "./downloads/"+segmentator_config_name
TrackingModelPath = "./downloads/"+tracking_model_name

TimeKernelSize = 2

# Output paths
!mkdir outputs
!mkdir outputs/segmentations
!mkdir outputs/trackings
!mkdir outputs/videos

for sample_name in samples_th_dict.keys():
  print(sample_name)

  SamplesPath = os.path.join(MOTS_root,sample_name,"lowres/")

  SegmentationSavePath = "./outputs/segmentations/"+sample_name+"_Segmentation.txt"

  # Performing segmentation
  Outmasks = SingleVideoSegmentation(SamplesPath,
                                    SegmentationModelPath,
                                    SegmentationModelConfigPath,
                                    Device,
                                    Color = "RGB",
                                    ScoreThreshold = samples_th_dict[sample_name])

  # Saving segmentation
  WriteSegmentation(Outmasks, SegmentationSavePath)

  AnnotPath = SegmentationSavePath

  VideoFrames = sorted(os.listdir(SamplesPath))
  Img0 = cv2.imread(os.path.join(SamplesPath,VideoFrames[0]))
  VideoShape = [len(os.listdir(SamplesPath)), np.shape(Img0)[0], np.shape(Img0)[1]]
  AnnotDF = LoadAnnotationDF(AnnotPath, VideoShape, MinObjectPixelNumber = 20, MaxOverlapRatio = 0.5)

  AnnotDF_raw = AnnotDF.copy()
  AnnotDF_LTR = AnnotDF.copy()

  Model = LoadPretrainedModel(TrackingModelPath, Device)
  AnnotDF_LTR = LocalTracking(SamplesPath, VideoShape, AnnotDF_LTR, Model, Device, TimeKernelSize, Color = "RGB", Marker = "BBOX", SegmentationConfidence = 0.2)
  del Model
  gc.collect()

  # Performing tracking
  for tracker in [#"Kalman",
                  #"Symmetry",
                  #"SymmetryL2",
                  "SymmetryShape"]:

    TrackingSavePath = "./outputs/trackings/"+sample_name+"_"+tracker+"_Tracks.json"
    TrackingWritePath = "./outputs/trackings/"+sample_name+"_"+tracker+"_Tracks.txt"
    TrackingVideoPath = "./outputs/videos/"+sample_name+"_"+tracker+"_Tracks.mp4"

    if tracker == "Kalman":
      AnnotDF = AnnotDF_raw.copy()
      AnnotDF = KalmanTracking(AnnotDF, MaxCentroidDistance = 20)

    elif tracker == "Symmetry":
      AnnotDF = AnnotDF_LTR.copy()
      AnnotDF = GlobalAssignment(SamplesPath, VideoShape, AnnotDF, TimeKernelSize, MinRequiredSimilarity = 0.2, MaxTimeKernelShift = None)
      AnnotDF = ConnectedIDReduction(AnnotDF)

    elif tracker == "SymmetryL2":
      AnnotDF = AnnotDF_LTR.copy()
      AnnotDF = GlobalAssignment_L2Distance(SamplesPath, VideoShape, AnnotDF, TimeKernelSize, MaxCentroidDistance = 20, MaxTimeKernelShift = None)
      AnnotDF = ConnectedIDReduction(AnnotDF)

    elif tracker == "SymmetryShape":
      AnnotDF = AnnotDF_LTR.copy()
      AnnotDF = GlobalAssignment_ShapeDistance(SamplesPath, VideoShape, AnnotDF, TimeKernelSize, MinRequiredSimilarity = 0.2, MaxTimeKernelShift = None)
      AnnotDF = ConnectedIDReduction(AnnotDF)

    AnnotDF = RemoveShortPaths(AnnotDF, MinimalPathLength=10)
    AnnotDF = InterpolateMissingObjects(AnnotDF)
    SaveTracks(AnnotDF,TrackingSavePath)
    SaveTracksVideo(SamplesPath, AnnotDF, TrackingVideoPath, Fps=30)

In [None]:
!zip -r videos.zip outputs/videos
!zip -r trackings.zip outputs/trackings

In [10]:
from google.colab import files
files.download("trackings.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [11]:
from google.colab import files
files.download("videos.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>