Step 1: Install & Imports

In [None]:
# !pip install torchvision matplotlib

import os
import cv2
import json
import torch
import torchvision
import numpy as np
from tqdm import tqdm
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision.models.detection.ssd import SSD300_VGG16_Weights
from torchvision.transforms import v2
import matplotlib.pyplot as plt


Step 2: Prepare COCO Subset Dataset (Image + Annotations)

In [None]:
class SSDDataset(Dataset):
    def __init__(self, img_dir, ann_file, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        with open(ann_file, 'r') as f:
            coco = json.load(f)

        self.images = coco['images']
        self.annotations = coco['annotations']
        self.categories = coco['categories']
        self.img_id_to_ann = {}
        for ann in self.annotations:
            self.img_id_to_ann.setdefault(ann['image_id'], []).append(ann)

        self.cat_id_to_index = {cat['id']: idx for idx, cat in enumerate(self.categories)}

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_info = self.images[idx]
        img_path = os.path.join(self.img_dir, img_info['file_name'])
        image = Image.open(img_path).convert("RGB")

        boxes = []
        labels = []
        for ann in self.img_id_to_ann.get(img_info['id'], []):
            x, y, w, h = ann['bbox']
            boxes.append([x, y, x + w, y + h])
            labels.append(self.cat_id_to_index[ann['category_id']])

        target = {
            "boxes": torch.as_tensor(boxes, dtype=torch.float32),
            "labels": torch.as_tensor(labels, dtype=torch.int64)
        }

        if self.transform:
            image = self.transform(image)  # <- fix here

        return image, target



Step 3: Load Data

In [None]:
from torchvision.models.detection.transform import GeneralizedRCNNTransform
from torchvision import transforms


train_dir = r"C:/Users/admin/Downloads/Code/ObjectDetection/coco_subset/train2017"
val_dir = r"C:/Users/admin/Downloads/Code/ObjectDetection/coco_subset/val2017"
train_ann = r"C:/Users/admin/Downloads/Code/ObjectDetection/coco_subset/annotations/instances_train2017.json"
val_ann = r"C:/Users/admin/Downloads/Code/ObjectDetection/coco_subset/annotations/instances_val2017.json"

# Basic transforms compatible with older torchvision
ssd_transform = transforms.Compose([
    transforms.Resize((300, 300)),                     # Resize to SSD input size
    transforms.ToTensor(),                             # Convert to tensor
    transforms.ConvertImageDtype(torch.float32)        # Normalize to float32
])

# Dataset (same)
train_dataset = SSDDataset(train_dir, train_ann, transform=ssd_transform)
val_dataset = SSDDataset(val_dir, val_ann, transform=ssd_transform)

# DataLoader
def collate_fn(batch):
    return tuple(zip(*batch))

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)

Step 4: Initialize SSD300 Model and Set Up Training

In [None]:
from torchvision.models.detection import ssd300_vgg16
from torchvision.models.detection.ssd import SSDClassificationHead
from torchvision.models.detection.ssd import SSD300_VGG16_Weights
import torch

# Define device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load pretrained SSD model
weights = SSD300_VGG16_Weights.DEFAULT
model = ssd300_vgg16(weights=weights)

# SSD300 feature map configuration
in_channels = [512, 1024, 512, 256, 256, 256]
num_anchors = [4, 6, 6, 6, 4, 4]
num_classes = len(train_dataset.cat_id_to_index) + 1  # +1 for background

# Replace classification head
model.head.classification_head = SSDClassificationHead(
    in_channels=in_channels,
    num_anchors=num_anchors,
    num_classes=num_classes
)

# Move model to device
model.to(device)
print(f" SSD300 model loaded with {num_classes} classes on {device}")


Step 5: Train SSD300 for 30 Epochs

In [None]:
import torch.optim as optim

# Set model to train mode
model.train()

# Define optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)

# Train for 30 epochs
num_epochs = 30
for epoch in range(num_epochs):
    print(f"\nEpoch [{epoch+1}/{num_epochs}]")
    epoch_loss = 0.0

    for images, targets in tqdm(train_loader):
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        # Forward and backward pass
        loss_dict = model(images, targets)
        loss = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f"Epoch {epoch+1} Loss: {epoch_loss:.4f}")


In [None]:
output_dir = r"C:/Users/admin/Downloads/Code/ObjectDetection/ssdOutput"
os.makedirs(output_dir, exist_ok=True)

model_path = os.path.join(output_dir, "ssd_model.pth")
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")


Step 6: SSD300 Video Reasoning and Visualization

In [None]:
import cv2
import numpy as np
from torchvision import transforms
from PIL import Image
from collections import Counter

# Load model
model.eval()
model.load_state_dict(torch.load(model_path))
model.to(device)

# Category mapping
category_names = [cat['name'] for cat in train_dataset.categories]
idx_to_name = {idx: name for idx, name in enumerate(category_names)}

# Helper: resize + convert frame
def preprocess_frame(frame):
    image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    image = Image.fromarray(image)
    image_resized = image.resize((300, 300))
    tensor = transforms.ToTensor()(image_resized).unsqueeze(0).to(device)
    return tensor

# Helper: scale boxes back to original size
def rescale_boxes(boxes, orig_size, input_size=(300, 300)):
    orig_h, orig_w = orig_size
    input_w, input_h = input_size
    scale_w, scale_h = orig_w / input_w, orig_h / input_h
    boxes[:, 0::2] *= scale_w
    boxes[:, 1::2] *= scale_h
    return boxes

# Video detection loop
video_dir = r"C:/Users/admin/Downloads/Code/ObjectDetection/videos"
video_files = [f for f in os.listdir(video_dir) if f.endswith(".mp4")]

for video_file in video_files:
    video_path = os.path.join(video_dir, video_file)
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Failed to open {video_file}")
        continue

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    output_path = os.path.join(output_dir, f"annotated_{video_file}")
    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))

    frame_count = 0
    detected_classes = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_count += 1

        tensor = preprocess_frame(frame)

        with torch.no_grad():
            output = model(tensor)[0]

        boxes = output['boxes'].detach().cpu()
        scores = output['scores'].detach().cpu()
        labels = output['labels'].detach().cpu()

        keep = (scores >= 0.05)
        boxes = boxes[keep]
        scores = scores[keep]
        labels = labels[keep]

        if boxes.size(0) == 0:
            out.write(frame)
            continue

        boxes = rescale_boxes(boxes.clone(), (height, width)).int()

        for box, score, label in zip(boxes, scores, labels):
            x1, y1, x2, y2 = box.tolist()
            cls_id = int(label.item())
            name = idx_to_name.get(cls_id, "Unknown")
            color = tuple(np.random.randint(0, 255, 3).tolist())
            text = f"{name} {score:.2f}"

            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(frame, text, (x1, max(y1 - 10, 10)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
            detected_classes.append(cls_id)

        out.write(frame)

    cap.release()
    out.release()
    print(f" Saved annotated video to: {output_path}")
    if len(detected_classes) == 0:
        print(" No objects detected in this video.")
    else:
        stats = Counter(detected_classes)
        for cid, count in stats.items():
            print(f" - {idx_to_name.get(cid, 'Unknown')}: {count} detections")


In [None]:
# Single-frame debug (optional)
cap = cv2.VideoCapture(video_path)
ret, frame = cap.read()
cap.release()

tensor = preprocess_frame(frame)
with torch.no_grad():
    output = model(tensor)[0]

boxes = output['boxes'].cpu()
scores = output['scores'].cpu()
labels = output['labels'].cpu()

# Filter low scores
keep = scores >= 0.25
boxes = boxes[keep]
scores = scores[keep]
labels = labels[keep]

boxes = rescale_boxes(boxes.clone(), (frame.shape[0], frame.shape[1])).int()

# Visualize
for box, score, label in zip(boxes, scores, labels):
    x1, y1, x2, y2 = box.tolist()
    name = idx_to_name.get(label.item(), "Unknown")
    color = tuple(np.random.randint(0, 255, 3).tolist())
    cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
    cv2.putText(frame, f"{name} {score:.2f}", (x1, max(y1 - 10, 10)),
                cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

# Show with matplotlib
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 8))
plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
plt.axis('off')
plt.show()
