<a href="https://colab.research.google.com/github/abhigoel25/AV_TrafficDetectionModel/blob/main/AV_TrafficSignDetection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import pandas as pd
import numpy as np
import torchvision

from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import datasets, transforms, models
from torchvision.transforms import ToTensor, transforms
import torchvision.transforms as T
from torchvision.transforms import functional as F

from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

from google.colab import drive
import os
from PIL import Image
drive.mount('/content/drive')
from torch.nn.utils.rnn import pad_sequence

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

In [None]:
trainingImage_folder_path = '/content/drive/MyDrive/Stanford_Dataset/Images/Train_Images'
testingImage_folder_path = '/content/drive/MyDrive/Stanford_Dataset/Images/Test_Images'
testingLabel_folder_path = '/content/drive/MyDrive/Stanford_Dataset/Labels/Test_Labels'
trainingLabel_folder_path = '/content/drive/MyDrive/Stanford_Dataset/Labels/Train_Labels'

In [None]:
class YoloDataset(Dataset):
  def __init__(self, images_dir, annotations_dir, transform=None):
    self.images_dir = images_dir
    self.annotations_dir = annotations_dir
    self.transform = transform
    self.image_files = [f for f in os.listdir(images_dir) if f.endswith('.jpg') or f.endswith('.png')]

  def __len__(self):
    return len(self.image_files)

  def __getitem__(self, idx):
      image_file = self.image_files[idx]
      image_path = os.path.join(self.images_dir, image_file)
      if image_file.endswith('.jpg'):
         annotation_path = os.path.join(self.annotations_dir, image_file.replace('.jpg', '.txt'))
      elif image_file.endswith('.png'):
         annotation_path = os.path.join(self.annotations_dir, image_file.replace('.png', '.txt'))

      image = Image.open(image_path).convert("RGB")
      boxes = []
      labels = []

      if os.path.exists(annotation_path):
        with open(annotation_path, 'r') as file:
          for line in file:
            parts = line.strip().split()
            label = int(parts[0]) + 1
            bbox = list(map(float, parts[1:]))
            bbox = yolo_to_rcnn(bbox, image.width, image.height)
            boxes.append(bbox)
            labels.append(label)
      else:
        print(f"Annotation file missing for image: {image_file}")
        return None, None

      boxes = torch.tensor(boxes, dtype=torch.float32)
      labels = torch.tensor(labels, dtype=torch.int64)

      target = {}
      target['boxes'] = boxes
      target['labels'] = labels

      if self.transform:
        image = self.transform(image)

      if boxes.numel() == 0:  # Skip if no boxes
        return None, None

      return image, target

def collate_fn(batch):
    batch = list(filter(lambda x: x[0] is not None and x[1] is not None, batch))  # Filter out None values
    if len(batch) == 0:
        return None, None
    images, targets = zip(*batch)
    max_width = 640
    max_height = 360

    # Resize images
    resized_images = []
    adjusted_targets = []
    for image, target in zip(images, targets):
        # Resize image
        resized_image = resize_image(image, (max_height, max_width))

        # Adjust bounding boxes
        original_width, original_height = image.shape[2], image.shape[1]
        width_scale = max_width / original_width
        height_scale = max_height / original_height

        adjusted_boxes = []
        for bbox in target['boxes']:
            x_min, y_min, x_max, y_max = bbox.tolist()
            adjusted_bbox = [
                x_min * width_scale,
                y_min * height_scale,
                x_max * width_scale,
                y_max * height_scale
            ]
            adjusted_boxes.append(adjusted_bbox)

        adjusted_target = {
            'boxes': torch.tensor(adjusted_boxes, dtype=torch.float32),
            'labels': target['labels']
        }

        resized_images.append(resized_image)
        adjusted_targets.append(adjusted_target)

    images = torch.stack(resized_images, dim=0)
    return images, adjusted_targets

def resize_image(image, target_size):
    return F.resize(image, target_size)

def yolo_to_rcnn(bbox, image_width, image_height):
    #print("Image Width {:.2f}".format(image_width))
    #print("Image Height {:.2f}".format(image_height))

    x_center, y_center, width, height = bbox
    x_min = (x_center - width / 2) * image_width
    x_max = (x_center + width / 2) * image_width
    y_min = (y_center - height / 2) * image_height
    y_max = (y_center + height / 2) * image_height

    #print("x-center {:.2f}".format(x_center))
    #print("y_center {:.2f}".format(y_center))
    #print("width of box {:.2f}".format(width))
    #print("height of box {:.2f}".format(height))
    #print("x_min {:.2f}".format(x_min))
    #print("y_min {:.2f}".format(y_min))
    #print("x_max {:.2f}".format(y_max))
    #print("y_max {:.2f}".format(y_max))
    return [x_min, y_min, x_max, y_max]

images_dir = trainingImage_folder_path
annotations_dir = trainingLabel_folder_path
transform = transforms.Compose([
    transforms.ToTensor()])

dataset = YoloDataset(images_dir, annotations_dir, transform)
dataloader = DataLoader(dataset, batch_size =4, shuffle=True, collate_fn=collate_fn)


In [None]:
class TestingDataset(Dataset):
  def __init__(self, images_dir, annotations_dir, transform=None):
    self.images_dir = images_dir
    self.annotations_dir = annotations_dir
    self.transform = transform
    self.image_files = [f for f in os.listdir(images_dir) if f.endswith('.jpg') or f.endswith('.png')]

  def __len__(self):
    return len(self.image_files)

  def __getitem__(self, idx):
      image_file = self.image_files[idx]
      image_path = os.path.join(self.images_dir, image_file)
      if image_file.endswith('.jpg'):
         annotation_path = os.path.join(self.annotations_dir, image_file.replace('.jpg', '.txt'))
      elif image_file.endswith('.png'):
         annotation_path = os.path.join(self.annotations_dir, image_file.replace('.png', '.txt'))

      image = Image.open(image_path).convert("RGB")
      boxes = []
      labels = []

      if os.path.exists(annotation_path):
        with open(annotation_path, 'r') as file:
          for line in file:
            parts = line.strip().split()
            label = int(parts[0]) + 1
            bbox = list(map(float, parts[1:]))
            bbox = yolo_to_rcnn(bbox, image.width, image.height)
            boxes.append(bbox)
            labels.append(label)
      else:
        print(f"Annotation file missing for image: {image_file}")
        return None, None

      boxes = torch.tensor(boxes, dtype=torch.float32)
      labels = torch.tensor(labels, dtype=torch.int64)

      target = {}
      target['boxes'] = boxes
      target['labels'] = labels

      if self.transform:
        image = self.transform(image)

      if boxes.numel() == 0:  # Skip if no boxes
        return None, None

      return image, target

def collate_fn(batch):
    batch = list(filter(lambda x: x[0] is not None and x[1] is not None, batch))  # Filter out None values
    if len(batch) == 0:
        return None, None
    images, targets = zip(*batch)
    max_width = 640
    max_height = 360

    # Resize images
    resized_images = []
    adjusted_targets = []
    for image, target in zip(images, targets):
        # Resize image
        resized_image = resize_image(image, (max_height, max_width))

        # Adjust bounding boxes
        original_width, original_height = image.shape[2], image.shape[1]
        width_scale = max_width / original_width
        height_scale = max_height / original_height

        adjusted_boxes = []
        for bbox in target['boxes']:
            x_min, y_min, x_max, y_max = bbox.tolist()
            adjusted_bbox = [
                x_min * width_scale,
                y_min * height_scale,
                x_max * width_scale,
                y_max * height_scale
            ]
            adjusted_boxes.append(adjusted_bbox)

        adjusted_target = {
            'boxes': torch.tensor(adjusted_boxes, dtype=torch.float32),
            'labels': target['labels']
        }

        resized_images.append(resized_image)
        adjusted_targets.append(adjusted_target)

    images = torch.stack(resized_images, dim=0)
    return images, adjusted_targets

def resize_image(image, target_size):
    return F.resize(image, target_size)

def yolo_to_rcnn(bbox, image_width, image_height):
    #print("Image Width {:.2f}".format(image_width))
    #print("Image Height {:.2f}".format(image_height))

    x_center, y_center, width, height = bbox
    x_min = (x_center - width / 2) * image_width
    x_max = (x_center + width / 2) * image_width
    y_min = (y_center - height / 2) * image_height
    y_max = (y_center + height / 2) * image_height

    #print("x-center {:.2f}".format(x_center))
    #print("y_center {:.2f}".format(y_center))
    #print("width of box {:.2f}".format(width))
    #print("height of box {:.2f}".format(height))
    #print("x_min {:.2f}".format(x_min))
    #print("y_min {:.2f}".format(y_min))
    #print("x_max {:.2f}".format(y_max))
    #print("y_max {:.2f}".format(y_max))
    return [x_min, y_min, x_max, y_max]

images_dir = testingImage_folder_path
annotations_dir = testingLabel_folder_path
transform = transforms.Compose([
    transforms.ToTensor()])

testing_dataset = TestingDataset(images_dir, annotations_dir, transform)
test1_dataloader = DataLoader(dataset, batch_size =4, shuffle=True, collate_fn=collate_fn)

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import torchvision.transforms.functional as F

def plot_image_with_boxes(image, target, predictions=None, ax=None):
    if ax is None:
        fig, ax = plt.subplots(1, 1, figsize=(12, 9))
    image = image.cpu().numpy().transpose((1, 2, 0))
    image = image * 255.0
    image = image.astype(np.uint8)

    ax.imshow(image)
    for box in target['boxes']:
        x_min, y_min, x_max, y_max = box.cpu().numpy()
        rect = patches.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, linewidth=1, edgecolor='r', facecolor='none')
        ax.add_patch(rect)
    if predictions:
        for box in predictions['boxes']:
            x_min, y_min, x_max, y_max = box.cpu().detach().numpy()
            rect = patches.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, linewidth=1, edgecolor='b', facecolor='none', linestyle='dashed')
            ax.add_patch(rect)
    ax.axis('off')

def visualize_batch(images, targets, predictions=None, index=0):
    fig, ax = plt.subplots(1, 1, figsize=(12, 9))
    img = images[index]
    tgt = targets[index]
    pred = predictions[index] if predictions else None
    plot_image_with_boxes(img, tgt, pred, ax)
    plt.tight_layout()
    plt.show()

for batch in dataloader:
    for img, tgt in zip(batch[0], batch[1]):
        print(tgt)
    visualize_batch(batch[0], batch[1])
    break

In [None]:
train_size = int(0.80 * len(dataset))
dev_size = int(0.20 * len(dataset))
test_size = int(0.61 * len(testing_dataset))

extra_size = len(dataset) - dev_size - train_size
extra_testing_size = len(testing_dataset) - test_size

print(train_size)
print(dev_size)
print(test_size)

train_dataset, dev_dataset, extra_dataset = random_split(dataset, [train_size, dev_size, extra_size])
test_dataset, extra_dataset = random_split(testing_dataset, [test_size, extra_testing_size])

train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn) # training set
dev_dataloader = DataLoader(dev_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn) # validation set
test2_dataloader = DataLoader(testing_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn) # validation set

In [None]:
num_classes = 19
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
model.roi_heads.score_thresh = 0.0000005
model.to(device)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum = 0.9, weight_decay=0.0005)

lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

train_accuracies = []
val_accuracies = []

num_epochs = 4
for epochs in range(num_epochs):
    model.train()
    correct_train = 0
    total_train = 0
    running_loss = 0.0

    for step, batch in enumerate(train_dataloader):
        if batch[0] is None or batch[1] is None:
            continue

        images, targets = batch
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        if any(target['boxes'].numel() == 0 for target in targets):
                continue  # Skip batches with no boxes

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        running_loss += losses.item()

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        ground_truth_labels = [target['labels'].to(device) for target in targets]
        ground_truth_boxes = [target['boxes'].to(device) for target in targets]

        model.eval()
        with torch.no_grad():
          predictions = model(images)
        model.train()

        for prediction, ground_truth in zip(predictions, ground_truth_labels):
            predicted_labels = prediction['labels']
            total_train += len(predicted_labels)
            matched = sum(1 for pl, gl in zip(predicted_labels, ground_truth) if pl == gl)
            correct_train += matched

            # Print predicted and ground truth labels for visual comparison
            #for pl, gl in zip(predicted_labels, ground_truth):
             #   print(f'Epoch [{epochs+1}/{num_epochs}], Step [{step+1}/{len(train_dataloader)}]')
              #  print(f'Predicted: {pl.item()}, Ground Truth: {gl.item()}')

        #print(predictions)
        #print(ground_truth_labels)
        #print(ground_truth_boxes)
        visualize_batch(images, targets, predictions)

        print(f'Loss: {losses.item()}')

        if step % 50 == 0:
          print(f'Epoch [{epochs+1}/{num_epochs}], Step [{step+1}/{len(train_dataloader)}], Training Loss: {losses.item():.4f}')

        lr_scheduler.step()

    #Validation Loop
    model.eval()
    correct_val = 0
    total_val = 0
    val_loss = 0.0
    with torch.no_grad():
      for batch in dev_dataloader:
        if batch[0] is None or batch[1] is None:
            continue

        images, targets = batch
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]


        if any(target['boxes'].numel() == 0 for target in targets):
                continue  # Skip batches with no boxes

        ground_truth_labels = [target['labels'].to(device) for target in targets]
        predictions = model(images)

        for prediction, ground_truth in zip(predictions, ground_truth_labels):
                predicted_labels = prediction['labels']
                matched = sum(1 for pl, gl in zip(predicted_labels, ground_truth) if pl == gl)
                total_val += len(predicted_labels)
                correct_val += matched

                for pl, gl in zip(predicted_labels, ground_truth):
                  print(f'Predicted: {pl.item()}, Ground Truth: {gl.item()}')

        model.train()
        loss_dict = model(images, targets)
        model.eval()
        losses = sum(loss for loss in loss_dict.values())
        val_loss += losses
        print(f'Loss: {losses.item()}')

    val_accuracy = correct_val / total_val if total_val > 0 else 0
    val_loss /= len(dev_dataloader)
    val_accuracies.append(val_accuracy)

    print(f'Epoch [{epochs+1}/{num_epochs}], Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%')

In [None]:
test_accuracies = []
#Testing Loop
model.eval()
correct_test = 0
total_test = 0
test_loss = 0.0
with torch.no_grad():
  for batch in test2_dataloader:
    if batch[0] is None or batch[1] is None:
        continue

    images, targets = batch
    images = list(image.to(device) for image in images)
    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]


    if any(target['boxes'].numel() == 0 for target in targets):
            continue  # Skip batches with no boxes

    ground_truth_labels = [target['labels'].to(device) for target in targets]
    predictions = model(images)

    for prediction, ground_truth in zip(predictions, ground_truth_labels):
            predicted_labels = prediction['labels']
            matched = sum(1 for pl, gl in zip(predicted_labels, ground_truth) if pl == gl)
            total_test += len(predicted_labels)
            correct_test += matched

            for pl, gl in zip(predicted_labels, ground_truth):
              print(f'Predicted: {pl.item()}, Ground Truth: {gl.item()}')

    visualize_batch(images, targets, predictions)
    model.train()
    loss_dict = model(images, targets)
    model.eval()
    losses = sum(loss for loss in loss_dict.values())
    test_loss += losses
    print(f'Loss: {losses.item()}')

test_accuracy = correct_test / total_test if total_test > 0 else 0
test_loss /= len(test2_dataloader)
test_accuracies.append(test_accuracy)

print(f'Epoch [{epochs+1}/{num_epochs}], Testing Loss: {test_loss:.4f}, Testing Accuracy: {test_accuracy:.2f}%')