# Importing Data

In [None]:
from google.colab import drive
import zipfile

drive.mount('/content/drive' , force_remount=True)

base_directory = '/content/drive/MyDrive/NNDL_HW3/'
zip_file_path = base_directory + 'archive.zip'
output_path = base_directory + 'Data/'


Mounted at /content/drive


# DataSet

In [None]:
import torch

def yolo_to_pascal_voc_format(yolo_annotations, image_width, image_height):
    pascal_voc_annotations = []

    for annotation in yolo_annotations:
        center_x, center_y, width, height = annotation

        # Convert YOLO normalized format to absolute pixel values
        center_x *= image_width
        center_y *= image_height
        width *= image_width
        height *= image_height

        # Convert YOLO format to Pascal VOC format
        x_min = center_x - (width / 2)
        y_min = center_y - (height / 2)
        x_max = center_x + (width / 2)
        y_max = center_y + (height / 2)

        # Append the converted bounding box to the Pascal VOC annotations list
        pascal_voc_annotations.append([x_min, y_min, x_max, y_max])

    pascal_voc_tensor = torch.tensor(pascal_voc_annotations, dtype=torch.float32)
    return pascal_voc_tensor


In [None]:
import torch
from PIL import Image
import os
import torchvision
from torchvision.models.detection import FasterRCNN
import torchvision.models as models
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.backbone_utils import resnet_fpn_backbone
from torch.utils.data import DataLoader, RandomSampler
from torchvision.models.detection import backbone_utils
import random
import matplotlib.pyplot as plt


class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, transforms):
        self.root_dir = root_dir
        self.transforms = transforms
        self.images = []
        self.labels = []

        images_dir = os.path.join(root_dir, 'images/')
        labels_dir = os.path.join(root_dir, 'labels/')

        print(f'images_dir : {images_dir}')
        print(f'labels_dir : {labels_dir}')

        extension = "txt"

        for image_name in os.listdir(images_dir):
            image_path = images_dir + image_name
            label_name = image_path[:-3] + extension

            label_name = label_name.replace('/images/' , '/labels/')
            label_path = label_name

            if os.path.isfile(image_path) and os.path.isfile(label_path):
                self.images.append(image_path)
                self.labels.append(label_path)

        if transforms is None:
            self.transforms = transforms.Compose([
                transforms.Resize((256, 256)),  # Example resize, adjust as needed
                transforms.ToTensor()           # Convert images to PyTorch tensors
            ])
        else:
            self.transforms = transforms

    def eda_analysis(self , labels_dir):
        class_counts = {}
        object_counts = {}
        txt_files = [file for file in os.listdir(labels_dir) if file.endswith('.txt')]

        first_chars = []

        line_count = 0
        for file in txt_files:
            # print(f'file : {file}')

            with open(os.path.join(labels_dir, file), 'r') as f:
                for line in f:
                  line_count = line_count + 1

                  if line[0] in class_counts:
                    class_counts[line[0]] = class_counts[line[0]] + 1

                  else :
                    class_counts[line[0]] = 1

                  object_counts[file] = line_count

        # Extract values from the dictionary
        keys = list(class_counts.keys())
        values = list(class_counts.values())
        print(f'keys : {keys}')
        print(f'values : {values}')

        plt.bar(keys, values)
        plt.xlabel('Classes')
        plt.ylabel('Counts')
        plt.title('Bar Plot of Class Counts')
        plt.show()

        keys = list(object_counts.keys())
        values = list(object_counts.values())
        # Plot histogram
        plt.hist(values, bins=range(min(values), max(values) + 1), edgecolor='black')
        plt.xlabel('Files')
        plt.ylabel('Frequency')
        plt.title('Histogram of Files & Frequency')
        plt.show()


    def __getitem__(self, idx):
        image_path = self.images[idx]
        label_path = self.labels[idx]

        # Assuming image processing remains the same
        image = Image.open(image_path).convert('RGB')
        img_width, img_height = image.size

        image = self.transforms(image)


        # Processing labels to create the expected dictionary format
        boxes = []
        classes = []
        with open(label_path, 'r') as f:
            for line in f:
                parts = line.strip().split()
                class_id = int(parts[0])  # Class ID
                box = list(map(float, parts[1:]))  # Bounding box coordinates
                boxes.append(box)
                classes.append(class_id)

        # Convert to tensors
        boxes = torch.tensor(boxes, dtype=torch.float32)

        boxes = yolo_to_pascal_voc_format(boxes, img_width, img_height)
        # Convert YOLO to Pascal voc

        classes = torch.tensor(classes, dtype=torch.long)

        # Construct the target dictionary expected by models
        target = {'boxes': boxes, 'labels': classes}

        return image, target



    def __len__(self):
      return len(self.images)


output_path = '/content/drive/MyDrive/NNDL_HW3/Data/aquarium_pretrain/train/'
# dataset = CustomDataset(output_path)



# Import Library

In [None]:
import torch
import torchvision.models as models
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models.detection.backbone_utils import resnet_fpn_backbone
from torchvision.ops import MultiScaleRoIAlign
from torchvision.models.detection.transform import GeneralizedRCNNTransform
from torchvision.ops import boxes as box_ops


# Building Model

In [None]:
# !pip install timm

In [None]:
import torch.nn as nn
import timm
from torchvision.models.detection.backbone_utils import BackboneWithFPN

def create_res2net_fpn_backbone():
    # Load the pre-trained res2net model from timm
    base_model = timm.create_model('res2net101_26w_4s', pretrained=True, features_only=True, out_indices=(1, 2, 3, 4))

    # Check the number of output channels from each feature layer
    out_channels = base_model.feature_info.channels()
    # for name, module in base_model.named_children():
    #     print(name)


    return_layers = {
        'layer1': 's2',  # Actual layer name for stage 2
        'layer2': 's3',  # Actual layer name for stage 3
        'layer3': 's4',  # Actual layer name for stage 4
        'layer4': 's5'   # Actual layer name for stage 5
    }

    # Create an FPN on top of the base model
    backbone = BackboneWithFPN(base_model, return_layers=return_layers,
                               in_channels_list=out_channels,
                               out_channels=256)
    return backbone

def build_model(num_classes):
    backbone = create_res2net_fpn_backbone()
    anchor_generator = AnchorGenerator(sizes=((32, 64, 128, 256, 512),),
                                       aspect_ratios=((0.5, 1.0, 2.0),))
    roi_pooler = MultiScaleRoIAlign(featmap_names=['s2', 's3', 's4', 's5'],
                                    output_size=7,
                                    sampling_ratio=2)
    model = FasterRCNN(backbone,
                       num_classes=num_classes,
                       rpn_anchor_generator=anchor_generator,
                       box_roi_pool=roi_pooler)
    return model

In [None]:
def ohem_loss(class_logits, box_regression, labels, regression_targets):
    pass

In [None]:
def train_one_epoch(model, optimizer, data_loader, device):
    model.train()
    for images, targets in data_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

def validate_model(model, data_loader, device):
    model.eval()
    with torch.no_grad():
        for images, targets in data_loader:
            images = list(image.to(device) for image in images)
            outputs = model(images)


# Collate_fn

In [None]:
def collate_fn(batch):
    images, targets = zip(*batch)
    images = torch.stack(images, dim=0)

    max_boxes = max(target['boxes'].shape[0] for target in targets)

    padded_targets = []
    for target in targets:
        boxes = target['boxes']
        # boxes = correct_boxes(boxes)

        num_boxes = boxes.shape[0]
        if num_boxes < max_boxes:
            padding_size = (max_boxes - num_boxes, boxes.shape[1])
            padded_boxes = torch.cat([boxes, torch.zeros(padding_size, dtype=boxes.dtype)], dim=0)
        else:
            padded_boxes = boxes


        padded_target = {
            'boxes': padded_boxes,
            'labels': target['labels']
        }
        padded_targets.append(padded_target)


    return images, padded_targets



# Training Model

In [None]:
import torchvision.transforms as T

def main():
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    model = build_model(num_classes = 7)  # Update num_classes based on your dataset
    model.to(device)

    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.Adam(params, lr=0.0001)

    transform = T.Compose([
      T.Resize((1024, 1024)),
      T.ToTensor(),
      T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    dataset = CustomDataset(output_path, transforms=transform)

    data_loader = DataLoader(dataset, batch_size=4, shuffle=True , collate_fn = collate_fn)

    # for images, labels in dataloader:
    #     print(images.shape, labels.shape)

    val_loader = DataLoader(dataset , batch_size = 4 , shuffle = True , collate_fn = collate_fn)

    num_epochs = 20

    for epoch in range(num_epochs):
        # train_one_epoch(model, optimizer, data_loader , device)
        validate_model(model, val_loader, device)

if __name__ == "__main__":
    main()


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


model.safetensors:   0%|          | 0.00/181M [00:00<?, ?B/s]

images_dir : /content/drive/MyDrive/NNDL_HW3/Data/aquarium_pretrain/train/images/
labels_dir : /content/drive/MyDrive/NNDL_HW3/Data/aquarium_pretrain/train/labels/


AssertionError: Anchors should be Tuple[Tuple[int]] because each feature map could potentially have different sizes and aspect ratios. There needs to be a match between the number of feature maps passed and the number of sizes / aspect ratios specified.