In [16]:
!nvidia-smi

Mon Apr  7 06:51:29 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 560.35.03              Driver Version: 560.35.03      CUDA Version: 12.6     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   63C    P0             27W /   70W |     335MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
|   1  Tesla T4                       Off |   00

In [17]:
!pip install gdown



In [18]:
! rm -rf video.mp4

In [19]:
import gdown

# Update with your file's specific ID
file_id = "1r0AVu38_1k2lA8unOhE0OYVDivQvbaVu"
url = f"https://drive.google.com/uc?id={file_id}"

output = "tr.mp4"
gdown.download(url, output, quiet=False)

Downloading...
From: https://drive.google.com/uc?id=1r0AVu38_1k2lA8unOhE0OYVDivQvbaVu
To: /kaggle/working/tr.mp4
100%|██████████| 7.04M/7.04M [00:00<00:00, 37.5MB/s]


'tr.mp4'

In [22]:
SOURCE_VIDEO_PATH = "/kaggle/working/tr.mp4"

In [23]:
!pip install "ultralytics<=8.3.40"
!pip install opencv-python
!pip install supervision==0.3.0



In [24]:
model = YOLO('yolov8s.pt')  # Using YOLOv8 nano model (faster & lightweight)

In [26]:
import supervision as sv
from ultralytics import YOLO
import os
import json
import cv2
import numpy as np
from sklearn.cluster import KMeans

# -- Configs --
TARGET_VIDEO_PATH = 'output_video.mp4'
FRAME_SAVE_DIR = 'frames/'
FRAME_DATA_PATH = 'frame_data.json'
os.makedirs(FRAME_SAVE_DIR, exist_ok=True)

# Constants
SCALE_FACTOR = 0.05
FPS = 30
TARGET_WIDTH, TARGET_HEIGHT = 25, 250

# Detection polygon & perspective
SOURCE = np.array([[1252,787],[2298,803],[5039,2159],[-550,2159]])
perspective_transform = cv2.getPerspectiveTransform(
    SOURCE.astype(np.float32),
    np.array([[0,0],[TARGET_WIDTH-1,0],[TARGET_WIDTH-1,TARGET_HEIGHT-1],[0,TARGET_HEIGHT-1]],np.float32)
)

# Annotator
box_annotator = sv.BoxAnnotator(thickness=4, text_thickness=4, text_scale=2)

# Vehicle color map
color_dict = {
    "red":(255,0,0),"green":(0,255,0),"yellow":(255,255,0),
    "blue":(0,0,255),"purple":(128,0,128),"orange":(255,165,0),
    "white":(255,255,255),"black":(0,0,0),"gray":(128,128,128),
    "brown":(139,69,19),"pink":(255,192,203),"violet":(238,130,238),
    "light_red":(255,102,102),"light_green":(102,255,102),
    "light_blue":(102,102,255),"light_yellow":(255,255,102),
}

def closest_color(rgb):
    return min(color_dict, key=lambda n: np.linalg.norm(np.array(color_dict[n]) - np.array(rgb)))

def get_exact_vehicle_color(bbox, frame, k=1):
    x1,y1,x2,y2 = map(int,bbox)
    roi = frame[y1:y2, x1:x2]
    if roi.size==0: return "Unknown"
    pixels = roi.reshape(-1,3)
    kmeans = KMeans(n_clusters=k, n_init=10).fit(pixels)
    c = kmeans.cluster_centers_.astype(int)[0]
    return closest_color((c[2],c[1],c[0]))  # BGR->RGB

def is_in_target_polygon(x,y,poly):
    return cv2.pointPolygonTest(poly,(x,y),False)>=0

def calculate_speed(x,y,tid,fn):
    if tid not in VEHICLE_POSITIONS:
        VEHICLE_POSITIONS[tid]=(x,y,fn); return 0
    px,py,pf=VEHICLE_POSITIONS[tid]
    d=np.hypot(x-px,y-py); dt=(fn-pf)/FPS
    VEHICLE_POSITIONS[tid]=(x,y,fn)
    return (d/dt)*SCALE_FACTOR if dt>0 else 0

def get_vehicle_direction(x,y,tid):
    if tid in vehicle_positions:
        px,py=vehicle_positions[tid]
        return "Down" if y>py else "Up" if y<py else "Stationary"
    return "Unknown"

def get_lane(x,le,re):
    return "Left Lane" if x<le else "Right Lane" if x>re else "Middle Lane"

def detect_crossing_box(frame):
    gray=cv2.cvtColor(frame,cv2.COLOR_BGR2GRAY)
    blur=cv2.GaussianBlur(gray,(5,5),0)
    edges=cv2.Canny(blur,50,150)
    cnts,_=cv2.findContours(edges,cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE)
    boxes=[cv2.boundingRect(c) for c in cnts if cv2.boundingRect(c)[2]>cv2.boundingRect(c)[3]*3]
    if not boxes:
        return (200,430),(600,460)
    x,y,w,h = max(boxes,key=lambda b:b[2])
    return (x,y),(x+w,y+h)

def detect_light_color(frame,box):
    x1,y1,x2,y2=map(int,box)
    roi=frame[y1:y2,x1:x2]
    if roi.size==0: return "unknown"
    hsv=cv2.cvtColor(roi,cv2.COLOR_BGR2HSV)
    r1=cv2.inRange(hsv,(0,70,50),(10,255,255))
    r2=cv2.inRange(hsv,(160,70,50),(180,255,255))
    g=cv2.inRange(hsv,(40,40,40),(80,255,255))
    y=cv2.inRange(hsv,(20,100,100),(30,255,255))
    rc, gc, yc = cv2.countNonZero(r1|r2), cv2.countNonZero(g), cv2.countNonZero(y)
    if rc>gc and rc>yc and rc>50: return "red"
    if gc>rc and gc>yc and gc>50: return "green"
    if yc>rc and yc>gc and yc>50: return "yellow"
    return "unknown"

# Init video info & counters
video_info = sv.VideoInfo.from_video_path(SOURCE_VIDEO_PATH)
left_lane,right_lane = video_info.width//3,2*(video_info.width//3)
line_y=video_info.height//2
stop_line=line_y+50

id_map,id_ctr={},1
frame_data_list=[]
vehicle_positions,vehicle_positions2={},{}
crossing_tracker={}
vehicle_crossings={'entered':0,'exited':0}
VEHICLE_POSITIONS={}
red_violators=set()

# Compute crossing box once
cap_tmp=cv2.VideoCapture(SOURCE_VIDEO_PATH)
_,first=cap_tmp.read(); cap_tmp.release()
crossing_box = detect_crossing_box(first)

# Load YOLO
model=YOLO('yolov8s.pt')

with sv.VideoSink(TARGET_VIDEO_PATH,video_info) as sink:
    for fn,result in enumerate(model.track(
        source=SOURCE_VIDEO_PATH,tracker='bytetrack.yaml',
        show=False,stream=True,agnostic_nms=True,persist=True
    )):
        frame=result.orig_img
        dets=sv.Detections.from_yolov8(result)

        # Traffic light detection
        light="unknown"
        if result.boxes.id is not None:
            for box,cls in zip(result.boxes.xyxy,result.boxes.cls):
                if model.model.names[int(cls)]=='traffic light':
                    light=detect_light_color(frame,box)
                    cv2.rectangle(frame,tuple(map(int,box[:2])),tuple(map(int,box[2:])),
                                  (0,0,255) if light=='red' else (0,255,0),2)
                    break

        # Remap tracker IDs
        if result.boxes.id is not None:
            tids=result.boxes.id.cpu().numpy().astype(int); upd=[]
            for t in tids:
                if t not in id_map: id_map[t]=id_ctr; id_ctr+=1
                upd.append(id_map[t])
            dets.tracker_id=upd

        annots=[]
        for bbox,conf,cid,tid in dets:
            tid=int(tid); bbox=list(map(float,bbox))
            cx,cy=(bbox[0]+bbox[2])/2,(bbox[1]+bbox[3])/2

            # Init if new
            if tid not in crossing_tracker:
                crossing_tracker[tid]={'crossed':False,'last_y':bbox[1],'inside':False}

            direction=get_vehicle_direction(cx,cy,tid)
            vcolor=get_exact_vehicle_color(bbox,frame)
            lane=get_lane(cx,left_lane,right_lane)
            speed=calculate_speed(cx,cy,tid,fn)

            # --- NEW VIOLATION: ENTERING the blue box while red ---
            inside = (crossing_box[0][0] <= cx <= crossing_box[1][0]
                      and crossing_box[0][1] <= cy <= crossing_box[1][1])
            violation = False
            if inside and light=='red' and tid not in red_violators:
                violation = True
                red_violators.add(tid)
            crossing_tracker[tid]['inside'] = inside

            # Entry/exit count
            last_y=crossing_tracker[tid]['last_y']
            if not crossing_tracker[tid]['crossed']:
                if cy>line_y and last_y<=line_y:
                    vehicle_crossings['entered']+=1; crossing_tracker[tid]['crossed']=True
                elif cy<line_y and last_y>=line_y:
                    vehicle_crossings['exited']+=1; crossing_tracker[tid]['crossed']=True
            crossing_tracker[tid]['last_y']=cy
            vehicle_positions2[tid]=(cx,cy)

            if is_in_target_polygon(cx,cy,SOURCE):
                lbl=f"ID:{tid}|{model.model.names[cid]} {conf:.2f}|{speed:.1f}km/h"
                frame=box_annotator.annotate(
                    scene=frame,
                    detections=sv.Detections(
                        xyxy=np.array([bbox]),confidence=np.array([conf]),
                        class_id=np.array([cid]),tracker_id=np.array([tid])
                    ),labels=[lbl]
                )
                annots.append({
                    "tracker_id":tid,"class_id":int(cid),"class_name":model.model.names[cid],
                    "direction":direction,"lane":lane,"vehicle_color":vcolor,
                    "stopped":bool(speed==0),"confidence":float(conf),"bbox":bbox,
                    "speed_kmh":speed,"red_light_violation":bool(violation)
                })

        # Draw overlays
        cv2.line(frame,(0,line_y),(video_info.width,line_y),(0,255,0),2)
        cv2.polylines(frame,[SOURCE.astype(np.int32)],True,(0,255,0),2)
        cv2.putText(frame,f"Ent:{vehicle_crossings['entered']} Ex:{vehicle_crossings['exited']}",
                    (30,50),cv2.FONT_HERSHEY_SIMPLEX,1,(255,0,0),2)
        cv2.putText(frame,f"Light:{light.upper()}",(30,150),
                    cv2.FONT_HERSHEY_SIMPLEX,1,(0,255,255),2)
        # Blue crossing box
        cv2.rectangle(frame,crossing_box[0],crossing_box[1],(255,0,0),2)

        warped=cv2.warpPerspective(frame,perspective_transform,(TARGET_WIDTH,TARGET_HEIGHT))
        cv2.imwrite(f"warped_frame_{fn:04d}.jpg",warped)
        cv2.imwrite(os.path.join(FRAME_SAVE_DIR,f"frame_{fn:04d}.jpg"),
                    cv2.cvtColor(frame,cv2.COLOR_BGR2RGB))

        sink.write_frame(frame)
        frame_data_list.append({
            "frame_number":fn,"traffic_light":light,
            "congestion_level":len(dets),"detections":annots
        })

print(f"Done: entered={vehicle_crossings['entered']} exited={vehicle_crossings['exited']}")
with open(FRAME_DATA_PATH,'w') as f:
    json.dump(frame_data_list,f,indent=4)



video 1/1 (frame 1/453) /kaggle/working/tr.mp4: 384x640 1 car, 1 traffic light, 11.5ms
video 1/1 (frame 2/453) /kaggle/working/tr.mp4: 384x640 1 car, 1 traffic light, 10.8ms
video 1/1 (frame 3/453) /kaggle/working/tr.mp4: 384x640 1 car, 1 traffic light, 10.9ms
video 1/1 (frame 4/453) /kaggle/working/tr.mp4: 384x640 1 car, 1 traffic light, 10.9ms
video 1/1 (frame 5/453) /kaggle/working/tr.mp4: 384x640 1 car, 1 traffic light, 10.9ms
video 1/1 (frame 6/453) /kaggle/working/tr.mp4: 384x640 1 car, 1 traffic light, 10.9ms
video 1/1 (frame 7/453) /kaggle/working/tr.mp4: 384x640 1 car, 1 traffic light, 10.8ms
video 1/1 (frame 8/453) /kaggle/working/tr.mp4: 384x640 1 car, 1 traffic light, 10.8ms
video 1/1 (frame 9/453) /kaggle/working/tr.mp4: 384x640 1 car, 1 traffic light, 10.8ms
video 1/1 (frame 10/453) /kaggle/working/tr.mp4: 384x640 1 car, 1 traffic light, 10.9ms
video 1/1 (frame 11/453) /kaggle/working/tr.mp4: 384x640 1 car, 1 traffic light, 10.9ms
video 1/1 (frame 12/453) /kaggle/working