<a href="https://colab.research.google.com/github/ellenguyen/CIS4496_EY/blob/main/MLModel_FasterRCNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import os
from sklearn.model_selection import train_test_split
import shutil

In [5]:
base_dataset_path = '/content/drive/My Drive/EYProject/CIS4496_EY/given/AugmentedData'

# train/test split subdirectories
train_path = os.path.join(base_dataset_path, 'train')
test_path = os.path.join(base_dataset_path, 'test')

train_images_path = os.path.join(train_path, 'images')
train_annotations_path = os.path.join(train_path, 'annotations')
test_images_path = os.path.join(test_path, 'images')
test_annotations_path = os.path.join(test_path, 'annotations')

for path in [train_images_path, train_annotations_path, test_images_path, test_annotations_path]:
    os.makedirs(path, exist_ok=True)

# Get list of all image and annotation files
image_files = [f for f in os.listdir(base_dataset_path) if f.endswith('.jpg')]
annotation_files = [f for f in os.listdir(base_dataset_path) if f.endswith('.txt')]

# Split into training and testing sets
train_images, test_images = train_test_split(image_files, test_size=0.2, random_state=42)

# Move files
def move_files(files, src, dest):
    for file in files:
        src_file_path = os.path.join(src, file)
        dest_file_path = os.path.join(dest, file)
        os.rename(src_file_path, dest_file_path)

# Move image and annotation files to their train and test directories
move_files(train_images, base_dataset_path, train_images_path)
move_files(test_images, base_dataset_path, test_images_path)

train_annotations = [f.replace('.jpg', '.txt') for f in train_images]
test_annotations = [f.replace('.jpg', '.txt') for f in test_images]

move_files(train_annotations, base_dataset_path, train_annotations_path)
move_files(test_annotations, base_dataset_path, test_annotations_path)

In [18]:
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision import transforms
import os
from PIL import Image
import numpy as np


In [7]:
class CustomDataset(Dataset):
    def __init__(self, root, transforms=None):
        self.root = root
        self.transforms = transforms
        self.imgs = sorted(os.listdir(os.path.join(root, "images")))
        self.labels = sorted(os.listdir(os.path.join(root, "annotations")))

    def __getitem__(self, idx):
        img_path = os.path.join(self.root, "images", self.imgs[idx])
        label_path = os.path.join(self.root, "annotations", self.labels[idx])
        img = Image.open(img_path).convert("RGB")
        with open(label_path) as f:
            boxes = []
            labels = []
            for line in f:
                annot = line.strip().split()
                label = int(annot[0])
                bbox = [float(n) for n in annot[1:]]
                # Convert to [xmin, ymin, xmax, ymax]
                boxes.append([bbox[0], bbox[1], bbox[0]+bbox[2], bbox[1]+bbox[3]])
                labels.append(label)

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        iscrowd = torch.zeros((len(boxes),), dtype=torch.int64)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["image_id"] = image_id
        target["area"] = area
        target["iscrowd"] = iscrowd

        if self.transforms is not None:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.imgs)


In [8]:
# Data transformations
data_transforms = transforms.Compose([
    transforms.ToTensor(),
])

dataset_train = CustomDataset('/content/drive/My Drive/EYProject/CIS4496_EY/given/AugmentedData/train', transforms=data_transforms)
dataset_test = CustomDataset('/content/drive/My Drive/EYProject/CIS4496_EY/given/AugmentedData/test', transforms=data_transforms)

data_loader_train = DataLoader(dataset_train, batch_size=4, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))
data_loader_test = DataLoader(dataset_test, batch_size=4, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))


In [14]:
def get_model_instance_segmentation(num_classes):
    # Load an instance segmentation model pre-trained on COCO
    model = fasterrcnn_resnet50_fpn(pretrained=True)

    # Get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features

    # Replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

num_classes = 4
model = get_model_instance_segmentation(num_classes)

In [15]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=0.0)
          (relu): ReLU(

In [None]:
from torch.optim import SGD
from torchvision.ops import batched_nms

# Use SGD optimizer
optimizer = SGD(model.parameters(), lr=0.005, momentum=0.9, weight_decay=0.0005)

# Training loop
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    i = 0
    for images, targets in data_loader_train:
        i += 1
        images = list(img.to(device) for img in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        if i % 50 == 0:
            print(f"Iteration #{i} loss: {losses}")

In [None]:
from torchvision.ops import box_iou

def evaluate(model, data_loader, device):
    model.eval()
    detection_threshold = 0.5
    results = []

    with torch.no_grad():
        for images, targets in data_loader:
            images = list(img.to(device) for img in images)
            outputs = model(images)

            for i, image in enumerate(images):
                target = targets[i]
                output = outputs[i]

                output_boxes = output['boxes'].cpu()
                target_boxes = target['boxes'].cpu()

                # Calculate IoU
                iou = box_iou(output_boxes, target_boxes)

                # Evaluate detections with IoU > detection_threshold
                correct_detections = iou > detection_threshold

                # Calculate Precision & Recall
                precision = torch.mean(correct_detections.float())

                results.append({
                    'image_id': targets[i]['image_id'].item(),
                    'iou': iou,
                    'precision': precision
                })

    # Calculate the mean of metrics across all test images.
    mean_iou = torch.mean(torch.tensor([r['iou'].mean() for r in results]))
    mean_precision = torch.mean(torch.tensor([r['precision'] for r in results]))

    print(f"Mean IoU: {mean_iou.item()}")
    print(f"Mean Precision: {mean_precision.item()}")

    return results

# Evaluate on the test dataset
results = evaluate(model, data_loader_test, device)
