# SETUP

In [None]:
import os
HOME = os.getcwd()
print(HOME)

/content


In [None]:
!pip install boxmot ultralytics supervision

In [None]:
import ultralytics
from ultralytics import YOLO
import supervision as sv
import cv2
import numpy as np
import torch
import time
import matplotlib.pyplot as plt
import pandas as pd

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


## GOOGLE DRIVE MOUNT

In [None]:
from google.colab import drive
drive.flush_and_unmount()
drive.mount('/content/drive')

Drive not mounted, so nothing to flush and unmount.
Mounted at /content/drive


# RESTORE DATA

In [None]:
%cd "/content"
!cp "/content/drive/MyDrive/Colab Notebooks/MVI_1482_VIS.avi" .

!cp "/content/drive/MyDrive/Colab Notebooks/maritime-best.pt" .

!cp "/content/drive/MyDrive/Colab Notebooks/gt.txt" .


/content


In [None]:
SOURCE_VIDEO_PATH = "/content/drive/MyDrive/Colab Notebooks/MVI_1482_VIS.avi"

In [None]:
MODEL_PATH = "/content/maritime-best.pt"

# Model Import

In [None]:
model = YOLO(MODEL_PATH)


In [None]:
# dict maping class_id to class_name
CLASS_NAMES_DICT = model.model.names
print(CLASS_NAMES_DICT)

# class ids matching the class names we have chosen // Ferry only
SELECTED_CLASS_IDS = [2]

{0: 'Boat', 1: 'Buoy', 2: 'Ferry', 3: 'Flying bird-plane', 4: 'Kayak', 5: 'Other', 6: 'Sail boat', 7: 'Speed boat', 8: 'Vessel-ship'}


# MODEL FRAME TEST

In [None]:
# create frame generator
generator = sv.get_video_frames_generator(SOURCE_VIDEO_PATH)
# create instance of BoxAnnotator and LabelAnnotator
box_annotator = sv.BoxAnnotator(thickness=4)
label_annotator = sv.LabelAnnotator(text_thickness=2, text_scale=1.5, text_color=sv.Color.BLACK)
# acquire first video frame
iterator = iter(generator)
frame = next(iterator)
# model prediction on single frame and conversion to supervision Detections
results = model(frame, verbose=False)[0]

# convert to Detections
detections = sv.Detections.from_ultralytics(results)
# only consider class id from selected_classes define above
detections = detections[np.isin(detections.class_id, SELECTED_CLASS_IDS)]

# format custom labels
labels = [
    f"{CLASS_NAMES_DICT[class_id]} {confidence:0.2f}"
    for confidence, class_id in zip(detections.confidence, detections.class_id)
]

# annotate and display frame
annotated_frame = frame.copy()
annotated_frame = box_annotator.annotate(
    scene=annotated_frame, detections=detections)
annotated_frame = label_annotator.annotate(
    scene=annotated_frame, detections=detections, labels=labels)

%matplotlib inline
sv.plot_image(annotated_frame, (16, 16))

# Track

In [None]:
import numpy as np
import cv2
from pathlib import Path
from boxmot.trackers.ocsort.ocsort import OcSort
import supervision as sv

def process_video_with_ocsort_and_trails(
        video_path,
        model,                         # YOLOv8 pre-trained model
        output_file,                   # MOT-style 
        video_output_path,             # Annotated video out
        selected_class_ids=[2],        # Ferry
        confidence_thr=0.5,
        annotate=True,
        trail_length=40,
        ocsort_kwargs=None             # OCSORT parameters
    ):
    if ocsort_kwargs is None:
        ocsort_kwargs = {}
    tracker = OcSort(**ocsort_kwargs)

    cap = cv2.VideoCapture(video_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    out = cv2.VideoWriter(
        video_output_path,
        cv2.VideoWriter_fourcc(*'mp4v'),
        fps, (width, height)
    )

    if annotate:
        box_annotator = sv.BoxAnnotator(thickness=4)
        label_annotator = sv.LabelAnnotator(text_thickness=2, text_scale=1.5, text_color=sv.Color.BLACK)
        class_names_dict = model.model.names

    frame_id = 1
    trails = {}

    with open(output_file, 'w') as f:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            results = model.predict(frame, verbose=False)[0]
            boxes = results.boxes.xyxy.cpu().numpy()
            confs = results.boxes.conf.cpu().numpy()
            classes = results.boxes.cls.cpu().numpy().astype(int)

            mask1 = np.isin(classes, selected_class_ids)
            boxes = boxes[mask1]
            confs = confs[mask1]
            classes = classes[mask1]
            mask2 = confs > confidence_thr
            boxes = boxes[mask2]
            confs = confs[mask2]
            classes = classes[mask2]

            # OCSORT input = [x1, y1, x2, y2, conf, class_id]
            if len(boxes) > 0:
                dets = np.hstack([boxes, confs[:, None], classes[:, None]])   # (N, 6)
                tracks = tracker.update(dets, frame)
            else:
                tracks = tracker.update(np.empty((0,6)), frame)

            if annotate:
                sv_boxes, sv_confs, sv_class_ids, sv_track_ids = [], [], [], []

            for trk in tracks:
                x1, y1, x2, y2, track_id, conf, class_id = trk[:7]
                w = x2 - x1
                h = y2 - y1
                f.write(f"{frame_id},{int(track_id)},{x1:.1f},{y1:.1f},{w:.1f},{h:.1f},{conf:.2f},-1,-1,-1\n")

                if annotate:
                    sv_boxes.append([x1, y1, x2, y2])
                    sv_confs.append(conf)
                    sv_class_ids.append(class_id)
                    sv_track_ids.append(track_id)
                    # For trail, save the center point
                    center = (int((x1+x2)/2), int((y1+y2)/2))
                    tid = int(track_id)
                    if tid not in trails:
                        trails[tid] = []
                    trails[tid].append(center)
                    if len(trails[tid]) > trail_length:
                        trails[tid] = trails[tid][-trail_length:]

            if annotate:
                dets_sv = sv.Detections(
                    xyxy=np.array(sv_boxes) if sv_boxes else np.zeros((0, 4)),
                    confidence=np.array(sv_confs) if sv_confs else np.array([]),
                    class_id=np.array(sv_class_ids, dtype=int) if sv_class_ids else np.array([], dtype=int),
                    tracker_id=np.array(sv_track_ids, dtype=int) if sv_track_ids else np.array([], dtype=int),
                )
                labels = [f"#{int(tid)} {class_names_dict.get(int(cid),'?')} ({conf:.2f})"
                          for tid, cid, conf in zip(sv_track_ids, sv_class_ids, sv_confs)]
                frame = box_annotator.annotate(scene=frame, detections=dets_sv)
                frame = label_annotator.annotate(scene=frame, detections=dets_sv, labels=labels)

            # Trail line
            for tid, points in trails.items():
                if len(points) > 1:
                    for i in range(1, len(points)):
                        cv2.line(frame, points[i-1], points[i], (0, 255, 255), 2)

            out.write(frame)
            frame_id += 1

    cap.release()
    out.release()


In [None]:
%cd /content
process_video_with_ocsort_and_trails(
    video_path="/content/MVI_1482_VIS.avi",
    model=model,
    output_file="ocsort_results.txt",
    video_output_path="ocsort_tracking_output.mp4",
    selected_class_ids=[2],
    confidence_thr=0.5,
    annotate=True,
    trail_length=40,
    ocsort_kwargs={
        "per_class": False,
        "det_thresh": 0.3
    }
)

In [None]:
output_dir = "/content/drive/MyDrive/Colab Notebooks/ocsort"
os.makedirs(output_dir, exist_ok=True)

# Save OCSORT outputs to GDrive
!cp ocsort_tracking_output.mp4 "/content/drive/MyDrive/Colab Notebooks/ocsort/"
!cp ocsort_results.txt "/content/drive/MyDrive/Colab Notebooks/ocsort/"

# Check OCSORT results
df_sort = pd.read_csv('ocsort_results.txt', header=None,
                     names=['frame', 'id', 'bb_left', 'bb_top', 'bb_width', 'bb_height', 'conf', 'x', 'y', 'z'])

print("\OCSORT Stats:")
print(f"Total frame count: {df_sort['frame'].nunique()}")
print(f"Total track count: {df_sort['id'].nunique()}")
print("\nTrack ID counts:")
print(df_sort.groupby('id').size())

\OCSORT Stats:
Total frame count: 454
Total track count: 2

Track ID counts:
id
1    283
2    335
dtype: int64
