<a href="https://colab.research.google.com/github/TrishKedi/MLDL-case-study/blob/main/DataPreprocessor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Step 1: Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# Step 2: Set the path to your ZIP file in Drive
zip_path = '/content/drive/My Drive/ML_CASE_STUDY/dataset.zip'  # ← change this to match your zip file path

In [3]:
# Step 3: Unzip the file into a working directory
import zipfile
import os

extract_dir = '/content'
os.makedirs(extract_dir, exist_ok=True)

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)

print(" Dataset extracted to:", extract_dir)

 Dataset extracted to: /content


In [4]:
import os
import torch
from PIL import Image
from pycocotools.coco import COCO
from torch.utils.data import Dataset, DataLoader
import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np


"""
Handle Data preprocessing on the fly
"""
class DataPreProcessor(Dataset):
    def __init__(self, image_dir, annotation_file, transforms=None, category_ids=None):
        self.image_dir = image_dir
        self.coco = COCO(annotation_file)
        self.ids = list(sorted(self.coco.imgs.keys()))
        self.transforms = transforms
        self.category_ids = category_ids

    def __getitem__(self, index):
        img_id = self.ids[index]
        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        anns = self.coco.loadAnns(ann_ids)

        # Filter out invalid boxes and categories not in remap (if specified)
        if self.category_ids is not None:
            anns = [ann for ann in anns if ann['category_id'] in self.category_ids and ann['iscrowd'] == 0]

        img_info = self.coco.loadImgs(img_id)[0]
        img_path = os.path.join(self.image_dir, img_info['file_name'])
        image = Image.open(img_path).convert("RGB")
        image = np.array(image)

        boxes = []
        labels = []
        for ann in anns:
            x, y, w, h = ann['bbox']
            if w > 1 and h > 1:
                boxes.append([x, y, x + w, y + h])  # Convert to Pascal VOC format
                labels.append(ann['category_id'])

        if self.transforms:
            transformed = self.transforms(image=image, bboxes=boxes, class_labels=labels)
            image = transformed['image']
            boxes = transformed['bboxes']
            labels = transformed['class_labels']

        # Prepare target dictionary
        target = {}
        target['boxes'] = torch.tensor(boxes, dtype=torch.float32)
        target['labels'] = torch.tensor(labels, dtype=torch.int64)
        target['image_id'] = torch.tensor([img_id])

        return image, target

    def __len__(self):
        return len(self.ids)


"""
Apply the following transformations during training

- Resize to max size of 800 while maintaining aspect ratio
- Pad to 800x800 if needed
- Horizontal flip with 50% probability
- Random brightness and contrast
- Optional blur
- Normalize with ImageNet mean and std
- Convert to PyTorch tensors
"""
def get_train_transforms():
    return A.Compose([
        A.LongestMaxSize(max_size=800),
        A.PadIfNeeded(min_height=800, min_width=800, border_mode=0),
        A.HorizontalFlip(p=0.5),
        A.RandomBrightnessContrast(p=0.2),
        A.Blur(p=0.1),
        A.Normalize(mean=(0.485, 0.456, 0.406),
                    std=(0.229, 0.224, 0.225)),
        ToTensorV2()
    ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['class_labels']))


# Custom collate_fn for DataLoader
def collate_fn(batch):
    images, targets = list(zip(*batch))
    return list(images), list(targets)


image_dir = "/content/dataset/train/images"
annotation_file = "/content/dataset/train/coco_annotations.json"

dataset = DataPreProcessor(
    image_dir=image_dir,
    annotation_file=annotation_file,
    transforms=get_train_transforms(),
    category_ids=[1, 2, 3, 4, 5, 6, 7, 8, 9]
)



dataloader = DataLoader(
    dataset,
    batch_size=10,
    shuffle=True,
    num_workers=4,
    collate_fn=collate_fn
)



# Test a batch
for imgs, targets in dataloader:
    print("Batch of images:", len(imgs))
    print("First target sample:", targets[0])
    break


  check_for_updates()


loading annotations into memory...
Done (t=3.52s)
creating index...
index created!




Batch of images: 10
First target sample: {'boxes': tensor([[310.6250, 410.0000, 318.7500, 426.2500],
        [370.6250, 434.3750, 375.0000, 443.1250],
        [282.5000, 419.3750, 302.5000, 425.6250],
        [568.7500, 396.2500, 579.3750, 405.6250],
        [564.3750, 410.6250, 578.7500, 422.5000],
        [308.7500, 452.5000, 335.0000, 473.7500],
        [265.0000, 441.8750, 311.2500, 483.7500],
        [383.7500, 384.3750, 566.8750, 496.2500],
        [110.0000, 455.6250, 198.7500, 504.3750],
        [179.3750, 448.1250, 293.7500, 521.2500],
        [321.2500, 445.6250, 408.1250, 515.0000],
        [335.0000, 443.7500, 363.7500, 450.0000],
        [633.7500, 428.7500, 799.3750, 525.6250],
        [605.6250, 436.8750, 619.3750, 480.6250],
        [568.7500, 440.0000, 577.5000, 470.0000]]), 'labels': tensor([1, 1, 2, 2, 2, 4, 4, 8, 7, 4, 4, 4, 4, 3, 3]), 'image_id': tensor([18502])}
