In [3]:
# MacV Object Tracker Task Implementation

import cv2
import numpy as np
import os
from collections import defaultdict

INPUT_VIDEO = "C:\\Users\\ANSHU\\Downloads\\macv-obj-tracking-video.mp4"
OUTPUT_VIDEO = "C:\\Users\\ANSHU\\Downloads\\op_tracked_video.mp4"

#Centroid Tracker
class CentroidTracker:
    def __init__(self, maxDisappeared=50):
        self.nextObjectID = 0
        self.objects = dict()
        self.disappeared = dict()
        self.maxDisappeared = maxDisappeared
        self.trails = defaultdict(list)
        self.entry_frame = dict()

    def register(self, centroid, frame_idx):
        self.objects[self.nextObjectID] = centroid
        self.disappeared[self.nextObjectID] = 0
        self.trails[self.nextObjectID].append(centroid)
        self.entry_frame[self.nextObjectID] = frame_idx
        self.nextObjectID += 1

    def deregister(self, objectID):
        del self.objects[objectID]
        del self.disappeared[objectID]

    def update(self, rects, frame_idx):
        inputCentroids = []
        for (startX, startY, endX, endY) in rects:
            cX = int((startX + endX) / 2.0)
            cY = int((startY + endY) / 2.0)
            inputCentroids.append((cX, cY))

        if len(self.objects) == 0:
            for i in range(len(inputCentroids)):
                self.register(inputCentroids[i], frame_idx)
        else:
            objectIDs = list(self.objects.keys())
            objectCentroids = list(self.objects.values())

            D = np.linalg.norm(np.array(objectCentroids)[:, np.newaxis] - np.array(inputCentroids), axis=2)
            rows = D.min(axis=1).argsort()
            cols = D.argmin(axis=1)[rows]

            usedRows, usedCols = set(), set()
            for (row, col) in zip(rows, cols):
                if row in usedRows or col in usedCols:
                    continue
                objectID = objectIDs[row]
                self.objects[objectID] = inputCentroids[col]
                self.disappeared[objectID] = 0
                self.trails[objectID].append(inputCentroids[col])
                usedRows.add(row)
                usedCols.add(col)

            unusedRows = set(range(D.shape[0])) - usedRows
            for row in unusedRows:
                objectID = objectIDs[row]
                self.disappeared[objectID] += 1
                if self.disappeared[objectID] > self.maxDisappeared:
                    self.deregister(objectID)

            unusedCols = set(range(len(inputCentroids))) - usedCols
            for col in unusedCols:
                self.register(inputCentroids[col], frame_idx)

        return self.objects

def detect_objects(frame):
    # Use simple color-based detection (e.g. white/bright objects)
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    _, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY)
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    rects = []
    for c in contours:
        if cv2.contourArea(c) < 500:
            continue
        x, y, w, h = cv2.boundingRect(c)
        rects.append((x, y, x + w, y + h))
    return rects

def main():
    cap = cv2.VideoCapture(INPUT_VIDEO)
    fourcc = cv2.VideoWriter_fourcc(*'avc1')
    out = None

    tracker = CentroidTracker()
    frame_idx = 0
    frame_rate = int(cap.get(cv2.CAP_PROP_FPS))

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if out is None:
            h, w = frame.shape[:2]
            out = cv2.VideoWriter(OUTPUT_VIDEO, fourcc, frame_rate, (w, h))

        rects = detect_objects(frame)
        objects = tracker.update(rects, frame_idx)

        for objectID, centroid in objects.items():
            text = f"ID {objectID}"
            cv2.putText(frame, text, (centroid[0] - 10, centroid[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
            cv2.circle(frame, centroid, 4, (0, 255, 0), -1)

            for i in range(1, len(tracker.trails[objectID])):
                cv2.line(frame, tracker.trails[objectID][i - 1], tracker.trails[objectID][i], (255, 0, 0), 2)

        out.write(frame)
        frame_idx += 1

    cap.release()
    if out is not None:
        out.release()

    print("Total Unique Objects:", len(tracker.entry_frame))
    for objectID, start_frame in tracker.entry_frame.items():
        duration_sec = (frame_idx - start_frame) / frame_rate
        print(f"Object ID {objectID} duration: {duration_sec:.2f}s")

if __name__ == '__main__':
    main()


Total Unique Objects: 50
Object ID 0 duration: 17.50s
Object ID 1 duration: 17.50s
Object ID 2 duration: 17.50s
Object ID 3 duration: 17.50s
Object ID 4 duration: 17.50s
Object ID 5 duration: 17.50s
Object ID 6 duration: 17.50s
Object ID 7 duration: 17.50s
Object ID 8 duration: 17.50s
Object ID 9 duration: 17.50s
Object ID 10 duration: 16.87s
Object ID 11 duration: 16.70s
Object ID 12 duration: 14.57s
Object ID 13 duration: 13.43s
Object ID 14 duration: 13.37s
Object ID 15 duration: 12.93s
Object ID 16 duration: 12.87s
Object ID 17 duration: 12.20s
Object ID 18 duration: 12.00s
Object ID 19 duration: 11.80s
Object ID 20 duration: 11.67s
Object ID 21 duration: 11.53s
Object ID 22 duration: 11.37s
Object ID 23 duration: 11.33s
Object ID 24 duration: 11.33s
Object ID 25 duration: 10.83s
Object ID 26 duration: 10.70s
Object ID 27 duration: 10.50s
Object ID 28 duration: 10.37s
Object ID 29 duration: 10.33s
Object ID 30 duration: 10.07s
Object ID 31 duration: 9.90s
Object ID 32 duration: 9.8