In [None]:
%pip install --force-reinstall lapx==0.5.11 supervision==0.24.0 ultralytics==8.3.12 gdown==5.2.0 

In [None]:
%pip install -q "numpy<2" "matplotlib<3.9"

In [None]:
pip install torch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2 --index-url https://download.pytorch.org/whl/cu118

In [None]:
import numpy, scipy, supervision, ultralytics, gdown, lap

print("numpy:", numpy.__version__)
print("scipy:", scipy.__version__)
print("supervision:", supervision.__version__)
print("ultralytics:", ultralytics.__version__)
print("gdown:", gdown.__version__)
print("lapx:", lap.__version__)

In [None]:
import torch
print(torch.cuda.is_available())
print(torch.cuda.get_device_name(0))

In [None]:
import os
import csv
import math
import warnings
import cv2 as cv
import subprocess
import numpy as np
import pandas as pd
import seaborn as sns
import supervision as sv
import matplotlib.pyplot as plt
from ultralytics import YOLO
from IPython.display import Video
from collections import defaultdict, deque
from scipy.signal import savgol_filter
import warnings
warnings.simplefilter("ignore")

In [None]:
on_kaggle = os.getenv("KAGGLE_KERNEL_RUN_TYPE") is not None
video_url = "input.mp4"

In [None]:
Video(video_url, width=960, height=540, embed=True)

In [None]:
class Cam2WorldMapper:
    def __init__(self):
        self.M = None 

    def __call__(self, image_pts):
        return self.map(image_pts)

    def find_perspective_transform(self, image_pts, world_pts):
        image_pts = np.float32(image_pts).reshape(-1, 1, 2)
        world_pts = np.float32(world_pts).reshape(-1, 1, 2)
        self.M = cv.getPerspectiveTransform(image_pts, world_pts)
        return self.M

    def map(self, image_pts):
        if self.M is None:
            raise ValueError("Perspective transformation has not been estimated")
        image_pts = np.float32(image_pts).reshape(-1, 1, 2)
        return cv.perspectiveTransform(image_pts, self.M).reshape(-1, 2)

In [None]:
image_pts = [(830, 410), (1090, 410), (1920, 850), (0, 850)]
# road is roughly 32 meters wide and approx. 50 meters long there
world_pts = [(0, 0), (32, 0), (32, 50), (0, 50)] 

mapper = Cam2WorldMapper()
mapper.find_perspective_transform(image_pts, world_pts)
print(mapper.M)

In [None]:
class LocalScaleSpeedometer:
    def __init__(self, mapper, fps, unit=3.6, window=5, max_kph=300):
        self.mapper = mapper
        self.fps = float(fps)
        self.unit = float(unit)    # m/s -> km/h multiplier (3.6)
        self.window = int(window)  # how many centroids to keep for smoothing (deque)
        self.max_kph = float(max_kph)
        self.pos_hist = defaultdict(lambda: deque(maxlen=self.window))   # stores image centroids (cx,cy)
        self.speed_hist = defaultdict(lambda: deque(maxlen=8))           # recent speed estimates for smoothing

    def _local_mpp(self, point):
        cx, cy = int(round(point[0])), int(round(point[1]))
        img_pts = np.array([[cx, cy], [cx + 1, cy], [cx, cy + 1]], dtype=np.float32)
        try:
            world_pts = self.mapper.map(img_pts)  
        except Exception:
            return (1e-6, 1e-6)

        w00 = world_pts[0]
        wx = world_pts[1]
        wy = world_pts[2]
        mpp_x = float(np.linalg.norm(wx - w00))
        mpp_y = float(np.linalg.norm(wy - w00))
        if mpp_x == 0:
            mpp_x = 1e-6
        if mpp_y == 0:
            mpp_y = 1e-6
        return (mpp_x, mpp_y)

    def update_with_centroid(self, frame_idx:int, track_id:int, centroid:tuple):
        tid = int(track_id)
        cx, cy = int(round(centroid[0])), int(round(centroid[1]))
        self.pos_hist[tid].append((cx, cy))

        # need at least two points to compute motion
        if len(self.pos_hist[tid]) < 2:
            return

        (x_prev, y_prev), (x_curr, y_curr) = self.pos_hist[tid][-2], self.pos_hist[tid][-1]
        dx_px = float(x_curr - x_prev)
        dy_px = float(y_curr - y_prev)

        # compute local meters-per-pixel at the midpoint
        mid = ((x_prev + x_curr) / 2.0, (y_prev + y_curr) / 2.0)
        mpp_x, mpp_y = self._local_mpp(mid)

        dx_m = dx_px * mpp_x
        dy_m = dy_px * mpp_y
        ds_m = math.hypot(dx_m, dy_m)

        # meters per second (distance per frame * fps)
        m_s = ds_m * self.fps
        kph = m_s * self.unit

        # clip extreme spikes and fallback to previous if too large
        if kph < 0:
            kph = 0.0
        if kph > self.max_kph:
            if self.speed_hist[tid]:
                kph = float(self.speed_hist[tid][-1])
            else:
                kph = float(min(kph, self.max_kph))

        # smoothing: push and keep history; median used on get_speed
        self.speed_hist[tid].append(kph)

    def get_speed(self, track_id:int):
        tid = int(track_id)
        if not self.speed_hist[tid]:
            return 0
        arr = np.array(self.speed_hist[tid], dtype=float)
        return int(round(float(np.median(arr))))

    def reset(self, track_id:int):
        tid = int(track_id)
        if tid in self.pos_hist:
            self.pos_hist[tid].clear()
        if tid in self.speed_hist:
            self.speed_hist[tid].clear()           

In [None]:
colors = ("#007fff", "#0072e6", "#0066cc", "#0059b3", "#004c99", "#004080", "#003366", "#00264d")
color_palette = sv.ColorPalette(list(map(sv.Color.from_hex, colors)))

# The supervision VideoInfo provides some metadata about the video
video_info = sv.VideoInfo.from_video_path(video_url)
fps = video_info.fps

# Polygonal zone that masks out detected objects that are outside it
poly = np.array([(0, 410), (1920, 410), (1920, 850), (0, 850)])
zone = sv.PolygonZone(poly, (sv.Position.TOP_CENTER, sv.Position.BOTTOM_CENTER))

bbox_annotator = sv.BoxAnnotator(
    color=color_palette,
    thickness=2,
    color_lookup=sv.ColorLookup.TRACK
)
trace_annotator = sv.TraceAnnotator(
    color=color_palette,
    position=sv.Position.CENTER,
    thickness=2,
    trace_length=fps,
    color_lookup=sv.ColorLookup.TRACK
)
label_annotator = sv.RichLabelAnnotator(
    color=color_palette,
    border_radius=2,
    font_size=16,
    color_lookup=sv.ColorLookup.TRACK,
    text_padding=6
)

In [None]:
yolo = YOLO("yolo11m.pt", task="detect")
speedometer = LocalScaleSpeedometer(mapper, fps)

# ---------------- parameters ----------------
max_reuse_gap = 30             # frames after which reappearing ByteTrack id is considered new
min_track_frames_for_speed = 3 # only compute speed after this many frames for a track
csv_path = "vehicle_speeds_unique.csv"
output_video = "annotated.mp4"

# ---------------- state ----------------
last_seen_frame = dict()       # {byte_track_id: last_frame_index_seen}
first_seen_frame = dict()      # {byte_track_id: first_frame_index_when_current_instance_started}
track_history = defaultdict(list)  # centroids for optional counting
seen_unique_labels = set()     # set of unique labels assigned (for summary)

unique_id_counter = defaultdict(int)   # e.g., {"Car": 5, "Bus": 2}
tracker_to_unique_label = dict()       # maps current ByteTrack id -> "Car#5"

class_map = {2: "Car", 5: "Bus", 7: "Truck"}

csvfile = open(csv_path, "w", newline="")
csv_writer = csv.DictWriter(csvfile, fieldnames=["frame", "tracker_id", "class", "speed_kmh", "cx", "cy"])
csv_writer.writeheader()

width, height = video_info.resolution_wh  
width, height = round(width / 32) * 32, round(height / 32) * 32    # YOLO expects the image size to be a multiple of 32

classes = [2, 5, 7]  # Car, Bus, Truck
conf = 0.4           # Detetion confidence threshold

# ---------------- Vehicle Counting (In / Out / Total) ----------------
in_line_y = 700   # line near bottom (entry)
out_line_y = 500  # line near top (exit)

count_in = 0
count_out = 0
counted_in_ids = set()
counted_out_ids = set()

font = cv.FONT_HERSHEY_SIMPLEX
font_scale = 0.9
thickness = 2
color_in = sv.Color.from_hex("#004080")   
color_out = sv.Color.from_hex("#f78923")  
font_color = sv.Color.from_hex("#004c99")
color_in_bgr = color_in.as_bgr()     
color_out_bgr = color_out.as_bgr()
font_color_bgr = font_color.as_bgr()

# ---------------- blinking line state ----------------
blink_duration = 5  
in_blink_frames = 0
out_blink_frames = 0


# ---------------- main loop ----------------
frame_idx = 0
with sv.VideoSink(output_video, video_info) as sink:
    for frame in sv.get_video_frames_generator(video_url):
        frame_idx += 1

        result = yolo.track(
            frame,
            classes=classes,
            conf=conf,
            imgsz=(height, width),
            persist=True,
            verbose=False,
            tracker="bytetrack.yaml",
        )

        det = sv.Detections.from_ultralytics(result[0])
        det = det[zone.trigger(detections=det)]            # filter by polygon zone

        labels = []
        trace_ids = det.tracker_id if len(det) > 0 else []

        xyxy = np.array(det.xyxy) if det.xyxy is not None else np.zeros((0,4))
        class_ids = np.array(det.class_id) if det.class_id is not None else np.zeros((len(trace_ids),), dtype=int)

        for i, byte_tid in enumerate(list(trace_ids)):
            byte_tid = int(byte_tid)

            # bounding box -> centroid
            if i >= len(xyxy):
                continue
            x1, y1, x2, y2 = map(int, xyxy[i])
            cx, cy = (x1 + x2) // 2, (y1 + y2) // 2

            # ----------------- Unique label assignment -----------------
            class_id = int(class_ids[i]) if i < len(class_ids) else None
            class_name = class_map.get(class_id, "Vehicle")

            # If this ByteTrack id seen before and short gap -> reuse current mapping
            if byte_tid in tracker_to_unique_label:
                gap = frame_idx - last_seen_frame.get(byte_tid, frame_idx)
                if gap > max_reuse_gap:
                    # ByteTrack id reappeared after a long gap -> assign a new global unique label
                    unique_id_counter[class_name] += 1
                    new_label = f"{class_name}#{unique_id_counter[class_name]}"
                    tracker_to_unique_label[byte_tid] = new_label
                # else: keep existing mapping (it's still the same instance)
            else:
                # first time we see this ByteTrack id -> assign a new global unique label
                unique_id_counter[class_name] += 1
                new_label = f"{class_name}#{unique_id_counter[class_name]}"
                tracker_to_unique_label[byte_tid] = new_label

            unique_label = tracker_to_unique_label[byte_tid]
            seen_unique_labels.add(unique_label)

            # ------------- ID reuse protection for speed history -------------
            if byte_tid not in last_seen_frame:
                # new first appearance
                first_seen_frame[byte_tid] = frame_idx
                speedometer.reset(byte_tid)
                track_history[byte_tid].clear()
            else:
                gap = frame_idx - last_seen_frame[byte_tid]
                if gap > max_reuse_gap:
                    # treat as new instance of same ByteTrack id (we already assigned new unique_label above)
                    first_seen_frame[byte_tid] = frame_idx
                    speedometer.reset(byte_tid)
                    track_history[byte_tid].clear()

            last_seen_frame[byte_tid] = frame_idx

            # --------- update centroid history used for counting/robustness ----------
            track_history[byte_tid].append((cx, cy))
            if len(track_history[byte_tid]) > 30:
                track_history[byte_tid] = track_history[byte_tid][-30:]

            # --------- compute speed after stable tracking ----------
            seen_duration = frame_idx - first_seen_frame.get(byte_tid, frame_idx)
            if seen_duration >= min_track_frames_for_speed:
                # Use centroid-based local-scale speed estimator (LocalScaleSpeedometer)
                speedometer.update_with_centroid(frame_idx, byte_tid, (cx, cy))
                current_speed = speedometer.get_speed(byte_tid)
            else:
                current_speed = 0

            labels.append(f"{unique_label} {current_speed} km/h")

            csv_writer.writerow({
                "frame": frame_idx,
                "tracker_id": unique_label,
                "class": class_name,
                "speed_kmh": current_speed,
                "cx": cx,
                "cy": cy
            })


        # ---------- Vehicle Counting Logic ----------
        mid_x = width // 2

        for byte_tid in trace_ids:
            if byte_tid not in track_history:
                continue
        
            curr_cx, curr_cy = track_history[byte_tid][-1]
            prev_cx, prev_cy = track_history[byte_tid][-2] if len(track_history[byte_tid]) >= 2 else (None, None)
        
            # ---------- IN count (left half, moving UP or first frame) ----------
            if byte_tid not in counted_in_ids and curr_cx <= mid_x:
                if prev_cy is None:
                    # first detection frame, vehicle already above IN line
                    if curr_cy <= in_line_y:
                        count_in += 1
                        counted_in_ids.add(byte_tid)
                else:
                    # line crossed upward
                    if prev_cy > in_line_y >= curr_cy:   
                        count_in += 1
                        counted_in_ids.add(byte_tid)
                        in_blink_frames = blink_duration  

            # ---------- OUT count (right half, moving DOWN or already below OUT line at first detection) ----------
            if byte_tid not in counted_out_ids and curr_cx > mid_x:
                if prev_cy is None:
                    # first detection and already below or on OUT line
                    if curr_cy >= out_line_y:
                        count_out += 1
                        counted_out_ids.add(byte_tid)
                else:
                    # normal downward crossing
                    if prev_cy <= out_line_y <= curr_cy:
                        count_out += 1
                        counted_out_ids.add(byte_tid)
                        out_blink_frames = blink_duration  
        
        # ---------- Annotations / drawing ----------
        frame_rgb = cv.cvtColor(cv.cvtColor(frame, cv.COLOR_BGR2GRAY), cv.COLOR_GRAY2RGB)

        if len(det) > 0:
            frame_rgb = bbox_annotator.annotate(frame_rgb, det)
            frame_rgb = trace_annotator.annotate(frame_rgb, det)
            if labels:
                frame_rgb = label_annotator.annotate(frame_rgb, det, labels=labels)
            else:
                frame_rgb = label_annotator.annotate(frame_rgb, det)

        # IN line blink
        overlay = frame_rgb.copy()
        if in_blink_frames > 0:
            cv.line(overlay, (0, in_line_y), (mid_x-50, in_line_y), (255, 255, 255), 6)
            frame_rgb = cv.addWeighted(overlay, 0.7, frame_rgb, 0.3, 0)
            in_blink_frames -= 1
        else:
            cv.line(frame_rgb, (0, in_line_y), (mid_x-50, in_line_y), color_in_bgr, 3)

        # OUT line blink
        overlay = frame_rgb.copy()
        if out_blink_frames > 0:
            cv.line(overlay, (mid_x+20, out_line_y), (width, out_line_y), (255, 255, 255), 6)
            frame_rgb = cv.addWeighted(overlay, 0.7, frame_rgb, 0.3, 0)
            out_blink_frames -= 1
        else:
            cv.line(frame_rgb, (mid_x+20, out_line_y), (width, out_line_y), color_out_bgr, 3)

        # ---------- Draw Counter Overlay ----------
        cv.putText(frame_rgb, f"Vehicles Entered: {count_in}", (40, 60), font, font_scale, font_color_bgr, thickness)
        cv.putText(frame_rgb, f"Vehicles Left: {count_out}", (40, 100), font, font_scale, font_color_bgr, thickness)
        cv.putText(frame_rgb, f"Total Vehicles: {len(seen_unique_labels)}", (40, 140), font, font_scale, font_color_bgr, thickness)

        sink.write_frame(frame_rgb)

csvfile.close()

print(f"Done — unique vehicles assigned: {len(seen_unique_labels)}. CSV saved to: {csv_path}")

In [None]:
compressed = "annotated_compressed.mp4"
subprocess.run(
    [
        "ffmpeg",
        "-i",
        output_video,
        "-crf",
        "18",
        "-preset",
        "veryfast",
        "-vcodec",
        "libx264",
        compressed,
        "-loglevel",
        "quiet",
        "-y",
    ]
)

Video(compressed, width=960, height=540)

In [None]:
df=pd.read_csv("/kaggle/working/vehicle_speeds_unique.csv")
df.head()

In [None]:
# --- Clean ---
df = df.dropna(subset=["speed_kmh"])
df = df[df["speed_kmh"] > 0]
df = df.sort_values(["tracker_id", "frame"]).reset_index(drop=True)

fps = fps

# --- Safe smoothing function ---
def safe_savgol(x, fps):
    n = len(x)
    if n < 5:
        return pd.Series(x, index=x.index)
    window = min(fps, n if n % 2 == 1 else n - 1)
    if window < 3:
        window = 3
    smoothed = savgol_filter(x, window_length=window, polyorder=2)
    return pd.Series(smoothed, index=x.index)

# --- Apply smoothing per vehicle ---
smooth_series = df.groupby("tracker_id")["speed_kmh"].apply(lambda x: safe_savgol(x, fps))
smooth_series = smooth_series.reset_index(level=0, drop=True)  # drop tracker_id from index
df["speed_smooth"] = smooth_series                             # now indices align

# --- Pivot to wide layout (frames x vehicles) ---
wide_df = df.pivot(index="frame", columns="tracker_id", values="speed_smooth")

# --- Clip extreme outliers ---
wide_df = wide_df.clip(
    lower=float(np.nanpercentile(wide_df, 1)),
    upper=float(np.nanpercentile(wide_df, 99))
)

# --- Plot ---
fig, axes = plt.subplots(2, 1, figsize=(20, 10), tight_layout=True)

sns.lineplot(data=wide_df, palette="viridis", linewidth=1.25, ax=axes[0])
axes[0].set_xlabel("Frame", color="#000000")
axes[0].set_ylabel("Speed (km/h)", color="#000000")
axes[0].set_ylim(10, 140)
axes[0].get_legend().set_visible(False)
axes[0].set_title("Vehicle Speed Traces (Smoothed)", color="#000000", loc="center", pad=40)
axes[0].tick_params(colors="#000000")

sns.kdeplot(wide_df.to_numpy().ravel(), fill=True, color="#004080", linewidth=1, ax=axes[1])
axes[1].set_xlabel("Speed (km/h)",color="#000000")
axes[1].set_ylabel("Density", color="#000000")
axes[1].set_title("Speed Distribution Across All Vehicles", color="#000000", loc="center", pad=20)
axes[1].tick_params(colors="#000000")

plt.savefig("distribution.png", dpi=100)
plt.show()

In [None]:
vehicle_stats = df.groupby("tracker_id")["speed_smooth"].agg(
    avg_speed="mean",
    max_speed="max"
).reset_index()

fig, axes = plt.subplots(2, 1, figsize=(20, 10), tight_layout=True)

# --- Average Speed ---
sns.barplot(data=vehicle_stats, x="tracker_id", y="avg_speed", palette="viridis", ax=axes[0])
axes[0].set_title("Average Speed per Vehicle", color="#000000", fontsize=14)
axes[0].set_xlabel("Vehicle ID", color="#000000")
axes[0].set_ylabel("Speed (km/h)", color="#000000")
axes[0].tick_params(colors="#000000", rotation=90)

# --- Max Speed ---
sns.barplot(data=vehicle_stats, x="tracker_id", y="max_speed", palette="viridis", ax=axes[1])
axes[1].set_title("Max Speed per Vehicle", color="#000000", fontsize=14)
axes[1].set_xlabel("Vehicle ID", color="#000000")
axes[1].set_ylabel("Speed (km/h)", color="#000000")
axes[1].tick_params(colors="#000000", rotation=90)

plt.savefig("speed.png", dpi=100)
plt.show()