In [2]:
# -----------------------------
# Realtime Object Tracking & Analytics (YOLOv8 + SORT) - Colab Ready
# -----------------------------

!pip install ultralytics opencv-python-headless gradio --quiet

import torch
import cv2
import numpy as np
import time
from collections import deque, defaultdict
from ultralytics import YOLO
from scipy.optimize import linear_sum_assignment
from google.colab import files
import ultralytics
from ultralytics import YOLO
import gradio as gr


# =====================================================
#  SIMPLE IOU FUNCTION
# =====================================================
def iou_bbox(bb_test, bb_gt):
    xx1 = np.maximum(bb_test[0], bb_gt[0])
    yy1 = np.maximum(bb_test[1], bb_gt[1])
    xx2 = np.minimum(bb_test[2], bb_gt[2])
    yy2 = np.minimum(bb_test[3], bb_gt[3])
    w = np.maximum(0., xx2 - xx1)
    h = np.maximum(0., yy2 - yy1)
    inter = w * h
    area1 = (bb_test[2] - bb_test[0]) * (bb_test[3] - bb_test[1])
    area2 = (bb_gt[2] - bb_gt[0]) * (bb_gt[3] - bb_gt[1])
    union = area1 + area2 - inter
    return inter / union if union > 0 else 0.0


# =====================================================
#  SORT TRACKING IMPLEMENTATION
# =====================================================
class KalmanBoxTracker:
    count = 0
    def __init__(self, bbox):
        x1, y1, x2, y2 = bbox
        w, h = x2 - x1, y2 - y1
        cx, cy = x1 + w/2., y1 + h/2.
        s = w * h
        r = w / float(h + 1e-6)

        self.dt = 1.
        self.x = np.array([cx, cy, s, r, 0., 0., 0.]).reshape((7,1))

        self.F = np.eye(7)
        for i in range(3): self.F[i, i+4] = self.dt

        self.H = np.zeros((4,7))
        self.H[0,0] = self.H[1,1] = self.H[2,2] = self.H[3,3] = 1.

        self.P = np.eye(7) * 10.
        self.P[4:,4:] *= 1000.
        self.R = np.eye(4)
        self.Q = np.eye(7)*0.01

        self.time_since_update = 0
        self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1
        self.history = []
        self.hits = 1
        self.hit_streak = 1
        self.age = 0
        self.last_bbox = bbox

    def predict(self):
        self.x = np.dot(self.F, self.x)
        self.P = np.dot(self.F, np.dot(self.P, self.F.T)) + self.Q
        self.age += 1
        self.time_since_update += 1
        if self.time_since_update > 0: self.hit_streak = 0
        self.history.append(self._get_state_bbox())
        return self.history[-1]

    def update(self, bbox):
        z = self._bbox_to_z(bbox).reshape((4,1))
        y = z - np.dot(self.H, self.x)
        S = np.dot(self.H, np.dot(self.P, self.H.T)) + self.R
        K = np.dot(np.dot(self.P, self.H.T), np.linalg.inv(S))
        self.x = self.x + np.dot(K, y)
        self.P = np.dot(np.eye(7) - np.dot(K, self.H), self.P)
        self.time_since_update = 0
        self.hits += 1
        self.hit_streak += 1
        self.last_bbox = bbox

    def _bbox_to_z(self, bbox):
        x1, y1, x2, y2 = bbox
        w, h = x2 - x1, y2 - y1
        cx, cy = x1 + w/2., y1 + h/2.
        s = w*h
        r = w/float(h+1e-6)
        return np.array([cx,cy,s,r])

    def _get_state_bbox(self):
        cx, cy, s, r = self.x[0], self.x[1], self.x[2], self.x[3]
        w = np.sqrt(abs(s*r))
        h = s / (w + 1e-6)
        x1, y1 = cx - w/2., cy - h/2.
        x2, y2 = cx + w/2., cy + h/2.
        return [x1.item(), y1.item(), x2.item(), y2.item()]


class Sort:
    def __init__(self, max_age=30, min_hits=1, iou_threshold=0.3):
        self.max_age=max_age
        self.min_hits=min_hits
        self.iou_threshold=iou_threshold
        self.trackers=[]

    def update(self, dets):
        trks=[trk.predict() for trk in self.trackers]
        matched, unm_det, unm_trk = self._assoc(dets, trks)

        for t_idx, d_idx in matched:
            self.trackers[t_idx].update(dets[d_idx,:4])

        for idx in unm_det:
            self.trackers.append(KalmanBoxTracker(dets[idx,:4]))

        ret=[]
        for t in reversed(range(len(self.trackers))):
            d=self.trackers[t]
            if d.time_since_update < 1:
                ret.append(np.array(d._get_state_bbox()+[d.id]).reshape(1,-1))
            if d.time_since_update > self.max_age:
                self.trackers.pop(t)

        return np.concatenate(ret) if ret else np.empty((0,5))

    def _assoc(self, dets, trks):
        if len(trks)==0: return [], list(range(len(dets))), []

        iou_matrix=np.zeros((len(trks), len(dets)), dtype=np.float32)
        for t,trk in enumerate(trks):
            for d, det in enumerate(dets):
                iou_matrix[t,d]=iou_bbox(trk, det[:4])

        matched_idx=np.stack(linear_sum_assignment(-iou_matrix),axis=1) if iou_matrix.size else np.empty((0,2),dtype=int)
        unm_trk=[t for t in range(len(trks)) if t not in matched_idx[:,0]] if matched_idx.size else list(range(len(trks)))
        unm_det=[d for d in range(len(dets)) if matched_idx.size==0 or d not in matched_idx[:,1]]

        matches=[]
        for m in matched_idx:
            if iou_matrix[m[0], m[1]] < self.iou_threshold:
                unm_trk.append(m[0]); unm_det.append(m[1])
            else:
                matches.append([m[0],m[1]])

        return matches, unm_det, unm_trk


# =====================================================
#  YOLO PARSE FUNCTION
# =====================================================
def yolo_to_bboxes(results, conf_threshold=0.3, classes=None):
    bboxes=[]
    for r in results:
        for det in r.boxes:
            conf=float(det.conf[0])
            cls=int(det.cls[0])
            if conf < conf_threshold: continue
            if classes and cls not in classes: continue
            xyxy=det.xyxy[0].cpu().numpy()
            bboxes.append([*xyxy.tolist(), conf])
    return np.array(bboxes) if bboxes else np.empty((0,5))


# =====================================================
#  ANALYTICS CLASS
# =====================================================
class Analytics:
    def __init__(self, line=None, region=None):
        self.line=line
        self.region=region
        self.counts={"in":0,"out":0}
        self.tracks_history=defaultdict(lambda: deque(maxlen=50))
        self.region_entry_time={}
        self.dwell_times=defaultdict(float)
        self.last_side={}

    @staticmethod
    def center(b):
        x1,y1,x2,y2=b
        return ((x1+x2)/2., (y1+y2)/2.)

    @staticmethod
    def side_of_line(pt, line):
        (x1,y1),(x2,y2)=line
        x,y=pt
        return np.sign((x2-x1)*(y-y1) - (y2-y1)*(x-x1))

    def update_tracks(self, tracks, t):
        for tr in tracks:
            x1,y1,x2,y2,tid=tr
            tid=int(tid)
            cx,cy=self.center([x1,y1,x2,y2])

            # Save history
            self.tracks_history[tid].append((t,(cx,cy)))

            # 1. Line crossing
            if self.line:
                side = self.side_of_line((cx,cy), self.line)
                last = self.last_side.get(tid, side)
                if last!=0 and side!=0 and last!=side:
                    self.counts["in" if last<side else "out"] += 1
                self.last_side[tid]=side

            # 2. Region dwell
            if self.region:
                xA,yA,xB,yB=self.region
                inside = xA<=cx<=xB and yA<=cy<=yB
                if inside and tid not in self.region_entry_time:
                    self.region_entry_time[tid]=t
                if not inside and tid in self.region_entry_time:
                    enter=self.region_entry_time.pop(tid)
                    self.dwell_times[tid]+=t-enter

    def get_dwell(self, tid): return self.dwell_times.get(tid,0)


# =====================================================
#  DRAW OVERLAY
# =====================================================
def draw_overlay(frame, tracks, analytics):
    h,w=frame.shape[:2]

    # Line
    if analytics.line:
        (x1,y1),(x2,y2)=analytics.line
        cv2.line(frame, (int(x1),int(y1)), (int(x2),int(y2)), (0,255,255),2)

    # Region
    if analytics.region:
        xA,yA,xB,yB=analytics.region
        cv2.rectangle(frame,(int(xA),int(yA)),(int(xB),int(yB)),(255,200,0),2)

    # Objects
    for tr in tracks:
        x1,y1,x2,y2,tid=tr
        tid=int(tid)
        color=(int((tid*47)%255), int((tid*97)%255), int((tid*67)%255))
        cv2.rectangle(frame,(int(x1),int(y1)),(int(x2),int(y2)),color,2)
        dwell=analytics.get_dwell(tid)
        cv2.putText(frame,f"ID {tid} | {dwell:.1f}s",(int(x1),int(y1)-6),0,0.55,color,2)

    # Counts
    c=analytics.counts
    cv2.putText(frame,f"In:{c['in']} Out:{c['out']}",(10,30),0,1,(0,240,0),2)
    cv2.putText(frame,f"Visitors: {len(analytics.tracks_history)}",(10,70),0,1,(0,180,255),2)

    return frame


# =====================================================
#  RUN VIDEO FUNCTION (GLOBAL ANALYTICS)
# =====================================================
def run_video(video_path, output_path="output.mp4", model_name="yolov8n.pt"):
    global analytics   # ‚Üê very important

    cap=cv2.VideoCapture(video_path)
    width, height = int(cap.get(3)), int(cap.get(4))
    fps = cap.get(cv2.CAP_PROP_FPS) or 25

    # Load YOLO safely
    with torch.serialization.safe_globals([
        torch.nn.modules.container.Sequential,
        ultralytics.nn.tasks.DetectionModel
    ]):
        model=YOLO(model_name)

    tracker=Sort()
    analytics = Analytics()
    analytics.line=((0,height//2),(width,height//2))
    analytics.region=(width*0.2,height*0.3,width*0.8,height*0.7)

    out=cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width,height))

    frame_id=0
    buffer=np.empty((0,5))

    while True:
        ret, frame=cap.read()
        if not ret: break
        frame_id+=1

        # detection every 3 frames
        if frame_id % 3 == 0:
            res=model.predict(frame, imgsz=640, conf=0.35, verbose=False)
            buffer=yolo_to_bboxes(res)
        dets=buffer

        trks=tracker.update(dets if dets.shape[0]>0 else np.empty((0,5)))
        analytics.update_tracks(trks, time.time())

        disp=frame.copy()
        disp=draw_overlay(disp,trks,analytics)
        out.write(disp)

    cap.release()
    out.release()
    return output_path


# =====================================================
#  GRADIO DASHBOARD
# =====================================================
def process_video_dashboard(video):
    if video is None:
        return "‚ùå No video uploaded", None, None, None

    output_path=f"processed_{int(time.time())}.mp4"
    out_file=run_video(video, output_path)

    # READ FROM GLOBAL ANALYTICS
    c=analytics.counts
    total=len(analytics.tracks_history)

    result_text=(
        f"‚úÖ Video processed!\n\n"
        f"üü¢ In: {c['in']}\n"
        f"üî¥ Out: {c['out']}\n"
        f"üë• Total Visitors: {total}"
    )

    return result_text, out_file, c["in"], c["out"]


# UI
with gr.Blocks(title="Realtime Visitor Analytics") as dashboard:
    gr.Markdown("<h1 style='text-align:center'> Realtime Object Tracking and Analytics</h1>")

    video_in = gr.Video(label="Upload Video")
    btn = gr.Button(" Process Video")

    result = gr.Textbox(label="Results", lines=6)
    video_out = gr.Video(label="Processed Output")

    btn.click(process_video_dashboard, inputs=video_in,
              outputs=[result, video_out,
                       gr.Number(label="In"),
                       gr.Number(label="Out")])

dashboard.launch(
    share=True,
    debug=False,
    show_error=True,
    allowed_paths=["/content"]
)



Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://9989a82dbe93f3d71c.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


