# DataLoader

In [None]:
import os
import json
import torch
from torch.utils.data import Dataset
from torchvision import transforms
from PIL import Image
import numpy as np
import selectivesearch

class CustomDataset(Dataset):
    def __init__(self, txt_file, img_dir, coco_json_file, aug=False):
        def generate_id(file_name):
            return file_name.replace('_', '').replace('.jpg', '').replace('img', '')

        with open(txt_file, 'r') as f:
            self.image_paths = [line.strip() for line in f.readlines()]

        self.img_dir = img_dir

        with open(coco_json_file, 'r') as f:
            coco_data = json.load(f)

        self.image_annotations = {}
        self.image_bboxes = {}

        for annotation in coco_data['annotations']:
            image_id = annotation['image_id']
            category_id = annotation['category_id']
            bbox_str = annotation['bbox']
            bbox = list(map(float, bbox_str.strip('[]').split(', ')))

            if image_id not in self.image_annotations:
                self.image_annotations[image_id] = []
                self.image_bboxes[image_id] = []

            self.image_annotations[image_id].append(category_id)
            self.image_bboxes[image_id].append(bbox)

        self.image_info = {
            int(generate_id(image['file_name'])): image['file_name']
            for image in coco_data['images']
        }

        self.base_transform = transforms.Compose([
            transforms.Resize((320, 320)),
            transforms.ToTensor(),
        ])

        self.aug_transform = transforms.Compose([
            transforms.Resize((320, 320)),
            transforms.ToTensor(),
        ])

        self.aug = aug

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, index):
        img_name = os.path.basename(self.image_paths[index])
        img_id = int(img_name.replace('_', '').replace('.jpg', '').replace('img', ''))
        
        if img_id not in self.image_info:
            raise ValueError(f"Image {img_name} not found in the COCO file.")
    
        img_path = os.path.join(self.img_dir, img_name)
        if not os.path.exists(img_path):
            raise ValueError(f"Image not found at path: {img_path}")
        
        image = Image.open(img_path).convert('RGB')
        original_width, original_height = image.size
        
        if self.aug:
            image_tensor = self.aug_transform(image)
        else:
            image_tensor = self.base_transform(image)
        
        # Scale the bounding boxes
        categories = self.image_annotations.get(img_id, [])
        bboxes = self.image_bboxes.get(img_id, [])
        categories = [c for c in categories if isinstance(c, int)]
        if not categories:
            categories = [-1]  # Special label for images without annotations
        
        scale_x = 320 / original_width
        scale_y = 320 / original_height
        scaled_bboxes = [
            torch.tensor([
                bbox[0] * scale_x,  # x_min
                bbox[1] * scale_y,  # y_min
                bbox[2] * scale_x,  # x_max
                bbox[3] * scale_y   # y_max
            ], dtype=torch.float32)
            for bbox in bboxes
        ] if bboxes else [torch.zeros(4, dtype=torch.float32)]
        
        labels = torch.tensor(categories, dtype=torch.int64)

        # Generate and process region proposals
        proposals = self._generate_region_proposals(image)
        proposals = self._filter_proposals(proposals, original_width, original_height)
        processed_proposals = self._process_proposals(image_tensor, proposals)

        return {
            "image": image_tensor,               # Scaled input image
            "labels": labels,                    # Object class labels
            "bboxes": scaled_bboxes,             # Scaled bounding boxes
            "regions": processed_proposals       # Processed region proposals
        }

    def _generate_region_proposals(self, image):
        img_np = np.array(image)
        
        if len(img_np.shape) == 3 and img_np.shape[0] == 3:
            img_np = np.transpose(img_np, (1, 2, 0))  # Convert [C, H, W] to [H, W, C]
        elif len(img_np.shape) == 2:
            img_np = np.stack([img_np] * 3, axis=-1)  # Grayscale to RGB
        elif img_np.shape[2] < 3:
            img_np = np.repeat(img_np, 3, axis=2)  # Single-channel to 3-channel
        elif img_np.shape[2] != 3:
            raise ValueError(f"Invalid image shape: {img_np.shape}")
        
        _, regions = selectivesearch.selective_search(img_np, scale=500, sigma=0.9, min_size=10)
        proposals = []
        for region in regions:
            x, y, w, h = region['rect']
            if w > 0 and h > 0 and w >= 10 and h >= 10:
                x_max, y_max = min(x + w, img_np.shape[1]), min(y + h, img_np.shape[0])
                proposals.append([x, y, x_max, y_max])
        
        return proposals

    def _filter_proposals(self, proposals, img_width, img_height, min_area=100, max_area_ratio=0.8):
        unique_proposals = set(tuple(p) for p in proposals)
        filtered = []
        for x_min, y_min, x_max, y_max in unique_proposals:
            width = x_max - x_min
            height = y_max - y_min
            area = width * height
            if area >= min_area and area <= max_area_ratio * (img_width * img_height):
                filtered.append((x_min, y_min, x_max, y_max))
        return filtered

    def _process_proposals(self, image_tensor, proposals, output_size=(227, 227)):
        processed_proposals = []
        for proposal in proposals:
            try:
                _, H, W = image_tensor.shape
                x_min, y_min, x_max, y_max = map(int, proposal)
                x_min, y_min = max(0, x_min), max(0, y_min)
                x_max, y_max = min(W, x_max), min(H, y_max)
    
                if x_min < x_max and y_min < y_max:
                    cropped_region = image_tensor[:, y_min:y_max, x_min:x_max]
                    if cropped_region.numel() == 0 or cropped_region.ndim != 3:
                        continue
                    
                    resized_region = torch.nn.functional.interpolate(
                        cropped_region.unsqueeze(0), size=output_size, mode='bilinear', align_corners=False
                    ).squeeze(0)
                    processed_proposals.append(resized_region)
            except Exception as e:
                print(f"Error processing proposal: {proposal}. Error: {e}")
    
        return processed_proposals

## Check DataLoader

# SPPNet (ZF-5)

In [None]:
# Define the SPP Layer
class SpatialPyramidPooling(nn.Module):
    def __init__(self, output_sizes):
        """
        :param output_sizes: List of output sizes for pyramid levels (e.g., [1, 2, 4]).
        """
        super(SpatialPyramidPooling, self).__init__()
        self.output_sizes = output_sizes

    def forward(self, x):
        batch_size, channels, height, width = x.size()
        pooled_outputs = []
        for output_size in self.output_sizes:
            kernel_size = (height // output_size, width // output_size)
            stride = kernel_size
            padding = (height % output_size // 2, width % output_size // 2)
            pooled = F.adaptive_max_pool2d(x, output_size)
            pooled_outputs.append(pooled.view(batch_size, -1))  # Flatten each level
        return torch.cat(pooled_outputs, dim=1)  # Concatenate pyramid levels

# Define the ZFNet Backbone with SPP Layer
class SPPNetZF5(nn.Module):
    def __init__(self, num_classes=1000, spp_output_sizes=[1, 2, 4]):
        super(SPPNetZF5, self).__init__()
        # ZFNet convolutional layers
        self.conv1 = nn.Conv2d(3, 96, kernel_size=7, stride=2, padding=3)
        self.conv2 = nn.Conv2d(96, 256, kernel_size=5, stride=2, padding=2)
        self.conv3 = nn.Conv2d(256, 384, kernel_size=3, stride=1, padding=1)
        self.conv4 = nn.Conv2d(384, 384, kernel_size=3, stride=1, padding=1)
        self.conv5 = nn.Conv2d(384, 256, kernel_size=3, stride=1, padding=1)

        self.spp = SpatialPyramidPooling(output_sizes=spp_output_sizes)

        # Fully connected layers
        self.fc1 = nn.Linear(self._calculate_fc_input_size(spp_output_sizes), 4096)
        self.fc2 = nn.Linear(4096, 4096)
        self.fc3 = nn.Linear(4096, num_classes)

    def _calculate_fc_input_size(self, spp_output_sizes):
        """
        Calculate the total size of the output vector from the SPP layer.
        """
        return sum([size * size for size in spp_output_sizes]) * 256

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, kernel_size=3, stride=2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, kernel_size=3, stride=2)
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = F.relu(self.conv5(x))
        x = self.spp(x)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, p=0.5, training=self.training)
        x = F.relu(self.fc2(x))
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.fc3(x)
        return x


In [None]:
class Trainer:
    def __init__(self, model, device, optimizer, criterion, train_loader, val_loader):
        """
        :param model: The SPPNet model.
        :param device: The device (CPU or GPU).
        :param optimizer: Optimizer for training.
        :param criterion: Loss function.
        :param train_loader: DataLoader for training.
        :param val_loader: DataLoader for validation.
        """
        self.model = model.to(device)
        self.device = device
        self.optimizer = optimizer
        self.criterion = criterion
        self.train_loader = train_loader
        self.val_loader = val_loader

    def train_one_epoch(self, epoch):
        self.model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for batch_idx, (inputs, targets) in enumerate(self.train_loader):
            inputs, targets = inputs.to(self.device), targets.to(self.device)

            # Forward
            outputs = self.model(inputs)
            loss = self.criterion(outputs, targets)

            # Backward
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

            # Metrics
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

            if batch_idx % 10 == 0:
                print(
                    f'Epoch {epoch}, Batch {batch_idx}/{len(self.train_loader)}, '
                    f'Loss: {loss.item():.4f}, Accuracy: {100. * correct / total:.2f}%'
                )

        train_loss = running_loss / len(self.train_loader)
        train_accuracy = 100. * correct / total
        return train_loss, train_accuracy

    def validate(self):
        self.model.eval()
        running_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for inputs, targets in self.val_loader:
                inputs, targets = inputs.to(self.device), targets.to(self.device)

                # Forward
                outputs = self.model(inputs)
                loss = self.criterion(outputs, targets)

                # Metrics
                running_loss += loss.item()
                _, predicted = outputs.max(1)
                total += targets.size(0)
                correct += predicted.eq(targets).sum().item()

        val_loss = running_loss / len(self.val_loader)
        val_accuracy = 100. * correct / total
        return val_loss, val_accuracy

    def fit(self, epochs):
        for epoch in range(epochs):
            train_loss, train_accuracy = self.train_one_epoch(epoch)
            val_loss, val_accuracy = self.validate()

            print(
                f'Epoch {epoch}: Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, '
                f'Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%'
            )
