
## Rescue Shall Pass: Emergency Vehicle Detection

<br>

## Transfer Learning - FasterRCNN

<br>

## BBM416 Computer Vision. Spring 2023.

<br>

#### Contributors: Hasim Zafer Cicek, Mehmet Giray Nacakci, Enes Yavuz

# INITIALIZATION

### Choose Local or Google Collab

In [1]:
# environment_ = "local"
environment_ = "collab"


In [None]:
root_folder = ""

""" Google Collab Online GPU Training """
# We mainly trained on Google Collab with GPU, which sped up the training around 5 times
if environment_ == "collab":
    from google.colab import drive
    drive.mount("/content/gdrive")

    root_folder = "/content/gdrive/My Drive/416 Pro/416 ORTAKLAR/giray_yeni_collab/"


In [None]:
!pip3 install torch torchvision

import torch
import torchvision
from torchvision import ops
from torchvision import transforms
from torchvision.transforms import ToTensor
from torch.utils.data import DataLoader, Dataset
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
print(torch.__version__)
print(torchvision.__version__)

In [4]:
import os
import cv2
import numpy as np
from PIL import Image
import math
import time
from matplotlib import pyplot
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# We mainly trained on Google Collab with GPU, which sped up the training around 60 times
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

class CustomDataset(Dataset):
    def __init__(self, root, transforms):
        self.root = root
        self.transforms = transforms
        self.imgs = list(sorted([file for file in os.listdir(os.path.join(root, "images")) if ".DS_Store" not in file]))
        self.labels = list(sorted([file for file in os.listdir(os.path.join(root, "labels")) if ".DS_Store" not in file]))


    def __getitem__(self, idx):
        img_path = os.path.join(self.root, "images", self.imgs[idx])
        label_path = os.path.join(self.root, "labels", self.labels[idx])
        img = Image.open(img_path).convert("RGB")

        with open(label_path) as txt_file:
            lines = txt_file.readlines()

        boxes = []
        labels = []
        for line in lines:
            class_label, x_center, y_center, width, height = map(float, line.strip().split())
            x_min = (x_center - width  / 2) * img.width
            y_min = (y_center - height / 2) * img.height
            x_max = (x_center + width  / 2) * img.width
            y_max = (y_center + height / 2) * img.height

            boxes.append([x_min, y_min, x_max, y_max])

            # dataset:    Emergency=0, Non-Emergency=1
            # model predicts: background=0, Emergency=1, Non-Emergency=2
            labels.append(int(class_label) + 1)

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        iscrowd = torch.zeros((len(labels),), dtype=torch.int64)

        target = {"boxes": boxes, "labels": labels, "image_id": image_id, "area": area, "iscrowd": iscrowd}

        if self.transforms is not None:
            img = ToTensor()(img)
            img = self.transforms(img)

        return img, target


    def __len__(self):
        return len(self.imgs)

transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    #transforms.ColorJitter(brightness=0.1, contrast=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load the dataset
dataset_train = CustomDataset(root_folder + 'emergency_dataset/train', transforms=transform)
dataset_val = CustomDataset(root_folder +'emergency_dataset/valid', transforms=transform)
dataset_test = CustomDataset(root_folder + 'emergency_dataset/test', transforms=transform)
dataset_demo = CustomDataset(root_folder + 'emergency_dataset/demo', transforms=transform)

# There can be multiple objects in each image
def my_collate(batch):
    data = [item[0] for item in batch]
    target = [item[1] for item in batch]
    return [data, target]


# memory efficient input processing
n = 1 if environment_ == "collab" else 0  # parallelization
batch_size_ = 50
train_dataloader = DataLoader(dataset_train, batch_size=batch_size_, shuffle=True, num_workers=n, collate_fn=my_collate)
val_dataloader = DataLoader(dataset_val, batch_size=batch_size_, shuffle=False, num_workers=n, collate_fn=my_collate)
test_dataloader = DataLoader(dataset_test, batch_size=batch_size_, shuffle=False, num_workers=n, collate_fn=my_collate)
demo_dataloader = DataLoader(dataset_demo, batch_size=batch_size_, shuffle=False, num_workers=n, collate_fn=my_collate)


# Plotting the results
training_loss_array = []
validation_loss_array = []


def draw_loss_plots(epochs):
    results_plot = pyplot.figure(figsize=(10, 8))
    pyplot.title(label="Training and Validation Loss vs Epochs", loc="center", y=1.0, fontsize=16, pad=35)
    pyplot.axis('off')

    results_plot.add_subplot(1, 1, 1)
    pyplot.plot(training_loss_array, color="orange")
    pyplot.plot(validation_loss_array, color="blue")
    pyplot.ylabel('Loss', fontsize=16)
    pyplot.xlabel('Epochs', fontsize=16)
    pyplot.legend(['Training Loss', 'Validation Loss'], fontsize=16)
    pyplot.xticks(np.arange(1, epochs , 5))

    results_plot.subplots_adjust(top=0.75)
    pyplot.tight_layout()
    pyplot.savefig(root_folder + "loss_plot.jpg")
    pyplot.show()



# Load a pre-trained Faster R-CNN model
model = torchvision.models.detection.fasterrcnn_mobilenet_v3_large_fpn(weights='DEFAULT', trainable_backbone_layers=0)

num_classes = 3  # Background, Non-Emergency Vehicle, Emergency Vehicle

# Freeze all layers
print("Total Backbone Network Layers: ", len(list(model.parameters())), "\n")
for index, param in enumerate(model.parameters()):
    param.requires_grad = False

# Replace the pre-trained head with a new one four our dataset
number_of_classifier_input_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(number_of_classifier_input_features, num_classes)
print("New model head: ", model.roi_heads.box_predictor, "\n")

# Move the model to the available device.
# We mainly trained on Google Collab with GPU, which sped up the training around 120 times
print("Device: ", device, "\n")
model.to(device)
print()

Downloading: "https://download.pytorch.org/models/fasterrcnn_mobilenet_v3_large_fpn-fb6a3cc7.pth" to /root/.cache/torch/hub/checkpoints/fasterrcnn_mobilenet_v3_large_fpn-fb6a3cc7.pth
100%|██████████| 74.2M/74.2M [00:01<00:00, 73.2MB/s]


Total Backbone Network Layers:  100 

New model head:  FastRCNNPredictor(
  (cls_score): Linear(in_features=1024, out_features=3, bias=True)
  (bbox_pred): Linear(in_features=1024, out_features=12, bias=True)
) 

Device:  cuda 




# TRAINING

In [None]:

# Construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]

# optimizer = torch.optim.SGD(params, lr=0.01, momentum=0.9, weight_decay=0.0005)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Learning rate scheduler which decreases the learning rate by 10x every 3 epochs
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

training_loss_array.clear()
validation_loss_array.clear()

print("- - - - -   TRAINING STARTED   - - - - - - ")

num_epochs = 20
min_val_loss = math.inf
training_start = time.time()

model.train()
for epoch in range(num_epochs):
    train_loss = 0
    train_epoch_time = time.time()

    batch_counter = 0
    for images, targets in train_dataloader:  # for each Batch

        batch_counter += 1
        batch_start_time = time.time()

        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        train_loss += losses.item()

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

    train_loss = round(train_loss / len(train_dataloader), 3)
    print(f"\nEpoch #{epoch+1}  Train Loss: {train_loss}  ", end="  ")
    print("  calculated in ", str(int(time.time() - train_epoch_time)), "seconds. \n ")
    training_loss_array.append(train_loss)


    """ VALIDATION """

    """
    Problem was: model(images, targets) returns losses while in model.train() mode.
    But returns prediction results in model.eval() mode. It is designed by pytorch that way.

    Solution is: removed model.eval() from Validation part.
    It is not recommended to run evaluation on train mode, but since all backbone layers are frozen, it is okay.
    """
    val_loss = 0
    start_val_time = time.time()
    with torch.no_grad():
        for images, targets in val_dataloader:  # processed in batches

            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(images, targets)

            losses = sum(loss for loss in loss_dict.values())
            val_loss += losses.item()

    val_loss = round(val_loss / len(val_dataloader), 3)
    print(f"Epoch #{epoch+1} Validation Loss: {val_loss}", end="    ")
    print("calculated in ", str(int(time.time() - start_val_time)), "seconds.")
    validation_loss_array.append(val_loss)

    # save model if validation loss has decreased
    if val_loss < min_val_loss:
        print(f"Validation Loss Decreased({min_val_loss:.3f}--->{val_loss:.3f}) \t Saving The Model\n")
        torch.save(model.state_dict(), root_folder + 'saved_model.pth')
        min_val_loss = val_loss
    else:
        print()

    # Update the learning rate
    lr_scheduler.step()


print("\n\n* * * * TRAINING  COMPLETED IN ", str(int(time.time() - training_start)), "seconds. * * * * \n ")

draw_loss_plots(num_epochs)


# TESTING (Saved Model)

<br>

### (Currently: CANNOT Run in Local, if model was trained in cloud (GOOGLE COLLAB)).

<br>

## First, Run the "INITIALIZATION" section. But No need to run Training if saved_model.pth exists.

In [8]:

def test_evaluation(actual_classes_and_bboxes, predicted_classes_and_bboxes):

    true_positives_for_vehicle = 0
    false_positives = 0
    false_negatives = 0
    accurate_emergency_class = 0
    wrong_emergency_class = 0
    total_actual_vehicle = 0
    total_predicted_vehicle = 0
    actual_labels_list = []
    predicted_labels_list = []


    for actual_for_image, predicted_for_image in zip(actual_classes_and_bboxes, predicted_classes_and_bboxes):

        actual_boxes = actual_for_image['boxes']
        actual_labels = actual_for_image['labels']

        predicted_boxes = predicted_for_image['boxes']
        predicted_labels = predicted_for_image['labels']
        scores = predicted_for_image['scores']

        # retain only the highest score ones of overlapping detections
        non_maximum_supressed = torchvision.ops.batched_nms(predicted_boxes, scores, predicted_labels, 0.5)
        predicted_boxes, scores, predicted_labels = (predicted_boxes[non_maximum_supressed], scores[non_maximum_supressed], predicted_labels[non_maximum_supressed])

        # remove predicted bounding boxes which has low score
        lowest_removed = torch.where(scores > 0.1)[0]
        predicted_boxes, scores, predicted_labels = (predicted_boxes[lowest_removed], scores[lowest_removed], predicted_labels[lowest_removed])

        # Comparison of predicted and actual
        total_actual_vehicle += len(actual_boxes)
        total_predicted_vehicle += len(predicted_boxes)

        intersection_over_union = (ops.box_iou(actual_boxes, predicted_boxes)).cpu().numpy()

        if len(intersection_over_union[0]) == 0: # no box is predicted for this image
            false_negatives += len(actual_boxes)  # actually exists but not predicted
        else:
            # intersection over union above is a 2d array (table) for IoU values
            # which holds IoU values for every combination of actual vs. predicted box pair

            predicted_boxes_if_assigned = [False] * len(predicted_labels)
            for a, row in enumerate(intersection_over_union):  # for each ground_truth box
                max_iou_of_predicted_boxes = np.max(row)
                if max_iou_of_predicted_boxes < 0.5:
                    false_negatives += 1
                else:

                    # prioritize assigning highest IoU first
                    for p, cell in sorted(enumerate(row), key=lambda x:x[1], reverse=True):

                        if cell >= 0.5:
                            if predicted_boxes_if_assigned[p] == False:  # prevent assigning same predicted box to multiple ground_truth boxes

                                true_positives_for_vehicle += 1
                                predicted_boxes_if_assigned[p] = True

                                actual_label_ = int(actual_labels[a])
                                predicted_label_ = int(predicted_labels[p])
                                if actual_label_ == predicted_label_:
                                    accurate_emergency_class += 1
                                else:
                                    wrong_emergency_class += 1
                                actual_labels_list.append(actual_label_)
                                predicted_labels_list.append(predicted_label_)

            for is_assigned in predicted_boxes_if_assigned:
                if is_assigned == False:
                    false_positives += 1

    return true_positives_for_vehicle, false_positives, false_negatives, accurate_emergency_class, wrong_emergency_class, total_actual_vehicle, total_predicted_vehicle, actual_labels_list, predicted_labels_list



def print_test_results(tp_for_vehicle, fp, fn, accurate_emergency_class, wrong_emergency_class, actual_vehicle, predicted_vehicle):

    print("\nActual Vehicle : ", actual_vehicle, " Predicted Vehicle : ", predicted_vehicle)

    precision = 0
    if tp_for_vehicle + fp != 0:
        precision = round(tp_for_vehicle / (tp_for_vehicle + fp), 2)
    recall = 0
    if tp_for_vehicle + fn != 0 :
        recall = round(tp_for_vehicle / (tp_for_vehicle + fn), 2)
    accuracy = 0
    if accurate_emergency_class + wrong_emergency_class != 0:
        accuracy = round(accurate_emergency_class / (accurate_emergency_class + wrong_emergency_class), 2)

    print("Bounding box Precision: ", precision, "  Recall: ", recall, " ;  classification ACCURACY:  ", accuracy, "\n\n")


def classification_confusion_matrix(actual, predicted):
    confusion_matrix_ = confusion_matrix(actual, predicted)
    cm_display = ConfusionMatrixDisplay(confusion_matrix=confusion_matrix_, display_labels=["Emergency", "Non-Emergency"])  # "background",
    cm_display.plot()
    pyplot.title('Confusion Matrix of Test Set (TruePos. b.boxes)', x=0.2, fontsize=17)
    pyplot.ylabel('Actual Class', fontsize=13)
    pyplot.xlabel('Predicted Class', fontsize=13)
    pyplot.xticks(rotation=90)
    pyplot.tight_layout()

    pyplot.savefig(root_folder + "confusion_matrix.jpg")
    pyplot.show()



In [None]:
""" TESTING """

start_test_time = time.time()

# attempt to run cloud trained model on local. does not work.
#if environment_== "local":
#    model = torch.load(model, map_location=torch.device('cpu'))
#    model.to(device)

# Load the best model
model.load_state_dict(torch.load(root_folder +'model_6.pth'))

print("- - - - -   TESTING RESULTS:   - - - - - - ")

true_positives_for_vehicle =  false_positives = false_negatives = accurate_emergency_class = 0
wrong_emergency_class = total_actual_vehicle = total_predicted_vehicle = 0
actual_labels_list = []
predicted_labels_list = []

model.eval()
with torch.no_grad():

    for images, targets in test_dataloader:  # processed in Batches
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        results = model(images, targets)

        true_positives_for_vehicle_, false_positives_, false_negatives_, accurate_emergency_class_, wrong_emergency_class_, total_actual_vehicle_, total_predicted_vehicle_, actual_labels_list_, predicted_labels_list_ = test_evaluation(targets, results)
        true_positives_for_vehicle += true_positives_for_vehicle_
        false_positives += false_positives_
        false_negatives += false_negatives_
        accurate_emergency_class += accurate_emergency_class_
        wrong_emergency_class += wrong_emergency_class_
        total_actual_vehicle += total_actual_vehicle_
        total_predicted_vehicle += total_predicted_vehicle_
        actual_labels_list.extend(actual_labels_list_)
        predicted_labels_list.extend(predicted_labels_list_)


print("\n\n* * * * TEST RESULTS Calculated in ", str(int(time.time() - start_test_time)), "seconds * * * *")

print_test_results(true_positives_for_vehicle, false_positives, false_negatives, accurate_emergency_class, wrong_emergency_class, total_actual_vehicle, total_predicted_vehicle)

classification_confusion_matrix(actual_labels_list, predicted_labels_list)


# VISUAL DEMO
of Detection on Selected Images

<br>

## First, Run the "INITIALIZATION" section.

In [None]:
model_no = 6

# 225 test images
#test_images_paths = list(sorted([str(root_folder + 'emergency_dataset/test/images/') + str(file) for file in os.listdir(root_folder + 'emergency_dataset/test/images') if ".DS_Store" not in file]))
demo_images_paths = list(sorted([str(root_folder + 'emergency_dataset/demo/images/') + str(file) for file in os.listdir(root_folder + 'emergency_dataset/demo/images') if ".DS_Store" not in file]))

# Load the best model
model.load_state_dict(torch.load(root_folder +'model_' + str(model_no) + '.pth'))
model.eval()
with torch.no_grad():

    iteration = 0

    for tensor_images, targets in demo_dataloader:

        tensor_images = list(image.to(device) for image in tensor_images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        results = model(tensor_images, targets)

        for i, (actual_for_image, predicted_for_image) in enumerate(zip(targets, results)):  # each image
            actual_boxes = actual_for_image['boxes']
            actual_labels = actual_for_image['labels']
            predicted_boxes = predicted_for_image['boxes']
            predicted_labels = predicted_for_image['labels']
            scores = predicted_for_image['scores']

            # retain only the highest score ones of overlapping detections
            non_maximum_supressed = torchvision.ops.batched_nms(predicted_boxes, scores, predicted_labels, 0.5)
            predicted_boxes, scores, predicted_labels = (predicted_boxes[non_maximum_supressed], scores[non_maximum_supressed], predicted_labels[non_maximum_supressed])

            # remove predicted bounding boxes which has low score
            lowest_removed = torch.where(scores > 0.1)[0]
            predicted_boxes, scores, predicted_labels = (predicted_boxes[lowest_removed], scores[lowest_removed], predicted_labels[lowest_removed])

            # annotate images with boxes
            actual_annotated_img = cv2.imread(demo_images_paths[batch_size_*iteration + i])
            predictions_annotated_img = cv2.imread(demo_images_paths[batch_size_*iteration + i])

            for e, box in enumerate(actual_boxes):
                color_ = (0,0,0)
                label_ = actual_labels[e]
                if int(label_) == 1:
                    color_ = (0,255,0)
                elif int(label_) == 2:
                    color_ = (0,255,255)
                cv2.rectangle(actual_annotated_img, (int(box[0]),int(box[1])), (int(box[2]),int(box[3])), color=color_, thickness=2, lineType=cv2.LINE_AA)

            for e, box in enumerate(predicted_boxes):
                color_ = (0,0,0)
                label_ = predicted_labels[e]
                if int(label_) == 1:
                    color_ = (0,255,0)
                elif int(label_) == 2:
                    color_ = (0,255,255)
                cv2.rectangle(predictions_annotated_img, (int(box[0]),int(box[1])), (int(box[2]),int(box[3])), color=color_, thickness=2, lineType=cv2.LINE_AA)
                cv2.putText(predictions_annotated_img, text=str(round(float(scores[e]), 2)), org=(int(box[0]), int(box[1]-5)), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.6, color=color_, thickness=2, lineType=cv2.LINE_AA)

            # concatenate and display side by side: image with Actual annotations, white space, image with Predicted annotations
            white_space = np.full((224, 50, 3), 255, dtype=np.uint8)
            annotated_image = np.concatenate((actual_annotated_img, white_space, predictions_annotated_img), axis=1, dtype=np.uint8)

            # pyplot.figure(figsize=(10, 8))
            pyplot.imshow(cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB))
            pyplot.title(label="Actual vs Predicted", loc="center", y=1.0, fontsize=16, pad=35)
            #results_plot.subplots_adjust(top=0.75)
            #pyplot.axis('off')
            #pyplot.text(.5, .9, "Green = Emergency , Yellow = Non-Emergency", ha='center')
            pyplot.tick_params(axis='x', which='both', bottom=False)
            pyplot.tick_params(axis='y', which='both', left=False)
            pyplot.xticks([])
            pyplot.yticks([])
            pyplot.xlabel('Green = Emergency , Yellow = Non-Emergency', fontsize=13)
            pyplot.tight_layout()
            img_name = demo_images_paths[batch_size_*iteration + i].split("/")[-1].split("_")[0]
            pyplot.savefig(root_folder + "demo_images/" + str(img_name) + "_m" + str(model_no) + ".jpg", dpi=450)
            pyplot.clf()  # clear memory

            #cv2.imwrite(root_folder + "demo_images/" + str(img_name) + "_m" + str(model_no) + ".jpg", annotated_image)

        iteration += 1

print("done")
