# Assignment 4: Wheres Waldo?
### Name: Eileanor LaRocco
In this assignment, you will develop an object detection algorithm to locate Waldo in a set of images. You will develop a model to detect the bounding box around Waldo. Your final task is to submit your predictions on Kaggle for evaluation.

### Process/Issues
- Double-checked that the images we were given were correctly bounded (did this by visualizing the boxes on the images - they look good!)
- Complication: Originally when I creating augmented images, the bounding box labels did not also augment. I also had to try out a few types of augmentation to see what made sense for waldo. The augmented images may still not be as different from one another as they could be which could allow the model to favor the training images that occur more frequently.
- Complication: Similarly, when resizing the images, ensuring the bounding boxes not only are also adjusted if necessary, but ensuring they do not get cut off and the image is not stretched/shrunk too much.

### Imports

In [1]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import torch
from torchvision.io import read_image
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.transforms import functional as F
from tqdm import tqdm
import csv
import opendatasets as od
import cv2
import albumentations as A
import random
import shutil
from sklearn.model_selection import train_test_split
from ultralytics import YOLO
import torch
import torch.nn as nn

  data = fetch_version_info()


In [2]:
SEED = 1

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

device = device = torch.device("mps")
print(device)

mps


### Download Data

In [3]:
od.download('https://www.kaggle.com/competitions/2024-fall-ml-3-hw-4-wheres-waldo/data')

Skipping, found downloaded files in "./2024-fall-ml-3-hw-4-wheres-waldo" (use force=True to force download)


### Paths

In [4]:
train_folder = "2024-fall-ml-3-hw-4-wheres-waldo/train/train" # Original Train Images
test_folder = "2024-fall-ml-3-hw-4-wheres-waldo/test/test" # Original Test Images
annotations_file = "2024-fall-ml-3-hw-4-wheres-waldo/annotations.csv" # Original Annotations File

# Preprocess Images (Crop/Augment)

### Check Image Sizes

In [5]:
# Train Images

# Iterate over all images in the folder
for image_name in os.listdir(train_folder):
    if image_name.endswith((".jpg")):  # Check for common image extensions
        image_path = os.path.join(train_folder, image_name)
        
        # Read the image using OpenCV
        img = cv2.imread(image_path)
        if img is not None:
            height, width, channels = img.shape  # Get image size (height, width, channels)
            print(f"Image: {image_name}, Width: {width}, Height: {height}")
        else:
            print(f"Could not read image: {image_name}")

# Test Images

# Iterate over all images in the folder
for image_name in os.listdir(test_folder):
    if image_name.endswith((".jpg")):  # Check for common image extensions
        image_path = os.path.join(test_folder, image_name)
        
        # Read the image using OpenCV
        img = cv2.imread(image_path)
        if img is not None:
            height, width, channels = img.shape  # Get image size (height, width, channels)
            print(f"Image: {image_name}, Width: {width}, Height: {height}")
        else:
            print(f"Could not read image: {image_name}")


Image: 8.jpg, Width: 2800, Height: 1760
Image: 9.jpg, Width: 1298, Height: 951
Image: 14.jpg, Width: 1700, Height: 2340
Image: 15.jpg, Width: 1600, Height: 1006
Image: 17.jpg, Width: 1599, Height: 1230
Image: 16.jpg, Width: 1525, Height: 3415
Image: 12.jpg, Width: 1276, Height: 1754
Image: 13.jpg, Width: 1280, Height: 864
Image: 11.jpg, Width: 2828, Height: 1828
Image: 10.jpg, Width: 1600, Height: 980
Image: 21.jpg, Width: 2048, Height: 1515
Image: 20.jpg, Width: 2953, Height: 2088
Image: 22.jpg, Width: 500, Height: 256
Image: 23.jpg, Width: 325, Height: 300
Image: 27.jpg, Width: 591, Height: 629
Image: 26.jpg, Width: 600, Height: 374
Image: 18.jpg, Width: 1590, Height: 981
Image: 24.jpg, Width: 456, Height: 256
Image: 25.jpg, Width: 413, Height: 500
Image: 19.jpg, Width: 1280, Height: 864
Image: 4.jpg, Width: 2048, Height: 1272
Image: 5.jpg, Width: 2100, Height: 1760
Image: 7.jpg, Width: 1949, Height: 1419
Image: 6.jpg, Width: 2048, Height: 1454
Image: 2.jpg, Width: 1286, Height: 946


### Resize Images

In [6]:
# Paths
resized_folder = "2024-fall-ml-3-hw-4-wheres-waldo/train/train/resized"
resized_annotations_file = "2024-fall-ml-3-hw-4-wheres-waldo/resized_annotations.csv"
target_size = (1000, 600)  # Target size for resizing images

# Read the annotations CSV file
annotations_df = pd.read_csv(annotations_file)

# Create the output folder if it doesn't exist
os.makedirs(resized_folder, exist_ok=True)

# List to store updated bounding boxes
updated_annotations = []

# Iterate over all images in the annotation file
for index, row in annotations_df.iterrows():
    image_name = row["filename"]
    xmin, ymin, xmax, ymax = row["xmin"], row["ymin"], row["xmax"], row["ymax"]
    
    # Load the image
    image_path = os.path.join(train_folder, image_name)
    img = cv2.imread(image_path)
    
    if img is not None:
        original_height, original_width = img.shape[:2]
        
        # Calculate the resizing scale factors
        scale_x = target_size[0] / original_width
        scale_y = target_size[1] / original_height
        
        # Resize the image
        resized_img = cv2.resize(img, target_size)
        
        # Adjust bounding boxes based on the scaling factors
        xmin_new = int(xmin * scale_x)
        ymin_new = int(ymin * scale_y)
        xmax_new = int(xmax * scale_x)
        ymax_new = int(ymax * scale_y)
        
        # Save the resized image
        resized_image_path = os.path.join(resized_folder, image_name)
        cv2.imwrite(resized_image_path, resized_img)
        
        # Add the updated annotation to the list
        updated_annotations.append([image_name, xmin_new, ymin_new, xmax_new, ymax_new])

# Save the updated annotations to a new CSV file
updated_annotations_df = pd.DataFrame(updated_annotations, columns=["filename", "xmin", "ymin", "xmax", "ymax"])
updated_annotations_df.to_csv(resized_annotations_file, index=False)

print("Images and annotations resized and saved.")


Images and annotations resized and saved.


# Preprocessing (for model)

In [7]:
# Paths
yolo_train_dir = "datasets/yolo_dataset/train"
yolo_val_dir = "datasets/yolo_dataset/val"

#Saved Predictions
yolo_test_dir = "yolo_test_predictions"

# Create necessary folders
os.makedirs(yolo_train_dir, exist_ok=True)
os.makedirs(yolo_val_dir, exist_ok=True)
os.makedirs(yolo_test_dir, exist_ok=True)

# Load annotations
annotations = pd.read_csv(resized_annotations_file)

# Function to convert annotations to YOLO format
def convert_to_yolo_format(row, img_width, img_height):
    x_center = (row["xmin"] + row["xmax"]) / 2 / img_width
    y_center = (row["ymin"] + row["ymax"]) / 2 / img_height
    width = (row["xmax"] - row["xmin"]) / img_width
    height = (row["ymax"] - row["ymin"]) / img_height
    return f"0 {x_center} {y_center} {width} {height}"

# Split training data into train and validation sets
image_files = annotations["filename"].unique()
train_images, val_images = train_test_split(image_files, test_size=0.2, random_state=42)

# Function to prepare YOLO format data
def prepare_yolo_data(image_list, output_dir):
    for img_name in image_list:
        img_path = os.path.join(resized_folder, img_name)
        img = cv2.imread(img_path)
        if img is None:
            continue
        img_height, img_width, _ = img.shape

        # Filter annotations for this image
        image_annotations = annotations[annotations["filename"] == img_name]

        # YOLO annotations file
        yolo_annotations = []
        for _, row in image_annotations.iterrows():
            yolo_line = convert_to_yolo_format(row, img_width, img_height)
            yolo_annotations.append(yolo_line)

        # Save image and annotation
        base_name = os.path.splitext(img_name)[0]
        shutil.copy(img_path, os.path.join(output_dir, f"{base_name}.jpg"))
        with open(os.path.join(output_dir, f"{base_name}.txt"), "w") as f:
            f.write("\n".join(yolo_annotations))

# Prepare training and validation data
prepare_yolo_data(train_images, yolo_train_dir)
prepare_yolo_data(val_images, yolo_val_dir)


In [8]:
def filter_csv_by_column(input_csv, output_csv, column_name, values_list):
    """
    Filters rows in a CSV file and keeps only those where the specified column's value is in a given list.

    Parameters:
        input_csv (str): Path to the input CSV file.
        output_csv (str): Path to save the filtered CSV file.
        column_name (str): Column to filter on.
        values_list (list): List of values to keep.
    """
    # Load the CSV into a DataFrame
    df = pd.read_csv(input_csv)

    # Filter the DataFrame
    filtered_df = df[df[column_name].isin(values_list)]

    # Save the filtered DataFrame to a new CSV file
    filtered_df.to_csv(output_csv, index=False)

In [9]:
#split annotations into train and val
values_list = []
directory = "datasets/yolo_dataset/train"
for filename in os.listdir(directory):
    if filename.endswith('.jpg'):
        values_list.append(filename)

# Example usage
input_csv = "2024-fall-ml-3-hw-4-wheres-waldo/resized_annotations.csv"
output_csv = "2024-fall-ml-3-hw-4-wheres-waldo/train_annotations.csv"
column_name = "filename"

filter_csv_by_column(input_csv, output_csv, column_name, values_list)




values_list = []
directory = "datasets/yolo_dataset/val"
for filename in os.listdir(directory):
    if filename.endswith('.jpg'):
        values_list.append(filename)

# Example usage
input_csv = "2024-fall-ml-3-hw-4-wheres-waldo/resized_annotations.csv"
output_csv = "2024-fall-ml-3-hw-4-wheres-waldo/test_annotations.csv" 
column_name = "filename" 

filter_csv_by_column(input_csv, output_csv, column_name, values_list)



In [10]:
from PIL import Image
import torchvision

class WaldoDataset(torch.utils.data.Dataset):
    def __init__(self, annotations_file, img_dir):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = torchvision.transforms.Compose([
            torchvision.transforms.ToPILImage(),
            torchvision.transforms.ToTensor()
        ])

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        # Load image
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = Image.open(img_path).convert("RGB")
        image = F.to_tensor(image)
        image = np.array(image)
        
        # Read bounding box data, ensuring all are converted to float
        box_data = self.img_labels.iloc[idx, 1:5].values
        boxes = []
        for item in box_data:
            try:
                boxes.append(float(item))
            except ValueError as e:
                raise ValueError(f"Error converting bounding box data to float: {e}")

        # Create tensors
        boxes = torch.as_tensor([boxes], dtype=torch.float32)
        labels = torch.ones((1,), dtype=torch.int64)
        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        iscrowd = torch.zeros((1,), dtype=torch.int64)
        
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["image_id"] = image_id
        target["area"] = area
        target["iscrowd"] = iscrowd

        image, target = self.transform(image, target)

        target = F.to_tensor(target)

        return image, target


# Example usage:
# Create the dataset
train_dataset = WaldoDataset(annotations_file= "2024-fall-ml-3-hw-4-wheres-waldo/train_annotations.csv", img_dir="datasets/yolo_dataset/train")
test_dataset = WaldoDataset(annotations_file= "2024-fall-ml-3-hw-4-wheres-waldo/test_annotations.csv", img_dir="datasets/yolo_dataset/val")

# Now, you can use this dataset with a DataLoader to train your model
from torch.utils.data import DataLoader

train_loader = DataLoader(
    train_dataset,
    batch_size=2,
    shuffle=True,
    #collate_fn=lambda x: tuple(zip(*x))
)

test_loader = DataLoader(
    test_dataset,
    batch_size=2,
    shuffle=True,
    #collate_fn=lambda x: tuple(zip(*x))
)

# Model

In [86]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SimpleBBoxModel(nn.Module):
    def __init__(self):
        super(SimpleBBoxModel, self).__init__()
        
        # Define a simple CNN architecture
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        
        # Define a fully connected layer to output 4 values for the bounding box
        self.fc1 = nn.Linear(64 * 16 * 16, 128)  # Flattening 64 channels of 16x16 feature maps
        self.fc2 = nn.Linear(128, 4)  # Outputting the 4 bounding box coordinates

    def forward(self, x):
        # Pass the input through the convolutional layers
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)  # Pooling to reduce the spatial dimensions
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)  # Pooling again
        
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(x, 2)  # Final pooling
        
        # Flatten the output for the fully connected layer
        x = torch.flatten(x, 1)  # Flatten all dimensions except batch
        
        # Pass through the fully connected layers
        x = F.relu(self.fc1(x))
        bbox = self.fc2(x)  # Output 4 values for the bounding box
        
        return bbox

# Example usage:
model = SimpleBBoxModel()

# Input a batch of 128x128 RGB images (batch_size=2)
images = torch.randn(2, 3, 128, 128)  # Random images for demonstration

# Forward pass
output = model(images)
print("Bounding box predictions: ", output)


Bounding box predictions:  tensor([[-0.0638,  0.0943, -0.0380,  0.0581],
        [-0.0603,  0.1020, -0.0275,  0.0441]], grad_fn=<AddmmBackward0>)


### Architecture

In [None]:
import torch
import torch.nn as nn
import torchvision
from torchvision.ops import roi_align, nms

# Define the Backbone
class Backbone(nn.Module):
    def __init__(self):
        super().__init__()
        resnet50 = torchvision.models.resnet50(pretrained=False)
        self.backbone = nn.Sequential(*list(resnet50.children())[:-2])  # Remove classification head
    
    def forward(self, x):
        return self.backbone(x)

# Define the RPN (Region Proposal Network) with custom anchor generation
class RPN(nn.Module):
    def __init__(self, in_channels):
        super().__init__()
        self.conv = nn.Conv2d(in_channels, 512, kernel_size=3, stride=1, padding=1)
        self.cls_logits = nn.Conv2d(512, 9 * 2, kernel_size=1)  # 9 anchors per location
        self.bbox_pred = nn.Conv2d(512, 9 * 4, kernel_size=1)   # 9 anchors per location

    def generate_anchors(self, feature_map_size, aspect_ratios, sizes):
        # Generate anchors in (xmin, ymin, xmax, ymax) format
        anchors = []
        for size in sizes:
            for aspect_ratio in aspect_ratios:
                w = size * aspect_ratio**0.5
                h = size / aspect_ratio**0.5
                # Generate anchors with (xmin, ymin, xmax, ymax)
                anchors.append([-w/2, -h/2, w/2, h/2])  # Anchors centered at (0, 0)
        return torch.tensor(anchors).float()

    def forward(self, feature_map, image_size):
        x = self.conv(feature_map)  # Convolution layer output
        logits = self.cls_logits(x)  # Classification logits
        bbox_deltas = self.bbox_pred(x)  # Bounding box deltas

        # Generate custom anchors (9 anchors for each location)
        anchors = self.generate_anchors(feature_map.shape[-2:], aspect_ratios=[0.5, 1.0, 2.0], sizes=[8, 16, 32])

        # Reshape logits to [batch_size, num_anchors, height, width]
        logits = logits.permute(0, 2, 3, 1).contiguous().view(logits.shape[0], -1, 2)
        bbox_deltas = bbox_deltas.permute(0, 2, 3, 1).contiguous().view(bbox_deltas.shape[0], -1, 4)

        return logits, bbox_deltas, anchors

# Define the Detection Head
class DetectionHead(nn.Module):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        self.fc1 = nn.Linear(in_channels * 7 * 7, 1024)
        self.fc2 = nn.Linear(1024, 1024)
        self.cls_score = nn.Linear(1024, num_classes)  # Classification scores
        self.bbox_pred = nn.Linear(1024, num_classes * 4)  # Bounding box regression
    
    def forward(self, x):
        x = torch.flatten(x, start_dim=1)
        x = nn.ReLU()(self.fc1(x))
        x = nn.ReLU()(self.fc2(x))
        cls_scores = self.cls_score(x)
        bbox_deltas = self.bbox_pred(x)
        return cls_scores, bbox_deltas

# Define Faster R-CNN with Custom RPN
class FasterRCNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.backbone = Backbone()
        self.rpn = RPN(2048)  # 2048 channels from ResNet50 backbone
        self.roi_align = roi_align
        self.head = DetectionHead(2048, num_classes)
    
    def forward(self, images, targets=None):
        # Backbone Forward Pass
        features = self.backbone(images)
        batch_size, _, height, width = images.shape

        # RPN Forward Pass
        rpn_logits, rpn_bbox_deltas, anchors = self.rpn(features, (height, width))

        # Decode Proposals (Using anchors and bbox deltas)
        proposals = self.decode_proposals(anchors, rpn_bbox_deltas, (height, width))

        # Apply NMS on Proposals
        proposals = self.apply_nms(proposals, rpn_logits, threshold=0.7)

        # RoI Align
        pooled_features = self.roi_align(features, proposals, output_size=(4, 4), spatial_scale=1/16)

        # Detection Head Forward Pass
        cls_scores, bbox_deltas = self.head(pooled_features)

        # Loss Computation
        if targets is not None:
            losses = self.compute_losses(rpn_logits, rpn_bbox_deltas, cls_scores, bbox_deltas, anchors, proposals, targets)
            return losses
        else:
            return self.post_process(cls_scores, bbox_deltas, proposals)

    
    
    
    def decode_proposals(self, anchors, bbox_deltas, feature_map_shape):
        # Ensure bbox_deltas has the expected shape [batch_size, num_anchors * height * width, 4]
        batch_size = bbox_deltas.shape[0]
        height, width = feature_map_shape
        num_anchors = 9  # Number of anchors per location (e.g., 9)

        # Reshape bbox_deltas to match the number of anchors (batch_size, num_anchors * height * width, 4)
        bbox_deltas = bbox_deltas.view(batch_size, num_anchors, height, width, 4)
        bbox_deltas = bbox_deltas.permute(0, 3, 1, 2).contiguous().view(batch_size, -1, 4)  # [batch_size, num_anchors * height * width, 4]

        # Expand anchors to match the number of proposals (batch_size, num_anchors * height * width, 4)
        anchors = anchors.view(1, -1, 4).expand(batch_size, -1, 4)

        # Apply deltas to anchors (decode proposals)
        proposals = self.apply_bbox_deltas(anchors, bbox_deltas)
        
        return proposals

    
    def apply_bbox_deltas(self, anchors, bbox_deltas):
        """
        Apply bounding box deltas to anchors to get proposals.
        
        Arguments:
        - anchors: Tensor of shape [batch_size, num_anchors, 4] representing the base boxes.
        - bbox_deltas: Tensor of shape [batch_size, num_anchors, 4] representing the bounding box deltas (4 per anchor).
        
        Returns:
        - proposals: Tensor of shape [batch_size, num_anchors, 4] representing the decoded bounding box proposals.
        """
        # Apply deltas to the anchors
        anchors = anchors.float()  # Make sure anchors are float32 for numerical stability

        # Get the current box coordinates
        anchor_x, anchor_y, anchor_w, anchor_h = anchors.split(1, dim=-1)

        # Split the bbox_deltas into 4 parts (dx, dy, dw, dh)
        dx, dy, dw, dh = bbox_deltas.split(1, dim=-1)

        # Apply the deltas
        pred_ctr_x = dx * anchor_w + anchor_x
        pred_ctr_y = dy * anchor_h + anchor_y
        pred_w = torch.exp(dw) * anchor_w
        pred_h = torch.exp(dh) * anchor_h

        # Convert the predictions back to x1, y1, x2, y2 format (bounding box corners)
        proposals = torch.cat([
            pred_ctr_x - 0.5 * pred_w,  # x1
            pred_ctr_y - 0.5 * pred_h,  # y1
            pred_ctr_x + 0.5 * pred_w,  # x2
            pred_ctr_y + 0.5 * pred_h   # y2
        ], dim=-1)

        return proposals




    def apply_nms(self, proposals, logits, threshold=0.7):
        # Convert logits to scores (foreground probability)
        scores = torch.sigmoid(logits[:, :, 1])  # Use the foreground class score

        # Apply NMS: proposals are anchors, scores are foreground probabilities
        keep = nms(proposals, scores, threshold)
        return proposals[keep]

    def compute_losses(self, rpn_logits, rpn_bbox_deltas, cls_scores, bbox_deltas, anchors, proposals, targets):
        # Compute RPN and detection head losses (simplified)
        return {
            "rpn_cls_loss": torch.tensor(0.0),
            "rpn_bbox_loss": torch.tensor(0.0),
            "head_cls_loss": torch.tensor(0.0),
            "head_bbox_loss": torch.tensor(0.0),
        }

    def post_process(self, cls_scores, bbox_deltas, proposals):
        # Simplified post-processing
        return cls_scores, bbox_deltas, proposals

# Instantiate the Model
model = FasterRCNN(num_classes=2)  # Example: 1 class + background

# Example image: A random 128x128 image with 3 color channels (simulating an RGB image)
image = torch.randn(2, 3, 128, 128)

# Put the model in evaluation mode
model.eval()

# Run the model on the image
with torch.no_grad():
    output = model(image)  # If no targets are passed, it will return the predictions
    print("Predictions:", output)


RuntimeError: shape '[2, 9, 128, 128, 4]' is invalid for input of size 1152

In [73]:
import torch
import torch.nn as nn
import torchvision
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.ops import roi_align, nms

# Define the Backbone
class Backbone(nn.Module):
    def __init__(self):
        super().__init__()
        resnet50 = torchvision.models.resnet50(pretrained=False)
        self.backbone = nn.Sequential(*list(resnet50.children())[:-2])  # Remove classification head
    
    def forward(self, x):
        return self.backbone(x)

# Define the RPN (Region Proposal Network)
class RPN(nn.Module):
    def __init__(self, in_channels, anchor_generator):
        super().__init__()
        self.anchor_generator = anchor_generator
        self.conv = nn.Conv2d(in_channels, 512, kernel_size=3, stride=1, padding=1)
        self.cls_logits = nn.Conv2d(512, anchor_generator.num_anchors_per_location()[0] * 2, kernel_size=1)
        self.bbox_pred = nn.Conv2d(512, anchor_generator.num_anchors_per_location()[0] * 4, kernel_size=1)
    
    def forward(self, feature_map, image_size):
        print(f"Feature map type: {type(feature_map)}")
        print(f"Feature map shape: {getattr(feature_map, 'shape', 'No shape attribute')}")
        print("image size:", image_size)
        x = self.conv(feature_map)  # Convolution layer output
        logits = self.cls_logits(x)  # Classification logits
        bbox_deltas = self.bbox_pred(x)  # Bounding box deltas

        # Debugging shapes
        print("RPN feature map shape:", x.shape)
        print("Image size passed to AnchorGenerator:", image_size)

        from torchvision.models.detection.image_list import ImageList

        # Example image tensor (e.g., a batch of images)
        image_tensor = torch.randn(1, 3, 128, 128)  # Example tensor of shape [batch_size, channels, height, width]
        image_size = (128, 128)  # The corresponding image size (height, width)

        # Create an ImageList object
        image_list = ImageList(image_tensor, image_size)

        # Now, you can pass the ImageList object to the AnchorGenerator
        anchors = self.anchor_generator([x], [image_list])[0]

        print(f"Anchors type: {type(anchors)}")
        print(f"Anchors shape: {anchors.shape if isinstance(anchors, torch.Tensor) else 'Unknown'}")

        return logits, bbox_deltas, anchors

# Define the Detection Head
class DetectionHead(nn.Module):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        self.fc1 = nn.Linear(in_channels * 7 * 7, 1024)
        self.fc2 = nn.Linear(1024, 1024)
        self.cls_score = nn.Linear(1024, num_classes)  # Classification scores
        self.bbox_pred = nn.Linear(1024, num_classes * 4)  # Bounding box regression
    
    def forward(self, x):
        x = torch.flatten(x, start_dim=1)
        x = nn.ReLU()(self.fc1(x))
        x = nn.ReLU()(self.fc2(x))
        cls_scores = self.cls_score(x)
        bbox_deltas = self.bbox_pred(x)
        return cls_scores, bbox_deltas

# Define Faster R-CNN
class FasterRCNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.backbone = Backbone()
        self.anchor_generator = AnchorGenerator(
            sizes=((8, 16, 32, 64, 128),),  # 128 since 128x128 image
            aspect_ratios=((0.5, 1.0, 2.0),)  # Aspect ratios remain the same
        )
        self.rpn = RPN(2048, self.anchor_generator)
        self.roi_align = roi_align
        self.head = DetectionHead(2048, num_classes)
    
    def forward(self, images, targets=None):
        # Backbone Forward Pass
        features = self.backbone(images)
        print("Backbone features shape:", features.shape)
        batch_size, _, height, width = images.shape

        # RPN Forward Pass
        rpn_logits, rpn_bbox_deltas, anchors = self.rpn(features, (height, width))

        # Decode Proposals
        proposals = self.decode_proposals(anchors, rpn_bbox_deltas)

        # Apply NMS on Proposals
        proposals = self.apply_nms(proposals, rpn_logits, threshold=0.7)

        # RoI Align
        pooled_features = self.roi_align(features, proposals, output_size=(4, 4), spatial_scale=1/16) #match output from resnet ((2048, 4, 4))


        # Detection Head Forward Pass
        cls_scores, bbox_deltas = self.head(pooled_features)

        # Loss Computation
        if targets is not None:
            losses = self.compute_losses(rpn_logits, rpn_bbox_deltas, cls_scores, bbox_deltas, anchors, proposals, targets)
            return losses
        else:
            return self.post_process(cls_scores, bbox_deltas, proposals)

    def decode_proposals(self, anchors, bbox_deltas):
        # Apply deltas to anchors to get proposals (not shown here for brevity)
        return proposals

    def apply_nms(self, proposals, logits, threshold=0.7):
        scores = torch.sigmoid(logits.squeeze(1))
        keep = nms(proposals, scores, threshold)
        return proposals[keep]

    def compute_losses(self, rpn_logits, rpn_bbox_deltas, cls_scores, bbox_deltas, anchors, proposals, targets):
        # Compute RPN and detection head losses
        # (e.g., Binary Cross-Entropy, Smooth L1 Loss)
        return {
            "rpn_cls_loss": rpn_cls_loss,
            "rpn_bbox_loss": rpn_bbox_loss,
            "head_cls_loss": head_cls_loss,
            "head_bbox_loss": head_bbox_loss,
        }

    def post_process(self, cls_scores, bbox_deltas, proposals):
        # Decode final predictions and apply NMS (not shown here for brevity)
        return final_detections

# Instantiate the Model
model = FasterRCNN(num_classes=2)  # Example: 1 class + background

# Example image: A random 128x128 image with 3 color channels (simulating an RGB image)
image = torch.randn(2, 3, 128, 128)

# Example target: A dictionary with ground truth data
# The target should include the bounding boxes and labels for the object(s) in the image
target = [{
    'boxes': torch.tensor([[30.0, 30.0, 100.0, 100.0],[30.0, 30.0, 100.0, 100.0]]),  # Example bounding box [xmin, ymin, xmax, ymax]
    'labels': torch.tensor([1, 1]),  # Example class label (1 for the object class, 0 is for background)
    'image_id': torch.tensor([0, 1]),  # Image id (if using a dataset with multiple images)
    'area': torch.tensor([100, 100]),  # Area of the bounding box (width * height)
    'iscrowd': torch.tensor([0, 0])  # Indicates whether the object is a crowd (used in COCO dataset)
}]

# Instantiate the model (use the code from previous steps)
model = FasterRCNN(num_classes=2)  # 1 class + background

# Put the model in evaluation mode
model.eval()

# Run the model on the image and target
with torch.no_grad():
    # Forward pass: predictions and losses if targets are provided
    #losses = model(image, target)
    #print("Losses:", losses)

    # Inference mode: Get predictions if no target is provided
    output = model(image)  # If no targets are passed, it will return the predictions
    print("Predictions:", output)


Backbone features shape: torch.Size([2, 2048, 4, 4])
Feature map type: <class 'torch.Tensor'>
Feature map shape: torch.Size([2, 2048, 4, 4])
image size: (128, 128)
RPN feature map shape: torch.Size([2, 512, 4, 4])
Image size passed to AnchorGenerator: (128, 128)


AttributeError: 'ImageList' object has no attribute 'shape'

In [37]:
import torch
import torch.nn as nn
import torchvision
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.ops import roi_align, nms

# Define the Backbone
class Backbone(nn.Module):
    def __init__(self):
        super().__init__()
        resnet50 = torchvision.models.resnet50(pretrained=False)
        self.backbone = nn.Sequential(*list(resnet50.children())[:-2])  # Remove classification head
    
    def forward(self, x):
        return self.backbone(x)

# Define the RPN (Region Proposal Network)
class RPN(nn.Module):
    def __init__(self, in_channels, anchor_generator):
        super().__init__()
        self.anchor_generator = anchor_generator
        self.conv = nn.Conv2d(in_channels, 512, kernel_size=3, stride=1, padding=1)
        self.cls_logits = nn.Conv2d(512, anchor_generator.num_anchors_per_location()[0] * 2, kernel_size=1)
        self.bbox_pred = nn.Conv2d(512, anchor_generator.num_anchors_per_location()[0] * 4, kernel_size=1)
    
    def forward(self, feature_map, image_size):
        print(feature_map.shape)
        print(image_size)
        x = self.conv(feature_map)  # Convolution layer output
        logits = self.cls_logits(x)  # Classification logits
        bbox_deltas = self.bbox_pred(x)  # Bounding box deltas

        # Debugging shapes
        print("RPN feature map shape:", x.shape)
        print("Image size passed to AnchorGenerator:", image_size)

        # Ensure feature map is passed as a list
        anchors = self.anchor_generator([x], [(image_size[1], image_size[0])])[0]
        print("Generated anchors type:", type(anchors))
        print("Generated anchors shape:", anchors.shape if isinstance(anchors, torch.Tensor) else anchors)

        return logits, bbox_deltas, anchors

# Define the Detection Head
class DetectionHead(nn.Module):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        self.fc1 = nn.Linear(in_channels * 4 * 4, 1024)  # Adjusting for smaller feature map size
        self.fc2 = nn.Linear(1024, 1024)
        self.cls_score = nn.Linear(1024, num_classes)  # Classification scores
        self.bbox_pred = nn.Linear(1024, num_classes * 4)  # Bounding box regression
    
    def forward(self, x):
        x = torch.flatten(x, start_dim=1)
        x = nn.ReLU()(self.fc1(x))
        x = nn.ReLU()(self.fc2(x))
        cls_scores = self.cls_score(x)
        bbox_deltas = self.bbox_pred(x)
        return cls_scores, bbox_deltas

# Define Faster R-CNN
class FasterRCNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.backbone = Backbone()
        self.anchor_generator = AnchorGenerator(
            sizes=((8, 16, 32, 64, 128),),  # Adjust anchor sizes for small image size
            aspect_ratios=((0.5, 1.0, 2.0),)  # Aspect ratios remain the same
        )
        self.rpn = RPN(2048, self.anchor_generator)
        self.roi_align = roi_align
        self.head = DetectionHead(2048, num_classes)
    
    def forward(self, images, targets=None):
        # Backbone Forward Pass
        features = self.backbone(images)
        print("Backbone features shape:", features.shape)
        batch_size, _, height, width = images.shape

        # RPN Forward Pass
        rpn_logits, rpn_bbox_deltas, anchors = self.rpn(features, (height, width))

        # Decode Proposals
        proposals = self.decode_proposals(anchors, rpn_bbox_deltas)

        # Apply NMS on Proposals
        proposals = self.apply_nms(proposals, rpn_logits, threshold=0.7)

        # RoI Align
        pooled_features = self.roi_align(features, proposals, output_size=(4, 4), spatial_scale=1/16)  # Adjust for smaller image size

        # Detection Head Forward Pass
        cls_scores, bbox_deltas = self.head(pooled_features)

        # Loss Computation
        if targets is not None:
            losses = self.compute_losses(rpn_logits, rpn_bbox_deltas, cls_scores, bbox_deltas, anchors, proposals, targets)
            return losses
        else:
            return self.post_process(cls_scores, bbox_deltas, proposals)

    def decode_proposals(self, anchors, bbox_deltas):
        # Apply deltas to anchors to get proposals (not shown here for brevity)
        return proposals

    def apply_nms(self, proposals, logits, threshold=0.7):
        scores = torch.sigmoid(logits.squeeze(1))
        keep = nms(proposals, scores, threshold)
        return proposals[keep]

    def compute_losses(self, rpn_logits, rpn_bbox_deltas, cls_scores, bbox_deltas, anchors, proposals, targets):
        # Compute RPN and detection head losses
        # (e.g., Binary Cross-Entropy, Smooth L1 Loss)
        return {
            "rpn_cls_loss": rpn_cls_loss,
            "rpn_bbox_loss": rpn_bbox_loss,
            "head_cls_loss": head_cls_loss,
            "head_bbox_loss": head_bbox_loss,
        }

    def post_process(self, cls_scores, bbox_deltas, proposals):
        # Decode final predictions and apply NMS (not shown here for brevity)
        return final_detections

# Instantiate the Model
model = FasterRCNN(num_classes=2)  # Example: 1 class + background

"""
# Example Inputs
images = torch.randn(1, 3, 128, 128)  # Batch of one image
targets = [
    {
        "boxes": torch.tensor([[50, 50, 200, 200]], dtype=torch.float32),
        "labels": torch.tensor([1], dtype=torch.int64)
    }
]

# Forward Pass (Training)
losses = model(images, targets)
print("Losses:", losses)

# Forward Pass (Inference)
detections = model(images)
print("Detections:", detections)
"""

# Example image: A random 128x128 image with 3 color channels (simulating an RGB image)
image = torch.randn(1, 3, 128, 128)

# Example target: A dictionary with ground truth data
# The target should include the bounding boxes and labels for the object(s) in the image
target = [{
    'boxes': torch.tensor([[30.0, 30.0, 100.0, 100.0]]),  # Example bounding box [xmin, ymin, xmax, ymax]
    'labels': torch.tensor([1]),  # Example class label (1 for the object class, 0 is for background)
    'image_id': torch.tensor([0]),  # Image id (if using a dataset with multiple images)
    'area': torch.tensor([4900.0]),  # Area of the bounding box (width * height)
    'iscrowd': torch.tensor([0])  # Indicates whether the object is a crowd (used in COCO dataset)
}]

# Instantiate the model (use the code from previous steps)
model = FasterRCNN(num_classes=2)  # 1 class + background

# Put the model in evaluation mode
model.eval()

# Run the model on the image and target
with torch.no_grad():
    # Forward pass: predictions and losses if targets are provided
    #losses = model(image, target)
    #print("Losses:", losses)

    # Inference mode: Get predictions if no target is provided
    output = model(image)  # If no targets are passed, it will return the predictions
    print("Predictions:", output)



Backbone features shape: torch.Size([1, 2048, 4, 4])
torch.Size([1, 2048, 4, 4])
(128, 128)
RPN feature map shape: torch.Size([1, 512, 4, 4])
Image size passed to AnchorGenerator: (128, 128)


AttributeError: 'tuple' object has no attribute 'shape'

In [78]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import random

model = SimpleYOLOv3(num_classes=1)

# IoU calculation
def compute_iou(pred_boxes, true_boxes):
    # pred_boxes and true_boxes should be in (x_min, y_min, x_max, y_max)
    inter_xmin = torch.max(pred_boxes[:, 0], true_boxes[:, 0])
    inter_ymin = torch.max(pred_boxes[:, 1], true_boxes[:, 1])
    inter_xmax = torch.min(pred_boxes[:, 2], true_boxes[:, 2])
    inter_ymax = torch.min(pred_boxes[:, 3], true_boxes[:, 3])

    inter_area = torch.clamp(inter_xmax - inter_xmin, min=0) * torch.clamp(inter_ymax - inter_ymin, min=0)
    pred_area = (pred_boxes[:, 2] - pred_boxes[:, 0]) * (pred_boxes[:, 3] - pred_boxes[:, 1])
    true_area = (true_boxes[:, 2] - true_boxes[:, 0]) * (true_boxes[:, 3] - true_boxes[:, 1])

    union_area = pred_area + true_area - inter_area
    iou = inter_area / union_area
    return iou

# Simple IoU loss function
def iou_loss(pred_boxes, true_boxes):
    iou = compute_iou(pred_boxes, true_boxes)
    return 1 - iou.mean()  # We want to maximize IoU, so minimize 1 - IoU

# Custom YOLOv3 training loop
def train(model, train_loader, optimizer, device):
    model.train()
    total_loss = 0
    for images, targets in train_loader:
        print(images.shape())
        print(images)
        print(targets.shape())
        images = images.to(device)
        targets = targets.to(device)

        # Forward pass
        predictions = model(images)

        # Extract predicted boxes and target boxes (for simplicity, assuming one grid cell)
        pred_boxes = predictions[:, :4]  # first 4 are bounding box coordinates
        pred_conf = predictions[:, 4]    # 5th is objectness confidence
        pred_class = predictions[:, 5:]  # remaining are class predictions

        true_boxes = targets[:, :4]  # Ground truth boxes
        true_conf = targets[:, 4]    # Objectness confidence
        true_class = targets[:, 5:]  # Ground truth class

        # Losses
        loss_loc = iou_loss(pred_boxes, true_boxes)  # IoU loss
        loss_conf = torch.nn.BCEWithLogitsLoss()(pred_conf, true_conf)  # Confidence loss
        loss_class = torch.nn.BCEWithLogitsLoss()(pred_class, true_class)  # Classification loss

        # Total loss (sum or weighted sum)
        loss = loss_loc + loss_conf + loss_class
        total_loss += loss.item()

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    avg_loss = total_loss / len(train_loader)
    print(f"Training loss: {avg_loss}")

# Evaluation (testing) function
def evaluate(model, test_loader, device):
    model.eval()
    total_iou = 0
    with torch.no_grad():
        for images, targets in test_loader:
            images = images.to(device)
            targets = targets.to(device)

            predictions = model(images)

            # Extract predicted boxes and target boxes
            pred_boxes = predictions[:, :4]
            true_boxes = targets[:, :4]

            # Calculate IoU for the batch
            iou = compute_iou(pred_boxes, true_boxes)
            total_iou += iou.mean().item()

    avg_iou = total_iou / len(test_loader)
    print(f"Average IoU on test set: {avg_iou}")

# Initialize model, optimizer, and device
model = SimpleYOLOv3(num_classes=1).to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Train the model
epochs = 10
for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")
    train(model, train_loader, optimizer, device)
    evaluate(model, test_loader, device)


Epoch 1/10


TypeError: Compose.__call__() takes 2 positional arguments but 3 were given

### Training

In [11]:
# Train YOLO model
#model = YOLO("yolov5su.pt")  # Load pretrained weights
#model.train(data="yolo.yaml", epochs=15, imgsz=640, pretrained=True, augment=True,)

# Submission File 

In [None]:
test_folder = "2024-fall-ml-3-hw-4-wheres-waldo/test/test"

# Predict on test images
test_images = [os.path.join(test_folder, img) for img in os.listdir(test_folder) if img.endswith(".jpg")]
results = model.predict(source=test_images, save=True, save_txt=True, project="yolo_test_predictions")

# Prepare to save the predictions
output_csv_path = os.path.join("yolo_test_predictions", "predictions.csv")
predictions = []

# Process results
for result in results:
    image_name = os.path.basename(result.path)  # Get the image name
    if result.boxes is not None and len(result.boxes) > 0:  # Check if there are predictions
        # Convert result.boxes to tensor for easier access
        boxes = result.boxes.xyxy.cpu().numpy()  # Convert bounding boxes to array
        confidences = result.boxes.conf.cpu().numpy()  # Convert confidence scores to array

        # Find the index of the box with the highest confidence
        best_idx = confidences.argmax()
        best_box = boxes[best_idx]
        conf = confidences[best_idx]

        # Extract bounding box coordinates
        x_min, y_min, x_max, y_max = best_box
        predictions.append([image_name, x_min, y_min, x_max, y_max, conf])
    else:
        # No predictions for this image
        predictions.append([image_name, None, None, None, None, None])

# Save predictions to CSV
df = pd.DataFrame(predictions, columns=["filename", "xmin", "ymin", "xmax", "ymax", "confidence"])
df.to_csv(output_csv_path, index=False)

print(f"Predictions saved to {output_csv_path}")



0: 640x640 (no detections), 210.5ms
1: 640x640 (no detections), 210.5ms
2: 640x640 (no detections), 210.5ms
3: 640x640 (no detections), 210.5ms
4: 640x640 (no detections), 210.5ms
5: 640x640 (no detections), 210.5ms
6: 640x640 (no detections), 210.5ms
7: 640x640 (no detections), 210.5ms
8: 640x640 (no detections), 210.5ms
Speed: 1.5ms preprocess, 210.5ms inference, 0.1ms postprocess per image at shape (1, 3, 640, 640)
Results saved to [1myolo_test_predictions/train2[0m
0 label saved to yolo_test_predictions/train2/labels
Predictions saved to yolo_test_predictions/predictions.csv
