In [None]:
!pip install ultralytics  # YOLO
!pip install opencv-python
!pip install filterpy  # Kalman filter
!pip install scipy  # Hungarian algorithm
!pip install matplotlib
!pip install numpy
!pip install pandas

import numpy as np
import cv2
from ultralytics import YOLO
from filterpy.kalman import KalmanFilter
from scipy.optimize import linear_sum_assignment
import matplotlib.pyplot as plt
from pathlib import Path
import os
import pandas as pd
from collections import defaultdict
from IPython.display import clear_output
from google.colab import drive

Collecting ultralytics
  Downloading ultralytics-8.3.234-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.3.234-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m24.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.18-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.234 ultralytics-thop-2.0.18
Collecting filterpy
  Downloading filterpy-1.4.5.zip (177 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m178.0/178.0 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: filterpy
  Building wheel for filterpy (setup.py) ... [?25l[?25hdone
  Created wheel for filterpy: filename=filterpy-1.4.5-py3-n

In [None]:
import numpy as np
from filterpy.kalman import KalmanFilter
from scipy.optimize import linear_sum_assignment
import cv2
import pandas as pd
from ultralytics import YOLO
from pathlib import Path
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

class KalmanBoxTracker:
    """
    This class represents the internal state of individual tracked objects observed as bbox.
    """
    count = 0

    def __init__(self, bbox):
        """
        Initialises a tracker using initial bounding box.
        bbox: [x1, y1, x2, y2, score]
        """
        # Define constant velocity model
        # State: [cx, cy, s, r, v_cx, v_cy, v_s] (center_x, center_y, scale, aspect_ratio, and their velocities)
        self.kf = KalmanFilter(dim_x=7, dim_z=4)

        # State Transition Matrix F (Constant Velocity Model)
        self.kf.F = np.array([
            [1,0,0,0,1,0,0],
            [0,1,0,0,0,1,0],
            [0,0,1,0,0,0,1],
            [0,0,0,1,0,0,0],
            [0,0,0,0,1,0,0],
            [0,0,0,0,0,1,0],
            [0,0,0,0,0,0,1]
        ])

        # Measurement Function H (Measures [cx, cy, s, r] from the state)
        self.kf.H = np.array([
            [1,0,0,0,0,0,0],
            [0,1,0,0,0,0,0],
            [0,0,1,0,0,0,0],
            [0,0,0,1,0,0,0]
        ])

        # Measurement Noise Covariance R
        # MODIFIED: Reduced R scaling from 10. to 5. to trust detection size/ratio slightly more.
        self.kf.R[2:,2:] *= 5.0

        # Initial State Covariance P (Uncertainty)
        self.kf.P[4:,4:] *= 1000.
        self.kf.P *= 10.

        # Process Noise Covariance Q (Trust in the motion model)
        # MODIFIED: Increased Q[4:,4:] multiplier from 0.01 to 0.5
        # to allow the velocity states (v_cx, v_cy, v_s) to change more quickly,
        # better tracking fast robot movements.
        self.kf.Q[-1,-1] *= 0.01
        self.kf.Q[4:,4:] *= 0.5  # MODIFIED: Increased from 0.01 to 0.5

        self.kf.x[:4] = self.convert_bbox_to_z(bbox)
        self.time_since_update = 0
        self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1
        self.history = []
        self.hits = 0
        self.hit_streak = 0
        self.age = 0

        # Store confidence score
        self.last_score = bbox[4] if len(bbox) > 4 else 0.0

    def update(self, bbox):
        """
        Updates the state vector with observed bbox.
        """
        self.time_since_update = 0
        self.history = []
        self.hits += 1
        self.hit_streak += 1
        self.kf.update(self.convert_bbox_to_z(bbox))

        # Update confidence score
        self.last_score = bbox[4] if len(bbox) > 4 else 0.0

    def predict(self):
        """
        Advances the state vector and returns the predicted bounding box estimate.
        """
        if (self.kf.x[6] + self.kf.x[2]) <= 0:
            self.kf.x[6] *= 0.0
        self.kf.predict()
        self.age += 1
        if self.time_since_update > 0:
            self.hit_streak = 0
        self.time_since_update += 1
        self.history.append(self.convert_x_to_bbox(self.kf.x))
        return self.history[-1]

    def get_state(self):
        """
        Returns the current bounding box estimate.
        """
        return self.convert_x_to_bbox(self.kf.x)

    @staticmethod
    def convert_bbox_to_z(bbox):
        """
        Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form
        [x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is
        the aspect ratio
        """
        w = bbox[2] - bbox[0]
        h = bbox[3] - bbox[1]
        x = bbox[0] + w/2.
        y = bbox[1] + h/2.
        s = w * h
        r = w / float(h)
        return np.array([x, y, s, r]).reshape((4, 1))

    @staticmethod
    def convert_x_to_bbox(x, score=None):
        """
        Takes a bounding box in the centre form [x,y,s,r] and returns it in the form
        [x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right
        """
        w = np.sqrt(x[2] * x[3])
        h = x[2] / w
        if score == None:
            return np.array([x[0]-w/2., x[1]-h/2., x[0]+w/2., x[1]+h/2.]).reshape((1,4))
        else:
            return np.array([x[0]-w/2., x[1]-h/2., x[0]+w/2., x[1]+h/2., score]).reshape((1,5))

def iou_batch(bb_test, bb_gt):
    """
    Computes IOU between two bboxes in the form [x1,y1,x2,y2]
    """
    bb_gt = np.expand_dims(bb_gt, 0)
    bb_test = np.expand_dims(bb_test, 1)

    xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0])
    yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
    xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2])
    yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
    w = np.maximum(0., xx2 - xx1)
    h = np.maximum(0., yy2 - yy1)
    wh = w * h
    o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1])
        + (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1]) - wh)
    return(o)


def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3):
    """
    Assigns detections to tracked object (both represented as bounding boxes)
    Returns 3 lists of matches, unmatched_detections and unmatched_trackers
    """
    if len(trackers) == 0:
        return np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 5), dtype=int)

    iou_matrix = iou_batch(detections, trackers)

    if min(iou_matrix.shape) > 0:
        a = (iou_matrix > iou_threshold).astype(np.int32)
        if a.sum(1).max() == 1 and a.sum(0).max() == 1:
            matched_indices = np.stack(np.where(a), axis=1)
        else:
            # Use Hungarian algorithm for optimal assignment
            matched_indices = linear_sum_assignment(-iou_matrix)
            matched_indices = np.array(list(zip(*matched_indices)))
    else:
        matched_indices = np.empty(shape=(0, 2))

    unmatched_detections = []
    for d, det in enumerate(detections):
        if d not in matched_indices[:, 0]:
            unmatched_detections.append(d)
    unmatched_trackers = []
    for t, trk in enumerate(trackers):
        if t not in matched_indices[:, 1]:
            unmatched_trackers.append(t)

    matches = []
    for m in matched_indices:
        if iou_matrix[m[0], m[1]] < iou_threshold:
            unmatched_detections.append(m[0])
            unmatched_trackers.append(m[1])
        else:
            matches.append(m.reshape(1, 2))

    if len(matches) == 0:
        matches = np.empty((0, 2), dtype=int)
    else:
        matches = np.concatenate(matches, axis=0)

    return matches, np.array(unmatched_detections), np.array(unmatched_trackers)

class ByteTrack:
    """
    ByteTrack: Multi-Object Tracking by Associating Every Detection Box
    Uses two-stage association with high and low confidence detections
    """
    def __init__(self, max_age=90, min_hits=1, iou_threshold=0.2):
        """
        Args:
            max_age: Maximum frames to keep track alive without detection (90 frames)
            min_hits: Minimum detections before track confirmed (1 hit)
            iou_threshold: Minimum IOU for match (0.2)
        """
        self.max_age = max_age
        self.min_hits = min_hits
        self.iou_threshold = iou_threshold
        self.trackers = []
        self.frame_count = 0

        # ByteTrack confidence thresholds
        # MODIFIED: high_thresh increased from 0.5 to 0.7
        # to ensure primary association uses very confident detections, reducing ID switch.
        self.high_thresh = 0.7  # MODIFIED: Increased from 0.5
        self.low_thresh = 0.05  # Kept at 0.05

    def update(self, detections):
        """
        Params:
          detections - a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...]
        Returns:
          a similar array, where the last column is the object ID and the second to last is score.
        """
        self.frame_count += 1

        trks = np.zeros((len(self.trackers), 5))
        to_del = []
        ret = []
        for t, trk in enumerate(trks):
            pos = self.trackers[t].predict()[0]
            trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
            if np.any(np.isnan(pos)):
                to_del.append(t)
        trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
        for t in reversed(to_del):
            self.trackers.pop(t)

        # ----------------- First Association (High Confidence Detections) -----------------
        if len(detections) > 0:
            high_detections = detections[detections[:, 4] >= self.high_thresh]
            low_detections = detections[(detections[:, 4] >= self.low_thresh) &
                                       (detections[:, 4] < self.high_thresh)]
        else:
            high_detections = np.empty((0, 5))
            low_detections = np.empty((0, 5))

        matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(
            high_detections, trks, self.iou_threshold
        )

        for m in matched:
            self.trackers[int(m[1])].update(high_detections[int(m[0]), :])

        # Tracks that were not matched to high-confidence detections
        unmatched_trks_first_stage = unmatched_trks.astype(int)

        # ----------------- Second Association (Low Confidence Detections) -----------------
        if len(low_detections) > 0 and len(unmatched_trks_first_stage) > 0:
            unmatched_trks_boxes = trks[unmatched_trks_first_stage]

            matched_low, unmatched_dets_low, unmatched_trks_low = associate_detections_to_trackers(
                low_detections, unmatched_trks_boxes, self.iou_threshold
            )

            for m in matched_low:
                tracker_idx = int(unmatched_trks_first_stage[int(m[1])])
                self.trackers[tracker_idx].update(low_detections[int(m[0]), :])

            # Unmatched trackers from both stages
            unmatched_trks = unmatched_trks_first_stage[unmatched_trks_low.astype(int)]

        # ----------------- Initialise New Tracks -----------------
        unmatched_dets = unmatched_dets.astype(int)
        for i in unmatched_dets:
            # Only use unmatched high-confidence detections to create new tracks
            if i < len(high_detections):
                trk = KalmanBoxTracker(high_detections[i, :])
                self.trackers.append(trk)

        # ----------------- Output and Delete Tracks -----------------
        i = len(self.trackers)
        for trk in reversed(self.trackers):
            d = trk.get_state()[0]
            if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
                # Return [x1, y1, x2, y2, track_id, score]
                ret.append(np.concatenate((d, [trk.id + 1, trk.last_score])).reshape(1, -1))
            i -= 1
            # Delete tracks that haven't been updated for max_age frames
            if trk.time_since_update > self.max_age:
                self.trackers.pop(i)

        if len(ret) > 0:
            return np.concatenate(ret)
        return np.empty((0, 6))

class RobotDetector:
    """
    YOLO-based detector for FRC robots
    """
    def __init__(self, model_name='best.pt', conf_threshold=0.05):
        """
        Args:
            model_name: YOLO model to use
            conf_threshold: Minimum confidence for detections
        """
        self.model = YOLO(model_name)
        # MODIFIED: Increased YOLO conf_threshold from 0.05 to 0.1
        # to reduce noisy initial detections, minimizing false track creation.
        self.conf_threshold = 0.1 # MODIFIED: Increased from 0.05

    def detect(self, frame):
        """
        Detect robots in frame

        Args:
            frame: numpy array (H, W, 3)

        Returns:
            detections: numpy array of shape (N, 5) where each row is [x1, y1, x2, y2, conf]
        """
        # Pass the (now higher) self.conf_threshold to the YOLO model
        results = self.model(frame, conf=self.conf_threshold, verbose=False)

        detections = []
        for result in results:
            boxes = result.boxes
            for box in boxes:
                x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                conf = box.conf[0].cpu().numpy()
                cls = box.cls[0].cpu().numpy()

                # Adjust class filter as needed for your YOLO model
                if cls == 0:  # Assuming 0 is the robot class
                    detections.append([x1, y1, x2, y2, conf])

        if len(detections) > 0:
            return np.array(detections)
        else:
            return np.empty((0, 5))

def draw_tracks_on_frame(frame, tracks):
    """
    Draw bounding boxes and IDs on frame
    """
    frame_copy = frame.copy()
    np.random.seed(42)
    colors = np.random.randint(0, 255, size=(1000, 3), dtype=np.uint8)

    for track in tracks:
        x1, y1, x2, y2, track_id, score = track
        x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
        track_id = int(track_id)

        color = tuple(int(c) for c in colors[track_id % len(colors)])

        cv2.rectangle(frame_copy, (x1, y1), (x2, y2), color, 2)

        label = f"Robot {track_id} ({score:.2f})"
        label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
        cv2.rectangle(frame_copy, (x1, y1 - label_size[1] - 10),
                      (x1 + label_size[0], y1), color, -1)
        cv2.putText(frame_copy, label, (x1, y1 - 5),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

    return frame_copy

def process_video_to_csv(video_path, output_csv_path, model_name='best.pt', display_every=100, save_frames=True, frames_output_folder=None):
    """
    Process a single video with ByteTrack and save results to CSV
    """
    # Detector and tracker initialized with MODIFIED parameters
    detector = RobotDetector(model_name=model_name)
    tracker = ByteTrack(max_age=90, min_hits=1, iou_threshold=0.2)

    # Reset tracker count for each video
    KalmanBoxTracker.count = 0

    # Create folder for sample frames if needed
    if save_frames:
        if frames_output_folder is None:
            frames_output_folder = Path(output_csv_path).parent / 'sample_frames'
        frames_output_folder = Path(frames_output_folder)
        frames_output_folder.mkdir(exist_ok=True, parents=True)

        video_frames_folder = frames_output_folder / Path(video_path).stem
        video_frames_folder.mkdir(exist_ok=True, parents=True)

    # Open video
    cap = cv2.VideoCapture(str(video_path))
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    print(f"\nProcessing: {Path(video_path).name}")
    print(f"Resolution: {width}x{height} @ {fps:.2f} FPS")
    print(f"Total frames: {total_frames}")

    frame_count = 0
    tracking_data = []
    frames_saved = 0
    save_interval = max(total_frames // 10, 30)  # Save ~10 frames per video

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1
        timestamp = frame_count / fps  # Time in seconds

        # Detect robots
        detections = detector.detect(frame)

        # Update tracker
        tracks = tracker.update(detections)

        # Save sample frames with detections
        if save_frames and (frame_count % save_interval == 0 or frame_count == 1) and len(tracks) > 0:
            frame_with_tracks = draw_tracks_on_frame(frame, tracks)
            frame_filename = video_frames_folder / f"frame_{frame_count:05d}.jpg"
            cv2.imwrite(str(frame_filename), frame_with_tracks)
            frames_saved += 1

        # Store tracking data
        for track in tracks:
            x1, y1, x2, y2, track_id, score = track

            tracking_data.append({
                'video_name': Path(video_path).name,
                'frame': frame_count,
                'timestamp': timestamp,
                'track_id': int(track_id),
                'x1': x1,
                'y1': y1,
                'x2': x2,
                'y2': y2,
                'score': score
            })

        # Display progress
        if frame_count % display_every == 0:
            print(f"Progress: {frame_count}/{total_frames} ({100*frame_count/total_frames:.1f}%) - Current tracks: {len(tracks)}")

    cap.release()

    # Create DataFrame
    df = pd.DataFrame(tracking_data)

    # Save to CSV
    df.to_csv(output_csv_path, index=False)

    print(f" Saved tracking data to: {output_csv_path}")
    print(f"  Total detections: {len(df)}")
    if len(df) > 0:
        print(f"  Unique track IDs: {df['track_id'].nunique()}")
        print(f"  Max track ID seen: {df['track_id'].max()}")
    if save_frames:
        print(f"  Sample frames saved: {frames_saved} in {video_frames_folder}")

    return df

def process_folder_to_csv(data_folder, output_folder, model_name='best.pt', save_sample_frames=True):
    """
    Process all MP4 videos in a folder and save tracking results to CSVs
    """
    data_folder = Path(data_folder)
    output_folder = Path(output_folder)
    output_folder.mkdir(exist_ok=True, parents=True)

    # Find all MP4 files
    video_files = list(data_folder.glob('*.mp4')) + list(data_folder.glob('*.MP4'))

    print(f"Found {len(video_files)} MP4 videos in {data_folder}")

    if len(video_files) == 0:
        print(" No MP4 files found. Please check the folder path.")
        return {}

    all_results = {}
    all_dataframes = []

    for i, video_path in enumerate(video_files):
        print(f"\n{'='*60}")
        print(f"Video {i+1}/{len(video_files)}")
        print(f"{'='*60}")

        # Set output CSV path
        output_csv = output_folder / f"{video_path.stem}_tracking.csv"

        # Process video
        try:
            df = process_video_to_csv(
                video_path,
                output_csv,
                model_name=model_name,
                save_frames=save_sample_frames,
                frames_output_folder=output_folder / 'sample_frames'
            )
            all_results[video_path.name] = df
            all_dataframes.append(df)
        except Exception as e:
            print(f" Error processing {video_path.name}: {e}")
            import traceback
            traceback.print_exc()
            continue

    # Create combined CSV with all videos
    if all_dataframes:
        combined_df = pd.concat(all_dataframes, ignore_index=True)
        combined_csv_path = output_folder / "all_videos_combined.csv"
        combined_df.to_csv(combined_csv_path, index=False)
        print(f"\n{'='*60}")
        print(f" Saved combined tracking data to: {combined_csv_path}")
        print(f"  Total rows: {len(combined_df)}")
        print(f"  Videos processed: {len(all_dataframes)}")

    # Summary
    print("\n" + "="*60)
    print("BATCH PROCESSING SUMMARY - OPTIMIZED ID PERSISTENCE")
    print("="*60)
    for video_name, df in all_results.items():
        if len(df) > 0:
            print(f"\n{video_name}:")
            print(f"  - Total detections: {len(df)}")
            print(f"  - Unique track IDs: {df['track_id'].nunique()}")
            print(f"  - Max track ID: {df['track_id'].max()}")
            print(f"  - Duration: {df['timestamp'].max():.2f}s")
            print(f"  - Avg detections/frame: {len(df)/df['frame'].max():.2f}")

    return all_results

# ============================================================================
# MAIN EXECUTION
# ============================================================================

# Set your paths
DATA_FOLDER = '/content/drive/MyDrive/Colab Notebooks/Test'
OUTPUT_FOLDER = '/content/drive/MyDrive/Colab Notebooks/ByteTrack_Testing'
MODEL_PATH = '/content/drive/MyDrive/Colab Notebooks/best.pt'

# print("BYTETRACK SETTINGS FOR OPTIMIZED ID PERSISTENCE:")
# print(" - max_age: 90 (Keeps tracks alive longer during occlusion)")
# print(" - min_hits: 1 (Confirms tracks immediately)")
# print(" - iou_threshold: 0.2 (Lenient matching)")
# print("-" * 30)
# print("KALMAN FILTER (for fast robot movement):")
# print(" - Q[4:,4:] multiplier: 0.5 (MODIFIED: Trust in velocity change is higher)")
# print(" - R[2:,2:] multiplier: 5.0 (MODIFIED: Trust in detection size/ratio is higher)")
# print("-" * 30)
# print("DETECTION THRESHOLDS (for better initialization):")
# print(" - ByteTrack high_thresh: 0.7 (MODIFIED: Primary association uses more confident detections)")
# print(" - YOLO conf: 0.1 (MODIFIED: Only initialize new tracks from better detections)")
# print("-" * 30)


# Process all videos in the Data folder
results = process_folder_to_csv(
    data_folder=DATA_FOLDER,
    output_folder=OUTPUT_FOLDER,
    model_name=MODEL_PATH,
    save_sample_frames=True
)

# Display sample results
if results:
    first_video = list(results.keys())[0]
    print(f"\n{'='*60}")
    print(f"Sample data from {first_video}:")
    print("="*60)
    print(results[first_video].head(10))

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Found 1 MP4 videos in /content/drive/MyDrive/Colab Notebooks/Test

Video 1/1

Processing: test.mp4
Resolution: 1920x1080 @ 29.99 FPS
Total frames: 4766
Progress: 100/4766 (2.1%) - Current tracks: 3
Progress: 200/4766 (4.2%) - Current tracks: 4
Progress: 300/4766 (6.3%) - Current tracks: 6
Progress: 400/4766 (8.4%) - Current tracks: 5
Progress: 500/4766 (10.5%) - Current tracks: 5
Progress: 600/4766 (12.6%) - Current tracks: 5
Progress: 700/4766 (14.7%) - Current tracks: 4
Progress: 800/4766 (16.8%) - Current tracks: 4
Progress: 900/4766 (18.9%) - Current tracks: 6
Progress: 1000/4766 (21.0%) - Current tracks: 5
Progress: 1100/4766 (23.1%) - Current tracks: 3
Progress: 1200/4766 (25.2%) - Current tracks: 4
Progress: 1300/4766 (27.3%) - Current tracks: 4
Progress: 1400/4766 (29.4%) - Current tracks: 4
Progress: 1500/4766 (31.5%) - Current tracks: 2
Progress: 16