# Object Detection
This notebook will serve as the primary means of documentation for creating an object detection model
_______________________________________________________________________________________________________________________________________________

### Full group includes:
- Jordan Brown
- Dylan Roy
- Maxwell Barret
- Julia Dewhurt

### Primary notebook contributers:
- ""
- ""
_______________________________________________________________________________________________________________________________________________

***The primary notebook contributers are the group members who were assigned to this specific task. All group members will work in collaboration to create a final working product. However, the nature of this project calls for the full group to be assigned to primary tasks.***

## Notebook Goal/Purpose

This notebook will be used to create a model for traffic based object detection. This will involve using labeled image and video feeds to determine what traffic objects look like. This model will be able to detect and track traffic counts at individual intersections. This tracking will be used to create a dataset which can hopefully be used to train a traffic prediction for the traffic analysis portion of this project.
_______________________________________________________________________________________________________________________________________________

In [1]:
import os
import zipfile
import requests

def download_file(url, destination):
    """
    Download a file from a URL to a specified destination.
    """
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(destination, 'wb') as f:
            for chunk in response.iter_content(1024):
                f.write(chunk)
        print(f"Downloaded {destination}")
    else:
        print(f"Failed to download {url}")

def extract_zip(zip_file, extract_to):
    """
    Extract a zip file to a specified directory.
    """
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    print(f"Extracted {zip_file} to {extract_to}")

def download_and_extract_coco(destination_dir):
    """
    Download and extract the COCO dataset (2017 version) images and annotations.
    """
    # Create destination directory if it doesn't exist
    os.makedirs(destination_dir, exist_ok=True)

    # Define COCO dataset URLs
    coco_urls = {
        'train2017': 'http://images.cocodataset.org/zips/train2017.zip',
        'val2017': 'http://images.cocodataset.org/zips/val2017.zip',
        'annotations': 'http://images.cocodataset.org/annotations/annotations_trainval2017.zip',
    }

    # Download images and annotations
    for name, url in coco_urls.items():
        file_path = os.path.join(destination_dir, f"{name}.zip")
        
        print(f"Downloading {name}...")
        download_file(url, file_path)
        
        # Extract the downloaded zip files
        print(f"Extracting {name}...")
        extract_zip(file_path, destination_dir)

        # Remove zip files after extraction
        os.remove(file_path)

    print("COCO dataset downloaded and extracted successfully!")

# Check if the COCO dataset has already been downloaded
if os.path.exists('COCO'):
    print("COCO dataset already exists. Skipping download and extraction.")
    exit()
else:
    
    # Specify the directory to save the dataset
    destination_dir = 'COCO'

    # Start downloading and extracting COCO dataset
    download_and_extract_coco(destination_dir)


COCO dataset already exists. Skipping download and extraction.


In [None]:
import os
import torch
import torchvision
from torchvision.models.detection.faster_rcnn import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.transforms import functional as F
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import numpy as np
from pycocotools.coco import COCO


class COCODataset(Dataset):
    def __init__(self, root, ann_file, transforms=None, selected_classes=None):
        self.root = root
        self.coco = COCO(ann_file)
        self.ids = list(sorted(self.coco.imgs.keys()))
        self.transforms = transforms
        self.selected_classes = selected_classes
        if self.selected_classes:
            self.selected_classes = [
                self.coco.getCatIds(catNms=[cls])[0] for cls in self.selected_classes
            ]

    def __getitem__(self, index):
        img_id = self.ids[index]
        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        anns = self.coco.loadAnns(ann_ids)
        path = self.coco.loadImgs(img_id)[0]['file_name']

        # Load image
        img = Image.open(os.path.join(self.root, path)).convert("RGB")

        # Extract bounding boxes and labels
        boxes = []
        labels = []
        for ann in anns:
            if self.selected_classes and ann["category_id"] not in self.selected_classes:
                continue
            x, y, w, h = ann["bbox"]  # COCO format: [x, y, width, height]
            if w > 0 and h > 0:  # Ensure valid boxes
                boxes.append([x, y, x + w, y + h])
                labels.append(self.selected_classes.index(ann["category_id"]) + 1)

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        # Handle empty annotations
        if len(boxes) == 0:
            boxes = torch.zeros((0, 4), dtype=torch.float32)
            labels = torch.zeros((0,), dtype=torch.int64)

        # Image ID
        image_id = torch.tensor([img_id])

        # Area and iscrowd
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        iscrowd = torch.zeros((len(boxes),), dtype=torch.int64)

        target = {
            "boxes": boxes,
            "labels": labels,
            "image_id": image_id,
            "area": area,
            "iscrowd": iscrowd,
        }

        if self.transforms:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.ids)


def collate_fn(batch):
    return tuple(zip(*batch))


def get_model(num_classes):
    # Load a pre-trained Faster R-CNN model
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

    # Update the classifier head
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(
        in_features, num_classes
    )

    return model


def train_model(model, train_loader, val_loader, device, num_epochs=1, lr=0.005):
    model.to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=0.0005)
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for imgs, targets in train_loader:
            imgs = [img.to(device) for img in imgs]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(imgs, targets)
            losses = sum(loss for loss in loss_dict.values())
            total_loss += losses.item()

            optimizer.zero_grad()
            losses.backward()
            optimizer.step()

        lr_scheduler.step()
        print(f"Epoch {epoch + 1}, Loss: {total_loss:.4f}")

        # Validation (optional)
        if val_loader is not None:
            validate_model(model, val_loader, device)

    return model


def validate_model(model, val_loader, device):
    model.eval()
    with torch.no_grad():
        for imgs, targets in val_loader:
            imgs = [img.to(device) for img in imgs]
            _ = model(imgs)  # No loss calculation, only forward pass


# Paths to COCO dataset
root = "COCO/train2017"
ann_file = "COCO/annotations/instances_train2017.json"

# Define the classes to detect
selected_classes = ["car", "truck", "motorcycle", "bus"]

# Define dataset and dataloaders
transforms = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor()
])
dataset = COCODataset(root, ann_file, transforms=transforms, selected_classes=selected_classes)
train_loader = DataLoader(dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)

val_root = "COCO/val2017"
val_ann_file = "COCO/annotations/instances_val2017.json"
val_dataset = COCODataset(val_root, val_ann_file, transforms=transforms, selected_classes=selected_classes)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)

# Get model
num_classes = len(selected_classes) + 1  # Add 1 for the background class
model = get_model(num_classes)

# Train
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
train_model(model, train_loader, val_loader, device)


loading annotations into memory...
Done (t=15.93s)
creating index...
index created!
loading annotations into memory...
Done (t=0.55s)
creating index...
index created!


