In [1]:
import numpy as np
import pandas as pd
import torch 
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from PIL import Image
import matplotlib.pyplot as plt

# My dataset loader and all my tested models including my final model

In [2]:
class CustomDataset(Dataset):
    def __init__(self, image_file, label_file=None, transform=None, train=True):
        self.images = pd.read_csv(image_file)
        self.labels = pd.read_csv(label_file) if label_file else None
        self.transform = transform
        self.train = train

    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        image = self.images.iloc[idx, :].values.astype(np.float32).reshape(28, 28)
        image = image / 255.0
        image = Image.fromarray(image)
        
        if self.transform:
            image = self.transform(image)

        if self.train and self.labels is not None:
            label = self.labels.iloc[idx].values[0]
            return image, label
        else:
            return image
    


# class FullyConnectedNN(nn.Module):
#     def __init__(self):
#         super(FullyConnectedNN, self).__init__()

#         self.classifier = nn.Sequential(
#             nn.Linear(784, 128),
#             nn.ReLU(),
#             nn.Linear(128, 64),
#             nn.ReLU(),
#             nn.Linear(64, 10),
#             nn.LogSoftmax(dim=1)
#         )
        
#     def forward(self, x):
#         x = x.view(-1, 28*28)
#         x = self.classifier(x)
#         return x
    


# class ConvolutionalNN(nn.Module):
#     def __init__(self):
#         super(ConvolutionalNN, self).__init__()

#         self.conv1 = nn.Sequential(
#             nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1),
#             nn.BatchNorm2d(32),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2)
#         )

#         self.conv2 = nn.Sequential(
#             nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
#             nn.BatchNorm2d(64),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2)
#         )

#         self.conv3 = nn.Sequential(
#             nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1),
#             nn.BatchNorm2d(128),
#             nn.ReLU()
#         )

#         self.classifier = nn.Sequential(
#             nn.Linear(128 * 7 * 7, 128),
#             nn.ReLU(),
#             nn.Dropout(0.4),
#             nn.Linear(128, 64),
#             nn.ReLU(),
#             nn.Linear(64, 10),
#             nn.LogSoftmax(dim=1)
#         )

#     def forward(self, x):
#         x = self.conv1(x)
#         x = self.conv2(x)
#         x = self.conv3(x)
#         x = x.view(x.size(0), -1)
#         x = self.classifier(x)
#         return x
    


# class ComplexCNN(nn.Module):
#     def __init__(self):
#         super(ComplexCNN, self).__init__()

#         self.conv1 = nn.Sequential(
#             nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1),
#             nn.BatchNorm2d(32),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2)
#         )

#         self.conv2 = nn.Sequential(
#             nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
#             nn.BatchNorm2d(64),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2)
#         )

#         self.conv3 = nn.Sequential(
#             nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1),
#             nn.BatchNorm2d(128),
#             nn.ReLU()
#         )

#         self.conv4 = nn.Sequential(
#             nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1),
#             nn.BatchNorm2d(256),
#             nn.ReLU()
#         )

#         self.classifier = nn.Sequential(
#             nn.Linear(256 * 7 * 7, 256),
#             nn.ReLU(),
#             nn.Dropout(0.4),
#             nn.Linear(256, 128),
#             nn.ReLU(),
#             nn.Linear(128, 10),
#             nn.LogSoftmax(dim=1)
#         )

#     def forward(self, x):
#         x = self.conv1(x)
#         x = self.conv2(x)
#         x = self.conv3(x)
#         x = self.conv4(x)
#         x = x.view(x.size(0), -1)
#         x = self.classifier(x)
#         return x



# class LargeFeatureCNN(nn.Module):
#     def __init__(self):
#         super(LargeFeatureCNN, self).__init__()

#         self.conv1 = nn.Sequential(
#             nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5, padding=2),
#             nn.BatchNorm2d(32),
#             nn.ReLU()
#         )

#         self.conv2 = nn.Sequential(
#             nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5, padding=2),
#             nn.BatchNorm2d(64),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2)
#         )

#         self.conv3 = nn.Sequential(
#             nn.Conv2d(in_channels=64, out_channels=128, kernel_size=5, padding=2),
#             nn.BatchNorm2d(128),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2)
#         )

#         self.classifier = nn.Sequential(
#             nn.Linear(128 * 7 * 7, 128),
#             nn.ReLU(),
#             nn.Dropout(0.4),
#             nn.Linear(128, 64),
#             nn.ReLU(),
#             nn.Linear(64, 10),
#             nn.LogSoftmax(dim=1)
#         )

#     def forward(self, x):
#         x = self.conv1(x)
#         x = self.conv2(x)
#         x = self.conv3(x)
#         x = x.view(x.size(0), -1)
#         x = self.classifier(x)
#         return x



# class ResNetCNN(nn.Module):
#     def __init__(self):
#         super(ResNetCNN, self).__init__()

#         self.conv1 = nn.Sequential(
#             nn.Conv2d(in_channels=1, out_channels=64, kernel_size=3, padding=1),
#             nn.BatchNorm2d(64),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2)
#         )

#         self.conv2 = nn.Sequential(
#             nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1),
#             nn.BatchNorm2d(128),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2)
#         )

#         self.conv3 = nn.Sequential(
#             nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1),
#             nn.BatchNorm2d(256),
#             nn.ReLU()
#         )

#         self.res_block = nn.Sequential(
#             nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1),
#             nn.BatchNorm2d(256),
#             nn.ReLU(),
#             nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1),
#             nn.BatchNorm2d(256)
#         )

#         self.classifier = nn.Sequential(
#             nn.Linear(256 * 7 * 7, 256),
#             nn.ReLU(),
#             nn.Dropout(0.2),
#             nn.Linear(256, 128),
#             nn.ReLU(),
#             nn.Dropout(0.2),
#             nn.Linear(128, 10),
#             nn.LogSoftmax(dim=1)
#         )

#     def forward(self, x):
#         x = self.conv1(x)
#         x = self.conv2(x)
#         x = self.conv3(x)

#         res = x
#         x = self.res_block(x)
#         x += res

#         x = x.view(x.size(0), -1)
#         x = self.classifier(x)
#         return x
    


class FinalCNN(nn.Module):
    def __init__(self):
        super(FinalCNN, self).__init__()

        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25)
        )

        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25)
        )

        self.classifier = nn.Sequential(
            nn.Linear(64 * 7 * 7, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(512, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, 10),
            nn.LogSoftmax(dim=1)
        )

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)

        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x    


# Splitting up the training data into train, test, and val sets

In [12]:
train_data = pd.read_csv('train.csv')

X = train_data.drop('label', axis=1)
y = train_data['label']

X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, 
                                                            test_size=0.1, random_state=1)

X_train, X_val, y_train, y_val = train_test_split(X, y,
                                                  test_size=0.1111, random_state=1)

# Augmenting and loading the datasets

In [13]:
transform = transforms.Compose([
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

train_data = CustomDataset('X_train.csv', 'y_train.csv', transform=transform)
test_data = CustomDataset('X_test.csv', 'y_test.csv', transform=transform)
val_data = CustomDataset('X_val.csv', 'y_val.csv', transform=transform)
sub_test_data = CustomDataset('test.csv', transform=transform, train=False)
batch_size = 8

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=True)
sub_test_loader = DataLoader(sub_test_data, batch_size=batch_size, shuffle=False)



# Training loop 

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.cuda.empty_cache()

model = FinalCNN().to(device)

optimiser = optim.Adam(model.parameters(), lr=0.001)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimiser, mode='min', factor=0.1, patience=5)

criterion = nn.CrossEntropyLoss()

train_losses = []
val_losses = []
train_accs = []
val_accs =  []
best_val_acc = 0

n_epochs = 100


for epoch in range(n_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    tot = 0

    # Training Step
    for batch_idx, (features, labels) in enumerate(train_loader):
        features, labels = features.to(device), labels.to(device)

        # Set parameter gradients to zero
        optimiser.zero_grad()

        # Forward pass
        outputs = model(features)
        loss = criterion(outputs, labels)
        loss.backward()
        optimiser.step()

        # Tracking the loss
        running_loss += loss.item()

        # Tracking the accuracy
        _, predicted = torch.max(outputs.data, 1)
        tot += labels.size(0)
        correct += (predicted == labels).sum().item()

    train_acc = correct / tot
    train_loss = running_loss / len(train_loader)

    # Validation Step
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for val_features, val_labels in val_loader:
            val_features, val_labels = val_features.to(device), val_labels.to(device)
            val_outputs = model(val_features)
            val_loss += criterion(val_outputs, val_labels).item()
            
            _, val_predicted = torch.max(val_outputs.data, 1)
            val_total += val_labels.size(0)
            val_correct += (val_predicted == val_labels).sum().item()

    val_acc = val_correct / val_total
    val_loss = val_loss / len(val_loader)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accs.append(train_acc)
    val_accs.append(val_acc)

    print(f'Epoch {epoch+1}/{n_epochs}')
    print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.3f}%')
    print(f'Val Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.3f}%')
    
    if epoch == 0 or val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), 'finalcnn_model.pth')
        print('new best!')


# Run the model on the final test set and prepare for submission

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.cuda.empty_cache()


model = FinalCNN().to(device)
model.load_state_dict(torch.load('finalcnn_model.pth'))
model.eval()

preds = []
with torch.no_grad():
    for images in sub_test_loader:
        images = images.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        preds.extend(predicted.cpu().numpy())

# Prepare the results in the correct format
submission_df = pd.DataFrame({
    'ImageID': range(1, len(preds) + 1),
    'label': preds
})

# submission_df.to_csv('final_submission.csv', index=False)

# Some code I made to visualise the kernels of my model

In [None]:
def visualise_kernels(conv_layer, num_input_channels=32, num_output_channels=64):
    # Copy over the weights of the convolution layer
    kernels = conv_layer.weight.data.cpu()  

    # Normalize kernels for visualization
    min_wt = kernels.min()
    max_wt = kernels.max()
    kernels = (kernels - min_wt) / (max_wt - min_wt)

    # Creating figure for the feature plot 
    fig, axs = plt.subplots(num_output_channels, num_input_channels, figsize=(num_input_channels, num_output_channels))

    # Loop for each output channel
    for i in range(num_output_channels):
        # Loop for each input channel
        for j in range(num_input_channels): 
            # Gets the kernel for each input-output pair
            kernel = kernels[i, j].squeeze() 
            axs[i, j].imshow(kernel, cmap='gray')
            axs[i, j].axis('off') 

    plt.show()

model = FinalCNN().to('cpu')
model.load_state_dict(torch.load('finalcnn_model.pth'))
visualise_kernels(model.conv2[0])