In [1]:
from collections import OrderedDict
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch import nn
from torch import optim
from torchvision import datasets
from torch.utils.data import DataLoader
from torchvision import transforms
import torch.multiprocessing as mp
from torchinfo import summary
from torch.utils.tensorboard import SummaryWriter

rng = np.random.default_rng()

In [2]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "cpu"
)

In [3]:
class AddGaussianNoise(nn.Module):
    def __init__(self, mean=0., std=1., *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.std = std
        self.mean = mean
        
    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size(), device=device) * self.std + self.mean
    
    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)

In [4]:
test_input_transform = transforms.Compose([
    transforms.Resize(32),
    transforms.ToTensor(),
])

train_input_transform = transforms.Compose([
    transforms.Resize(32),
    transforms.ToTensor(),
])

train_run_transform = transforms.Compose([
    transforms.RandomAffine(degrees=(-25,25), translate=(0.1,0.1), scale=(0.8, 1.2)),
    AddGaussianNoise(mean=0.0,std=0.02)
])

In [5]:
training_data = datasets.EMNIST(root="Torch MNIST", split="digits", train=True,download=True, transform=train_input_transform)
test_data = datasets.EMNIST(root="Torch MNIST", split="digits", train=False,download=True, transform=test_input_transform)

In [6]:
batch_size = int(2**12)
train_dataloader = DataLoader(training_data, batch_size = batch_size, shuffle=True, num_workers=16, pin_memory=True)
test_dataloader = DataLoader(test_data, batch_size = batch_size, shuffle=True, num_workers=16, pin_memory=True)
batch_count = int(len(training_data)/batch_size) + 1
batch_size

4096

In [7]:
class NeuralNetwork(nn.Module):
    def __init__(self,):
        super().__init__()
        self.Softmax = nn.Softmax(dim=1)
        self.Max_Pool_2D = nn.MaxPool2d(2)
        self.Activation = nn.GELU()
        
        self.conv_0 = nn.Sequential(OrderedDict([
            ("conv0", nn.Conv2d(1,16,3, padding=1)),
            ("act0", nn.GELU()),
            ("batch_norm0", nn.BatchNorm2d(16)),
            ("conv1", nn.Conv2d(16,16,3, padding=1)),
            ("act1", nn.GELU()),
            ("batch_norm1", nn.BatchNorm2d(16)),
            ("max_pool", nn.MaxPool2d(2)),
        ]))
        
        self.conv_1 = nn.Sequential(OrderedDict([
            ("conv0", nn.Conv2d(16,32,3, padding=1)),
            ("act0", nn.GELU()),
            ("batch_norm0", nn.BatchNorm2d(32)),
            ("conv1", nn.Conv2d(32,32,3, padding=1)),
            ("act1", nn.GELU()),
            ("batch_norm1", nn.BatchNorm2d(32)),
            ("max_pool", nn.MaxPool2d(2)),
        ]))
        
        self.conv_2 = nn.Sequential(OrderedDict([
            ("conv0", nn.Conv2d(32,64,3, padding=1)),
            ("act0", nn.GELU()),
            ("batch_norm0", nn.BatchNorm2d(64)),
            ("conv1", nn.Conv2d(64,64,3, padding=1)),
            ("act1", nn.GELU()),
            ("batch_norm1", nn.BatchNorm2d(64)),
            ("max_pool", nn.MaxPool2d(2)),
        ]))
        
        self.conv_3 = nn.Sequential(OrderedDict([
            ("conv0", nn.Conv2d(64,128,3, padding=1)),
            ("act0", nn.GELU()),
            ("batch_norm0", nn.BatchNorm2d(128)),
            ("conv1", nn.Conv2d(128,128,3, padding=1)),
            ("act1", nn.GELU()),
            ("batch_norm1", nn.BatchNorm2d(128)),
            ("max_pool", nn.MaxPool2d(2)),
        ]))
        
        self.linear_stack = nn.Sequential(OrderedDict([
            ("flatten", nn.Flatten()),
            ("dropout1", nn.Dropout(0.3)),
            ("linear1", nn.Linear(2*2*128, 2048)),
            ("act1", nn.GELU()),
            ("batch_norm1", nn.BatchNorm1d(2048)),
            ("dropout2", nn.Dropout(0.3)),
            ("linear2", nn.Linear(2048, 2048)),
            ("act2", nn.GELU()),
            ("batch_norm2", nn.BatchNorm1d(2048)),
            ("linear3", nn.Linear(2048,10)),
        ]))

    def forward(self, x):
        if self.training:
            x = train_run_transform(x)
        
        x = self.conv_0(x)
        x = self.conv_1(x)
        x = self.conv_2(x)
        x = self.conv_3(x)
        x = self.linear_stack(x)
        
        if self.training:
            return x
        else:
            return self.Softmax(x)

In [8]:
model = NeuralNetwork().to(device)

summary(model, input_size=(batch_size, 1, 32, 32))

Layer (type:depth-idx)                   Output Shape              Param #
NeuralNetwork                            [4096, 10]                --
├─Sequential: 1-1                        [4096, 16, 16, 16]        --
│    └─Conv2d: 2-1                       [4096, 16, 32, 32]        160
│    └─GELU: 2-2                         [4096, 16, 32, 32]        --
│    └─BatchNorm2d: 2-3                  [4096, 16, 32, 32]        32
│    └─Conv2d: 2-4                       [4096, 16, 32, 32]        2,320
│    └─GELU: 2-5                         [4096, 16, 32, 32]        --
│    └─BatchNorm2d: 2-6                  [4096, 16, 32, 32]        32
│    └─MaxPool2d: 2-7                    [4096, 16, 16, 16]        --
├─Sequential: 1-2                        [4096, 32, 8, 8]          --
│    └─Conv2d: 2-8                       [4096, 32, 16, 16]        4,640
│    └─GELU: 2-9                         [4096, 32, 16, 16]        --
│    └─BatchNorm2d: 2-10                 [4096, 32, 16, 16]        64
│    └─C

In [9]:
run_comment = input("Run Comment: ")
writer = SummaryWriter(comment=" " + run_comment)
writer.add_scalar("Loss/Train", 1.0, 0)
writer.add_scalar("Accuracy/Test", 0, 0)
writer.add_scalar("Accuracy/Train", 0, 0)
epoch_count = 0

In [10]:
loss_fn = nn.CrossEntropyLoss()
Optimiser = optim.AdamW(model.parameters(), lr=1e-2, eps=1e-4, weight_decay=1e-6)
model.train(True)
accuracy_test_freq = 1

for epoch in range(0,1000):
    
    running_loss = 0.0
    batch_loss = 0.0
    total = 0
    correct = 0
    
    for batch_num, batch_data in enumerate(train_dataloader):
        #print("Epoch {0} Batch {1}".format(epoch+1, batch_num+1))
        Optimiser.zero_grad()
        
        data, labels = batch_data
        #labels = nn.functional.one_hot(labels, num_classes = 47).float()
        data = data.to(device)
        labels = labels.to(device)
        
        
        prediction = model(data)
        _, predicted = torch.max(prediction, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        loss = loss_fn(prediction,labels)
        
        loss.backward()
        
        running_loss += loss.item()
        batch_loss += loss.item()
        log_num = int(batch_count/8)
        if log_num == 1:
            print(f'[{epoch + 1}, {batch_num + 1:5d}] loss: {batch_loss:.3f}')
            writer.add_scalar("Loss/Train", batch_loss / log_num, epoch_count * batch_count + batch_num)
            writer.add_scalar("Accuracy/Train", (correct/total) * 100, epoch_count * batch_count + batch_num)
            batch_loss = 0.0
            total = 0
            correct = 0
        else:
            if batch_num % log_num == (log_num-1):
                print(f'[{epoch + 1}, {batch_num + 1:5d}] loss: {batch_loss / log_num:.3f}')
                writer.add_scalar("Loss/Train", batch_loss / log_num, epoch_count * batch_count + batch_num)
                writer.add_scalar("Accuracy/Train", (correct/total) * 100, epoch_count * batch_count + batch_num )
                batch_loss = 0.0
                total = 0
                correct = 0
        Optimiser.step()
        
        running_loss = 0.0
        total = 0
        correct = 0
        
    if (epoch+1) % accuracy_test_freq == 0:
        with torch.no_grad():
            model.train(False)
            for loader_data in test_dataloader:
                data, labels = loader_data
                data = data.to(device)
                labels = labels.to(device)
                output = model(data)
                _, predicted = torch.max(output, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
            model.train(True)
        writer.add_scalar("Accuracy/Test", (correct/total) * 100, epoch_count + 1)
        
    epoch_count += 1
    

[1,     7] loss: 6.616
[1,    14] loss: 2.733
[1,    21] loss: 1.766
[1,    28] loss: 1.185
[1,    35] loss: 0.859
[1,    42] loss: 0.654
[1,    49] loss: 0.644
[1,    56] loss: 0.518
[2,     7] loss: 0.384
[2,    14] loss: 0.342
[2,    21] loss: 0.261
[2,    28] loss: 0.261
[2,    35] loss: 0.257
[2,    42] loss: 0.226
[2,    49] loss: 0.152
[2,    56] loss: 0.132
[3,     7] loss: 0.118
[3,    14] loss: 0.118
[3,    21] loss: 0.130
[3,    28] loss: 0.123
[3,    35] loss: 0.118
[3,    42] loss: 0.120
[3,    49] loss: 0.120
[3,    56] loss: 0.098
[4,     7] loss: 0.101
[4,    14] loss: 0.089
[4,    21] loss: 0.078
[4,    28] loss: 0.073
[4,    35] loss: 0.070
[4,    42] loss: 0.085
[4,    49] loss: 0.063
[4,    56] loss: 0.071
[5,     7] loss: 0.059
[5,    14] loss: 0.064
[5,    21] loss: 0.068
[5,    28] loss: 0.066
[5,    35] loss: 0.062
[5,    42] loss: 0.064
[5,    49] loss: 0.052
[5,    56] loss: 0.052
[6,     7] loss: 0.060
[6,    14] loss: 0.049
[6,    21] loss: 0.050
[6,    28] 

KeyboardInterrupt: 

In [None]:
total = 0
correct = 0
model.eval()

with torch.no_grad():
    for loader_data in test_dataloader:
        data, labels = loader_data
        data = data.to(device)
        labels = labels.to(device)
        output = model(data)
        _, predicted = torch.max(output, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
print((correct/total) * 100)