# DIPA Project

In [10]:
import os
import pandas as pd
import torchvision
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import functional as F
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from PIL import Image
import matplotlib.pyplot as plt
from datetime import datetime
from ultralytics import YOLO
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
from matplotlib.backends.backend_pdf import PdfPages
from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, classification_report
from torchmetrics.detection.mean_ap import MeanAveragePrecision


DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Device used ", DEVICE)
CLASSES = ['__background__', 'pothole']
NUM_CLASSES = len(CLASSES)  # 2

Device used  cuda


# 1 Model Initialization

#### Initilize bounding-box dataset class for FasterRCNN
- _limit_ - number of images to load into dataset class
- .csv file structure - filename, width, height, class, xmin, ymin, xmax, ymax


In [11]:
import os
import pandas as pd
import torch
from torch.utils.data import Dataset
from PIL import Image
from torchvision import transforms

class PotholeDataset(Dataset):
    def __init__(self, csv_file, image_dir, limit=None, img_size=(224, 224)):
        self.df = pd.read_csv(csv_file)
        self.image_dir = image_dir
        self.image_files = self.df['filename'].unique()
        if limit is not None:
            self.image_files = self.image_files[:limit]

        self.img_size = img_size  # (width, height)
        self.transform = transforms.Compose([
            transforms.Resize(self.img_size),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
        ])

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        # Load image
        image_id = self.image_files[idx]
        img_path = os.path.join(self.image_dir, image_id)
        img = Image.open(img_path).convert("RGB")

        # Record original size and apply transform
        orig_w, orig_h = img.size
        img = self.transform(img)
        new_w, new_h = self.img_size

        # Load bounding boxes for this image
        records = self.df[self.df['filename'] == image_id]
        boxes = records[['xmin', 'ymin', 'xmax', 'ymax']].values.astype(float)

        # Scale boxes to match resized image
        scale_x = new_w / orig_w
        scale_y = new_h / orig_h
        boxes[:, [0, 2]] *= scale_x  # xmin, xmax
        boxes[:, [1, 3]] *= scale_y  # ymin, ymax
        boxes = torch.as_tensor(boxes, dtype=torch.float32)

        # Labels: 1 for pothole
        labels = torch.ones((boxes.shape[0],), dtype=torch.int64)

        target = {
            'boxes': boxes,
            'labels': labels,
            'image_id': torch.tensor([idx]),
            'area': (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]),
            'iscrowd': torch.zeros((boxes.shape[0],), dtype=torch.int64)
        }

        return img, target


In [12]:
class PotholeDataset(Dataset):
    def __init__(self, csv_file, image_dir, limit=None):
        self.df = pd.read_csv(csv_file)
        self.image_dir = image_dir
        self.image_files = self.df['filename'].unique()

        if limit is not None:
            self.image_files = self.image_files[:limit]  # limit number of images

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_id = self.image_files[idx]
        img_path = os.path.join(self.image_dir, image_id)
        img = Image.open(img_path).convert("RGB")
        img_tensor = transforms.ToTensor()(img)

        records = self.df[self.df['filename'] == image_id]
        boxes = records[['xmin', 'ymin', 'xmax', 'ymax']].values
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.ones((records.shape[0],), dtype=torch.int64)

        target = {
            'boxes': boxes,
            'labels': labels,
            'image_id': torch.tensor([idx]),
            'area': (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]),
            'iscrowd': torch.zeros((records.shape[0],), dtype=torch.int64)
        }

        return img_tensor, target

##### Retrieve the dataset class

In [13]:
def get_loaders(batch_size=16, train_limit=None, valid_limit=None, test_limit=None):
    datasets = {
        'train': PotholeDataset("dataset/train/_annotations.csv", "dataset/train/images", limit=train_limit),
        'valid': PotholeDataset("dataset/valid/_annotations.csv", "dataset/valid/images", limit=valid_limit),
        'test':  PotholeDataset("dataset/test/_annotations.csv", "dataset/test/images", limit=test_limit),
    }

    loaders = {
        split: DataLoader(datasets[split], batch_size=batch_size, shuffle=(split == 'train' or split == 'valid' or split == 'test'),
                          collate_fn=lambda x: tuple(zip(*x)))
        for split in datasets
    } 
    return loaders

Retrieve the FasterRCNN model

In [14]:
def get_fasterrcnn_model(num_classes):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

# 2 Model Training

##### Train the FasterRCNN model

In [15]:
def trainFasterRCNN(model, dataloader, optimizer, device, epochs=2):
    model.to(device)
    train_losses = []
    val_losses = []
    for epoch in range(epochs):
        model.train()
        start_time = datetime.now()

        total_loss = 0.0
        for imgs, targets in dataloader['train']:
            imgs = list(img.to(device) for img in imgs)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(imgs, targets)
            loss = sum(loss for loss in loss_dict.values())

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            
        avg = total_loss / len(dataloader)
        train_losses.append(avg)
        current_time = datetime.now()
        d = current_time-start_time
        d = str(d).split(".")[0]  
        print(f"Epoch({epoch+1}) loss: {avg}, time: {d}") 

        if dataloader['test']:
            running_val_loss = 0.0
            for imgs, targets in dataloader['valid']:
                imgs = list(img.to(device) for img in imgs)
                targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
                print("targets:", targets)
                loss_dict = model(imgs, targets)
                loss_val = sum(loss_val for loss_val in loss_dict.values())
                running_val_loss += loss_val.item()

        avg_val_loss = running_val_loss / len(dataloader['valid'])
        val_losses.append(avg_val_loss)
        print(f"Epoch({epoch+1}) validation_loss: {avg_val_loss:.4f}")
    return train_losses, val_losses


### Run training for the FasterRCNN model

In [None]:
loaders = get_loaders(batch_size=16, train_limit=None, valid_limit=None, test_limit=None)
fasterrcnn = get_fasterrcnn_model(NUM_CLASSES)
optimizer1 = torch.optim.SGD(fasterrcnn.parameters(), lr=0.005, momentum=0.9)
epochs = 50
fastercnn_losses, test_losses = trainFasterRCNN(fasterrcnn, loaders, optimizer1, DEVICE, epochs=epochs)



# 3 Model Evaluation

#### Vizualize FasterRCNN results

In [None]:
from matplotlib.backends.backend_pdf import PdfPages

def visualizeFasterRCNNPredictions(
    model, dataloader, device, num_images=10, score_threshold=0.70,
    output_pdf_path="predicted_vs_gt_FasterRCNN.pdf"
):
    if os.path.exists(output_pdf_path):
        os.remove(output_pdf_path)
        print("File deleted.")

    metric = MeanAveragePrecision(iou_thresholds=[0.5], 
                                class_metrics=True)  
    metric.reset()
    model.eval()
    with torch.no_grad():
        for images, targets in dataloader:
            images = [img.to(device) for img in images]
            outputs = model(images, targets)
            # move everything to CPU
            preds = [{k: v.cpu() for k, v in out.items()} for out in outputs]
            gts   = [{k: v.cpu() for k, v in tgt.items()} for tgt in targets]
            metric.update(preds, gts)

    # 3) Compute
    results = metric.compute() 
    images_visualized = 0
    for k, v in results.items():
        print(f"{k}: {v}")
        
    map = results.get('map', 0.0)
    mar_100 = results.get('mar_100', 0.0)

    metrics_text = (
    f"Averag recall:    {mar_100:.4f}\n"
    f"Average precision:   {map:.4f}"
    )

    with PdfPages(output_pdf_path) as pdf:
        with torch.no_grad():
            for imgs, targets in dataloader:
                imgs = list(img.to(device) for img in imgs)
                outputs = model(imgs)

                for i in range(len(outputs)):

                    img_np = imgs[i].permute(1, 2, 0).cpu().numpy()
                    fig, axes = plt.subplots(1, 2, figsize=(14, 7))

                    # ground truth
                    axes[0].imshow(img_np)
                    axes[0].set_title("Ground Truth")
                    for box in targets[i]['boxes']:
                        x1, y1, x2, y2 = box.int().tolist()
                        axes[0].add_patch(plt.Rectangle(
                            (x1, y1), x2 - x1, y2 - y1,
                            edgecolor='green', fill=False, linewidth=2
                        ))
                    axes[0].axis('off')

                    # prediction
                    axes[1].imshow(img_np)
                    axes[1].set_title("Predictions")
                    for box, score in zip(outputs[i]['boxes'], outputs[i]['scores']):
                        if score >= score_threshold:
                            x1, y1, x2, y2 = box.int().tolist()
                            axes[1].add_patch(plt.Rectangle(
                                (x1, y1), x2 - x1, y2 - y1,
                                edgecolor='red', fill=False, linewidth=2
                            ))
                            axes[1].text(x1, y1 - 5, f"{score:.2f}", color="red", fontsize=8)
                    axes[1].axis('off')

                    pdf.savefig(fig)
                    plt.close(fig)

                    images_visualized += 1
                    if images_visualized >= num_images:
                        plt.figure(figsize=(8, 5))
                        plt.plot([ep+1 for ep in range(epochs)], fastercnn_losses, label='train loss', color='blue')
                        plt.plot([ep+1 for ep in range(epochs)], test_losses, label='Validation loss', color='red')
                        plt.xlabel("Epoch")
                        plt.ylabel("Loss")
                        plt.title("Training Box Loss")
                        plt.legend()
                        plt.grid(True)
                        pdf.savefig()
                        plt.close()

                        plt.figure(figsize=(8, 5))
                        plt.axis('off')
                        plt.title("Evaluation Metrics", fontsize=14)
                        plt.text(0, 0.8, metrics_text, fontsize=12, verticalalignment='top')
                        pdf.savefig()
                        plt.close()
                        print(f"PDF report saved as {output_pdf_path}")
                        return


In [None]:
visualizeFasterRCNNPredictions(fasterrcnn, loaders['valid'], DEVICE)

File deleted.
map: 0.2211872637271881
map_50: 0.2211872637271881
map_75: -1.0
map_small: 0.24667920172214508
map_medium: 0.17499582469463348
map_large: 0.3931579887866974
mar_1: 0.17164179682731628
mar_10: 0.43283581733703613
mar_100: 0.6156716346740723
mar_small: 0.5581395626068115
mar_medium: 0.6666666865348816
mar_large: 0.7857142686843872
map_per_class: 0.2211872637271881
mar_100_per_class: 0.6156716346740723
classes: 1
PDF report saved as predicted_vs_gt_FasterRCNN.pdf
