<a href="https://colab.research.google.com/github/AidaAriafar/YOLOv5-Hybrid-Object-Tracking/blob/main/SignalsSystemsProject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

ADDING **DIRECTORIES**

In [None]:
#better to set the runtime type to gpu

!mkdir -p data # input
!mkdir -p results/fusion # output

print(" Project directories created.")

✅ Project directories created.


**the main part**

In [None]:
%%writefile main.py
import argparse
import cv2
import imutils
import numpy as np
import time
from ultralytics import YOLO
from scipy.spatial import distance as dist
from utils import initialize_tracker, compute_iou, MyCustomTracker # Import MyCustomTracker and compute_iou

# Output path for the fusion video
fusion_output_base = "results/fusion/"

def tracker_with_detector(video_path, yolo_model, tracker_type, output_path, detection_interval=5):
    vs = cv2.VideoCapture(video_path)
    time.sleep(1.0) # Give some time for video capture to initialize
    fps = None
    initBB = None # Initial bounding box for the tracker
    frame_count = 0
    video_fps = 30 # Default output video FPS, adjust if needed

    # Dynamically get video dimensions for output writer
    frame_width = int(vs.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(vs.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for MP4 video
    out = cv2.VideoWriter(output_path, fourcc, video_fps, (frame_width, frame_height))

    tracker = initialize_tracker(tracker_type)

    start_time = time.perf_counter() # Start time for FPS calculation

    while True:
        ret, frame = vs.read()
        if not ret or frame is None:
            break

        current_detections_xywh = [] # Store (x,y,w,h) for this frame's detections

        # Run YOLO detection only on certain frames or the first frame
        if frame_count == 0 or frame_count % detection_interval == 0:
            results = yolo_model(frame, verbose=False, conf=0.5)[0] # Added confidence threshold for example : 0.5
            for *xyxy, conf, cls in results.boxes.data.tolist():
                x1, y1, x2, y2 = xyxy
                current_detections_xywh.append((x1, y1, x2 - x1, y2 - y1))

        if initBB is not None: # If tracker is already initialized
            # Perform tracker update. For MyCustomTracker, this is Kalman prediction.
            # For OpenCV trackers, this is their direct update.
            success, box = tracker.update(frame) # box is predicted_bbox_kf_xywh for MyCustomTracker

            if isinstance(tracker, MyCustomTracker):
                predicted_bbox_kf_xywh = box

                # Check if the prediction from MyCustomTracker's update was successful and not None
                if not success or predicted_bbox_kf_xywh is None:
                    print(f"[WARNING] MyCustomTracker {tracker.obj_id} prediction failed (invalid bbox) at frame {frame_count}. Setting initBB to None.")
                    initBB = None # Mark tracker as potentially lost
                    frame_count += 1
                    out.write(frame)
                    continue # Skip the rest of processing for this frame

                if len(current_detections_xywh) > 0: # Only if detections were run this frame
                    # Simple association: Find the detection closest to the predicted box
                    max_iou = 0
                    best_det_idx = -1
                    for i, det_bbox_xywh in enumerate(current_detections_xywh):
                        current_iou = compute_iou(predicted_bbox_kf_xywh, det_bbox_xywh)
                        if current_iou > max_iou:
                            max_iou = current_iou
                            best_det_idx = i

                    if max_iou > 0.1: # If a good match is found
                        matched_det_bbox_xywh = current_detections_xywh[best_det_idx]
                        tracker.correct_with_measurement(matched_det_bbox_xywh) # Update KF with matched detection
                        x, y, w, h = [int(v) for v in matched_det_bbox_xywh]
                        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) # Green for tracked
                        # print(f"[INFO] Frame {frame_count}: MyCustomTracker {tracker.obj_id} updated with Detector measurement (IoU: {max_iou:.2f}).")
                    else: # No good match, rely solely on Kalman prediction
                        x, y, w, h = [int(v) for v in predicted_bbox_kf_xywh]
                        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2) # Blue for prediction only
                        # print(f"[INFO] Frame {frame_count}: MyCustomTracker {tracker.obj_id} using Kalman prediction (no good detection match).")
                else: # No detections in this frame (either skipped or none found)
                    x, y, w, h = [int(v) for v in predicted_bbox_kf_xywh]
                    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2) # Blue for prediction only
                    # print(f"[INFO] Frame {frame_count}: MyCustomTracker {tracker.obj_id} using Kalman prediction (no detections in this frame).")

                # Update initBB for next iteration based on current state (important for consistent drawing)
                initBB = (x, y, w, h)


            # This branch is for OpenCV trackers (CSRT, KCF, MOSSE)
            else:
                if success:
                    (x, y, w, h) = [int(v) for v in box]
                    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) # Draw green rectangle for tracked object
                    initBB = (x, y, w, h) # Update initBB for consistent drawing
                else: # OpenCV tracker failed, reinitialize with detector
                    print(f"[INFO] Tracker lost object at frame {frame_count}. Attempting re-detection...")
                    # Always run detection here if tracker is lost
                    results = yolo_model(frame, verbose=False, conf=0.5)[0]

                    if len(results.boxes.xyxy) > 0: # If detection finds an object
                        det_bbox_xyxy = results.boxes.xyxy[0].cpu().numpy() # Get first detected box (x1,y1,x2,y2)
                        det_bbox_xywh = (det_bbox_xyxy[0], det_bbox_xyxy[1], det_bbox_xyxy[2] - det_bbox_xyxy[0], det_bbox_xyxy[3] - det_bbox_xyxy[1])

                        initBB = (int(det_bbox_xywh[0]), int(det_bbox_xywh[1]), int(det_bbox_xywh[2]), int(det_bbox_xywh[3]))
                        tracker = initialize_tracker(tracker_type) # Re-initialize the OpenCV tracker
                        tracker.init(frame, initBB)
                        if fps is None:
                            fps = vs.get(cv2.CAP_PROP_FPS)
                        print("[INFO] Tracker was restarted by Detector.")
                        (x, y, w, h) = [int(v) for v in initBB]
                        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
                    else:
                        print(f"[WARNING] No object detected at frame {frame_count}. Tracker remains lost.")
                        if initBB is not None: # Draw last known box if available
                            x, y, w, h = [int(v) for v in initBB]
                            cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2) # Red box for lost tracker
                        initBB = None # Indicate tracker is truly lost

            # Periodic re-detection for OpenCV trackers to correct drift
            if not isinstance(tracker, MyCustomTracker) and frame_count % 30 == 0 and initBB is not None:
                results = yolo_model(frame, verbose=False, conf=0.5)[0]
                if len(results.boxes.xyxy) > 0:
                    det_bbox_xyxy = results.boxes.xyxy[0].cpu().numpy()
                    det_bbox_xywh = (det_bbox_xyxy[0], det_bbox_xyxy[1], det_bbox_xyxy[2] - det_bbox_xyxy[0], det_bbox_xyxy[3] - det_bbox_xyxy[1])

                    # Compute IoU between tracker's current box and detector's best box
                    iou = compute_iou(initBB, det_bbox_xywh)

                    if iou < 0.2: # If IoU is low, means significant drift
                        print(f"[INFO] Tracker re-initialized by Detector at frame {frame_count} due to low IoU ({iou:.2f}).")
                        initBB = (int(det_bbox_xywh[0]), int(det_bbox_xywh[1]), int(det_bbox_xywh[2]), int(det_bbox_xywh[3]))
                        tracker = initialize_tracker(tracker_type)
                        tracker.init(frame, initBB)
                        (x, y, w, h) = [int(v) for v in initBB]
                        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

        else: # First frame processing, initialize tracker using detector
            print(f"[INFO] Initializing tracker at frame {frame_count} using detector...")
            # We already ran detection for frame_count == 0 above
            if len(current_detections_xywh) > 0:
                det_bbox_xywh = current_detections_xywh[0] # Get first detected box

                initBB = (int(det_bbox_xywh[0]), int(det_bbox_xywh[1]), int(det_bbox_xywh[2]), int(det_bbox_xywh[3]))

                # Initialize the tracker
                tracker.init(frame, initBB)

                if fps is None:
                    fps = vs.get(cv2.CAP_PROP_FPS)
                print("[INFO] Tracker was started by Detector.")
                (x, y, w, h) = [int(v) for v in initBB]
                cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
            else:
                print("[WARNING] No object detected in the first frame. Cannot initialize tracker. Exiting.")
                break

        frame_count += 1
        out.write(frame)

    end_time = time.perf_counter()
    total_time = end_time - start_time

    vs.release()
    out.release()
    print("[INFO] Single object tracking finished.")

    if frame_count > 0:
        actual_fps = frame_count / total_time
        print(f"[INFO] For {tracker_type} tracker:")
        print(f"[INFO] Total frames processed: {frame_count}")
        print(f"[INFO] Total processing time: {total_time:.2f} seconds")
        print(f"[INFO] Achieved FPS: {actual_fps:.2f}")
    else:
        print(f"[INFO] No frames processed for {tracker_type} tracker.")


def multi_object_tracker(video_path, yolo_model, tracker_type, output_path, max_skipped_frames=30, detection_interval=5):
    vs = cv2.VideoCapture(video_path)
    time.sleep(1.0)

    frame_width = int(vs.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(vs.get(cv2.CAP_PROP_FRAME_HEIGHT))
    video_fps = 30
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, video_fps, (frame_width, frame_height))

    active_trackers = {} # Stores {track_id: MyCustomTracker instance}
    next_object_id = 0
    frame_count = 0
    start_time = time.perf_counter()

    while True:
        ret, frame = vs.read()
        if not ret or frame is None:
            break

        current_detections = [] # Store (x,y,w,h) for this frame's detections

        # Only run YOLO detection on certain frames or the first frame
        if frame_count == 0 or frame_count % detection_interval == 0:
            results = yolo_model(frame, verbose=False, conf=0.5)[0] # Confidence threshold
            for *xyxy, conf, cls in results.boxes.data.tolist():
                x1, y1, x2, y2 = xyxy
                current_detections.append((x1, y1, x2 - x1, y2 - y1))

        # Predict locations for existing trackers
        predicted_bboxes = {} # {track_id: (success, predicted_bbox_xywh)}
        for track_id, tracker in list(active_trackers.items()): # Use list() to allow modification during iteration
            success, predicted_bbox_xywh = tracker.update(frame)
            if success:
                predicted_bboxes[track_id] = predicted_bbox_xywh
            else:
                # If prediction failed (e.g., invalid bbox), consider removing
                print(f"[INFO] Track {track_id} prediction failed at frame {frame_count}. Marking for removal if max_skipped_frames exceeded.")
                # The MyCustomTracker.update() method already increments skipped_frames if prediction is unsuccessful
                # No need to explicitly set skipped_frames = max_skipped_frames here, just let the removal logic handle it.

        # Initialize these lists before the conditional block to prevent UnboundLocalError
        pred_bbox_list = []
        pred_track_ids = []
        matched_track_ids = set() # Also initialize here
        matched_detection_indices = set() # Also initialize here

        # Data Association (Simple IoU-based matching)
        if len(current_detections) > 0 and len(predicted_bboxes) > 0:
            # Build cost matrix (IoU based)
            iou_matrix = np.zeros((len(predicted_bboxes), len(current_detections)))

            # Populate pred_bbox_list and pred_track_ids from predicted_bboxes dict
            # This ensures they only contain IDs of successfully predicted tracks
            for track_id, p_bbox in predicted_bboxes.items():
                pred_bbox_list.append(p_bbox)
                pred_track_ids.append(track_id)

            for i, p_bbox in enumerate(pred_bbox_list): # Iterate using these newly populated lists
                for j, d_bbox in enumerate(current_detections):
                    iou_matrix[i, j] = compute_iou(p_bbox, d_bbox)

            # Simple greedy matching: prioritize higher IoU matches
            sorted_matches = np.argsort(iou_matrix.flatten())[::-1] # Sort descending by IoU

            for flat_idx in sorted_matches:
                pred_idx = flat_idx // len(current_detections)
                det_idx = flat_idx % len(current_detections)

                if pred_idx in matched_track_ids or det_idx in matched_detection_indices:
                    continue # Already matched

                track_id = pred_track_ids[pred_idx] # Get the actual track_id from the index
                detected_bbox = current_detections[det_idx]

                if iou_matrix[pred_idx, det_idx] > 0.3: # IoU threshold for a match
                    active_trackers[track_id].correct_with_measurement(detected_bbox)
                    matched_track_ids.add(pred_idx) # Store index in pred_track_ids
                    matched_detection_indices.add(det_idx)

        # Handle unmatched detections (new objects)
        if len(current_detections) > 0: # Only process new detections if YOLO ran
            for i, det_bbox_xywh in enumerate(current_detections):
                if i not in matched_detection_indices:
                    # This is a new object, initialize a new tracker
                    new_tracker = initialize_tracker(tracker_type, next_object_id)
                    new_tracker.init(frame, det_bbox_xywh)
                    active_trackers[next_object_id] = new_tracker
                    print(f"[INFO] New object {next_object_id} detected and initialized at frame {frame_count}.")
                    next_object_id += 1

        #  Handle unmatched tracks
        # Iterate over all currently active trackers.
        for track_id in list(active_trackers.keys()):
            tracker = active_trackers[track_id]

            # Check if this tracker's prediction was included in the matching process AND was NOT matched.
            # If the track_id is not in predicted_bboxes, it means its prediction failed,
            # and it should naturally increment skipped_frames through tracker.update().
            if track_id in predicted_bboxes:
                # If it was predicted, check if it was actually matched to a detection
                try:
                    pred_idx_in_list = pred_track_ids.index(track_id)
                    if pred_idx_in_list not in matched_track_ids:
                        # This tracker's successful prediction did not find a match.
                        # 'skipped_frames' has already been incremented by its update() call
                        # unless it was corrected. If it wasn't matched, it wasn't corrected.
                        pass # No explicit action needed here, 'skipped_frames' is handled by tracker.update()
                except ValueError:
                    # This case should ideally not be reached if track_id is in predicted_bboxes,
                    # but it's a safeguard if pred_track_ids was somehow not consistent.
                    print(f"[DEBUG] Consistency warning: Track ID {track_id} in predicted_bboxes but not in pred_track_ids when expected.")

            # Remove tracker if it has skipped too many frames.
            if tracker.skipped_frames > max_skipped_frames:
                print(f"[INFO] Track {track_id} lost (skipped too many frames) at frame {frame_count}.")
                del active_trackers[track_id]


        # Draw bounding boxes and IDs for all active trackers
        for track_id, tracker in active_trackers.items():
            x, y, w, h = [int(v) for v in tracker.current_bbox] # Use the current_bbox
            color = (0, 255, 0) # Green for active tracks
            if tracker.skipped_frames > 0: # If it's predicting without a recent detection
                color = (0, 165, 255) # Orange for predicting (potentially occluded)
            cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
            cv2.putText(frame, f"ID: {track_id}", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)


        frame_count += 1
        out.write(frame)

    end_time = time.perf_counter()
    total_time = end_time - start_time

    vs.release()
    out.release()
    print("[INFO] Multi-object tracking finished.")

    if frame_count > 0:
        actual_fps = frame_count / total_time
        print(f"[INFO] For Multi-Object Tracking with {tracker_type}:")
        print(f"[INFO] Total frames processed: {frame_count}")
        print(f"[INFO] Total processing time: {total_time:.2f} seconds")
        print(f"[INFO] Achieved FPS: {actual_fps:.2f}")
    else:
        print(f"[INFO] No frames processed for Multi-Object Tracking.")


if __name__ == "__main__":
    ap = argparse.ArgumentParser()
    ap.add_argument("-v", "--video", type=str, required=True, help="path to input video file")
    ap.add_argument("-t", "--tracker", type=str, default="my_custom_tracker", help="OpenCV object tracker type (e.g., csrt, kcf, mosse) or 'my_custom_tracker'")
    ap.add_argument("-m", "--mode", type=str, default="single", choices=["single", "multi"], help="Tracking mode: 'single' or 'multi' object tracking.")
    ap.add_argument("-o", "--output", type=str, default=None, help="path to output video file")
    ap.add_argument("--detection_interval", type=int, default=5, help="Number of frames between YOLO detections (for MyCustomTracker).")
    args = vars(ap.parse_args())

    if args["output"] is None:
        if args["mode"] == "single":
            args["output"] = f"{fusion_output_base}{args['tracker']}_tracked_video.mp4"
        else: # multi-object tracking
            args["output"] = f"{fusion_output_base}multi_{args['tracker']}_tracked_video.mp4"


    print("Loading YOLOv5s model...")
    yolo_model = YOLO("yolov5s.pt")
    print("YOLOv5s model loaded.")

    if args["mode"] == "single":
        tracker_with_detector(args["video"], yolo_model, args["tracker"], args["output"], detection_interval=args["detection_interval"])
    elif args["mode"] == "multi":
        if args["tracker"] != "my_custom_tracker":
            print("[WARNING] Multi-object tracking is currently optimized for 'my_custom_tracker'. Other OpenCV trackers might not perform optimally in multi-object mode due to lack of explicit measurement correction.")
        multi_object_tracker(args["video"], yolo_model, args["tracker"], args["output"], detection_interval=args["detection_interval"])

    print(f'\nProject finished processing for {args["mode"]} object tracking with {args["tracker"]} tracker.')
    print(f'Output video saved to: {args["output"]}')

Overwriting main.py


**utils: contains all the classes and functions used by the main script**

In [None]:
%%writefile utils.py
import numpy as np
import cv2

# --- Kalman Filter Implementation  ---
class KalmanFilter:
    def __init__(self, dt=1, state_dim=8, meas_dim=4):
        # State vector: [x, y, w, h, vx, vy, vw, vh]
        # x, y: center coordinates of bounding box
        # w, h: width and height of bounding box
        # vx, vy, vw, vh: velocities for x, y, w, h
        self.state = np.zeros((state_dim, 1)) # Initial state

        # State transition matrix (A): x_k = A * x_{k-1} + B * u_k + w_k
        # Assuming constant velocity model
        self.A = np.eye(state_dim)
        for i in range(state_dim // 2):
            self.A[i, i + state_dim // 2] = dt # Add dt to position for velocity

        # Measurement matrix (H): z_k = H * x_k + v_k
        # We measure [x, y, w, h] directly
        self.H = np.zeros((meas_dim, state_dim))
        self.H[0, 0] = 1 # x
        self.H[1, 1] = 1 # y
        self.H[2, 2] = 1 # w
        self.H[3, 3] = 1 # h

        # Covariance matrices
        self.P = np.eye(state_dim) * 100 # State covariance (large initial uncertainty)
        self.Q = np.eye(state_dim) * 0.1 # Process noise covariance (small uncertainty in model)
        self.R = np.eye(meas_dim) * 1 # Measurement noise covariance (uncertainty in measurements)

    def predict(self):
        # Predict the next state
        self.state = np.dot(self.A, self.state)
        self.P = np.dot(np.dot(self.A, self.P), self.A.T) + self.Q
        return self.state

    def update(self, measurement):
        # Update the state based on measurement
        # measurement should be [x, y, w, h]
        y = measurement.reshape(-1, 1) - np.dot(self.H, self.state) # Innovation
        S = np.dot(np.dot(self.H, self.P), self.H.T) + self.R # Innovation covariance
        K = np.dot(np.dot(self.P, self.H.T), np.linalg.inv(S)) # Kalman gain

        self.state = self.state + np.dot(K, y)
        self.P = np.dot((np.eye(self.state.shape[0]) - np.dot(K, self.H)), self.P)
        return self.state

    def set_state(self, bbox):
        # Set initial state from bounding box [x1, y1, x2, y2]
        x_center = (bbox[0] + bbox[2]) / 2
        y_center = (bbox[1] + bbox[3]) / 2
        width = bbox[2] - bbox[0]
        height = bbox[3] - bbox[1]
        self.state = np.array([[x_center], [y_center], [width], [height], [0], [0], [0], [0]]) # No initial velocity


# --- MyCustomTracker Class ---
class MyCustomTracker:
    def __init__(self, obj_id):
        self.obj_id = obj_id # Unique ID for this tracker
        self.kf = None
        self.current_bbox = None # (x, y, w, h)
        self.last_frame = None
        self.initialized = False
        self.frame_count = 0
        self.skipped_frames = 0 # Counter for frames with no detection match

    def init(self, frame, bbox):
        # Initialize Kalman Filter with the first bounding box (x, y, w, h)
        self.kf = KalmanFilter()
        x1, y1, w, h = bbox
        self.kf.set_state([x1, y1, x1+w, y1+h])
        self.current_bbox = bbox
        self.last_frame = frame.copy()
        self.initialized = True
        self.frame_count = 0
        self.skipped_frames = 0
        return True

    def update(self, frame):
        # This performs Kalman Filter prediction.
        # Measurement update will be handled externally in main.py after data association.
        if not self.initialized:
            return False, None

        self.frame_count += 1
        self.skipped_frames += 1 # Increment skipped frames reset upon correction

        predicted_state = self.kf.predict()
        pred_x_center, pred_y_center, pred_w, pred_h = predicted_state[0:4].flatten()

        # Convert predicted state to bbox (x, y, w, h)
        predicted_bbox = (int(pred_x_center - pred_w / 2),
                          int(pred_y_center - pred_h / 2),
                          int(pred_w),
                          int(pred_h))

        # Ensure predicted box is valid (positive dimensions)
        if predicted_bbox[2] <= 0 or predicted_bbox[3] <= 0:
            return False, None # Invalid prediction consider tracker lost

        self.current_bbox = predicted_bbox # Update current_bbox to predicted for next frame's association
        return True, predicted_bbox

    def correct_with_measurement(self, detection_bbox_xywh):
        # This method is called by main.py after a detection is associated with this tracker.
        # detection_bbox_xywh is (x, y, w, h)
        x_center = detection_bbox_xywh[0] + detection_bbox_xywh[2] / 2
        y_center = detection_bbox_xywh[1] + detection_bbox_xywh[3] / 2
        width = detection_bbox_xywh[2]
        height = detection_bbox_xywh[3]
        measurement = np.array([[x_center], [y_center], [width], [height]])
        self.kf.update(measurement)
        self.current_bbox = detection_bbox_xywh # Update current_bbox to the actual detected one
        self.skipped_frames = 0 # Reset skipped frames counter


# ---  Utility Functions  ---
def initialize_tracker(tracker_type, obj_id=None):

    if tracker_type == 'kcf':
        return cv2.legacy.TrackerKCF_create()
    elif tracker_type == 'mosse':
        return cv2.legacy.TrackerMOSSE_create()
    elif tracker_type == "csrt":
        return cv2.legacy.TrackerCSRT_create()
    elif tracker_type == "my_custom_tracker":
        return MyCustomTracker(obj_id)
    else:
        raise ValueError(f"Unsupported tracker type: {tracker_type}")

def compute_iou(boxA, boxB):
    # boxA and boxB are (x, y, w, h) format
    # Convert to (x1, y1, x2, y2)
    boxA_xyxy = (boxA[0], boxA[1], boxA[0] + boxA[2], boxA[1] + boxA[3])
    boxB_xyxy = (boxB[0], boxB[1], boxB[0] + boxB[2], boxB[1] + boxB[3])

    xA = max(boxA_xyxy[0], boxB_xyxy[0])
    yA = max(boxA_xyxy[1], boxB_xyxy[1])
    xB = min(boxA_xyxy[2], boxB_xyxy[2])
    yB = min(boxA_xyxy[3], boxB_xyxy[3])
    interArea = max(0, xB - xA) * max(0, yB - yA)
    boxAArea = boxA[2] * boxA[3]
    boxBArea = boxB[2] * boxB[3]
    iou = interArea / float(boxAArea + boxBArea - interArea)
    return iou

def compute_center_location_error(gt_bbox, pred_bbox):

    gt_center = np.array([(gt_bbox[0] + gt_bbox[2]) / 2, (gt_bbox[1] + gt_bbox[3]) / 2])
    pred_center = np.array([(pred_bbox[0] + pred_bbox[2]) / 2, (pred_bbox[1] + pred_bbox[3]) / 2])
    center_error = np.linalg.norm(gt_center - pred_center)
    return center_error

def load_annotations(filepath):

    annotations = []
    with open(filepath, 'r') as f:
        for line in f:
            parts = line.strip().split(',')
            frame_idx = int(parts[0])
            bbox = [float(p) for p in parts[1:]]
            bbox = [bbox[0], bbox[1], bbox[0] + bbox[2], bbox[1] + bbox[3]]
            annotations.append(bbox)
    return annotations

def compute_visual_similarity(feature1, feature2):
    if feature1 is None or feature2 is None:
        return float('inf')
    return np.linalg.norm(feature1 - feature2)

def get_visual_features(frame, bbox_xywh): # Changed to expect xywh
    x, y, w, h = [int(v) for v in bbox_xywh]
    x1, y1, x2, y2 = x, y, x + w, y + h
    x1 = max(0, x1)
    y1 = max(0, y1)
    x2 = min(frame.shape[1], x2)
    y2 = min(frame.shape[0], y2)

    if x2 <= x1 or y2 <= y1:
        return None

    roi = frame[y1:y2, x1:x2]
    if roi.size == 0:
        return None

    resized_roi = cv2.resize(roi, (32, 32))
    gray_roi = cv2.cvtColor(resized_roi, cv2.COLOR_BGR2GRAY)
    features = gray_roi.flatten()
    return features

Overwriting utils.py


**requirement packages with specified versions**

In [None]:
%%writefile requirements.txt
absl-py==2.1.0
addict==2.4.0
aliyun-python-sdk-core==2.15.1
aliyun-python-sdk-kms==2.16.3
cachetools==5.3.3
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
colorama==0.4.6
contourpy==1.1.1
crcmod==1.7
cryptography==42.0.7
cycler==0.12.1
Cython==3.0.10
einops==0.8.0
filelock==3.13.1
fonttools==4.51.0
fsspec==2024.2.0
google-auth==2.29.0
google-auth-oauthlib==1.0.0
grpcio==1.64.0
idna==3.7
importlib_metadata==7.1.0
importlib_resources==6.4.0
Jinja2==3.1.3
jmespath==0.10.0
joblib==1.4.2
kiwisolver==1.4.5
Markdown==3.6
markdown-it-py==3.0.0
MarkupSafe==2.1.5
mat4py==0.6.0
matplotlib==3.7.5
mdurl==0.1.2
motmetrics==1.4.0
mpmath==1.3.0
networkx==3.0
numpy==1.23.5
oauthlib==3.2.2
opencv-python==4.9.0.80
opencv-contrib-python==4.9.0.80

Overwriting requirements.txt


installing them...

In [None]:
# Install ultralytics
!pip install -q ultralytics
from IPython.display import clear_output
clear_output()
print("Ultralytics installed.")
# Install dependencies from the cleaned requirements.txt
print("Installing other project dependencies...")
!pip install -r requirements.txt
clear_output()
print(" All project dependencies installed.")

✅ All project dependencies installed.


**uploading part**

In [None]:
from google.colab import files

print("Please upload your video file (e.g., my_video.mp4):")
uploaded_video = files.upload()

# Get the filename of the uploaded video
video_filename = list(uploaded_video.keys())[0]
print(f" Uploaded video: {video_filename}")

# Move the uploaded video to the 'data' directory
!mv "{video_filename}" "data/"

# Store the full path for later use in running the script
video_path_in_colab = f"data/{video_filename}"

print(f"\nVideo will be processed from: {video_path_in_colab}")

Please upload your video file (e.g., my_video.mp4):


Saving person1.mp4 to person1.mp4
📁 Uploaded video: person1.mp4

Video will be processed from: data/person1.mp4


In [None]:
# Ensure ultralytics is installed
print("Ensuring ultralytics is installed...")
!pip install -q ultralytics
from IPython.display import clear_output
clear_output()
print(" Ultralytics verified/installed.")

✅ Ultralytics verified/installed.


**single custom tracker (for videos containing 1 object)**

In [None]:
# IMPORTANT: Ensure your video file has been successfully uploaded via Cell 5.
# The `video_path_in_colab` variable comes from Cell 5.

print("Starting processing with MY CUSTOM TRACKER...")
!python main.py --video "{video_path_in_colab}" --tracker my_custom_tracker --output results/fusion/my_custom_tracked_video.mp4

print(f"\nProcessing complete for MY CUSTOM TRACKER.")
print(f"Check the 'results/fusion/' directory for 'my_custom_tracked_video.mp4'.")
print("You can download it from the left-hand file browser in Colab or use the download code provided earlier.")

Starting processing with MY CUSTOM TRACKER...
Loading YOLOv5s model...
PRO TIP 💡 Replace 'model=yolov5s.pt' with new 'model=yolov5su.pt'.
YOLOv5 'u' models are trained with https://github.com/ultralytics/ultralytics and feature improved performance vs standard YOLOv5 models trained with https://github.com/ultralytics/yolov5.

YOLOv5s model loaded.
[INFO] Initializing tracker at frame 0 using detector...
[INFO] Tracker was started by Detector.
[INFO] Single object tracking finished.
[INFO] For my_custom_tracker tracker:
[INFO] Total frames processed: 286
[INFO] Total processing time: 32.32 seconds
[INFO] Achieved FPS: 8.85

Project finished processing for single object tracking with my_custom_tracker tracker.
Output video saved to: results/fusion/my_custom_tracked_video.mp4

Processing complete for MY CUSTOM TRACKER.
Check the 'results/fusion/' directory for 'my_custom_tracked_video.mp4'.
You can download it from the left-hand file browser in Colab or use the download code provided earl

**multi object tracker (works for signle objects as well)**

In [None]:
# Ensure your video file has been successfully uploaded via Cell 5.
# The `video_path_in_colab` variable comes from Cell 5.

print("Starting processing with MULTI-OBJECT TRACKING using MY CUSTOM TRACKER...")
!python main.py --video "{video_path_in_colab}" --tracker my_custom_tracker --mode multi --output results/fusion/multi_custom_tracked_video.mp4

print(f"\nProcessing complete for Multi-Object Tracking with MY CUSTOM TRACKER.")
print(f"Check the 'results/fusion/' directory for 'multi_custom_tracked_video.mp4'.")
print("You can download it from the left-hand file browser in Colab.")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
[INFO] New object 27 detected and initialized at frame 250.
[INFO] New object 28 detected and initialized at frame 260.
[INFO] New object 29 detected and initialized at frame 265.
[INFO] New object 30 detected and initialized at frame 270.
[INFO] Track 24 lost (skipped too many frames) at frame 271.
[INFO] New object 31 detected and initialized at frame 280.
[INFO] Track 27 lost (skipped too many frames) at frame 281.
[INFO] Track 28 lost (skipped too many frames) at frame 291.
[INFO] Track 29 lost (skipped too many frames) at frame 301.
[INFO] Track 30 lost (skipped too many frames) at frame 301.
[INFO] Track 31 lost (skipped too many frames) at frame 311.
[INFO] New object 32 detected and initialized at frame 320.
[INFO] New object 33 detected and initialized at frame 345.
[INFO] New object 34 detected and initialized at frame 365.
[INFO] New object 35 detected and initialized at frame 370.
[INFO] Track 33 lost (skipped

**csrt, kcf, mosse output**

In [None]:
# Ensure your video file has been successfully uploaded via Cell 4.
# The `video_path_in_colab` variable comes from Cell 4.

print("Starting processing with CSRT tracker...")
!python main.py --video "{video_path_in_colab}" --tracker csrt --output results/fusion/csrt_tracked_video.mp4

print("\nStarting processing with KCF tracker...")
!python main.py --video "{video_path_in_colab}" --tracker kcf --output results/fusion/kcf_tracked_video.mp4

print("\nStarting processing with MOSSE tracker...")
!python main.py --video "{video_path_in_colab}" --tracker mosse --output results/fusion/mosse_tracked_video.mp4


print(f"\nAll processing complete for CSRT, KCF, and MOSSE trackers.")
print(f"Check the 'results/fusion/' directory for the following output videos:")
print(f"- results/fusion/csrt_tracked_video.mp4")
print(f"- results/fusion/kcf_tracked_video.mp4")
print(f"- results/fusion/mosse_tracked_video.mp4")
print("\nYou can download them from the left-hand file browser in Colab or use the download code provided earlier.")

Starting processing with CSRT tracker...
Loading YOLOv5s model...
PRO TIP 💡 Replace 'model=yolov5s.pt' with new 'model=yolov5su.pt'.
YOLOv5 'u' models are trained with https://github.com/ultralytics/ultralytics and feature improved performance vs standard YOLOv5 models trained with https://github.com/ultralytics/yolov5.

YOLOv5s model loaded.
[INFO] Initializing tracker at frame 0 using detector...
[INFO] Tracker was started by Detector.
[INFO] Single object tracking finished.
[INFO] For csrt tracker:
[INFO] Total frames processed: 286
[INFO] Total processing time: 75.99 seconds
[INFO] Achieved FPS: 3.76

Project finished processing for single object tracking with csrt tracker.
Output video saved to: results/fusion/csrt_tracked_video.mp4

Starting processing with KCF tracker...
Loading YOLOv5s model...
PRO TIP 💡 Replace 'model=yolov5s.pt' with new 'model=yolov5su.pt'.
YOLOv5 'u' models are trained with https://github.com/ultralytics/ultralytics and feature improved performance vs stand