In [1]:
import os
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.utils import save_image
from sklearn.model_selection import train_test_split
from livelossplot import PlotLosses
import random
import itertools
from PIL import Image, UnidentifiedImageError
from sklearn.metrics import precision_score
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data.dataloader import default_collate






# Import torch.nn.functional separately
import torch.nn.functional as F




def set_device(device="cpu", idx=0):
    if device != "cpu":
        if torch.cuda.device_count() > idx and torch.cuda.is_available():
            print("Cuda installed! Running on GPU {} {}!".format(idx, torch.cuda.get_device_name(idx)))
            device="cuda:{}".format(idx)
        elif torch.cuda.device_count() > 0 and torch.cuda.is_available():
            print("Cuda installed but only {} GPU(s) available! Running on GPU 0 {}!".format(torch.cuda.device_count(), torch.cuda.get_device_name()))
            device="cuda:0"
        else:
            device="cpu"
            print("No GPU available! Running on CPU")
    return device

device = set_device("cuda")

def set_seed(seed):
    """
    Use this to set ALL the random seeds to a fixed value and take out any randomness from cuda kernels
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

    torch.backends.cudnn.benchmark = False  ##uses the inbuilt cudnn auto-tuner to find the fastest convolution algorithms. -
    torch.backends.cudnn.enabled   = False

    return True

set_seed(42)

Cuda installed! Running on GPU 0 Tesla V100-SXM2-16GB!


True

In [2]:
# Load the Drive helper and mount
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
!pip install livelossplot



In [4]:
class CustomHandDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        try:
            image = Image.open(img_path).convert('L')  # Convert to grayscale
            if self.transform:
                image = self.transform(image)
        except UnidentifiedImageError:
            # print(f"UnidentifiedImageError: cannot identify image file {img_path}. It will be skipped.")
            return None, None
        return image, label


def custom_collate_fn(batch):
    batch = [x for x in batch if x[0] is not None]
    return default_collate(batch)

In [5]:

# Define your transformations
train_transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),  # Convert to grayscale
    transforms.Resize((32, 32)),  # Resize the images to a consistent size as some images are larger than others
    transforms.RandomRotation(degrees=10),  # Rotate image by up to 10 degrees
    transforms.RandomHorizontalFlip(),      # Flip image horizontally
    transforms.ToTensor(),  # Convert to tensor
    transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize to the range [-1, 1]
])


test_transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),  # Convert to grayscale
    transforms.Resize((32, 32)),  # Resize the images to a consistent size as some images are larger than others
    transforms.ToTensor(),  # Convert to tensor
    transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize to the range [-1, 1]
])


directories = ['drive/MyDrive/Coursework2/real_hands', 'drive/MyDrive/Coursework2/VAE_hands', 'drive/MyDrive/Coursework2/GAN_hands']
image_paths = []
labels = []

# Assign labels to images: 0 for real hands, 1 for VAE hands, 2 for GAN hands
for i, directory in enumerate(directories):
    dir_path = directory  # Use the directory name directly if it's in the root directory
    images = [os.path.join(dir_path, img) for img in os.listdir(dir_path) if img.endswith('.jpeg')]
    image_paths.extend(images)
    labels.extend([i] * len(images))

# Split the paths and labels into train, validation, and test sets
train_paths, test_paths, train_labels, test_labels = train_test_split(image_paths, labels, test_size=0.2, random_state=42)
train_paths, val_paths, train_labels, val_labels = train_test_split(train_paths, train_labels, test_size=0.2, random_state=42) # 0.2 x 0.8 = 0.16



train_dataset = CustomHandDataset(train_paths, train_labels, transform=train_transform)
val_dataset = CustomHandDataset(val_paths, val_labels, transform=test_transform)
test_dataset = CustomHandDataset(test_paths, test_labels, transform=test_transform)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=custom_collate_fn)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, collate_fn=custom_collate_fn)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, collate_fn=custom_collate_fn)

Upto now I have copied the code but here is where i changed my class to be more suitable for hyperparamter tuning. I did this because it will make the main notebook less complicated instead of having all the hyperparameters become arguments etc.

In [6]:
class CustomCNN(nn.Module):
    def __init__(self, dropout_rate=0.3, kernel_size=3):
        super(CustomCNN, self).__init__()
        # Convolutional layers
        self.conv1 = nn.Conv2d(1, 16, kernel_size=kernel_size, padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv2 = nn.Conv2d(16, 32, kernel_size=kernel_size, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv3 = nn.Conv2d(32, 64, kernel_size=kernel_size, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)

        # New convolutional layer
        self.conv4 = nn.Conv2d(64, 128, kernel_size=kernel_size, padding=1)
        self.bn4 = nn.BatchNorm2d(128)
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)

        # Global Average Pooling
        self.gap = nn.AdaptiveAvgPool2d((1, 1))

        # Fully connected layers
        self.fc1 = nn.Linear(128, 256)
        self.fc2 = nn.Linear(256, 3)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool1(x)
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool2(x)
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.pool3(x)

        x = F.relu(self.bn4(self.conv4(x)))
        x = self.pool4(x)

        x = self.gap(x)
        x = x.view(-1, 128)

        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)

        return x


In [7]:
def train(model, device, train_loader, optimizer, criterion , epoch):
    model.train()
    train_loss = 0
    for (data, target) in train_loader:
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        train_loss += loss.item()
        optimizer.step()

    train_loss /= len(train_loader)
    print(f'Epoch: {epoch:02d}, Train Loss: {train_loss:.4f}')
    return train_loss

def validate(model, device, test_loader, criterion):
    model.eval()
    test_loss = 0
    correct = 0

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()  # Sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # Get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader)
    accuracy = 100. * correct / len(test_loader.dataset)

    print(f'Validation Loss: {test_loss:.4f}, Accuracy: {accuracy:.2f}%')
    return test_loss, accuracy

# Initialize the plot
liveloss = PlotLosses()

Ideally I would have a much better range of parameters and more types of parameters such as kernel size, learning schedule paramters etc

In [8]:
# Will use 25 epochs for grid search
num_epochs = 25

In [9]:
param_grid = {
    'dropout_rate': [0.3, 0.5, 0.7],
    'learning_rate': [0.001, 0.0005, 0.0001],
    'weight_decay': [0 ,1e-4, 1e-5, 1e-6, 1e-7],
    'scheduler_patience': [2, 4, 6],
}

In [10]:
class EarlyStopping:
    def __init__(self, patience=5, min_delta=0): # pa
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = None
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif self.best_loss - val_loss > self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

# Initialize early stopping
early_stopping = EarlyStopping(patience=4, min_delta=0.01)

best_val_loss = float('inf')
best_params = {}
criterion = nn.CrossEntropyLoss()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')



# Grid Search Loop
for dropout_rate, learning_rate, weight_decay, scheduler_patience in itertools.product(*param_grid.values()):
    # Initialize the model
    model = CustomCNN(dropout_rate=dropout_rate).to(device)  # Ensure CustomCNN can accept kernel_size if it's a parameter
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    scheduler = ReduceLROnPlateau(optimizer, 'min', patience=scheduler_patience, factor=0.7, min_lr=1e-7, verbose=True)

    early_stopping = EarlyStopping(patience=4, min_delta=0.01)
    best_val_loss = float('inf')

    for epoch in range(1, num_epochs + 1):
        train_loss = train(model, device, train_loader, optimizer, criterion, epoch)
        val_loss, _ = validate(model, device, val_loader, criterion)

        scheduler.step(val_loss)  # Update the scheduler

        if val_loss is not None:  # Ensure val_loss is not None
            early_stopping(val_loss)

            if early_stopping.early_stop:
                print(f"Early stopping triggered at epoch {epoch}!")
                break

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_params = {
                'dropout_rate': dropout_rate,
                'learning_rate': learning_rate,
                'weight_decay': weight_decay,
                'scheduler_patience': scheduler_patience,
            }
            best_model = model.state_dict()

print(f"Best Validation Loss: {best_val_loss}")
print(f"Best Hyperparameters: {best_params}")

Epoch: 01, Train Loss: 0.8670
Validation Loss: 1.4841, Accuracy: 31.03%
Epoch: 02, Train Loss: 0.6651
Validation Loss: 3.2369, Accuracy: 31.03%
Epoch: 03, Train Loss: 0.5789
Validation Loss: 1.4508, Accuracy: 44.40%
Epoch: 04, Train Loss: 0.5557
Validation Loss: 0.6031, Accuracy: 74.14%
Epoch: 05, Train Loss: 0.5333
Validation Loss: 0.6162, Accuracy: 75.00%
Epoch: 06, Train Loss: 0.4981
Validation Loss: 0.6129, Accuracy: 76.29%
Epoch: 07, Train Loss: 0.4987
Validation Loss: 0.8529, Accuracy: 59.91%
Epoch 00007: reducing learning rate of group 0 to 7.0000e-04.
Epoch: 08, Train Loss: 0.4368
Validation Loss: 0.5245, Accuracy: 79.31%
Epoch: 09, Train Loss: 0.4357
Validation Loss: 0.5145, Accuracy: 78.45%
Epoch: 10, Train Loss: 0.4301
Validation Loss: 0.6019, Accuracy: 76.29%
Epoch: 11, Train Loss: 0.4099
Validation Loss: 0.5002, Accuracy: 79.74%
Epoch: 12, Train Loss: 0.3835
Validation Loss: 0.4380, Accuracy: 82.76%
Epoch: 13, Train Loss: 0.3898
Validation Loss: 0.6021, Accuracy: 73.71%
Ep

- The best model showed a tendency towards higher dropout rates, indicating a need for regularization due to the complexity of the task.
- A lower learning rate and a small amount of weight decay seemed to balance the model's learning capacity and generalization.
- The scheduler's patience allowed for adequate reaction to plateaus in the learning process.