In [7]:
import os
import random
import shutil
import glob
import tarfile
from PIL import Image, ImageDraw
import xml.etree.ElementTree as ET
import pickle

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models

from sklearn.cluster import KMeans
from sklearn.metrics import pairwise_distances_argmin_min
from sklearn.metrics import davies_bouldin_score
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix
from sklearn.decomposition import PCA


import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from tqdm.auto import tqdm

In [None]:
#importing and unzipping datasets for Google Colab integration

from google.colab import files
uploaded = files.upload()
!unzip file.zip -d /content/my_folder

In [None]:
gzImagesFilePath = 'images.tar.gz'
gzAnnotationsFilePath = 'annotations.tar.gz'

outputImagesFolder = 'oxfordPetImages/'
outputAnnotationsFolder = 'oxfordPetAnnotations/'

with tarfile.open(gzImagesFilePath, 'r:gz') as tar:
    tar.extractall(path=outputImagesFolder)

with tarfile.open(gzAnnotationsFilePath, 'r:gz') as tar:
    tar.extractall(path=outputAnnotationsFolder)

print(f"Images extracted to {outputImagesFolder}")
print(f"Images extracted to {outputAnnotationsFolder}")

In [None]:
petImagesFilePath = 'oxfordPetImages/images/' #path to images
petAnnotations = 'oxfordPetAnnotations/annotations/trimaps' #path to trimaps
petImageswithAnnotations = 'oxfordPetImages&Annotations/' #output for images with VGG16

os.makedirs(petImageswithAnnotations, exist_ok=True) #ensures image output dir EXISTS
features = []
filenames = []
groundTruthLabels =[]

VGG = VGG16Classifier(num_classes=5)
VGG.eval()

checkpointFile = 'checkpoint.pkl'

def saveProgress(features, filenames, groundTruthLabels):
    with open(checkpointFile, 'wb') as f:
        pickle.dump({'features': features, 'filenames': filenames, 'groundTruthLabels': groundTruthLabels}, f)
    print(f"Progress saved: {len(filenames)} images.")

def loadProgress():
    if os.path.exists(checkpointFile):
        with open(checkpointFile, 'rb') as f:
            checkpoint = pickle.load(f)
        return checkpoint['features'], checkpoint['filenames'], checkpoint['groundTruthLabels']
    else:
        return [], [], []

features, filenames, groundTruthLabels = loadProgress()

startIndex = len(filenames)

transform = transform.Compose([ #initating transformations
     transform.Resize((224,224)),
     transform.ToTensor(),
     transform.Normalize(mean, std)
])

imagefilenames = [filename for filename in os.listdir(petImagesFilePath) if filename.endswith('.jpg') or filename.endswith('.png')]

chunk_size = 100
for i in tqdm(range(startIndex, len(imagefilenames), chunk_size), desc='Processing Chunks', unit='chunk'):
    chunk = imagefilenames[i:i + chunk_size]
    chunkFeatures = []
    chunkFilenames = []
    chunkLabels = []

    with tqdm(chunk, desc="Processing images within chunk", unit="image", leave=False) as chunkBar:
        for filename in chunkBar:
                petImagePath = os.path.join(petImagesFilePath, filename)
                try:
                        img = Image.open(petImagePath).convert('RGB') #opens image in RGB mode
                        imgResize = img.resize((224,224)) #resizes image for VGG16
                        
                        petImageTensor = transform(img).unsqueeze(0) #apply transformations to image
                        with torch.no_grad(): #running image through VGG16 model
                            feature = VGG(petImageTensor)
                            feature = feature.view(-1).cpu().numpy()
                            chunkFeatures.append(feature)
                            chunkFilenames.append(filename)

                        trimapFilename = filename.replace('.jpg', '.png').replace('.png', '.png')
                        trimapPath = os.path.join(petAnnotations, trimapFilename)

                        if os.path.exists(trimapPath):
                            trimap = Image.open(trimapPath).convert('L')
                            trimapResized = trimap.resize((224,224), Image.NEAREST) #resize to match image dimensions
                            trimapArray = np.array(trimapResized)

                            mostFreqLabel = np.bincount(trimapArray.flatten()).argmax()
                            chunkLabels.append(trimapArray.flatten())
                        else:
                            print(f'Trimap for {filename} not found!')
                except Exception as e:
                        print(f'Error Processing {filename}: {e}')
        
        features.extend(chunkFeatures)
        filenames.extend(chunkFilenames)
        groundTruthLabels.extend(chunkLabels)

        saveProgress(features, filenames, groundTruthLabels)

features = np.array(features)
groundTruthLabels = np.array(groundTruthLabels)
    

In [None]:
IMG_SIZE = 224 #IMAGE SIZE

source_dir = 'iROADSDataset'  # The original dataset directory
train_dir = 'iRoads/train'
validation_dir = 'iRoads/validation'


# Create train and validation directories
os.makedirs(train_dir, exist_ok=True)
os.makedirs(validation_dir, exist_ok=True)

def remove_macos_resource_forks(directory): #removes macOS resource forks
    for filepath in glob.iglob(os.path.join(directory, '**', '._*'), recursive=True):
        try:
            os.remove(filepath)
            print(f"Removed: {filepath}")
        except Exception as e:
            print(f"Error removing file {filepath}: {e}")

directory = 'iRoads'  
remove_macos_resource_forks(directory) #removes macOS resource forks (if present)
categories = ['Daylight', 'Night', 'RainyDay', 'RainyNight', 'Snowy', 'SunStroke', 'Tunnel']

#80% train, 20% validation
split_ratio = 0.8
for category in tqdm(categories, desc="Splitting Data", unit="category"):
    source_class_dir = os.path.join(source_dir, category) #source for class
    os.makedirs(os.path.join(train_dir, category), exist_ok=True)
    os.makedirs(os.path.join(validation_dir, category), exist_ok=True)
    files = [file for file in os.listdir(source_class_dir) if not file.startswith('._')]  #exclude system files like ._ files
    random.shuffle(files)  # Shuffle the files to randomize split
    
    split_index = int(len(files) * split_ratio) #calc. split index
    
    for i, file in enumerate(tqdm(files, desc=f"Processing {category}", unit="file", leave=False)):
        source_file = os.path.join(source_class_dir, file)
        
        # If the index is less than the split index, move to train
        if i < split_index:
            shutil.move(source_file, os.path.join(train_dir, category, file))
        else:
            shutil.move(source_file, os.path.join(validation_dir, category, file))

print("Data split complete.")

transform = transforms.Compose([ #transformation init defintion
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet mean and std
])

trainDataset = datasets.ImageFolder(train_dir, transform=transform) #loading train dataset
validationDataset = datasets.ImageFolder(validation_dir, transform=transform) #load validation dataset

trainLoader = DataLoader(train_dataset, batch_size=32, shuffle=True) # DATA load for both training and validation
validationLoader = DataLoader(validation_dataset, batch_size=32, shuffle=False)

model = models.resnet18(pretrained=True) #load ResNet18 model

for param in model.parameters(): #freezing ResNet layers, minus FCL
    param.requires_grad = False

model.fc = nn.Linear(model.fc.in_features, len(categories))  #adjust ResNet FCL TO number of classes
model = model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

criterion = nn.CrossEntropyLoss() #entropy loss function

optimizer = optim.Adam(model.fc.parameters(), lr=0.0001) #optimiser 

def train_model(model, train_loader, criterion, optimizer, numEpochs=2):
    model.train()
    for epoch in tqdm(range(numEpochs), desc="Training Epochs", unit="epoch"):
        running_loss = 0.0
        correct = 0
        total = 0
        for inputs, labels in tqdm(trainLoader, desc="Training Batches", unit="batch", leave=False):
            inputs, labels = inputs.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")), labels.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)  #cross entropy loss
            loss.backward()
            optimizer.step()

            runningLoss += loss.item()

            _, predicted = torch.max(outputs, 1) #output -> predicted
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {runningloss/len(trainLoader):.4f}, Accuracy: {100 * correct / total:.2f}%") #stats of epoch

def evaluateModel(model, validationLoader, criterion):
    model.eval()
    correct = 0
    total = 0
    y_true = []
    y_pred = []
    
    with torch.no_grad():
        runningloss = 0.0
        for inputs, labels in tqdm(validationLoader, desc="Evaluating Validation", unit="batch"):
            inputs, labels = inputs.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")), labels.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            runningloss += loss.item()

            _, predicted = torch.max(outputs, 1) #output -> predicted
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            y_true.extend(labels.cpu().numpy()) #store true labels for confusionMatrix
            y_pred.extend(predicted.cpu().numpy()) #store predicted labels for confusionMatrix

        print(f"Validation Loss: {runningLoss/len(validationLoader):.4f}, Accuracy: {100 * correct / total:.2f}%")

    return y_true, y_pred

trainModel(model, train_loader, criterion, optimizer, num_epochs=2) #model training

y_true, y_pred = evaluateModel(model, validationLoader, criterion) #true and predicted labels from evaluated model


In [None]:

accuracy = np.sum(np.array(y_true) == np.array(y_pred)) / len(y_true) #accuracy metric
precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='weighted') #other performance metrics

#print block for performance metrics
print(f"Accuracy: {accuracy*100:.2f}%")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1:.2f}")

confusionMatrix = confusion_matrix(y_true, y_pred) #creation of confusionMatrix
name = 'ResNet-18 iRoads' #name to link to output.png

plt.figure(figsize=(8, 6))
sns.set(style='whitegrid', palette='Blues')
ax = sns.heatmap(confusionMatrix, annot=True, fmt='d', cmap='Blues', cbar=False, 
                 xticklabels=categories, yticklabels=categories, linewidths=.8, linecolor='black')

ax.set_xlabel("Predicted Labels", fontsize=10)
ax.set_ylabel("True Labels", fontsize=10)
ax.set_title(f"Confusion Matrix for {name}", fontsize=12, fontweight='bold')
plt.savefig('confusionMatrix.png', bbox_inches='tight')
plt.tight_layout()
plt.show()

modelSavePath = 'resnet18_iroads_model.pth'
torch.save(model.state_dict(), modelSavePath)
print(f"\nModel saved at: {modelSavePath}")

In [None]:
def extractFeatures(model, dataloader): #feature extraction for clustering
    model.eval()
    features = []
    labels = []
    with torch.no_grad():
        for inputs, targets in tqdm(dataloader, desc="Extracting Features", unit="batch"):
            inputs = inputs.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) #extract features from model before FCL
            feature_vector = model.conv1(inputs)  #convolution layer
            feature_vector = model.bn1(feature_vector)  #normalization
            feature_vector = model.relu(feature_vector)  #ReLU activation
            feature_vector = model.maxpool(feature_vector)  

            feature_vector = model.layer1(feature_vector)
            feature_vector = model.layer2(feature_vector)
            feature_vector = model.layer3(feature_vector)
            feature_vector = model.layer4(feature_vector)

            feature_vector = feature_vector.mean(dim=[2, 3])  #Global average pooling
            features.append(feature_vector.cpu().numpy())  
            labels.extend(targets.cpu().numpy())  #Collect corresponding labels
    return np.concatenate(features), np.array(labels)

trainFeatures, trainLabels = extract_features(model, trainLoader) #feature extraction from training data

kmeans = KMeans(n_clusters=len(categories), random_state=42) #k-means application to extracted features
kmeans.fit(trainFeatures)

DBindex = davies_bouldin_score(trainFeatures, kmeans.labels_) #DBI evalulation 
print(f"Davies-Bouldin Index: {DBindex:.4f}")


In [None]:
pca = PCA(n_components=2) #PCA dimensionality reduction and visualisation
reducedFeatures = pca.fit_transform(trainFeatures)

sns.set(style='whitegrid')
plt.figure(figsize=(10, 8))
scatter = sns.scatterplot(x=reduced_features[:, 0], y=reduced_features[:, 1], 
                          hue=kmeans.labels_, palette="Set1", s=50, edgecolor="black")
plt.title("K-Means Clustering (2D PCA Projection)", fontsize=16, fontweight='bold')
plt.xlabel("Principal Component 1", fontsize=10)
plt.ylabel("Principal Component 2", fontsize=10)
plt.legend(title='Cluster', title_fontsize='11', fontsize='9', loc='upper right')
plt.tight_layout()
plt.savefig('K-Means Clustering (2D PCA Projection).png', dpi=300)
plt.show()

def purity_score(y_true, y_pred): #clustering purity score
    contingency_matrix = np.histogram2d(y_true, y_pred, bins=(len(np.unique(y_true)), len(np.unique(y_pred))))[0]
    return np.sum(np.amax(contingency_matrix, axis=0)) / np.sum(contingency_matrix)

purity = purity_score(train_labels, kmeans.labels_)
print(f"Purity Score: {purity:.4f}")