# MIIA Pothole Image Classification Challenge  

This code was developed as part of the [Zindi Image Classification Challenge](https://zindi.africa/competitions/miia-pothole-image-classification-challenge). The goal of this ML project is to reduce the time and money required to repair potholes and improve road infrastructure in South Africa. Through image recognition, we reduce the amount of manual effort needed to identify potholes, enabling faster maintenance and promoting safer roadways.  

## Table of Contents

1. [Import and Setup](#import-and-setup)
2. [Cropping Images](#cropping-images)
3. [Load the Remaining Data](#load-the-remaining-data)
4. [Preprocessing Images (Data Transformations)](#preprocessing-images-data-transformations)
5. [Model Definition](#model-definition)
6. [Model Training & Evaluation](#model-training-and-evaluation)
7. [Model Testing](#model-testing)

## Import and Setup

In [None]:
import os
import pandas as pd
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from torch.utils.data.dataset import random_split
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from sklearn.metrics import roc_auc_score
import numpy as np
import random

# Set a seed for consistent results
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [8]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


## Cropping Images

We can see that nearly all the images are taken from inside the car, with the dashboard and sky taking up much of the photo. To reduce noise we can crop out irrelevant data, leaving only the road.
We have seperated cropping from other data transformations, as we can then focus on a "new" dataset which can be easily reused in the future.

* To crop the image, make sure you update the path (to the image folder) accordingly.
* We can try a variety of ranges for the cropping and compare results.

In [None]:
import cv2, os, glob

# path to original image folder
# TODO: Replace with local path to the folder containing the images
data_path = r'D:\MMAI\MMAI894\Zindi\data\all_data'

# path to the folder to save the cropped image
save_path = './cropped_data2/'
imgfiles = glob.iglob(os.path.join(data_path, '**/*.JPG'), recursive=True)
for imgfile in imgfiles:
    image = cv2.imread(imgfile)
    img_name = imgfile.split('/')[-1]

    # Define the coordinates of the top-left and bottom-right points of the cropping rectangle
    # Format: (x, y)
    top_left = (50, 240)
    bottom_right = (750, 430)

    # Crop the image using array slicing
    # Note: OpenCV images are accessed with (y, x) notation
    cropped_image = image[top_left[1]:bottom_right[1], top_left[0]:bottom_right[0]]

    # Save or display the cropped image
    cv2.imwrite(save_path + img_name, cropped_image)

    # or
    # cv2.imshow('Cropped Image', cropped_image)
    # cv2.waitKey(0)
    # cv2.destroyAllWindows()

## Load the Remaining Data

Reminder to update the following paths, for your local machine. In this case we are referencing the cropped dataset, to use the raw dataset please update the path accordingly.

In [None]:

# change the following path for your own training

# path to image folder
data_path = r'D:\MMAI\MMAI894\Zindi\data\cropped_data1'

# path to train csv
train_path = r'D:\MMAI\MMAI894\Zindi\data\train_ids_labels.csv'

# path to test csv
test_path = r'D:\MMAI\MMAI894\Zindi\data\test_ids_only.csv'

# Create a custom dataset class for training and testing
class CustomDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.annotations = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        img_path = os.path.join(self.root_dir, self.annotations.iloc[index, 0] + '.JPG')
        image = Image.open(img_path).convert("RGB")
        if len(self.annotations.iloc[index]) > 1:
            label = int(self.annotations.iloc[index, 1])
        else:label = 1

        if self.transform:
            image = self.transform(image)

        return (image, label)


# Preprocessing Images (Data Transformations)

Here we can test a variety of transformations. We have commented out several of the transformations as they reduced model quality, but left them listed in as potential parameters. To see a full list of commonly used transfomations, please checkout: https://pytorch.org/vision/main/auto_examples/transforms/plot_transforms_illustrations.html#sphx-glr-auto-examples-transforms-plot-transforms-illustrations-py


In [None]:
## Transform the training and validation images

train_transform = transforms.Compose([
    # transforms.Resize((800, 600)),
    # transforms.RandomCrop((800, 400)),
    transforms.Resize((250, 250)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    # transforms.RandomRotation(degrees=(-3, 3), expand=True),
    transforms.RandomHorizontalFlip(p=0.5),
])

val_transform = transforms.Compose([
    # transforms.Resize((800, 600)),
    # transforms.RandomCrop((800, 400)),
    transforms.Resize((250, 250)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    # transforms.RandomRotation(degrees=(-3, 3), expand=True),
    # transforms.RandomHorizontalFlip(p=0.5),
])

# Create an instance of the CustomDataset with no transforms
dataset = CustomDataset(csv_file=train_path, root_dir=data_path, transform=None)
# Splitting dataset into train and validation
train_size = int(0.80 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Wrap the subsets to apply the transformations
class TransformWrapper(Dataset):
    def __init__(self, dataset, transform):
        self.dataset = dataset
        self.transform = transform

    def __getitem__(self, index):
        x, y = self.dataset[index]
        x = self.transform(x)
        return x, y

    def __len__(self):
        return len(self.dataset)

# Apply specific transformations to each subset
train_dataset = TransformWrapper(train_dataset, train_transform)
val_dataset = TransformWrapper(val_dataset, val_transform)

# Create DataLoader for each dataset
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

## Model Definition

In the following code we will leverage ResNet34 a very common CNN (convolutional neural network) for image classification. We have also provided code if you are interested in using the VGG16 CNN. 
Another model that we have seen success with (though the configuration is not provided here) is MaxVit (Multi-Axis Vision Transformer).

In [5]:
def resnet34():
    model = models.resnet152(pretrained=True)
    # freeze all params
    for params in model.parameters():
        params.requires_grad_ = False
    model.conv1 = nn.Conv2d(3, 64, kernel_size=5, stride=2, padding=4, bias=False)
    # add a new final layer
    nr_filters = model.fc.in_features  # number of input features of last layer
    model.fc = nn.Linear(nr_filters, 1)
    return model

In [6]:
def vggMod():
    # Load the pre-trained VGG16 model
    model = models.vgg16(pretrained=True)
    
    # Freeze all parameters
    for params in model.parameters():
        params.requires_grad_ = False
    
    # Add a Conv2d layer with 64 output channels, kernel_size=3, and padding=1
    model.conv1 = nn.Conv2d(3, 64, kernel_size=5, stride=2, padding=4, bias=False)
    
    # Add a pooling layer (you can use MaxPooling or AveragePooling)
    model.pool1  = nn.MaxPool2d(kernel_size=5, stride=2)
    
    # Modify the classifier to have 1 output neuron
    nr_features = model.classifier[-1].in_features
    model.classifier[-1] = nn.Linear(nr_features, 1)
    
    return model


In [7]:
model = resnet34()
# model = vggMod()
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=6, eta_min=0.00002)

def evaluate(model, device, val_loader, criterion):
    model.eval()
    final_outputs = []
    final_targets = []

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            final_outputs.extend(outputs.sigmoid().cpu().detach().numpy())
            final_targets.extend(labels.cpu().detach().numpy())

    # Calculate AUC
    auc = roc_auc_score(final_targets, final_outputs)
    return auc





## Model Training and Evaluation

In addition to training the model, we will save a version of the model (each epoch) with the best validation accuracy. During training watch for losses that are too small, the model may be overfitting.

Note: 
- Please change the epochs as necessary, taking into account both training time and resulting performance.
- Feel free to update the saved "Model Name", as to avoid overwriting previous iterations and experiments.

In [None]:
# Train the Model
model.train()
best_auc, best_epoch, best_model = 0, 0, model

for epoch in range(20):
    epoch_loss = 0


    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs).squeeze()

        # Ensure outputs and labels are correctly shaped
        labels = labels.float()  # Ensure labels are float for BCELoss

        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    # Evaluate the model after each epoch
    val_auc = evaluate(model, device, val_loader, criterion)
    scheduler.step()
    if val_auc > best_auc:
        best_auc = val_auc
        best_model = model
        best_epoch = epoch + 1

    # Print the results for each epoch
    print(f'Epoch: {epoch+1}, Train Loss: {epoch_loss:.4f}, Val. AUC: {val_auc:.4f}')

# Save the best model
torch.save(best_model.state_dict(), 'BestModel_' + str(best_epoch) + '.ckpt')

Epoch: 1, Train Loss: 25.0141, Val. AUC: 0.9801
Epoch: 2, Train Loss: 42.7203, Val. AUC: 0.9753
Epoch: 3, Train Loss: 24.9684, Val. AUC: 0.9767
Epoch: 4, Train Loss: 15.3840, Val. AUC: 0.9871
Epoch: 5, Train Loss: 9.4232, Val. AUC: 0.9913


## Model Testing

Test the model (against the test dataset), the result of this cell will be an excel spreadsheet that can be submitted to the [Zindi Competition](https://zindi.africa/competitions/miia-pothole-image-classification-challenge/submit).

In [None]:
# Function for testing the model
def test(model, device, test_loader):
    model.eval()
    test_probs = []

    with torch.no_grad():
        for inputs, _ in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            # Get probabilities for the positive class
            probs = outputs.sigmoid().cpu().numpy()
            test_probs.extend(probs.squeeze())

    return test_probs

# Load the Test DataSet
test_dataset = CustomDataset(csv_file=test_path, root_dir=data_path, transform=val_transform)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Test the model
test_probs = test(best_model, device, test_loader)

# Save the results to a CSV file
test_images = [test_dataset.annotations.iloc[idx, 0] for idx in range(len(test_dataset))]
test_results = pd.DataFrame({'Image_ID': test_images, 'Label': test_probs})
test_results.to_csv(r'D:\MMAI\MMAI894\Zindi\out\test-results-01.csv', index=False)