In [12]:
import cv2
import torch

def get_density_label(count):
    if count < 5:
        return "Low"
    elif count < 10:
        return "Medium"
    else:
        return "High"

def draw_count_and_density(frame, vehicle_count, x=10, y=30):
    density_label = get_density_label(vehicle_count)
    text = f"Vehicles: {vehicle_count} | Density: {density_label}"
    cv2.putText(frame, text, (x, y), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)


device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print("Using device:", device)


Using device: mps


YOLO

In [2]:
import cv2
import os
from ultralytics import YOLO

model_path = "/Users/rajvijayvargiya/Downloads/yolov8n.pt"
if not os.path.exists(model_path):
    print(f"Model file not found at {model_path}")
    exit(1)

model = YOLO(model_path)

input_video = "/Users/rajvijayvargiya/Downloads/Vehicle_Detection_Image_Dataset/sample_video.mp4"
output_video = "/Users/rajvijayvargiya/Downloads/yolo_video_output.mp4"

cap = cv2.VideoCapture(input_video)
if not cap.isOpened():
    print("Error opening video file")
    exit(1)

width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_video, fourcc, fps, (width, height))

while True:
    ret, frame = cap.read()
    if not ret:
        break
    
    results = model(frame)
    annotated = results[0].plot()

    # Count the total detections (vehicles)
    vehicle_count = len(results[0].boxes)

    # Use the previously-defined helper function to draw info
    draw_count_and_density(annotated, vehicle_count, x=50, y=50)

    out.write(annotated)
    cv2.imshow("Detection", annotated)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
out.release()
cv2.destroyAllWindows()


0: 384x640 1 car, 53.1ms
Speed: 1.6ms preprocess, 53.1ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 car, 44.3ms
Speed: 1.4ms preprocess, 44.3ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 car, 44.3ms
Speed: 1.7ms preprocess, 44.3ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 car, 38.1ms
Speed: 1.2ms preprocess, 38.1ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 41.9ms
Speed: 1.2ms preprocess, 41.9ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 44.1ms
Speed: 1.3ms preprocess, 44.1ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 40.1ms
Speed: 1.4ms preprocess, 40.1ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 42.8ms
Speed: 1.4ms preprocess, 42.8ms inference, 0.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 

2025-04-10 00:26:47.610 Python[65427:7105953] +[IMKClient subclass]: chose IMKClient_Modern
2025-04-10 00:26:47.610 Python[65427:7105953] +[IMKInputSession subclass]: chose IMKInputSession_Modern



0: 384x640 2 cars, 135.1ms
Speed: 1.5ms preprocess, 135.1ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 46.7ms
Speed: 1.3ms preprocess, 46.7ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 cars, 49.0ms
Speed: 1.5ms preprocess, 49.0ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 cars, 46.5ms
Speed: 1.5ms preprocess, 46.5ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 cars, 1 truck, 45.1ms
Speed: 1.5ms preprocess, 45.1ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 cars, 45.5ms
Speed: 1.6ms preprocess, 45.5ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 cars, 61.5ms
Speed: 1.5ms preprocess, 61.5ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 52.9ms
Speed: 1.5ms preprocess, 52.9ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 64

RetinaNet

In [3]:
import torchvision
import numpy as np
from PIL import Image
import torchvision.transforms as T


model = torchvision.models.detection.retinanet_resnet50_fpn(pretrained=True).eval().to(device)
transform = T.Compose([T.ToTensor()])

def draw_boxes(frame, boxes, scores, threshold=0.5):
    count = 0
    for box, score in zip(boxes, scores):
        if score >= threshold:
            count += 1
            x1, y1, x2, y2 = map(int, box)
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
    draw_count_and_density(frame, count, x=10, y=30)
    return frame

def process_frame_retinanet(frame, threshold=0.5):
    img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    with torch.no_grad():
        pred = model([transform(img).to(device)])[0]
    boxes = pred['boxes'].cpu().numpy()
    scores = pred['scores'].cpu().numpy()
    return boxes, scores

def run_retinanet_video(input_path, output_path, threshold=0.5):
    cap = cv2.VideoCapture(input_path)
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (w, h))

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        boxes, scores = process_frame_retinanet(frame, threshold)
        frame = draw_boxes(frame, boxes, scores, threshold)
        out.write(frame)
        cv2.imshow('RetinaNet', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    out.release()
    cv2.destroyAllWindows()

run_retinanet_video("/Users/rajvijayvargiya/Downloads/Vehicle_Detection_Image_Dataset/sample_video.mp4", "/Users/rajvijayvargiya/Downloads/retina_net_video_output.mp4")



DETR (Detection Transformer)

In [4]:
import torch
import numpy as np
from PIL import Image
import torchvision.transforms as T


detr = torch.hub.load('facebookresearch/detr', 'detr_resnet50', pretrained=True).eval().to(device)
transform = T.Compose([
    T.ToTensor(),
    T.Normalize([0.485, 0.456, 0.406],[0.229, 0.224, 0.225])
])

def box_cxcywh_to_xyxy(box):
    x_c, y_c, w, h = box
    x1, y1 = x_c - w/2, y_c - h/2
    x2, y2 = x_c + w/2, y_c + h/2
    return [x1, y1, x2, y2]

def process_frame_detr(frame, threshold=0.7):
    img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    t = transform(img).unsqueeze(0).to(device)
    with torch.no_grad():
        outputs = detr(t)
    probas = outputs['pred_logits'].softmax(-1)[0,:, :-1]
    keep = probas.max(-1).values > threshold
    boxes = outputs['pred_boxes'][0, keep].cpu()
    scores = probas[keep].max(-1).values.cpu().numpy()
    w, h = img.size
    scaled_boxes = []
    for box in boxes:
        bx = box_cxcywh_to_xyxy(box)
        bx[0] *= w; bx[1] *= h; bx[2] *= w; bx[3] *= h
        scaled_boxes.append(bx)
    return np.array(scaled_boxes), scores

def draw_boxes_detr(frame, boxes, scores, threshold=0.7):
    count = 0
    for (x1, y1, x2, y2), score in zip(boxes, scores):
        if score >= threshold:
            count += 1
            x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0,255,0), 2)
    draw_count_and_density(frame, count, x=10, y=30)
    return frame

def run_detr_video(input_path, output_path, threshold=0.7):
    cap = cv2.VideoCapture(input_path)
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out_vid = cv2.VideoWriter(output_path, fourcc, fps, (w, h))

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        boxes, scores = process_frame_detr(frame, threshold)
        frame = draw_boxes_detr(frame, boxes, scores, threshold)
        out_vid.write(frame)
        cv2.imshow('DETR', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    out_vid.release()
    cv2.destroyAllWindows()

run_detr_video("/Users/rajvijayvargiya/Downloads/Vehicle_Detection_Image_Dataset/sample_video.mp4", "/Users/rajvijayvargiya/Downloads/detr_video_output.mp4")



Using cache found in /Users/rajvijayvargiya/.cache/torch/hub/facebookresearch_detr_main


SSD (Single Shot MultiBox Detector)

In [5]:
import torch
import torchvision
import numpy as np
from PIL import Image
import torchvision.transforms as T


model = torchvision.models.detection.ssd300_vgg16(pretrained=True).eval().to(device)
transform = T.Compose([T.ToTensor()])

def process_frame_ssd(frame, threshold=0.5):
    img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    with torch.no_grad():
        pred = model([transform(img).to(device)])[0]
    boxes = pred['boxes'].cpu().numpy()
    scores = pred['scores'].cpu().numpy()
    idxs = scores >= threshold
    return boxes[idxs], scores[idxs]

def draw_boxes_ssd(frame, boxes, scores):
    count = 0
    for box, score in zip(boxes, scores):
        count += 1
        x1, y1, x2, y2 = map(int, box)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0,255,0), 2)
    draw_count_and_density(frame, count, x=10, y=30)
    return frame

def run_ssd_video(input_path, output_path, threshold=0.5):
    cap = cv2.VideoCapture(input_path)
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out_vid = cv2.VideoWriter(output_path, fourcc, fps, (w, h))

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        boxes, scores = process_frame_ssd(frame, threshold)
        frame = draw_boxes_ssd(frame, boxes, scores)
        out_vid.write(frame)
        cv2.imshow('SSD', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    out_vid.release()
    cv2.destroyAllWindows()

run_ssd_video("/Users/rajvijayvargiya/Downloads/Vehicle_Detection_Image_Dataset/sample_video.mp4", "/Users/rajvijayvargiya/Downloads/ssd_video_output.mp4")



EfficientDet

In [6]:
import torch
import numpy as np
from PIL import Image
import torchvision.transforms as T
from effdet import create_model


model = create_model('tf_efficientdet_d0', pretrained=True, bench_task='predict').eval().to(device)
transform = T.Compose([
    T.ToTensor(),
    T.Resize((512,512)),
    T.Normalize(mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225])
])

def process_frame_effdet(frame, threshold=0.5):
    h, w = frame.shape[:2]
    img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    inp = transform(img).unsqueeze(0).to(device)
    with torch.no_grad():
        out = model(inp)
    if out.ndim == 3:
        out = out[0]
    boxes = out[:, :4].cpu().numpy()
    scores = out[:, 4].cpu().numpy()
    keep = scores >= threshold
    boxes = boxes[keep]
    scores = scores[keep]
    scale = np.array([w/512.0, h/512.0, w/512.0, h/512.0])
    boxes *= scale
    return boxes, scores

def draw_boxes_effdet(frame, boxes, scores):
    count = 0
    for box, score in zip(boxes, scores):
        count += 1
        x1, y1, x2, y2 = map(int, box)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0,255,0), 2)
    draw_count_and_density(frame, count, x=10, y=30)
    return frame

def run_effdet_video(input_path, output_path, threshold=0.5):
    cap = cv2.VideoCapture(input_path)
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out_vid = cv2.VideoWriter(output_path, fourcc, fps, (w, h))

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        boxes, scores = process_frame_effdet(frame, threshold)
        frame = draw_boxes_effdet(frame, boxes, scores)
        out_vid.write(frame)
        cv2.imshow('EfficientDet', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    out_vid.release()
    cv2.destroyAllWindows()

run_effdet_video("/Users/rajvijayvargiya/Downloads/Vehicle_Detection_Image_Dataset/sample_video.mp4", "/Users/rajvijayvargiya/Downloads/efficientdet_video_output.mp4")

R-CNN (Selective Search + VGG16)

In [None]:
# import os
# import cv2
# import torch
# import numpy as np
# import selectivesearch
# import pickle
# import torchvision.transforms as T
# from torchvision.models import vgg16
# from torchvision.ops import nms
# from PIL import Image


# vgg = vgg16(pretrained=True).features.eval().to(device)
# transform = T.Compose([
#     T.ToPILImage(),
#     T.Resize((224, 224)),
#     T.ToTensor(),
#     T.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
# ])

# try:
#     with open('vehicle_classifier.pkl', 'rb') as f:
#         classifier = pickle.load(f)
#     print("Loaded SVM/classifier.")
# except:
#     print("No classifier found, using dummy classifier.")
#     classifier = None

# def classify_region(features):
#     if classifier:
#         return classifier.decision_function(features.cpu().numpy())[0]
#     return features.norm().item()

# def extract_features(roi):
#     roi_t = transform(roi).unsqueeze(0).to(device)
#     with torch.no_grad():
#         feats = vgg(roi_t).view(-1)
#     return feats

# def selective_search_regions(frame):
#     # scale/sigma/min_size can be tuned
#     _, regions = selectivesearch.selective_search(frame, scale=300, sigma=0.9, min_size=30)
#     candidates = []
#     for r in regions:
#         x, y, w, h = r['rect']
#         if w >= 30 and h >= 30:
#             candidates.append((x, y, x+w, y+h))
#     return candidates

# def rcnn_process_frame(frame, score_thresh=5.0, iou_thresh=0.3):
#     rects = selective_search_regions(frame)
#     boxes, scores = [], []
#     for (x1, y1, x2, y2) in rects:
#         roi = frame[y1:y2, x1:x2]
#         if roi.size == 0:
#             continue
#         feats = extract_features(roi)
#         score = classify_region(feats)
#         if score > score_thresh:
#             boxes.append([x1, y1, x2, y2])
#             scores.append(score)
#     if boxes:
#         keep = nms(torch.tensor(boxes, dtype=torch.float32), 
#                    torch.tensor(scores, dtype=torch.float32), iou_thresh)
#         keep = keep.cpu().numpy()
#     else:
#         keep = []

#     count = len(keep)
#     for idx in keep:
#         x1, y1, x2, y2 = boxes[idx]
#         cv2.rectangle(frame, (x1, y1), (x2, y2), (0,255,0), 2)
#     draw_count_and_density(frame, count, x=10, y=30)
#     return frame

# def run_rcnn_video(input_path, output_path):
#     cap = cv2.VideoCapture(input_path)
#     if not cap.isOpened():
#         print("Error opening video.")
#         return
#     w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
#     h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
#     fps = cap.get(cv2.CAP_PROP_FPS)
#     fourcc = cv2.VideoWriter_fourcc(*'mp4v')
#     out = cv2.VideoWriter(output_path, fourcc, fps, (w, h))

#     while True:
#         ret, frame = cap.read()
#         if not ret:
#             break
#         processed = rcnn_process_frame(frame)
#         out.write(processed)
#         cv2.imshow("R-CNN", processed)
#         if cv2.waitKey(1) & 0xFF == ord('q'):
#             break
#     cap.release()
#     out.release()
#     cv2.destroyAllWindows()

# run_rcnn_video("/Users/rajvijayvargiya/Downloads/Vehicle_Detection_Image_Dataset/sample_video.mp4", "/Users/rajvijayvargiya/Downloads/rcnn_video_output.mp4")


Faster R-CNN

In [13]:
import torch
import torchvision
import cv2
import numpy as np
from PIL import Image
import torchvision.transforms as T

# Use CPU to avoid the MPS error on Apple M2
device = torch.device("cpu")

model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True).eval().to(device)
transform = T.Compose([T.ToTensor()])

def faster_process_frame(frame, threshold=0.5):
    img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    with torch.no_grad():
        preds = model([transform(img).to(device)])[0]
    boxes = preds['boxes'].cpu().numpy()
    scores = preds['scores'].cpu().numpy()
    idxs = scores >= threshold
    return boxes[idxs], scores[idxs]

def draw_boxes_faster(frame, boxes, scores):
    count = len(boxes)
    for box in boxes:
        x1, y1, x2, y2 = map(int, box)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0,255,0), 2)
    draw_count_and_density(frame, count, x=10, y=30)
    return frame

def run_faster_video(input_video, output_video, threshold=0.5):
    cap = cv2.VideoCapture(input_video)
    if not cap.isOpened():
        print("Error opening video.")
        return
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video, fourcc, fps, (w, h))

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        boxes, scores = faster_process_frame(frame, threshold)
        frame = draw_boxes_faster(frame, boxes, scores)
        out.write(frame)
        cv2.imshow("Faster R-CNN", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    cap.release()
    out.release()
    cv2.destroyAllWindows()

# Run the video processing
run_faster_video(
    "/Users/rajvijayvargiya/Downloads/Vehicle_Detection_Image_Dataset/sample_video.mp4",
    "/Users/rajvijayvargiya/Downloads/fasterrcnn_video_output.mp4"
)


2025-04-10 02:31:04.129 Python[67464:7197103] +[IMKClient subclass]: chose IMKClient_Modern
2025-04-10 02:31:04.129 Python[67464:7197103] +[IMKInputSession subclass]: chose IMKInputSession_Modern
