In [None]:
# This porject was developed through following the youtube tutroial provided by NeuralNine at: 'https://youtu.be/vBlO87ZAiiw?si=CvjeWbPVdDf4YlJI'




from torchvision import datasets
from torchvision.transforms import ToTensor

In [None]:
#Set up training data in tensor form
train_data = datasets.MNIST(
    root = 'data',
    train = True,
    transform= ToTensor(),
    download= True
)

#Set up testing data in tensor form
test_data = datasets.MNIST(
    root = 'data',
    train = False,
    transform= ToTensor(),
    download= True
)

In [None]:
from torch.utils.data import DataLoader

#Define data loaders for both the testing and training set
loaders = {
    'train' : DataLoader(train_data,
                         batch_size=100,
                         shuffle=True,
                         num_workers=1),
    'test': DataLoader(test_data,
                       batch_size=100,
                       shuffle=True,
                       num_workers=1),
}

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class CNN(nn.Module): #Define model class which inherits from pytorch nn class
    def __init__(self):
        super(CNN, self).__init__()

        self.conv1 = nn.Conv2d(1, 10, kernel_size=5) #Create convolutional layer to take 1 input and output 10 
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5) #Create convolutional layer to take 10  input and output 20 
        self.conv2_drop = nn.Dropout2d() #Create dropout layer
        self.fc1 = nn.Linear(320, 50) #Create fully connected layer to take 320  inputs and output 50 
        self.fc2 = nn.Linear(50, 10) #Create fully connected layer to take 50 inputs and output 10

    def forward(self, x): #Define the network's forward pass
        x = F.relu(F.max_pool2d(self.conv1(x), 2)) #Pass input through first layer with a pooling stride of 2
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) #Pass output from first layer to the second layer (which is also a dropout layer) with a pooling stride of 2
        x = x.view(-1, 320) #Flatten output from previous layer
        x = F.relu(self.fc1(x)) #Pass flattened output to fully connected layer
        x = F.dropout(x, training=self.training) #Dropout connections
        x = self.fc2(x) #Pass output to second fully connected layer

        return F.softmax(x) #Return probability distribution of outputs

In [None]:
import torch

device = torch.device('cuda')

model = CNN().to(device) #Create model on GPU

optimizer = optim.Adam(model.parameters(), lr=0.001) #Define optimizer

loss_fn = nn.CrossEntropyLoss() #Define loss function

def train(epoch):
    model.train() #Set model to training mode
    for batch_idx, (data, target) in enumerate(loaders['train']): #Loop through all training data
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad() #Reset optimizer gradient to deal with new loop
        output = model(data) #Get the output of the model
        loss = loss_fn(output, target) #Calculate the loss based on this output and expected output
        loss.backward() #Apply backprobagation to calculate new gradients
        optimizer.step()

        if batch_idx % 20 == 0:
            #Print Progress
            print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(loaders['train'].dataset)} ({100. * batch_idx / len(loaders['train']):.0f}%)]\t{loss.item():.6f}")

def test():
    model.eval() #Set model to testing/evaluation mode

    test_loss = 0
    correct = 0

    with torch.no_grad(): #Disable gradient calculations for this stage
        for data, target in loaders['test']: #Loop through all testing data
            data, target = data.to(device), target.to(device)
            output = model(data) #Get the output of the model
            test_loss += loss_fn(output, target).item() #Add the loss of the particular case to total loss
            pred = output.argmax(dim=1, keepdim=True) #Make the prediction the output with greatest probability
            correct += pred.eq(target.view_as(pred)).sum().item() #If prediction is correct then add 1 to number of correct predictions

    test_loss /= len(loaders['test'].dataset) #Calculate average loss
    #Print progress
    print(f"\nTest set: Average loss: {test_loss:.4f}, Accurace {correct}/{len(loaders['test'].dataset)} ({100.*correct/len(loaders['test'].dataset):.0f}%\n)")
    

In [None]:
#Train and test model 10 times
for epoch in range(1, 11):
    train(epoch)
    test()

In [None]:
import matplotlib.pyplot as plt

#Try out model on arbitrary datapoints inside dataset

model.eval()
data, target = test_data[16]
data = data.unsqueeze(0).to(device)

output = model(data)
prediction = output.argmax(dim=1, keepdim = True).item()

print(f"Prediction: {prediction}")

image = data.squeeze(0).squeeze(0).cpu().numpy()

plt.imshow(image, cmap ='gray')
plt.show