In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
from torchvision.io import read_image
import glob
import time
import os
import torchmetrics
import matplotlib.pyplot as plt
import pickle
from torchmetrics.classification import MulticlassF1Score, MulticlassAccuracy
from torchvision.datasets import ImageFolder

device = torch.device("cuda" if torch.cuda.is_available() else "cpu");

In [12]:
# part A : Batch Normalisation
class BatchNorm2d(nn.Module): #My definition of Batch Normalisation.
    def __init__(self, size):
        super(BatchNorm2d, self).__init__()
        self.epsilon = 1e-5;
        shape = (1, size, 1, 1)
        self.gamma = nn.Parameter(torch.ones(shape)) # the scaling factor that determines the new standard deviation.
        self.beta = nn.Parameter(torch.zeros(shape)) # the bias that is the new mean.
        self.running_sum = torch.zeros(shape).to(device);
        self.running_square_sum = torch.zeros(shape).to(device);
        self.total = 0;
    def forward(self, X):
        #if we are in training mode, then we use the mean and variance of this batch.
        if self.training:
            mean = torch.mean(X, dim = (0,2,3), keepdim = True);
            var = torch.var(X,dim = (0,2,3), keepdim = True);
            self.total += 1;
            self.running_sum += mean;
            self.running_square_sum += var;
        else:
            mean = self.running_sum / self.total;
            var = self.running_square_sum / self.total;
        # X_mean = torch.ones(X.shape) * mean;
#         X_mean = mean.expand_as(X);
        X_transformed = (X - mean) / torch.sqrt(var + self.epsilon); #epsilon is added for non-zero denominator
        # X_transformed = self.gamma * X_transformed + self.beta;
        X_transformed = X_transformed * self.gamma + self.beta;
        return X_transformed;
# part B : Instance Normalisation.
class InstanceNormalisation2d(nn.Module):
    def __init__(self, size):
        super(InstanceNormalisation2d, self).__init__();
        self.epsilon = 1e-5;
        self.gamma = nn.Parameter(torch.ones((1, size, 1, 1)));
        self.beta = nn.Parameter(torch.zeros((1, size, 1, 1)));
    def forward(self, X):
        mean = torch.mean(X, dim = (2,3), keepdim = True);
        var = torch.var(X, dim = (2,3), keepdim = True);
        X_transformed = (X - mean) / torch.sqrt(var + self.epsilon);
        X_transformed = X_transformed * self.gamma + self.beta;
        return X_transformed;
class BatchInstanceNormalisation2d(nn.Module):
    def __init__(self, size):
        super(BatchInstanceNormalisation2d, self).__init__();
        self.batch_norm = BatchNorm2d(size);
        self.instance_norm = InstanceNormalisation2d(size);
        shape = (1, size, 1, 1)
        self.rho = nn.Parameter(torch.ones(shape));
        self.epsilon = 1e-5;
        self.gamma = nn.Parameter(torch.ones(shape)) # the scaling factor that determines the new standard deviation.
        self.beta = nn.Parameter(torch.zeros(shape)) # the bias that is the new mean.
    def forward(self, X):
        #if we are in training mode, then we use the mean and variance of this batch.
        X_batch = self.batch_norm(X);
        X_instance = self.instance_norm(X);
        #X_batch = (X - mean) / torch.sqrt(var + self.epsilon); #epsilon is added for non-zero denominator
        #instance_mean = torch.mean(X, dim = (2,3), keepdim = True);
        #instance_var = torch.var(X, dim = (2,3), keepdim = True);
        #X_instance = (X - instance_mean) / torch.sqrt(instance_var + self.epsilon); #this is the instance value.
        X_transformed = self.rho * X_batch + (1 - self.rho) * X_instance;
        X_transformed = X_transformed * self.gamma + self.beta;
        return X_transformed;
    
class LayerNormalisation2d(nn.Module):
    def __init__(self, size = None):
        super(LayerNormalisation2d, self).__init__();
        self.epsilon = 1e-5; #it actually has no use for size, since it normalizes accross the channels as well.
        self.gamma = nn.Parameter(torch.ones((1, 1, 1, 1)));
        self.beta = nn.Parameter(torch.zeros((1, 1, 1, 1)));
    def forward(self, X):
        mean = torch.mean(X, dim = (1,2,3), keepdim = True); #normalizes accross the channel dimension as well.
        var = torch.var(X, dim = (1,2,3), keepdim = True);
        X_transformed = (X - mean) / torch.sqrt(var + self.epsilon);
        X_transformed = X_transformed * self.gamma + self.beta;
        return X_transformed;

class GroupNormalisation2d(nn.Module):
    def __init__(self, size, groups = 8):
        super(GroupNormalisation2d, self).__init__();
        self.epsilon = 1e-5;
        self.gamma = nn.Parameter(torch.ones((1, size, 1, 1)));
        self.beta = nn.Parameter(torch.zeros((1, size, 1, 1)));
        self.groups = groups;
    def forward(self, X):
        shape = X.shape;
        X = X.view(shape[0], self.groups, shape[1]//self.groups, shape[2], shape[3]);
        mean = torch.mean(X, dim = (2,3,4), keepdim = True);
        var = torch.var(X, dim = (2,3,4), keepdim = True);
        X_transformed = (X - mean) / torch.sqrt(var + self.epsilon);
        X_transformed = X_transformed.view(shape);
        X_transformed = X_transformed * self.gamma + self.beta;
        return X_transformed;

### Describing the ResNet class

In [2]:
class ResNetBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride = 1, norm = nn.BatchNorm2d):
        super(ResNetBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 3, stride = stride, padding = 1,bias=False)
        self.bn1 = norm(out_channels).to(device=device)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, stride = 1, padding = 1,bias=False)
        self.bn2 = norm(out_channels).to(device=device)
        self.stride = stride;
        self.conv1x1 = None; self.bn1x1 = None; #Originally.
        if(self.stride != 1):
            self.conv1x1 = nn.Conv2d(in_channels, out_channels, kernel_size = 1, stride = stride, padding = 0,bias=False)
            self.bn1x1 = nn.BatchNorm2d(out_channels,device=device);

    def forward(self, x):
#         residual = x;
        o = self.conv1(x)
        o = self.bn1(o);
        o = F.relu(o).to(device); #The first layer for the resnet block.
        o = self.conv2(o); 
        o = self.bn2(o); 
        if(self.stride != 1): #this means we have to perform 1x1 convolutions
            x = self.conv1x1(x); 
            x = self.bn1x1(x); #Applying the 1x1 convolutions to maintain the size.
        o += x; #inplace addition.
        o = F.relu(o); #the second layer output completed here.
        return o;


In [3]:
class ResNet(nn.Module):
    def __init__(self, in_channels, num_classes, n, norm=nn.BatchNorm2d):
        super(ResNet, self).__init__();
        self.n = n;
        self.conv1 = nn.Conv2d(in_channels, 16, kernel_size=3, stride=1, padding=1, device=device,bias=False);
        self.bn1 = norm(16).to(device=device); #of output size.
        self.relu = nn.ReLU();
        self.res16 = nn.ModuleList();
        for i in range(n):
            self.res16.append(ResNetBlock(16,16, norm=norm).to(device));
        self.res32 = nn.ModuleList();
        self.res32.append(ResNetBlock(16,32,2, norm=norm).to(device)); #1 Block which will change the size of the input.
        for i in range(n-1):
            self.res32.append(ResNetBlock(32,32,norm=norm).to(device));
        self.res64 = nn.ModuleList();
        self.res64.append(ResNetBlock(32,64,2,norm=norm).to(device));
        for i in range(n-1):
            self.res64.append(ResNetBlock(64,64,norm=norm).to(device));
        
        self.final_mean_pool = nn.AdaptiveAvgPool2d(output_size=(1,1));
        self.fc = nn.Linear(64, num_classes);

    def forward(self, o):
        o = self.conv1(o)
        o = self.bn1(o)
        o = self.relu(o)
        for i in range(len(self.res16)):
            o = self.res16[i](o)
        for i in range(len(self.res32)):
            o = self.res32[i](o);
        for i in range(len(self.res64)):
            o = self.res64[i](o);
        o = self.final_mean_pool(o); 
        o = o.view(o.size(0), -1);
#         o = torch.flatten(o, start_dim=1); #Flattening from after the batch index.
        o = self.fc(o); #final layer.
        return o;
        

In [4]:
class bird_dataset(Dataset):
    def __init__(self, datapath): #Either test, train, or val datafolder.
        self.datapath = datapath;
        folder_list = glob.glob(datapath + "/*");
        self.data = [];
        self.labels = set();
        for folder in folder_list:
            label = os.path.basename(folder); #gets the last name of the folder, which is the label.
            self.labels.add(label);
            file_list = glob.glob(folder + "/*");
            for file in file_list:
                self.data.append((file, label));
        self.labels = list(self.labels);
        self.label_to_index = {label: i for i, label in enumerate(self.labels)};
    
    def __len__(self):
        return len(self.data);
    
    def __getitem__(self, idx):
        img_path, label = self.data[idx];
        img = read_image(img_path)
        img = img/255;
        # print(img);
        # img = transforms.ToTensor()(img); #converts the image to a tensor, but read_image already does this.
        label = self.label_to_index[label]; #using labels as indices for the classes, instead of names.
        # label_arr = np.zeros(len(self.labels));
        # label_arr[label] = 1;
        return img, label;



### creating the dataloaders

In [5]:
## Parameters for the network.32
num_classes = 25; 
n = 2; #6n + 2 layers.
in_channels = 3; #RGB images.
batch_size = 32; #Probably wont run on my laptop with just 4GB of VRAM.
initial_learning_rate = 0.01;
num_epochs = 50; 

In [6]:
train_dataset = ImageFolder(root="/kaggle/input/birds-25/Birds_25/train",transform=transforms.ToTensor())
test_dataset = ImageFolder(root="/kaggle/input/birds-25/Birds_25/test",transform=transforms.ToTensor())
val_dataset = ImageFolder(root="/kaggle/input/birds-25/Birds_25/val",transform=transforms.ToTensor())
# Train_loader = DataLoader(bird_dataset("Birds_25/train"), batch_size=batch_size, shuffle=True); #This is how to use the DataLoader to get batches of data.
Train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers = 4);
Test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, num_workers = 2);
Val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, num_workers = 2);

# Test_loader = DataLoader(bird_dataset("/kaggle/input/birds-2/Birds_25/test"), batch_size=batch_size, shuffle=True);
# Val_loader = DataLoader(bird_dataset("/kaggle/input/birds-2/Birds_25/val"), batch_size=batch_size, shuffle=True);

In [13]:
# model = ResNet(in_channels, num_classes, n, norm=BatchNorm2d).to(device);

model = ResNet(in_channels, num_classes, n, norm=BatchInstanceNormalisation2d).to(device);
print( "DOING MY IMPLEMENTATION OF Batch-Instance NORMALISATION.")

DOING MY IMPLEMENTATION OF Batch-Instance NORMALISATION.


In [14]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr= initial_learning_rate);
# optimizer = optim.SGD(model.parameters(), lr = initial_learning_rate, momentum = 0.92);

In [15]:
def load_checkpoint(model, optimizer, filename):
    checkpoint = torch.load(filename);
    model.load_state_dict(checkpoint['model_state_dict']);
    optimizer.load_state_dict(checkpoint['optimizer_state_dict']);
    epoch = checkpoint['epoch'];
    loss = checkpoint['loss'];
    return model, optimizer, epoch, loss;

def store_checkpoint(model, optimizer, epoch, loss, filename):
    torch.save({
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'epoch': epoch,
            'loss': loss,
            }, filename);

In [16]:
#Check accuracy on training and test to see how good our model is.
macroF1 = MulticlassF1Score(num_classes=num_classes, average='macro')
microF1 = MulticlassF1Score(num_classes=num_classes, average='micro')
accuracy = MulticlassAccuracy(num_classes=num_classes)
def check_eval(loader, model):
    correct = 0; num_samples = 0;
    model.eval(); #Sets it into evaluation mode, so no dropout or batchnorm
    preds = []; labels = [];
    with torch.no_grad():
        for x,y in loader:
            x = x.to(device);
            y = y.to(device);
            #x = x.reshape(x.shape[0], -1);
            scores = model(x);
            _, predictions = scores.max(1);
            preds.extend(predictions);
            labels.extend(y);
    
    preds = torch.tensor(preds); labels = torch.tensor(labels);
    acc = accuracy(preds, labels); macF1 = macroF1(preds, labels); micF1 = microF1(preds, labels);
    model.train();
    return (acc, macF1, micF1);

In [17]:
# print(check_eval(Train_loader, model));
# print(check_eval(Val_loader, model));
# print(check_eval(Test_loader, model));

In [18]:

def train_model(model, traindata):
    for epoch in range(num_epochs):
        start = time.time();
        mean_loss = 0; total_batches = 0;
        print("epoch: ", epoch+1);
        epochlabels = []; epochoutputs = [];
        for i, (images, labels) in enumerate(Train_loader):
            total_batches += 1;
            images = images.to(device);
            labels = labels.to(device);
            #Forward pass
            outputs = model(images);
            loss = criterion(outputs, labels);
            #Backward pass
            optimizer.zero_grad(); #Zeroes the gradients before backpropagation.
            loss.backward(); #Backpropagation.
            optimizer.step(); #Updates the weights.
            mean_loss += loss.item();
            _, preds = outputs.max(1);
            epochlabels.extend(labels);
            epochoutputs.extend(preds); #stores the values predicted each epoch.
            print("batch: ", i+1, "loss: ", mean_loss/total_batches, end = "          \r");
        epoch_time = time.time() - start;
        for g in optimizer.param_groups:
            g['lr'] = g['lr']/1.06; #Decay the learning rate by a constant after each epoch.
        store_checkpoint(model, optimizer, epoch, loss.item(), "checkpoint_epoch" + str(epoch) + ".pth");
        epochlabels = torch.tensor(epochlabels);
        epochoutputs = torch.tensor(epochoutputs);
        mic = microF1(epochoutputs, epochlabels); mac = macroF1(epochoutputs, epochlabels); acc = accuracy(epochoutputs, epochlabels);
        traindata['train'].append((acc, mac, mic));
        valacc, valmac, valmic = check_eval(Val_loader, model);
        traindata['val'].append((valacc, valmac, valmic));
        testacc, testmac, testmic = check_eval(Test_loader, model);
        traindata['test'].append((testacc, testmac, testmic));
        end = time.time();
        print(epoch+1, "th epoch: ", epoch_time, "s, total:", end - start ,"mean loss: ", mean_loss/total_batches, "          ");
        print("VAL acc, mac, mic :", (valacc, valmac, valmic))
        for name, param in model.named_parameters():
            if name.split('.')[-1] == 'rho':
                setattr(model, name, torch.clamp(param, 0, 1)) #clamping it for Batch Instance normalization


In [None]:
traindata = {};
traindata['val'] = [];
traindata['train'] = [];
traindata['test'] = [];
train_start = time.time();
train_model(model, traindata);
duration = time.time() - train_start;
print("\n\n---------Training finished after ", duration, " seconds--------\n\n");

epoch:  1
batch:  280 loss:  2.7381926689829146          

In [None]:
# with open("/kaggle/input/checkpoints-part1a/traindata.pickle", "rb") as file:
#     traindata = pickle.load(file)

In [None]:
with open('traindata.pickle', 'wb') as file:
    pickle.dump(traindata, file);
store_checkpoint(model, optimizer, 51, 2, "final_chkpnt.pth"); #stores the model.

In [None]:
print("VAL: ", check_eval(Val_loader, model));
print("TEST: ", check_eval(Test_loader, model));
print("TRAIN: ", check_eval(Train_loader, model));

In [None]:
# import matplotlib.pyplot as plt
# traindata['val'] = [];
# for i in range(50):
#     print(i, " doing     \r")
#     file_name = '/kaggle/input/checkpoints-part1a/checkpoint_epoch' + str(i) + '.pth'
#     load_checkpoint(model, optimizer, file_name)
#     valacc, valmac, valmic = check_eval(Val_loader, model);
#     traindata['val'].append((valacc, valmac, valmic));


In [None]:
def plot_epoch(data, name):
    index = list(range(1, len(data) + 1))
    accuracy, micro_f1, macro_f1 = zip(*data)

    # Plotting
    fig, ax = plt.subplots(figsize=(10, 8))

    ax.plot(index, accuracy, marker='o', label='Accuracy')
    ax.plot(index, micro_f1, marker='s', label='Micro F1')
    ax.plot(index, macro_f1, marker='^', label='Macro F1')

    ax.set_title('Performance Metrics for ' + name)
    ax.set_xlabel('List Index')
    ax.set_ylabel('Score')
    ax.set_xticks(index)
    ax.legend()
    ax.grid(True)

    plt.tight_layout()
    plt.savefig(name) #The fig should be saved before doing plt.show().
    plt.show()
plot_epoch(traindata['train'], 'train plot');
plot_epoch(traindata['val'], 'val plot');
plot_epoch(traindata['test'], 'test plot');