# Convoluted Neural Network

In [None]:
''' Needed libraries '''

import numpy as np # For matrix operations and numerical processing
import matplotlib.pyplot as plt # For plotting
import os, sys # For filepaths

# PyTorch libraries (for CNN):
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader

# sklearn and seaborn for metrics:
from sklearn.metrics import confusion_matrix
import seaborn as sns

In [None]:
''' Add the datasets and libraries to the system path '''

# Find the path to our implementations
current_directory = os.getcwd()
parent_directory = os.path.dirname(current_directory)
home_directory = os.path.dirname(parent_directory)
libraries_path = os.path.join(home_directory, 'Libraries')

# Find the path to the datasets
datasets_path = os.path.join(home_directory, 'Datasets')

# Add them both to the system path
sys.path.append(datasets_path)
sys.path.append(libraries_path)

In [None]:
''' Define the neural network '''

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(7*7*64, 128)
        self.fc2 = nn.Linear(128, 5)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = x.view(-1, 7*7*64)
        x = F.relu(self.fc1(self.dropout1(x)))
        x = self.fc2(self.dropout2(x))
        return F.log_softmax(x, dim=1)

In [None]:
''' Function to load and preprocess the datasets '''

def load_data(batch_size=64):
    # Load datasets from .npy files
    train_dataset = np.load(os.path.join(datasets_path, 'fashion_train.npy'))
    test_dataset = np.load(os.path.join(datasets_path, 'fashion_test.npy'))

    # Split datasets into X (features) and y (labels)
    train_X = train_dataset[:, :-1]
    train_y = train_dataset[:, -1]
    test_X = test_dataset[:, :-1]
    test_y = test_dataset[:, -1]

    # Resize the pixel values to have mean 0 and standard deviation 1
    # mean1 = np.mean(train_X)
    # std1 = np.std(train_X)
    # train_X = (train_X - mean1) / std1
    # mean2 = np.mean(test_X)
    # std2 = np.std(test_X)
    # test_X = (test_X - mean2) / std2

    # Convert to tensor and reshape
    tensor_train_X = torch.Tensor(train_X).reshape(-1, 1, 28, 28)
    tensor_train_y = torch.Tensor(train_y).long()
    tensor_test_X = torch.Tensor(test_X).reshape(-1, 1, 28, 28)
    tensor_test_y = torch.Tensor(test_y).long()

    # Create DataLoaders
    train_loader = DataLoader(list(zip(tensor_train_X, tensor_train_y)), batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(list(zip(tensor_test_X, tensor_test_y)), batch_size=batch_size, shuffle=False)

    return train_loader, test_loader

In [None]:
''' Function to load the pre-trained model '''

def load_model():
    # Load the model
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Using device:", device)
    model = NeuralNetwork().to(device)
    model.load_state_dict(torch.load("model.pth"))
    return model, device

In [None]:
''' Plotting functions '''

# Learning curve
def plot_learning_curve(train_losses, test_losses):
    plt.plot(train_losses, label='Training loss')
    plt.plot(test_losses, label='Validation loss')
    plt.title('Learning Curve')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()


# Confusion matrix
def plot_confusion_matrix(model, device, test_loader):
    model, device = load_model()
    model.eval()
    y_true = []
    y_pred = []
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            _, pred = torch.max(output, 1)
            y_true.extend(target.tolist())
            y_pred.extend(pred.tolist())
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, cmap=plt.cm.Blues, fmt='g')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.show()


# Plot with example images
def plot_example_images():
    model, device = load_model()
    model.eval()

    # Load the test set
    test_loader = load_data()[1]
    criterion = nn.CrossEntropyLoss()

    test_loss = 0
    correct = 0

    labels = np.load(os.path.join(datasets_path, 'labels_dict.npy'), allow_pickle=True).item()

    plt.figure(figsize=(20, 20))

    num_subplots = 100  # Set the maximum number of subplots
    num_images = min(len(test_loader.dataset), num_subplots)  # Limit the number of images to display

    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(test_loader):
            output = model(data)
            test_loss += criterion(output, target).item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability

            for i in range(len(pred)):
                if batch_idx * test_loader.batch_size + i >= num_images:
                    break  # Break if the maximum number of images has been reached

                plt.subplot(10, 10, batch_idx * test_loader.batch_size + i + 1)
                plt.imshow(data[i].cpu().numpy().reshape(28, 28), cmap='gray', interpolation='nearest')
                title = f'Predicted: {labels[pred[i].item()]}\nActual: {labels[target[i].item()]}'
                if pred[i].item() == target[i].item():
                    plt.title(title, color='green')
                else:
                    plt.title(title, color='red')
                plt.axis('off')

                correct += pred.eq(target.view_as(pred)).sum().item()

            if (batch_idx + 1) * test_loader.batch_size >= num_images:
                break  # Break if the maximum number of images has been reached

    plt.tight_layout()
    plt.show()

In [None]:
''' Training and testing functions '''

# Training the model
def train(model, device, train_loader, optimizer, criterion, epoch, scheduler=None, verbose=True):
    model.train()
    train_loss = 0
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    if scheduler:
        scheduler.step()
    avg_loss = train_loss / len(train_loader)
    if verbose:
        print(f'Train Epoch: {epoch} \tAverage Loss: {avg_loss:.6f}')
    return avg_loss


# Testing the model
def test(model, device, test_loader, criterion, verbose=True):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            _, pred = torch.max(output, 1)
            correct += pred.eq(target.view_as(pred)).sum().item()
    avg_loss = test_loss / len(test_loader)
    accuracy = float("{:.2f}".format(100. * correct / len(test_loader.dataset)))
    if verbose:
        print(f'Test Average loss: {avg_loss:.4f}')
        print(f'Test Accuracy: {accuracy}%')
    return avg_loss, accuracy

In [None]:
''' Main function '''

def main(retrain=True, plot=0, epoch_num=10, learn=1e-3, wd=1e-5, sched=0, opt=0, mom=0.9, testing=False, model=None):
    if testing:
        verbose = False
    else:
        verbose = True

    # Load data
    train_loader, test_loader = load_data()

    # Initialize the model, optimizer, criterion, etc.
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Using device:", device)
    if testing:
        if model is not None:
            model = model.to(device)
        else:
            model = NeuralNetwork().to(device)
    else:
        model = NeuralNetwork().to(device)
    if opt == 0:
        optimizer = optim.Adam(model.parameters(), lr=learn, weight_decay=wd)
    elif opt == 1:
        optimizer = optim.SGD(model.parameters(), lr=learn, momentum=mom, weight_decay=wd)
    else:
        print("Invalid optimizer. Optimizer set to Adam.")
        optimizer = optim.Adam(model.parameters(), lr=learn, weight_decay=wd)
    criterion = nn.CrossEntropyLoss()
    if sched == 0:
        scheduler = None
    elif sched == 1:
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.1)
    elif sched == 2:
        scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10, eta_min=0, last_epoch=-1)
    else:
        print("Invalid scheduler. Scheduler set to None.")
        scheduler = None


    if not retrain:
        # Load pre-trained model
        model, device = load_model()
        model.eval()  # Set model to evaluation mode

        # Test the pre-trained model
        test_loss, accuracy = test(model, device, test_loader, criterion)
        if not testing:
            print(f"Test Loss of the pre-trained model: {test_loss}")
            print(f"Test Accuracy of the pre-trained model: {accuracy}%")
        # Plot confusion matrix for pre-trained model
        plot_confusion_matrix(model, device, test_loader)
        return
    
    # Training and Testing
    train_losses, test_losses = [], []
    for epoch in range(epoch_num+1):
        train_loss = train(model, device, train_loader, optimizer, criterion, epoch, scheduler, verbose)
        test_loss, accuracy = test(model, device, test_loader, criterion, verbose)
        train_losses.append(train_loss)
        test_losses.append(test_loss)
    
    if not testing:
        # Load and test the pre-trained model
        accuracy2 = 0
        if os.path.isfile("model.pth"):
            # Load pre-trained model
            model2 = NeuralNetwork().to(device)
            model2.load_state_dict(torch.load("model.pth"))
            model2.eval()  # Set model to evaluation mode
            test_loss2, accuracy2 = test(model2, device, test_loader, criterion,verbose=False)
            ptm_tl = f"Test Loss of the pre-trained model: {test_loss2:.4f}"
            ptm_ta = f"Test Accuracy of the pre-trained model: {accuracy2}%"

    if testing:
        return accuracy
    
    # Save the model
    print("Training complete!")        

    ftm_tl = f"Final test loss: {test_loss:.4f}"
    ftm_ta = f"Final test accuracy: {accuracy}%"
    if accuracy-accuracy2 > 0:
        acc = "more"
    else:
        acc = "less"
    question = input(f"{ptm_tl}\n{ptm_ta}\n{ftm_tl}\n{ftm_ta}\n\n\n New model {accuracy-accuracy2:.2f}% {acc} accurate. Save the model? (y/n)")
    if question == "y":
        torch.save(model.state_dict(), "model.pth")
        print("Model saved!")
    else:
        print("Model not saved.")

    if plot > 0:
        # Plot the learning curve
        plot_learning_curve(train_losses, test_losses)
    
    if plot > 1:
        # Plot the confusion matrix
        plot_confusion_matrix(model, device, test_loader)

    if plot > 2:
        # Plot with example images
        plot_example_images()
    
    return accuracy, accuracy2


In [None]:
''' Set the parameters and run the main function '''

retrain = True # Set to False to not retrain the model
plot = 0 # Set to 0 to plot nothing, 1 to plot the learning curve, 2 to also plot the confusion matrix, and 3 to also plot example images
epoch_num = 200 # Saved model trained on 200 epochs
learn = 1e-5 # Learning rate: saved model trained with 1e-5
wd = 1e-9 # Weight decay: Saved model trained with 1e-9
sched = 0   # Scheduler: (0 = None, 1 = StepLR, 2 = CosineAnnealingLR) Saved model trained with no scheduler
opt = 0 # Optimizer: (0 = Adam, 1 = SGD) Saved model trained with Adam
mom = 0.9 # Momentum (only for SGD): Default 0.9

# Run the main function
main(retrain, plot, epoch_num, learn, wd, sched, opt, mom)