In [None]:
import os
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transform
import torchmetrics
import random
import matplotlib
import matplotlib.pyplot as matplot

from torchvision.datasets import ImageFolder, DatasetFolder
from torch.utils.data import DataLoader, Subset, random_split, ConcatDataset
from torchvision.utils import make_grid
from torchvision.transforms import ToTensor, Normalize, Compose
from torchmetrics import F1Score, Accuracy
from PIL import Image

from sklearn.model_selection import KFold

from sklearn.metrics import confusion_matrix, accuracy_score
import seaborn as sn
import pandas as pd
import numpy as np

%matplotlib inline

In [None]:
project_name='rock-paper-scissors'

In [None]:
# Data directory
data_dir = './dataset'
print(os.listdir(data_dir))

# Classes
# Place all the images according to there class in 3 different folders (Rock, Paper, Scissors)
classes = os.listdir(data_dir)
print(classes)

In [None]:
stats=((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))

# Transform for training dataset
train_transform = transform.Compose([
                         transform.Resize((32,32)), 
                         transform.RandomHorizontalFlip(), 
                         transform.ToTensor(), 
                         transform.Normalize(*stats,inplace=True)])

# Transform for validation dataset
valid_transform = transform.Compose([transform.Resize((32,32)),transform.ToTensor(), transform.Normalize(*stats)])

In [None]:
# Complete data (Includes images from four datasets)
rock_paper_scissors_data = ImageFolder(data_dir, train_transform)

In [None]:
# Diving the dataset in the ratio of 85% and 15%
# Training dataset - 85%
# Validation dataset - 15%
train_size = int(0.85 * len(rock_paper_scissors_data))
val_size = len(rock_paper_scissors_data) - train_size
train_dataset, val_dataset = random_split(rock_paper_scissors_data, [train_size, val_size])

In [None]:
# Batch Size
batch_size = 64

# Number of classes (3)
# Rock, Paper, Scissors
num_of_classes = len(classes)

In [None]:
# Custom validation dataset from university
# **Remove custom_val_dataset if you are not using any custom dataset**
custom_val_dataset = ImageFolder("./custom-dataset/validation", valid_transform)

# Merging the custom validation dataset with the validation dataset
complete_val_dataset = ConcatDataset([val_dataset,custom_val_dataset])

In [None]:
# Custom test dataset from university
test_dataset = ImageFolder("./custom-dataset/test", valid_transform)

In [None]:
# Dataloaders for train, test & validation
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=3, pin_memory=True)
val_dataloader = DataLoader(complete_val_dataset, batch_size=batch_size, num_workers=3, pin_memory=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, num_workers=3, pin_memory=True)

In [None]:
# Function to show images
def show_images(dataloader):
    for images, labels in dataloader:
        fig, ax = matplot.subplots(figsize=(12, 12))
        ax.set_xticks([]); ax.set_yticks([])
        ax.imshow(make_grid(images[:64], nrow=8).permute(1, 2, 0))
        break


In [None]:
show_images(train_dataloader)

In [None]:
# Function to get the device GPU or CPU
# Install Cuda if you want use GPU

def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')

# Function to move the data on to the device
def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)

In [None]:
device = get_default_device()
device

In [None]:
# Resnet9 model

def conv_block(in_channels, out_channels, pool=False):
    layers = [nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1), 
              nn.BatchNorm2d(out_channels), 
              nn.ReLU(inplace=True)]
    if pool: layers.append(nn.MaxPool2d(2))
    return nn.Sequential(*layers)

class ResNet9(nn.Module):
    def __init__(self, num_classes=3):
        super().__init__()
        
        self.conv1 = conv_block(3, 64)
        self.conv2 = conv_block(64, 128, pool=True)
        self.res1 = nn.Sequential(conv_block(128, 128), conv_block(128, 128))
        
        self.conv3 = conv_block(128, 256, pool=True)
        self.conv4 = conv_block(256, 512, pool=True)
        self.res2 = nn.Sequential(conv_block(512, 512), conv_block(512, 512))
        
        self.classifier = nn.Sequential(nn.MaxPool2d(4), 
                                        nn.Flatten(), 
                                        nn.Dropout(0.2),
                                        nn.Linear(512, num_classes))
        
    def forward(self, xb):
        out = self.conv1(xb)
        out = self.conv2(out)
        out = self.res1(out)
        out = self.conv3(out)
        out = self.conv4(out)
        out = self.res2(out)
        out = self.classifier(out)

        return out

In [None]:
# k = Number of folds
k = 10
number_of_epochs = 8
learning_rate = 0.001
weight_decay = 1e-4

In [None]:
# F-1 Score
def get_f1score(preds, labels):
    f1_score = F1Score(task="multiclass", num_classes=num_of_classes, average='weighted')
    return f1_score(preds, labels)

In [None]:
# Accuracy
def get_accuracy(preds, labels):
    accuracy = Accuracy(task="multiclass", num_classes=num_of_classes).to(device)
    return accuracy(preds, labels)

In [None]:
# Training function
def train(model, train_loader, criterion, optimizer):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)
    epoch_loss = running_loss / len(train_loader.dataset)
    return epoch_loss

In [None]:
# Evaluate model
def evaluate(model, val_loader):
    model.eval()
    all_labels = []
    all_preds = []
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_labels.append(labels.cpu())
            all_preds.append(preds.cpu())
    all_labels = torch.cat(all_labels)
    all_preds = torch.cat(all_preds)
    return all_preds, all_labels

In [None]:
# Without K-fold cross-validation
    
# Model
model = ResNet9()
model.to(device)
    
# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
    
# To store training losses and accuracies
train_losses = []
train_accuracies = []
    
# To store validation losses and accuracies
val_losses = []
val_accuracies = []
    
# Train and validate
for epoch in range(number_of_epochs):
    train_loss = train(model, train_dataloader, criterion, optimizer)
    val_loss = train(model,val_dataloader,criterion,optimizer)
        
    train_losses.append(train_loss)
    val_losses.append(val_loss)
        
    # Training Accuracy
    preds, labels = evaluate(model, train_dataloader)
    train_accuracy = get_accuracy(preds, labels)

    # Validation Accuracy
    preds, labels = evaluate(model, val_dataloader)
    val_accuracy = get_accuracy(preds, labels)

    train_accuracies.append(train_accuracy)
    val_accuracies.append(val_accuracy)
        
    print(f'Epoch {epoch + 1} - Train loss: {train_loss:.4f} - Train Accuracy: {train_accuracy:.4f} - Val loss: {val_loss:.4f} - Val accuracy: {val_accuracy:.4f}')
    
# Saving the Current Fold Model
torch.save(model, f'withoutKFold.pth')

In [None]:
# K-fold cross-validation
kf = KFold(n_splits=k, shuffle=True)

# Train and validate for each fold
for fold, (train_indices, val_indices) in enumerate(kf.split(rock_paper_scissors_data)):
    print(f'Fold {fold + 1}/{k}')
    
    # Create subset datasets and dataloaders for this fold
    train_subset = Subset(rock_paper_scissors_data, train_indices)
    val_subset = Subset(rock_paper_scissors_data, val_indices) + custom_val_dataset
    train_loader = DataLoader(train_subset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_subset, batch_size=64, shuffle=False)
    
    # Model
    model = ResNet9()
    model.to(device)
    
    # Define the loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
    
    # To store training losses and accuracies
    train_losses = []
    train_accuracies = []
        
    # To store validation losses and accuracies
    val_losses = []
    val_accuracies = []
    
    # Train and validate for this fold
    for epoch in range(number_of_epochs):
        train_loss = train(model, train_loader, criterion, optimizer)
        val_loss = train(model,val_loader,criterion,optimizer)
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        
        # Training Accuracy
        preds, labels = evaluate(model, train_loader)
        train_accuracy = get_accuracy(preds, labels)

        # Validation Accuracy
        preds, labels = evaluate(model, val_loader)
        val_accuracy = get_accuracy(preds, labels)

        train_accuracies.append(train_accuracy)
        val_accuracies.append(val_accuracy)
        
        print(f'Epoch {epoch + 1} - Train loss: {train_loss:.4f} - Train Accuracy: {train_accuracy:.4f} - Val loss: {val_loss:.4f} - Val accuracy: {val_accuracy:.4f}')
    
    # Saving the Current Fold Model
    torch.save(model, f'fold{fold}.pth')

In [None]:
# Function to plot the losses
def plot_losses():
    matplot.plot(train_losses, '-bx')
    matplot.plot(val_losses, '-rx')
    matplot.xlabel('epoch')
    matplot.ylabel('loss')
    matplot.legend(['Training', 'Validation'])
    matplot.title('Loss vs. No. of epochs');

In [None]:
plot_losses()

In [None]:
# Testing the model
test_model = ResNet9()
PATH = './model/10Fold-40Epochs-Dropout0.2/fold2.pth'
test_model = torch.load(PATH)
test_model.to(device)

In [None]:
# Evaluating Test Data
preds, labels = evaluate(test_model, test_dataloader)

# Accuracy
accuracy = get_accuracy(preds, labels)
accuracy

In [None]:
# F-1 SCORE
f1_score = get_f1score(preds, labels)
f1_score

In [None]:
# Confusion Matrix
confusion_matrix = np.zeros((num_of_classes, num_of_classes))
with torch.no_grad():
    for i, (inputs, labels) in enumerate(test_dataloader):
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = test_model(inputs)
        _, preds = torch.max(outputs, 1)
        for t, p in zip(labels.view(-1), preds.view(-1)):
                confusion_matrix[t.long(), p.long()] += 1



In [None]:
matplot.figure(figsize=(12,7))
df_cm = pd.DataFrame(confusion_matrix, index=classes, columns=classes)
heatmap = sn.heatmap(df_cm, annot=True)