In [None]:
#TODO -- what can I modify -- parameters like different loss function, optimizer, epoch, etc??
# can also do another pretrained CNN or my own tiny CNN??

#Setup and Imports

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import cv2
import tarfile
import os
import numpy as np
import pandas as pd
import pickle
from tqdm import tqdm #progress bars
import torch

#sampling
from collections import defaultdict # can group class without checking if key already exists
import random

#normalize and resize
import torchvision.transforms as transforms

#dataset
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from PIL import Image
from torch.utils.data import DataLoader
from pycocotools.coco import COCO
from collections import Counter

#pretrained CNN
import torchvision.models as models
import torch.nn as nn

#training
from sklearn.metrics import f1_score
import torch.optim as optim

#plot
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

<torch._C.Generator at 0x7c44a32524f0>

#Load iCub

In [None]:
def load_icub_data(dataset_path, max_per_class=500):
    labels, image_paths = [], []

    # Get all 'part*' folders (e.g., part1, part2, ...)
    part_folders = [f for f in os.listdir(dataset_path) if f.startswith('part')]

    for part in part_folders:
        part_path = os.path.join(dataset_path, part)

        # Loop over class folders (e.g., book, pencilcase, etc.)
        for class_name in os.listdir(part_path):
            class_path = os.path.join(part_path, class_name)
            if not os.path.isdir(class_path):
                continue

            # Loop over object instances (e.g., book1, book2, ...)
            for instance_name in os.listdir(class_path):
                instance_path = os.path.join(class_path, instance_name)

                if not os.path.isdir(instance_path):
                    continue

                # Only use 'MIX' transformation
                mix_path = os.path.join(instance_path, 'MIX')
                if not os.path.isdir(mix_path):
                    continue

                # Loop over days (e.g., day5, day6, ...)
                for day_name in os.listdir(mix_path):
                    day_path = os.path.join(mix_path, day_name)
                    if not os.path.isdir(day_path):
                        continue

                    # Use only left camera images
                    left_path = os.path.join(day_path, 'left')
                    if not os.path.isdir(left_path):
                        continue

                    # Loop through image files
                    for file in os.listdir(left_path):
                        if not file.lower().endswith(('.jpg', '.jpeg', '.png')):
                            continue

                        #now record label and image path for each image
                        img_path = os.path.join(left_path, file)
                        labels.append(class_name)  # use high-level class name
                        image_paths.append(img_path)
    return sample_dataset(image_paths, labels, max_per_class)


In [None]:
def sample_dataset(image_paths, labels, max_per_class=500, seed=42):
    # Group image paths by class
    class_to_images = defaultdict(list)
    for path, label in zip(image_paths, labels):
        class_to_images[label].append(path)

    # Set seed for reproducibility
    random.seed(seed)

    # Subsample each class
    sampled_image_paths = []
    sampled_labels = []

    for label, paths in class_to_images.items():
        random.shuffle(paths)
        selected = paths[:max_per_class]
        sampled_image_paths.extend(selected)
        sampled_labels.extend([label] * len(selected))

    # shuffle data -- avoid order bias
    combined = list(zip(sampled_image_paths, sampled_labels))
    random.shuffle(combined)
    sampled_image_paths, sampled_labels = zip(*combined)

    return list(sampled_image_paths), list(sampled_labels)


#Load COCO

In [None]:

#note: labels must be from 0-89 for loss function
def build_image_label_map(coco, top_k=20, target_count=8000):
    img_ids = coco.getImgIds()
    temp_labels = {}
    cat_count = Counter()

    # most common object per image - no filtering yet
    for img_id in img_ids:
        anns = coco.loadAnns(coco.getAnnIds(imgIds=img_id))
        if not anns:
            continue
        cat_ids = [ann['category_id'] for ann in anns]
        most_common_cat = Counter(cat_ids).most_common(1)[0][0]
        fname = coco.loadImgs(img_id)[0]['file_name']
        temp_labels[fname] = most_common_cat
        cat_count[most_common_cat] += 1

    #get only the top k categories
    top_cats = set([cat for cat, _ in cat_count.most_common(top_k)])
    label_to_index = {cat_id: idx for idx, cat_id in enumerate(sorted(top_cats))}

    # get images from top_k categories only until target_count is reached
    image_label_map = {}
    class_image_counts = defaultdict(int)

    for fname, cat_id in temp_labels.items():
        if cat_id in top_cats:
            image_label_map[fname] = label_to_index[cat_id]
            class_image_counts[cat_id] += 1
            if len(image_label_map) >= target_count:
                break

    return image_label_map, label_to_index


In [None]:
def paths_and_labels(image_label_map, img_dir):
    paths = []
    labels = []
    for fname, label in image_label_map.items():
        path = os.path.join(img_dir, fname)
        if os.path.exists(path):
            paths.append(path)
            labels.append(label)
    return paths, labels

#Data preprocessing

In [None]:
#make dataset class to feed data into pytorch
class ImageDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert("RGB")
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

#Load pretrained model

In [None]:
def build_model(num_classes, dropout=.5):
    #load in ResNet-18
    model = models.resnet18(pretrained=True)

    # diable gradient updates for weights -- only want train the classifier part at the nd
    for param in model.parameters():
        param.requires_grad = False

    #replace the classifier layer (fully connected linear layer), add dropout
    model.fc = nn.Sequential(
        nn.Dropout(dropout),
        nn.Linear(model.fc.in_features, num_classes)
    )

    for name, param in model.named_parameters():
        if "layer4" in name or "fc" in name:
            param.requires_grad = True

    return model.to(device)

# Training

In [None]:
def train_model(model, train_loader, val_loader, optimizer, scheduler, criterion, num_epochs=10, dropout=None, lr=None, optimizer_name=None, augment=None):
    #track best f1 score
    best_f1 = 0.0
    #starts new epoch
    for epoch in range(num_epochs):
        model.train() #TRAINING MODE
        running_loss, correct, total = 0.0, 0, 0
        #loop over batches
        for images, labels in tqdm(train_loader,
                               desc=f"Epoch {epoch+1}/{num_epochs}",
                               leave=False):
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad() #reset gradient from last batch
            outputs = model(images) #forward pass
            loss = criterion(outputs, labels)
            loss.backward()#backward pass
            optimizer.step() #update weights

            running_loss += loss.item() #get total loss over all batches
            _, predicted = outputs.max(1) #get predicted class from softmax
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item() #get how many predicts are correct

        #decide learning rate
        scheduler.step()
        #evaluate how model is learning
        val_acc, val_f1, _, _ = evaluate_model(model, val_loader)
        if val_f1 > best_f1:
            best_f1 = val_f1
            #save the best model for later
            torch.save(model.state_dict(), f"best_model_dropout{dropout}_lr{lr}_{optimizer_name}_aug{augment}.pth")
            print(f"new best model with F1 = {val_f1:.4f}")
        print(f"Epoch {epoch+1}/{num_epochs}, Train Acc: {100*correct/total:.2f}%, Val F1: {val_f1:.4f}")
    return best_f1

In [None]:
def evaluate_model(model, data_loader):
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad(): #disable gradient tracking
        #load through validation dataset
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)
            #forward pass
            outputs = model(images)
            #compute validation accuracy
            _, preds = outputs.max(1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    acc = np.mean(np.array(all_preds) == np.array(all_labels)) * 100
    f1 = f1_score(all_labels, all_preds, average='macro')
    return acc, f1, all_labels, all_preds

In [None]:
##if the dataset is icub only, then take the best model and test it against the test set
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import torchvision.transforms as transforms

def evaluate_best_icub_model(test_paths, test_labels, best_config,
                             batch_size=32, num_workers=4):
    # 1) Validation‐style transforms (no randomness!)
    test_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    ])

    # 2) Build Dataset & DataLoader
    test_dataset = ImageDataset(test_paths, test_labels, transform=test_transform)
    test_loader  = DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True
    )

    # 3) Rebuild model
    num_classes = len(set(test_labels))
    model = build_model(num_classes, dropout=best_config['dropout'])

    # 4) Load checkpoint
    ckpt_filename = (
        f"best_model_dropout{best_config['dropout']}"
        f"_lr{best_config['lr']}_{best_config['optimizer']}"
        f"_aug{best_config.get('augment', False)}.pth"
    )
    state = torch.load(ckpt_filename, map_location=device)
    model.load_state_dict(state)
    model.to(device)
    model.eval()

    # 5) Criterion & evaluation
    criterion = nn.CrossEntropyLoss()
    test_acc, test_f1, all_labels, all_preds = evaluate_model(model, test_loader)
    cm = confusion_matrix(all_labels, all_preds)

    return test_acc, test_f1, cm

#Run experiment -- with different settings (dropout, learning rate, optimizer, augemented data, number of epochs)

In [None]:
from torchvision.datasets import ImageFolder
def run_experiment(dropout, lr, optimizer_name, augment=False, num_epochs=10):
    if augment:
        train_transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.RandomHorizontalFlip(),
            transforms.ColorJitter(0.2, 0.2, 0.2),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
        #validation set should not be augmented
        val_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
    else:
        #resize and normalize data (basically to match pretrained model)----
        #must resize data to (224, 224) bc pretrained CNNS were trained on ImageNet (where all images were 224x224 pixel)
        #normalize to speed up convergence during training
        #use mean and std from ImageNet to match what network was trained on
        train_transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
        #validation and training set need to have the same transformation
        val_transform = train_transform

    train_dataset = ImageDataset(train_paths, train_labels, train_transform)
    val_dataset = ImageDataset(val_paths, val_labels, val_transform)

    #create dataloaders -- helps feed batches into model
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4, pin_memory=True)

    model = build_model(num_classes=len(label_encoder.classes_), dropout=dropout)

    #experiment with different optimizers
    trainable_params = filter(lambda p: p.requires_grad, model.parameters())

    if optimizer_name.lower() == 'adam':
        optimizer = optim.Adam(trainable_params, lr=lr)
    elif optimizer_name.lower() == 'sgd':
        optimizer = optim.SGD(trainable_params, lr=lr, momentum=0.9)
    else:
        raise ValueError("Unsupported optimizer")

    #experiment with different learning rates
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)
    #use cross entropy loss function -- why?
    criterion = nn.CrossEntropyLoss()

    best_val_f1 = train_model(
        model, train_loader, val_loader, optimizer, scheduler, criterion, num_epochs,
        dropout=dropout, lr=lr, optimizer_name=optimizer_name, augment=augment
    )

    return {
        "dropout": dropout,
        "lr": lr,
        "optimizer": optimizer_name,
        "augment": augment,
        "val_f1": best_val_f1,
        "model": model,
        "criterion": criterion

    }


#Run all experiments (24)

In [None]:
# - Dropout rate: [0.0, 0.3, 0.5] → to see how regularization affects overfitting
# - Learning rate: [1e-3, 1e-4] → to compare fast vs slow learning
# - Optimizer: ['adam', 'sgd'] → to test adaptive vs momentum-based optimization
# - Data augmentation: [False, True] → to see if transforms help generalization
# In total: 3 x 2 x 2 x 2 = 24 experiments

results = []
##CHANGE THIS
data = 'icub'

if data == 'icub':
    dataset_path = "/content/drive/MyDrive/U of Manchester/Robotics Assignment/data/icubworld_data"
    #get the data
    sampled_image_paths, sampled_labels = load_icub_data(dataset_path, max_per_class=500)
    label_encoder = LabelEncoder()
    encoded_labels = label_encoder.fit_transform(sampled_labels)

    #split into training, validation, testing
    trainval_paths, test_paths, trainval_labels, test_labels = train_test_split(
        sampled_image_paths, encoded_labels, test_size=0.1, stratify=encoded_labels, random_state=42)
    train_paths, val_paths, train_labels, val_labels = train_test_split(
        trainval_paths, trainval_labels, test_size=0.2, stratify=trainval_labels, random_state=42)

elif data == 'coco':
    annotation_path_val = '/content/drive/MyDrive/U of Manchester/Robotics Assignment/data/coco/annotations/instances_val2017.json'
    annotation_path_train = '/content/drive/MyDrive/U of Manchester/Robotics Assignment/data/coco/annotations/instances_train2017.json'
    coco_val = COCO(annotation_path_val)
    coco_train = COCO(annotation_path_train)


    #coco only has train and val
    train_image_label_map, train_label_to_index = build_image_label_map(coco_train, target_count=8000)
    val_image_label_map,   val_label_to_index   = build_image_label_map(coco_val, target_count=2000)

    all_labels = set(train_image_label_map.values()) | set(val_image_label_map.values())
    label_encoder = LabelEncoder()
    label_encoder.fit(list(sorted(all_labels)))

    local_train_path = "/content/train2017"
    local_val_path   = "/content/val2017"

    #unzip
    if not os.path.exists(local_train_path):
        print("Unzipping training images...")
        !unzip -q "/content/drive/MyDrive/U of Manchester/Robotics Assignment/data/train2017.zip" -d /content/
    if not os.path.exists(local_val_path):
        print("Unzipping validation images...")
        !unzip -q "/content/drive/MyDrive/U of Manchester/Robotics Assignment/data/val2017.zip" -d /content/

    train_paths, train_labels = paths_and_labels(train_image_label_map, local_train_path)
    val_paths, val_labels     = paths_and_labels(val_image_label_map, local_val_path)

    # encode them
    train_labels = label_encoder.transform(train_labels)
    val_labels  = label_encoder.transform(val_labels)

#debug
print("Train images:", len(train_paths))
print("Val images:  ", len(val_paths))
if data == 'icub':
    print("Test images: ", len(test_paths))
#RUN the experiments
for dropout in [0.0, 0.3, 0.5]:
    for lr in [1e-3, 1e-4]:
        for opt in ['adam', 'sgd']:
            for augment in [False, True]:
                print(f"Running: dropout={dropout}, lr={lr}, optimizer={opt}, augment={augment}")
                result = run_experiment(dropout, lr, opt, augment)
                results.append(result)
#save
df = pd.DataFrame(results)
df.to_csv("experiment_results.csv", index=False)
df


['/content/drive/MyDrive/U of Manchester/Robotics Assignment/data/icubworld_data/part2/hairbrush/hairbrush6/MIX/day6/left/00007137.jpg', '/content/drive/MyDrive/U of Manchester/Robotics Assignment/data/icubworld_data/part2/hairclip/hairclip1/MIX/day8/left/00000772.jpg', '/content/drive/MyDrive/U of Manchester/Robotics Assignment/data/icubworld_data/part1/mouse/mouse4/MIX/day8/left/00004080.jpg', '/content/drive/MyDrive/U of Manchester/Robotics Assignment/data/icubworld_data/part3/soapdispenser/soapdispenser3/MIX/day4/left/00002609.jpg', '/content/drive/MyDrive/U of Manchester/Robotics Assignment/data/icubworld_data/part1/pencilcase/pencilcase7/MIX/day5/left/00001671.jpg']
[ 5  6  7 14 10]

['/content/drive/MyDrive/U of Manchester/Robotics Assignment/data/icubworld_data/part4/squeezer/squeezer9/MIX/day1/left/00006051.jpg', '/content/drive/MyDrive/U of Manchester/Robotics Assignment/data/icubworld_data/part1/mouse/mouse6/MIX/day7/left/00001940.jpg', '/content/drive/MyDrive/U of Mancheste

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 235MB/s]


new best model with F1 = 0.8644
Epoch 1/10, Train Acc: 73.26%, Val F1: 0.8644




new best model with F1 = 0.8672
Epoch 2/10, Train Acc: 92.89%, Val F1: 0.8672




new best model with F1 = 0.8985
Epoch 3/10, Train Acc: 95.99%, Val F1: 0.8985




new best model with F1 = 0.9233
Epoch 4/10, Train Acc: 97.86%, Val F1: 0.9233




Epoch 5/10, Train Acc: 98.49%, Val F1: 0.9187




new best model with F1 = 0.9495
Epoch 6/10, Train Acc: 99.46%, Val F1: 0.9495




new best model with F1 = 0.9549
Epoch 7/10, Train Acc: 99.90%, Val F1: 0.9549




Epoch 8/10, Train Acc: 99.93%, Val F1: 0.9544




Epoch 9/10, Train Acc: 99.96%, Val F1: 0.9538




new best model with F1 = 0.9576
Epoch 10/10, Train Acc: 99.99%, Val F1: 0.9576




Micro-averaged Precision: 0.9577777777777777
Micro-averaged Recall:    0.9577777777777777
Macro-averaged Precision: 0.9581798674939552
Macro-averaged Recall:    0.9577777777777776


In [None]:
#read in file if not already done
#df = pd.read_csv("experiment_results.csv")

#get the best model-- test it
# best_config = df.sort_values(by="val_f1", ascending=False).iloc[0]
# print("Best configuration:\n", best_config)

# print(best_config)


In [None]:
# --- Usage example, right after your grid-run ---
best_idx    = df['val_f1'].idxmax()
best_config = df.loc[best_idx]

# test_acc, test_f1, cm = evaluate_best_icub_model(
#     test_paths, test_labels, best_config,
#     batch_size=best_config.get('batch_size', 32),
#     num_workers=4
# )

test_acc, test_f1, cm = evaluate_best_icub_model(
    val_paths, val_labels, best_config,
    batch_size=best_config.get('batch_size', 32),
    num_workers=4
)

print(f"Best test accuracy: {test_acc:.2f}%")
print(f"Best test   F1  : {test_f1:.4f}")


disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(xticks_rotation='vertical')
plt.title("Confusion Matrix")
plt.show()

In [None]:
import seaborn as sns

cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

plt.figure(figsize=(12, 10))
sns.heatmap(cm_normalized, cmap='viridis', xticklabels=False, yticklabels=False, cbar=True)
plt.title('Normalized Confusion Matrix')
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Extract from confusion matrix
true_positives = np.diag(cm)
support = cm.sum(axis=1)       # total actual instances per class
predicted = cm.sum(axis=0)     # total predicted instances per class

# Safe division: avoids division by zero
recall = np.divide(true_positives, support, out=np.zeros_like(true_positives, dtype=float), where=support != 0)
precision = np.divide(true_positives, predicted, out=np.zeros_like(true_positives, dtype=float), where=predicted != 0)
accuracy_per_class = recall  # same as per-class accuracy in single-label settings

# Print summary
print("Mean per-class accuracy:", np.mean(accuracy_per_class))
print("Mean precision:", np.mean(precision))
print("Mean recall:", np.mean(recall))

# Plot per-class accuracy
plt.figure(figsize=(12, 4))
plt.plot(accuracy_per_class, label='Per-class Accuracy')
plt.xlabel('Class Index')
plt.ylabel('Accuracy')
plt.title('Per-class Accuracy from Confusion Matrix')
plt.grid(True)
plt.legend()
plt.show()



In [None]:
# Micro-averaged: global TP, FP, FN
import numpy as np
true_positives = np.diag(cm).sum()
total_predicted = cm.sum(axis=0).sum()   # total predicted = total predictions
total_actual = cm.sum(axis=1).sum()      # total actual = total ground truth labels

precision_micro = true_positives / cm.sum(axis=0).sum()
recall_micro = true_positives / cm.sum(axis=1).sum()

print("Micro-averaged Precision:", precision_micro)
print("Micro-averaged Recall:   ", recall_micro)


In [None]:
import numpy as np
true_positives = np.diag(cm).sum()
total_predicted = cm.sum(axis=0).sum()   # total predicted = total predictions
total_actual = cm.sum(axis=1).sum()      # total actual = total ground truth labels

precision_micro = true_positives / cm.sum(axis=0).sum()
recall_micro = true_positives / cm.sum(axis=1).sum()

print("Micro-averaged Precision:", precision_micro)
print("Micro-averaged Recall:   ", recall_micro)

tp_per_class = np.diag(cm)
support = cm.sum(axis=1)       # ground truth count per class
predicted = cm.sum(axis=0)     # predicted count per class

precision_per_class = np.divide(tp_per_class, predicted, out=np.zeros_like(tp_per_class, dtype=float), where=predicted != 0)
recall_per_class = np.divide(tp_per_class, support, out=np.zeros_like(tp_per_class, dtype=float), where=support != 0)

precision_macro = np.mean(precision_per_class)
recall_macro = np.mean(recall_per_class)

print("Macro-averaged Precision:", precision_macro)
print("Macro-averaged Recall:   ", recall_macro)
