# Installing Libraries


In [1]:
!pip -q uninstall -y ultralytics opencv-python opencv-python-headless scipy

# Installing compatible versions
!pip -q install "ultralytics==8.3.1" \
                 "opencv-python-headless==4.10.0.84" \
                 "scipy>=1.13.1" \
                 "yt-dlp" \
                 "shapely==2.0.4" \
                 "lapx==0.5.9" \
                 "pandas==2.2.2"

#Imports


In [2]:
# Dummy rec class for legacy models.
import sys, types
if "numpy.rec" not in sys.modules:
    sys.modules["numpy.rec"] = types.ModuleType("numpy.rec")

In [3]:
#All the imports
import os, math, time, glob, subprocess
import numpy as np
import pandas as pd
import cv2
from shapely.geometry import Point, Polygon
from ultralytics import YOLO

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


# Downloading Video Form Youtube


In [4]:
YOUTUBE_URL = "https://www.youtube.com/watch?v=MNn9qKG2UFI"
VIDEO_PATH  = "input.mp4"

if not os.path.exists(VIDEO_PATH):
    !yt-dlp -f "mp4[height<=720]/best[height<=720]/best" -o "input.%(ext)s" {YOUTUBE_URL}
    if not os.path.exists(VIDEO_PATH):
        files = glob.glob("input.*")
        if files:
            os.rename(files[0], VIDEO_PATH)

assert os.path.exists(VIDEO_PATH), "Video download failed."
print("Downloaded:", VIDEO_PATH)

[youtube] Extracting URL: https://www.youtube.com/watch?v=MNn9qKG2UFI
[youtube] MNn9qKG2UFI: Downloading webpage
[youtube] MNn9qKG2UFI: Downloading tv client config
[youtube] MNn9qKG2UFI: Downloading player 5ec65609-main
[youtube] MNn9qKG2UFI: Downloading tv player API JSON
[youtube] MNn9qKG2UFI: Downloading ios player API JSON
[youtube] MNn9qKG2UFI: Downloading m3u8 information
[info] MNn9qKG2UFI: Downloading 1 format(s): 18
[download] Sleeping 6.00 seconds as required by the site...
[download] Destination: input.mp4
[K[download] 100% of   20.91MiB in [1;37m00:00:01[0m at [0;32m10.73MiB/s[0m
Downloaded: input.mp4


# Sort Algorithm

In [5]:
import numpy as np
from math import hypot
try:
    from lapx import lapjv
    HAVE_LAP = True
except Exception:
    HAVE_LAP = False

def iou(bb_test, bb_gt):
    xx1 = np.maximum(bb_test[0], bb_gt[0])
    yy1 = np.maximum(bb_test[1], bb_gt[1])
    xx2 = np.minimum(bb_test[2], bb_gt[2])
    yy2 = np.minimum(bb_test[3], bb_gt[3])
    w = np.maximum(0., xx2 - xx1)
    h = np.maximum(0., yy2 - yy1)
    inter = w * h
    area1 = (bb_test[2]-bb_test[0])*(bb_test[3]-bb_test[1])
    area2 = (bb_gt[2]-bb_gt[0])*(bb_gt[3]-bb_gt[1])
    return inter / (area1 + area2 - inter + 1e-9)

def convert_bbox_to_z(bbox):
    w = bbox[2]-bbox[0]; h = bbox[3]-bbox[1]
    x = bbox[0] + w/2.; y = bbox[1] + h/2.
    s = w*h
    r = w/(h+1e-9)
    return np.array([x,y,s,r], dtype=np.float64).reshape(4,1)

def convert_x_to_bbox(x, score=None):
    w = np.sqrt(max(x[2,0]*x[3,0], 1e-9))
    h = x[2,0]/max(w, 1e-9)
    x1 = x[0,0] - w/2.; y1 = x[1,0] - h/2.
    x2 = x[0,0] + w/2.; y2 = x[1,0] + h/2.
    if score is None:
        return np.array([x1, y1, x2, y2]).reshape(1,4)
    else:
        return np.array([x1, y1, x2, y2, score]).reshape(1,5)

class KalmanBox:
    """7D KF state: [x,y,s,r, vx,vy,vs]^T"""
    def __init__(self, bbox):
        self.F = np.eye(7)
        self.F[0,4] = 1.0
        self.F[1,5] = 1.0
        self.F[2,6] = 1.0
        self.H = np.zeros((4,7))
        self.H[0,0] = 1.0; self.H[1,1] = 1.0; self.H[2,2] = 1.0; self.H[3,3] = 1.0
        self.R = np.diag([1.0, 1.0, 10.0, 10.0])
        self.Q = np.eye(7) * 0.01
        self.P = np.eye(7) * 10.0
        self.P[4:,4:] *= 1000.0

        self.x = np.zeros((7,1))
        self.x[:4] = convert_bbox_to_z(bbox)

        self.time_since_update = 0
        self.hits = 0
        self.hit_streak = 0
        self.age = 0

    def predict(self):
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q
        self.age += 1
        if self.time_since_update > 0:
            self.hit_streak = 0
        self.time_since_update += 1
        return convert_x_to_bbox(self.x)

    def update(self, bbox):
        z = convert_bbox_to_z(bbox)
        y = z - (self.H @ self.x)
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ np.linalg.inv(S)
        self.x = self.x + K @ y
        I = np.eye(7)
        self.P = (I - K @ self.H) @ self.P
        self.time_since_update = 0
        self.hits += 1
        self.hit_streak += 1

class Track:
    _count = 0
    def __init__(self, bbox):
        self.kf = KalmanBox(bbox)
        self.id = Track._count
        Track._count += 1

    def predict(self):
        return self.kf.predict()

    def update(self, bbox):
        self.kf.update(bbox)

    @property
    def time_since_update(self):
        return self.kf.time_since_update
    @property
    def hit_streak(self):
        return self.kf.hit_streak

    def get_state(self):
        return convert_x_to_bbox(self.kf.x)

class SORT:
    def __init__(self, max_age=30, min_hits=3, iou_threshold=0.25):
        self.max_age = max_age
        self.min_hits = min_hits
        self.iou_threshold = iou_threshold
        self.tracks = []
        self.frame_count = 0

    def _associate(self, dets, trks):
        if dets.size == 0 or trks.size == 0:
            return np.empty((0,2), dtype=int), np.arange(dets.shape[0]), np.arange(trks.shape[0])
        iou_mat = np.zeros((dets.shape[0], trks.shape[0]), dtype=np.float32)
        for d in range(dets.shape[0]):
            for t in range(trks.shape[0]):
                iou_mat[d,t] = iou(dets[d], trks[t])

        cost = 1.0 - iou_mat
        if HAVE_LAP:
            r, c, _ = lapjv(cost, extend_cost=True)
            matches = []
            unmatched_d = []
            unmatched_t = list(range(trks.shape[0]))
            for di, ci in enumerate(r):
                if ci >= 0 and iou_mat[di,ci] >= self.iou_threshold:
                    matches.append([di, ci])
                    if ci in unmatched_t:
                        unmatched_t.remove(ci)
                else:
                    unmatched_d.append(di)
            return np.array(matches, dtype=int), np.array(unmatched_d, dtype=int), np.array(unmatched_t, dtype=int)
        else:

            matches = []
            used_cols = set()
            for d in range(dets.shape[0]):
                c = np.argmax(iou_mat[d])
                if iou_mat[d,c] >= self.iou_threshold and c not in used_cols:
                    matches.append([d,c])
                    used_cols.add(c)
            unmatched_d = [d for d in range(dets.shape[0]) if d not in [m[0] for m in matches]]
            unmatched_t = [t for t in range(trks.shape[0]) if t not in [m[1] for m in matches]]
            return np.array(matches, dtype=int), np.array(unmatched_d, dtype=int), np.array(unmatched_t, dtype=int)

    def update(self, dets):
        self.frame_count += 1

        trk_preds = []
        for trk in self.tracks:
            pred = trk.predict()[0]
            trk_preds.append(pred)
        trk_preds = np.array(trk_preds) if trk_preds else np.empty((0,4))

        matched, unmatched_dets, unmatched_trks = self._associate(dets, trk_preds)

        for d_idx, t_idx in matched:
            self.tracks[t_idx].update(dets[d_idx])

        for d_idx in unmatched_dets:
            self.tracks.append(Track(dets[d_idx]))

        alive = []
        for t_idx, trk in enumerate(self.tracks):
            if t_idx in unmatched_trks:
                pass
            if trk.time_since_update <= self.max_age:
                alive.append(trk)
        self.tracks = alive

        outputs = []
        for trk in self.tracks:
            d = trk.get_state()[0]
            if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
                outputs.append(np.concatenate((d, [trk.id])).reshape(1,-1))
        if outputs:
            return np.concatenate(outputs)
        return np.empty((0,5))

# Running the program and Output


In [6]:
OUTPUT_VIDEO = "output_annotated.mp4"
CSV_PATH     = "counts.csv"

MODEL_NAME   = "yolov8s.pt"
CONF_THRESH  = 0.30
IOU_NMS      = 0.5
IMG_SIZE     = 640
DEVICE       = 0

tracker = SORT(max_age=60, min_hits=2, iou_threshold=0.20)

# Reset SORT counter each run
try:
    Track._count = 1
except NameError:
    pass

VEHICLE_CLASS_IDS = {2, 3, 5, 7}

LANE_POLYGONS = [
    np.array([[  4, 309], [104, 178], [164, 186], [ 57, 356]], np.int32),  # Lane 1
    np.array([[ 66, 356], [168, 189], [246, 192], [217, 355]], np.int32),  # Lane 2
    np.array([[258, 355], [277, 196], [347, 193], [368, 355]], np.int32),  # Lane 3
    np.array([[382, 352], [505, 197], [593, 226], [542, 356]], np.int32)   # Lane 4
]
lane_polys_cv2 = {i+1: pts for i, pts in enumerate(LANE_POLYGONS)}

from shapely.geometry import Polygon, Point, LineString, box as shapely_box
from shapely.ops import unary_union

lane_polys = {i+1: Polygon([tuple(p) for p in pts]) for i, pts in enumerate(LANE_POLYGONS)}
lanes_union = unary_union(list(lane_polys.values()))

# Counting lines per lane
def lane_line_from_polygon(poly: Polygon, frac_from_top=0.60):
    minx, miny, maxx, maxy = poly.bounds
    y_line = miny*(1-frac_from_top) + maxy*frac_from_top
    seg = poly.intersection(LineString([(0, y_line), (99999, y_line)]))
    if seg.is_empty:
        return int(minx), int(maxx), int(y_line)
    if seg.geom_type == "MultiLineString":
        longest = max(list(seg.geoms), key=lambda g: g.length)
        (x1,y1), (x2,y2) = longest.coords[0], longest.coords[-1]
    else:
        (x1,y1), (x2,y2) = seg.coords[0], seg.coords[-1]
    return int(x1), int(x2), int(round(y1))

lane_lines = {lid: lane_line_from_polygon(poly, 0.60) for lid, poly in lane_polys.items()}
lane_line_geoms = {lid: LineString([(x1,y), (x2,y)]) for lid,(x1,x2,y) in lane_lines.items()}

# Overlap thresholds
LANE_OVERLAP_THRESH = 0.20
ROI_MIN_OVERLAP     = 0.05
INSIDE_STREAK       = 8
CROSS_TOL           = 3

# Loading YOLO
from ultralytics import YOLO
model = YOLO(MODEL_NAME)
model.fuse()
model.to("cuda")

# Video I/O
cap = cv2.VideoCapture(VIDEO_PATH); assert cap.isOpened()
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
W   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
H   = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v'); out = cv2.VideoWriter(OUTPUT_VIDEO, fourcc, fps, (W,H))

lane_counts = {1:0,2:0,3:0,4:0}
rows = []

id_alias, next_alias = {}, 1

id_state = {}

def bbox_polygon(x1,y1,x2,y2):
    return shapely_box(float(x1), float(y1), float(x2), float(y2))

def assign_lane_by_overlap(bpoly: Polygon):
    """Return lane id with max area overlap ratio, if >= threshold; else 0."""
    b_area = bpoly.area + 1e-9
    best_lid, best_ratio = 0, 0.0
    for lid, lpoly in lane_polys.items():
        inter = bpoly.intersection(lpoly)
        if inter.is_empty:
            continue
        ratio = inter.area / b_area
        if ratio > best_ratio:
            best_ratio, best_lid = ratio, lid
    return best_lid if best_ratio >= LANE_OVERLAP_THRESH else 0

frame_idx = 0
while True:
    ok, frame = cap.read()
    if not ok: break
    frame_idx += 1
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Detecting
    results = model.predict(source=rgb, verbose=False, device=DEVICE,
                            conf=CONF_THRESH, iou=IOU_NMS, imgsz=IMG_SIZE, half=True)

    dets_all = []
    for r in results:
        if r.boxes is None: continue
        for b in r.boxes:
            if int(b.cls[0].item()) in VEHICLE_CLASS_IDS:
                x1,y1,x2,y2 = b.xyxy[0].tolist()
                dets_all.append([x1,y1,x2,y2])

    dets = []
    for x1,y1,x2,y2 in dets_all:
        bpoly = bbox_polygon(x1,y1,x2,y2)
        inter = bpoly.intersection(lanes_union)
        if not inter.is_empty and (inter.area / (bpoly.area + 1e-9)) >= ROI_MIN_OVERLAP:
            dets.append([x1,y1,x2,y2])
    dets = np.array(dets, dtype=np.float32) if dets else np.empty((0,4), dtype=np.float32)

    tracks = tracker.update(dets)

    # Drawing lanes & lines
    overlay = frame.copy()
    for lid, pts in lane_polys_cv2.items():
        cv2.polylines(overlay, [pts], True, (60,180,255), 2)
        x1l,x2l,yl = lane_lines[lid]
        cv2.line(overlay, (x1l,yl), (x2l,yl), (0,255,255), 2)
        cv2.putText(overlay, f"Lane {lid}: {lane_counts[lid]}",
                    (int(pts[0][0]), max(20, int(pts[0][1]-6))),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2, cv2.LINE_AA)

    # Processinh tracks
    for x1,y1,x2,y2, tid in tracks:
        tid = int(tid)
        # pretty alias id
        if tid not in id_alias:
            id_alias[tid] = next_alias; next_alias += 1
        disp_id = id_alias[tid]

        bpoly = bbox_polygon(x1,y1,x2,y2)
        curr_lane = assign_lane_by_overlap(bpoly)

        bcx, bcy = int((x1+x2)/2), int(y2)

        st = id_state.get(tid, {"lane":0, "counted":False, "prev_bcx":None, "prev_bcy":None, "above_streak":0})

        st["lane"] = curr_lane if curr_lane != 0 else st["lane"]

        if not st["counted"] and curr_lane != 0:
            line_y = lane_lines[curr_lane][2]
            line_geom = lane_line_geoms[curr_lane]

            if st["prev_bcy"] is not None:
                motion = LineString([(st["prev_bcx"], st["prev_bcy"]), (bcx, bcy)])
                near_now  = abs(bcy - line_y) <= CROSS_TOL
                near_prev = abs(st["prev_bcy"] - line_y) <= CROSS_TOL
                upward    = (st["prev_bcy"] - bcy) >= 1

                crossed = motion.intersects(line_geom) or (near_prev and near_now and upward)
                if crossed and upward:
                    st["counted"] = True
                    lane_counts[curr_lane] += 1
                    rows.append({
                        "vehicle_id": disp_id,
                        "lane": curr_lane,
                        "frame": frame_idx,
                        "timestamp": round(frame_idx / fps, 3)
                    })

            if not st["counted"]:
                if bcy <= line_y:
                    st["above_streak"] += 1
                    if st["above_streak"] >= INSIDE_STREAK:
                        st["counted"] = True
                        lane_counts[curr_lane] += 1
                        rows.append({
                            "vehicle_id": disp_id,
                            "lane": curr_lane,
                            "frame": frame_idx,
                            "timestamp": round(frame_idx / fps, 3)
                        })
                else:
                    st["above_streak"] = 0

        # Save state & draw
        st["prev_bcx"], st["prev_bcy"] = bcx, bcy
        id_state[tid] = st

        color = (50,220,50) if st["counted"] else (60,60,255)
        cv2.rectangle(overlay, (int(x1),int(y1)), (int(x2),int(y2)), color, 2)
        cv2.circle(overlay, (bcx,bcy), 3, (255,255,255), -1)
        label_lane = st["lane"] if st["lane"] else curr_lane
        label = f"ID {disp_id}" + (f" L{label_lane}" if label_lane else "")
        cv2.putText(overlay, label, (int(x1), int(y1)-7),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,0), 3, cv2.LINE_AA)
        cv2.putText(overlay, label, (int(x1), int(y1)-7),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 1, cv2.LINE_AA)

    out.write(overlay)

cap.release(); out.release()
df = pd.DataFrame(rows, columns=["vehicle_id","lane","frame","timestamp"])
df.to_csv(CSV_PATH, index=False)
print("Saved CSV:", CSV_PATH, "| Video:", OUTPUT_VIDEO)

Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8s.pt to 'yolov8s.pt'...


100%|██████████| 21.5M/21.5M [00:00<00:00, 75.6MB/s]


YOLOv8s summary (fused): 168 layers, 11,156,544 parameters, 0 gradients, 28.6 GFLOPs
Saved CSV: counts.csv | Video: output_annotated.mp4
