In [302]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch import Tensor
import torchvision
import PIL



#### Setting device (checking if gpu is available for faster training)

In [303]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

### Loading the dataset

In [304]:
def load_cifar10(batch_size):
    # Transformations for the training data
    train_trans = transforms.Compose([
        # transforms.RandomAffine(degrees=(-5, 5), translate=(0.1, 0.1), scale=(0.9, 1.1), resample=PIL.Image.BILINEAR),
        transforms.RandomRotation(10),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    
    # Transformations for the test data (minimal, without augmentation)
    test_trans = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    
    # Loading the datasets
    train = torchvision.datasets.CIFAR10(root="../data", train=True, transform=train_trans, download=True)
    test = torchvision.datasets.CIFAR10(root="../data", train=False, transform=test_trans, download=True)
    
    # Creating the DataLoaders
    train_loader = DataLoader(train, batch_size, shuffle=True, pin_memory=False)
    test_loader = DataLoader(test, batch_size, shuffle=False, pin_memory=False)
    
    return train_loader, test_loader


In [305]:
BATCH_SIZE = 128
train_iter, test_iter = load_cifar10(BATCH_SIZE)
for data in train_iter:
    print(data[0].size())
    print(data[1].size())
    break

Files already downloaded and verified
Files already downloaded and verified
torch.Size([128, 3, 32, 32])
torch.Size([128])


### CIFAR-10 Dataset Analysis
Cifar-10 holds images if dimension 32x32
Cifar images also contain colour using the rgb format
Therefore each item in the dataset is of size 3x32x32


In [306]:
# hyper params
B1_OUT = 32
B2_OUT = 64
B3_OUT = 128
B4_OUT = B3_OUT
L1_OUT = 1024
L2_OUT = 512


class Model(torch.nn.Module):
    def __init__(self, in_channels, out_features):
        super(Model, self).__init__()
        self.maxpool = nn.MaxPool2d(2,2) 
        self.maxpool2 = nn.MaxPool2d(2) 
        self.block1 = Block(6,in_channels,B1_OUT, kernel_size=5, padding=2) 
        # self.block1 = Block(16,in_channels,B1_OUT, kernel_size=7, padding=3) 
        self.resid1 = Block(6,B1_OUT,B1_OUT,kernel_size=5,padding=2,residual=True)
        self.block2 = Block(6,B1_OUT,B2_OUT, kernel_size=5, padding=2)
        self.resid2 = Block(6,B2_OUT,B2_OUT,kernel_size=5,padding=2,residual=True)
        self.block3 = Block(6,B2_OUT,B3_OUT,kernel_size=3, padding=1)
        self.resid3 = Block(6,B3_OUT,B3_OUT,kernel_size=3,padding=1, residual=True)
        # NEW HERE
        self.resid3_2 = Block(6,B3_OUT,B3_OUT,kernel_size=3,padding=1, residual=True)
        self.block4 = Block(6,B3_OUT,B4_OUT,kernel_size=3,padding=1)
        self.resid4 = Block(6,B4_OUT,B4_OUT,kernel_size=3,padding=1, residual=True)
        ##
        self.spatial_pool = nn.AdaptiveAvgPool2d(1)
        # Linear Layers
        self.activation = nn.ReLU()
        # self.activation = nn.Sigmoid()
        
        self.linear1 = nn.Linear(B4_OUT,L1_OUT)
        self.dropout1 = nn.Dropout(0.15)
        self.linear2 = nn.Linear(L1_OUT,L2_OUT)
        self.dropout2 = nn.Dropout(0.15)
        self.linear3 = nn.Linear(L2_OUT, out_features)

    def forward(self, x):
        out = self.activation(self.block1(x))
        # out = self.activation(self.resid1(out))
        out = self.maxpool(out) #16x16
        out = self.activation(self.block2(out))
        out = self.activation(self.resid2(out))
        out = self.maxpool(out) # 8x8
        out = self.activation(self.block3(out))
        out = self.activation(self.resid3(out))
        # NEW HERE
        out = self.activation(self.resid3_2(out))
        out = self.maxpool(out) # 4x4
        # out = self.activation(self.block4(out))
        ##

        out = self.spatial_pool(out)
        # flatten for linear layers
        out = out.reshape(-1 , B4_OUT)

        out = self.linear1(out)
        out = self.activation(out)
        # out = self.dropout1(out)

        out = self.linear2(out)
        out = self.activation(out)
        # out = self.dropout2(out)

        out = self.linear3(out)
        return out


class Block(torch.nn.Module):
    def __init__(self, k, in_channels, out_channels, padding=2, kernel_size=5, stride=1, residual=False):
        super(Block, self).__init__()
        self.residual = residual
        L1_OUT = 512
        L2_OUT = 512
        
        self.k = k
        self.out_channels = out_channels
        self.in_channels = in_channels
        # defining weight calculations (a)
        self.spatialAvgPool = nn.AdaptiveAvgPool2d(1)
        # flatten
        self.aLinear1 = nn.Linear(in_features=in_channels, out_features=L1_OUT)
        self.aLinear2 = nn.Linear(in_features=L1_OUT,out_features=L2_OUT)
        self.aLinear3 = nn.Linear(L2_OUT,k)
        self.activation = nn.ReLU()
        # defining the convolution section
        # creates k convolutions
        self.convs = nn.ModuleList([nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size ,padding=padding, stride=stride) for _ in range(k)])
        # self.batch_norms = nn.ModuleList([nn.BatchNorm2d(out_channels) for _ in range(k)])
        self.batch_norm = nn.BatchNorm2d(out_channels)

    def forward(self, x: Tensor) -> Tensor:

        # apply spatial average pooling
        pooled: Tensor = self.spatialAvgPool(x)
        # flatten the output for the linear layer
        pooled = pooled.view(pooled.size(0), -1)
        a: Tensor = self.activation(self.aLinear1(pooled))
        a = self.activation(self.aLinear2(a))
        a = self.activation(self.aLinear3(a))

        O: Tensor = torch.zeros_like(self.convs[0](x))

        # Apply the convolutions and accumulate the weighted sum
        for i in range(self.k):
            conv = self.convs[i](x)
            conv = self.activation(conv)
            # conv = self.batch_norms[i](conv)
            O = O + a[:, i].view(-1, 1, 1, 1) * conv # this calculates a_1 conv_1 + ... + a_k conv_k, also applies activation function to it
        O = self.batch_norm(O)
        
        if self.residual:
            O= O + x
        
        return O # returns [batchsize, c, h, w]


### Training The Model

In [307]:
class Accumulator:
    
    def __init__(self, n) -> None:
        self.data = [0.0] * n
    
    # deletes all data and information stored
    def reset(self) -> None:
        self.data = [0.0] * len(self.data)
    
    # takes in n inputs, for each arg it adds it to its corresponding index in data
    def add(self, *args) -> None:
        self.data = [a + float(b) for a,b in zip(self.data, args)]
    
    # allows the indexing operator to be used
    def __getitem__(self, idx) -> any:
        return self.data[idx]
    
    def toList(self) -> list:
        return self.data
    
    def percentage(self, index, total):
        return 100 * self[index] / total if total > 0 else 0

In [308]:
import matplotlib.pyplot as plt

def trainingLoop(num_epoch, train_iter, test_iter, net, loss_function, optimizer) -> tuple[int, list[float], list[float], list[float]]:
    loss_values = [] 
    training_accuracy_values = []  
    testing_accuracy_values = []

    for epoch in range(num_epoch):
        # train on trianing set to obtain updated weights
        train_metrics = train(train_iter, net, loss_function, optimizer) # tuple (loss , accuracy)
        # testing new weights on unseen data
        test_accuracy = test(test_iter, net)
        # data for plotting
        loss_values.append(train_metrics[0])
        training_accuracy_values.append(train_metrics[1]) 
        testing_accuracy_values.append(test_accuracy)

        print(f'Epoch {epoch+1}, Average Loss: {loss_values[-1]}, Training Accuracy: {training_accuracy_values[-1]}, Testing Accuracy {testing_accuracy_values[-1]}')
        
    return(num_epoch, loss_values, training_accuracy_values, testing_accuracy_values)


def test(data_iter, net) -> float:
    net.eval() # set to testing mode
    
    metrics = Accumulator(2) # [correct, total]
    
    with torch.no_grad():
        for X, y in data_iter:
            # Move data to the specified device
            X, y = X.to(device), y.to(device)
            # Get model predictions
            y_hat = net(X)
            # Convert probabilities to predicted class labels
            _, predicted_labels = torch.max(y_hat, 1)
            # Accumulate the number of correct predictions and the total
            metrics.add((predicted_labels == y).sum().item(), y.size(0))
    
    # Calculate accuracy
    accuracy = metrics[0] / metrics[1]
    
    # Set the network back to training mode
    net.train()
    
    return accuracy

def train(data_iter, net, loss_function, optimizer) -> tuple[float , float]:
    net.train()
    
    metrics = Accumulator(3)  # [sum of losses, correct predictions, total predictions]
    for X, Y in data_iter:  # get x and corresponding y value
        X, Y = X.to(device), Y.to(device)
        
        y_hat = net(X)  # get prediction
        loss = loss_function(y_hat, Y)  # loss of this specific value
        optimizer.zero_grad()  # clear gradient
        loss.backward()  # calculate derivative of loss w.r.t the weight
        optimizer.step()  # change weight values accordingly

        metrics.add(loss.item(), (torch.max(y_hat, 1)[1] == Y).float().sum().item(), Y.size(0))
    # Calculate average loss and accuracy from the accumulated values
    avg_loss = metrics[0] / metrics[2]
    accuracy = metrics[1] / metrics[2]
    
    return (avg_loss, accuracy)


In [309]:
# Function to plot the data
def plotTraining(num_epoch, loss_values, train_accuracy_values, test_accuracy_values):
    fig, ax1 = plt.subplots()

    # Training Loss
    color = 'tab:red'
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Train Loss', color=color)
    ax1.plot(range(1, num_epoch+1), loss_values, '-o', color=color)
    ax1.tick_params(axis='y', labelcolor=color)

    # Create a second y-axis to share the same x-axis
    ax2 = ax1.twinx()  

    # Training Accuracy and Testing Accuracy
    color_train = 'tab:blue'
    color_test = 'tab:green'
    ax2.set_ylabel('Accuracy', color=color_train)  # We already handled the x-label with ax1
    # Plot training accuracy on ax2
    ax2.plot(range(1, num_epoch+1), train_accuracy_values, '-s', color=color_train, label='Train Accuracy')
    # Plot testing accuracy on ax2 as well
    ax2.plot(range(1, num_epoch+1), test_accuracy_values, '-^', color=color_test, label='Test Accuracy')
    ax2.tick_params(axis='y', labelcolor=color_train)
    ax2.legend(loc='upper left')

    fig.tight_layout()  # Otherwise the right y-label is slightly clipped
    plt.title('Training Loss and Accuracy')
    plt.show()


In [310]:
def init_weights(m):
    if type(m) == nn.Linear or type(m) == nn.Conv2d: # by checking the type we can init different layers in different ways
        torch.nn.init.xavier_normal_(m.weight)
        # torch.nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')


In [311]:
model = Model(3, 10).to(device)
# model = Model(3, 10)
model.apply(init_weights)

loss = nn.CrossEntropyLoss()
lr = 0.001
# optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=0)
optimizer = torch.optim.Adam(model.parameters(), lr=lr,weight_decay=1e-4 ) #

In [312]:
num_epochs = 20
# mu.train_ch3(model, train_iter, test_iter, loss, num_epochs, optimizer)
params = trainingLoop(num_epochs,train_iter,test_iter ,model, loss, optimizer)
plotTraining(*params) # spreads tuple

"""
increase mlp layer width -> double
increase channels
increase k


=======

sgd

1 max 2 max 3 (2 resids)

"""

Epoch 1, Average Loss: 0.01532512929201126, Training Accuracy: 0.28592, Testing Accuracy 0.3768
Epoch 2, Average Loss: 0.01217972076177597, Training Accuracy: 0.41876, Testing Accuracy 0.4284
Epoch 3, Average Loss: 0.01099575785636902, Training Accuracy: 0.48424, Testing Accuracy 0.5011
Epoch 4, Average Loss: 0.010141730097532272, Training Accuracy: 0.52794, Testing Accuracy 0.5357
Epoch 5, Average Loss: 0.009533973472118379, Training Accuracy: 0.55994, Testing Accuracy 0.5707
Epoch 6, Average Loss: 0.008932209870815278, Training Accuracy: 0.58984, Testing Accuracy 0.5705
Epoch 7, Average Loss: 0.008575738742351532, Training Accuracy: 0.60816, Testing Accuracy 0.6162
Epoch 8, Average Loss: 0.008249762881994247, Training Accuracy: 0.61988, Testing Accuracy 0.6471
Epoch 9, Average Loss: 0.007975986238718032, Training Accuracy: 0.63702, Testing Accuracy 0.602


KeyboardInterrupt: 