In [None]:
!pip install kagglehub

In [None]:
import kagglehub

data_dir = kagglehub.dataset_download("andrewmvd/dog-cat-detection")
print("Path to dataset file: ", data_dir)

In [None]:
import os
import torch
import random
import numpy as np
import pandas as pd
import seaborn as sns
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
import xml.etree.ElementTree as ET 

from tqdm import tqdm
from PIL import Image
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
from torchvision.models.resnet import ResNet50_Weights

In [None]:
class MyDataset(Dataset):
    def __init__(self, annotations_dir, image_dir, transform=None):
        self.annotations_dir = annotations_dir
        self.image_dir = image_dir
        self.transform = transform
        self.image_files = self.filter_images_with_multiple_objects()
        
    def filter_images_with_multiple_objects(self):
        valid_image_files = []
        for f in os.listdir(self.image_dir):
            if os.path.isfile(os.path.join(self.image_dir, f)):
                img_name = f
                annotation_name = os.path.splitext(img_name)[0] +".xml"
                annotation_path = os.path.join(self.annotations_dir, annotation_name)
                
                # Keep iomages that have single object
                if self.count_objects_in_annotation(annotation_path) == 1:
                    valid_image_files.append(img_name)
        return valid_image_files
    
    def count_objects_in_annotation(self, annotation_path):
        try:
            tree = ET.parse(annotation_path)
            root = tree.getroot()
            count = 0
            for obj in root.findall("object"):
                count += 1
        except FileNotFoundError:
            return 0
        
    def __len__(self):
        return len(self.image_files)
    
    def __getitem__(self, idx):
        img1_name = self.image_files[idx]
        img1_path = os.path.join(self.image_dir, img1_name)
        
        annotation_name = os.path.splitext(img1_name)
        img1_annotations = self.parse_annotation(
            os.path.join(self.annotations_dir, annotation_name)
        )
        idx2 = random.randint(0, len(self.image_files) - 1)
        img2_file = self.image_files[idx2]
        img2_path = os.path.join(self.image_dir, img2_file)
        
        annotation_name = os.path.splitext(img2_file)[0] + ".xml"
        img2_annotations = self.parse_annotation(
            os.path.join(self.annotations_dir, annotation_name)
        )
        
        img1 = Image.open(img1_path).convert("RGB")
        img2 = Image.open(img1_path).convert("RGB")

        merged_image = Image.new("RGB", (img1.width + img2.width, max(img1.height, img2.height)))
        
        merged_image.paste(img1, (0, 0))
        merged_image.paste(img2, (img1.width, 0))
        merged_w = img1.width + img2.width
        merged_h = max(img1.height, img2.height)
        
        merged_annotations = []
        
        merged_annotations.append(
            {
                "bbox": img1_annotations[1].tolist(),
                "label": img1_annotations[0]
            }
        )
        
        new_bbox = [
            (img2_annotations[1][0]*img2.width + img1.width) / merged_w,
            img2_annotations[1][1]*img2.height/merged_h,
            (img2_annotations[1][2]*img2.width + img1.width) / merged_w,
            img2_annotations[1][3]*img2.height/merged_h,
        ]
        
        merged_annotations.append(
            {
                "bbox": new_bbox,
                "label": img2_annotations
            }
        )
        
        if self.transform:
            merged_image = self.transform(merged_image)
        else:
            merged_image = transforms.ToTensor()(merged_image)
            
        annotations = torch.zeros(len(merged_annotations), 5)
        
        for i, ann in enumerate(merged_annotations):
            annotations[i] = torch.cat((torch.tensor(ann["bbox"]), torch.tensor([ann["label"]])))
            
        return merged_image
    
    def parse_annotation(self, annotation_path):
        tree = ET.parse(annotation_path)
        root = tree.getroot()
        
        image_width = int(root.find('size/width').text)
        image_height = int(root.find('size/height').text)
        
        label = None
        bbox = None
        for obj in root.findall("object"):
            name = obj.find("name").text
            
            if label is None:
                label = name
                xmin = int(obj.find('bndbox/xmin').text)
                ymin = int(obj.find('bndbox/ymin').text)
                xmax = int(obj.find('bndbox/xmax').text)
                ymax = int(obj.find('bndbox/ymax').text)
                
                bbox = [
                    xmin / image_width,
                    ymin / image_height,
                    xmax / image_width,
                    ymax / image_height,
                ]
        
        label_num = 0 if label == 'cat' else 1 if label =='dog' else -1
        
        return label_num, torch.tensor(bbox, dtype=torch.float32)

In [None]:
annotations_dir = os.path.join(data_dir, 'annotations')
image_dir = os.path.join(data_dir, 'images')

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

dataset = MyDataset(annotations_dir, image_dir, transform=transform)
train_dataset, val_dataset = train_test_split(dataset, test_size=0.2, random_state=42)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)

In [None]:
class SimpleYOLO(nn.Module):
    def __init__(self, num_classes):
        super(SimpleYOLO, self).__init__()
        self.backbone = models.resnet50(weights=ResNet50_Weights.DEFAULT)
        self.num_classes = num_classes
        
        self.backbone = nn.Sequential(*list(self.backbone.children())[:-2])
        
        self.fcs = nn.Linear(2048, 2*2*(4 + self.num_classes))
        
    def forward(self, x):
        features = self.backbone(x)
        features = F.adaptive_avg_pool2d(features, (1, 1))
        features = features.view(features.size(0), -1)
        features = self.fcs(features)
        
        return features

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
num_classes = 2
class_to_idx = {
    "dog": 0,
    "cat": 1
}

model = SimpleYOLO(num_classes=num_classes).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
def calculate_loss(output, targets, device, num_classes):
    mse_loss = nn.MSELoss()
    ce_loss = nn.CrossEntropyLoss()
    batch_size = output.shape[0]
    total_los = 0
    
    output = output.view(batch_size, 2, 2, 4 + num_classes)
    
    for i in range(batch_size):
        for j in range(len(targets[i])):
            bbox_center_x = (targets[i][j][0] + targets[i][j][2]) / 2
            bbox_center_y = (targets[i][j][1] + targets[i][j][3]) / 2
            
            grid_x = int(bbox_center_x*2)
            grid_y = int(bbox_center_y*2)
            
            # 1. Classification loss
            label_one_hot = torch.zeros(num_classes, device=device)
            label_one_hot[int(targets[i][j][4])] = 1
            
            classification_loss = ce_loss(output[i, grid_y, grid_x, 4:], label_one_hot)
            
            # 2. Regression Loss for the responsible grid cell
            bbox_target = targets[i][j][:4].to(device)
            regression_loss = mse_loss(output[i, grid_y, grid_x, :4], bbox_target)
            
            # 3. No object Loss
            no_obj_loss = 0
            for other_grid_y in range(2):
                for other_grid_x in range(2):
                    if other_grid_y != grid_y or other_grid_x != grid_x:
                        no_obj_loss += mse_loss(output[i, other_grid_y, other_grid_x, :4], torch.zeros(4, device=device))
                        
            total_loss += classification_loss + regression_loss + no_obj_loss
            
    return total_los / batch_size

In [None]:
def evaluate_model(model, data_loader, device, num_classes):
    model.eval()
    running_loss = 0.0
    all_predictions = []
    all_targets = []
    
    with torch.no_grad():
        for images, targets in tqdm(data_loader, desc="Validaton", leave=False):
            images = images.to(device)
            
            output = model(images)
            total_loss = calculate_loss(output, targets, device, num_classes)
            running_loss += total_loss.item()
            
            output = output.view(images.shape[0], 2, 2, 4 + num_classes)
            
            for batch_idx in range(images.shape[0]):
                for target in targets[batch_idx]:
                    bbox_center_x = (target[0] + target[2]) / 2
                    bbox_center_y = (target[1] + target[3]) / 2
                    grid_x = int(bbox_center_x*2)
                    grid_y = int(bbox_center_y*2)

                    prediction = output[batch_idx, grid_y, grid_x, :4].argmax().item()
                    all_predictions.append(prediction)
                    all_targets.append(target[4].item())
        
    val_loss = running_loss / len(data_loader)
    
    all_predictions = torch.tensor(all_predictions, device=device)
    all_targets = torch.tensor(all_targets, device=device)
    
    val_accuracy = (all_predictions == all_targets).float().mean()
    
    return val_loss, val_accuracy.item()

In [None]:
def train_model(model, train_loader, val_loader, optimizer, num_epochs, device, num_classes):
    best_val_accuracy = 0.0
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []
    
    for epoch in tqdm(range(num_epochs), desc="Epochs"):
        model.train()
        running_loss = 0.0
        
        for images, targets in tqdm(train_loader, desc="Batches", leave=False):
            images = images.to(device)
            optimizer.zero_grad()
            output = model(images)
            
            total_loss = calculate_loss(output, targets, device, num_classes)
            
            total_loss.backward()
            optimizer.step()
            running_loss += total_loss.item()
            
        epoch_loss = running_loss / len(train_loader)
        train_losses.append(epoch_loss)
        
        val_loss, val_accuracy = evaluate_model(model, val_loader, device, num_classes)
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)
        
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_loss: .4f}")
        print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")
        
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            torch.save(model.state_dict(), "best_model.pth")
            
    return train_losses, val_losses, train_accuracies, val_accuracies

In [None]:
def inference(model, image_path, transform, device, class_to_idx, threshold=0.5):
    model.eval()