# Modern CNNs
### Implementations with interpretability

## Creating some directories 

In [2]:
import os
if not os.path.exists('./state_dicts'):
    os.makedirs('./state_dicts')
if not os.path.exists('./models'):
    os.makedirs('./models')
if not os.path.exists('./runs'):
    os.makedirs('./runs')

### Importing necessary packages and libraries

In [3]:
import torch
import torch.nn as nn
import torch.optim as opt
torch.set_printoptions(linewidth=120)
import torch.nn.functional as F

from torch.utils.data import  DataLoader
from torch.utils.tensorboard import SummaryWriter
import torchvision
import torchvision.transforms as transforms

import matplotlib.pyplot as plt

import numpy as np
from tqdm.notebook import tqdm
from IPython.display import clear_output

In [4]:
def get_num_correct(preds, labels):
    return preds.argmax(dim=1).eq(labels).sum().item()

In [5]:
color = 3

## LeNet

In [5]:
class LeNet(nn.Module):
    def __init__(self):
        super().__init__()

        self.NN = nn.Sequential(
            nn.Conv2d(in_channels=color, out_channels=6, kernel_size=5, padding=2),
            nn.Sigmoid(),
            nn.AvgPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5),
            nn.Sigmoid(),
            nn.AvgPool2d(kernel_size=2, stride=2)
        )
        self.classifier = nn.Sequential(
            nn.Linear(in_features=16*5*5, out_features=120),
            nn.Sigmoid(),
            nn.Linear(in_features=120, out_features=84),
            nn.Sigmoid()
        )
        # self.out = nn.Linear(in_features=16*5*5, out_features=100)
        self.out = nn.Linear(in_features=84, out_features=10)
        
    def forward(self, x):
        x = self.NN(x)
        x = torch.flatten(x,start_dim = 1)
        x = self.classifier(x)
        x = self.out(x)

        return x

## AlexNet

In [6]:
class AlexNet(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()

        self.NN = nn.Sequential(
            nn.Conv2d(color, 96, kernel_size=11, stride=4, padding=1), nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(96, 256, kernel_size=5, padding=2), nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(256, 384, kernel_size=3, padding=1), nn.ReLU(),
            nn.Conv2d(384, 384, kernel_size=3, padding=1), nn.ReLU(),
            nn.Conv2d(384, 256, kernel_size=3, padding=1), nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            )

        self.avgpool = nn.AdaptiveAvgPool2d((6,6))
        self.classifier = nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Linear(256*6*6, 4096), nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(4096, 4096), nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(4096, num_classes),
            )

    def forward(self, x):
        x = self.NN(x)
        x = self.avgpool(x)
        x = torch.flatten(x,1)
        x = self.classifier(x)

        return x

## VGG

In [None]:
VGG_types = {
    'VGG11': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'VGG13': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'VGG16': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
    'VGG19': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M'],
}


class VGG_net(nn.Module):
    def __init__(self, in_channels=3, num_classes=1000):
        super(VGG_net, self).__init__()
        self.in_channels = in_channels
        self.NN = self.create_conv_layers(VGG_types['VGG11'])
        
        self.classifier = nn.Sequential(
            nn.Linear(512*7*7, 4096),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(4096, num_classes)
            )
        
    def forward(self, x):
        x = self.NN(x)
        x = x.reshape(x.shape[0], -1)
        x = self.classifier(x)
        return x

    def create_conv_layers(self, architecture):
        layers = []
        in_channels = self.in_channels
        
        for x in architecture:
            if type(x) == int:
                out_channels = x
                
                layers += [nn.Conv2d(in_channels=in_channels,out_channels=out_channels,
                                     kernel_size=(3,3), stride=(1,1), padding=(1,1)),
                           nn.BatchNorm2d(x),
                           nn.ReLU()]
                in_channels = x
            elif x == 'M':
                layers += [nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))]
                
        return nn.Sequential(*layers)

In [None]:
class BasicBlock(nn.Module):
    expansion: int = 1

    def __init__(
        self,
        inplanes: int,
        planes: int,
        stride: int = 1,
        downsample = None,
        groups: int = 1,
        base_width: int = 64,
        dilation: int = 1,
        norm_layer = None,
    ) -> None:
        super().__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        if groups != 1 or base_width != 64:
            raise ValueError("BasicBlock only supports groups=1 and base_width=64")
        if dilation > 1:
            raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
        # Both self.conv1 and self.downsample layers downsample the input when stride != 1
        # self.conv1 = conv3x3(inplanes, planes, stride)
        self.conv1 = nn.Conv2d(
                                inplanes,
                                planes,
                                kernel_size=3,
                                stride=stride,
                                padding=dilation,
                                groups=groups,
                                bias=False,
                                dilation=dilation,
                            )
        self.bn1 = norm_layer(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(
                                planes,
                                planes,
                                kernel_size=3,
                                stride=stride,
                                padding=dilation,
                                groups=groups,
                                bias=False,
                                dilation=dilation,
                            )
        self.bn2 = norm_layer(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out



### Importing CIFAR-100 with preprocessing

In [3]:
resize = 224
trans = [#transforms.CenterCrop(224),
        # transforms.RandomHorizontalFlip(),  
        # transforms.RandomRotation(10),  
        # transforms.RandomAffine(0,shear=10,scale=(0.8,1.2)),
        # transforms.ColorJitter(brightness=0.2,contrast=0.2,saturation=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ]

if resize:
    trans.insert(0, transforms.Resize(resize))

train_set = torchvision.datasets.CIFAR100(root="../data",
train = True,
download=True,
transform=transforms.Compose(trans))
train_loader = DataLoader(train_set,batch_size = 100, shuffle = True)

test_set = torchvision.datasets.CIFAR100(root="../data",
train = False,
download=True,
transform=transforms.Compose(trans))
test_loader = DataLoader(test_set,batch_size = 100, shuffle = True)

Files already downloaded and verified
Files already downloaded and verified


### Importing CIFAR-10 

In [7]:
resize = 224
trans = [#transforms.CenterCrop(224),
        # transforms.RandomHorizontalFlip(),  
        # transforms.RandomRotation(10),  
        # transforms.RandomAffine(0,shear=10,scale=(0.8,1.2)),
        # transforms.ColorJitter(brightness=0.2,contrast=0.2,saturation=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])]
if resize:
    trans.insert(0, transforms.Resize(resize))

train_set = torchvision.datasets.CIFAR10(root="../data",
train = True,
download=True,
transform=transforms.Compose(trans))
train_loader = DataLoader(train_set,batch_size = 100, shuffle = True)

test_set = torchvision.datasets.CIFAR10(root="../data",
train = False,
download=True,
transform=transforms.Compose(trans))
test_loader = DataLoader(test_set,batch_size = 100, shuffle = True)

Files already downloaded and verified
Files already downloaded and verified


### Importing Fashion MNIST

In [6]:
#Fashion MNIST
resize = 0
trans = [transforms.ToTensor()]
if resize:
    trans.insert(0, transforms.Resize(resize))

train_set = torchvision.datasets.FashionMNIST(root="../data",
train = True,
download=True,
transform=transforms.Compose(trans))
train_loader = DataLoader(train_set,batch_size = 100, shuffle = True)

test_set = torchvision.datasets.FashionMNIST(root="../data",
train = False,
download=True,
transform=transforms.Compose(trans))
test_loader = DataLoader(test_set,batch_size = 100, shuffle = True)

In [24]:
a = next(iter(train_loader))
print(a[0].shape)

torch.Size([100, 1, 28, 28])


### Importing image for testing

In [None]:
dataset = torchvision.datasets.ImageFolder('../data/hymenoptera_data', transform = transforms.Compose([transforms.CenterCrop(224), transforms.ToTensor()#, transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
]))

dataloader = torch.utils.data.DataLoader(dataset,1)

### Initialising a model for training

In [8]:
model = torchvision.models.alexnet(pretrained=True)

In [8]:
model = AlexNet()
model_name = "AlexNet"

In [9]:
dataset_name = "CIFAR-10"

In [10]:
# tb = SummaryWriter(log_dir=f"./runs/{model_name}/{dataset_name}/")
if not os.path.exists(f'./{model_name}'):
    os.makedirs(f'./{model_name}')
if not os.path.exists(f'./state_dicts/{model_name}'):
    os.makedirs(f'./state_dicts/{model_name}')
if not os.path.exists(f'./state_dicts/{model_name}/{dataset_name}'):
    os.makedirs(f'./state_dicts/{model_name}/{dataset_name}')
model_state_path = f'./state_dicts/{model_name}/{dataset_name}/state_dict_model.pth'
model_path = f'./models/{model_name}_{dataset_name}_saved_model.pt'
# model.load_state_dict(torch.load(model_state_path))
epochs_start_from = 0
images, labels = next(iter(train_loader))
grid = torchvision.utils.make_grid(images)
# tb.add_image("images", grid)
# tb.add_graph(model, images)
# tb.close()

### Choosing device

In [11]:
def select_device(device=''):
    if device.lower() == 'cuda':
        if not torch.cuda.is_available():
            print ("torch.cuda not available")
            return torch.device('cpu')    
        else:
            return torch.device('cuda:0')
    if device.lower() == 'dml':
        return torch.device('dml')
    else:
        return torch.device('cpu')

In [12]:
device = select_device("cuda")
print(device)


cuda:0


In [13]:
no_of_layers=0
layers=[]
model_weights=[]

model_children=list(model.NN.children())
for child in model_children:
  if type(child)==nn.Sequential:
    for layer in child.children():
      no_of_layers+=1
      layers.append(layer)
      if type(layer) == nn.Conv2d:
        model_weights.append(layer.weight)
  else:
    no_of_layers+=1
    layers.append(child)
    if type(child) == nn.Conv2d:
        model_weights.append(child.weight)

print(no_of_layers)

13


## Hyperparameter tuning:
* Loss criterion
* Optimizer
* Batch sizes
* Learning Rate
    

In [14]:
print('training on', device)
model = model.to(device)
# train_loader = DataLoader(train_set,batch_size = 20, shuffle = True)
# test_loader = DataLoader(test_set,batch_size = 20, shuffle = True)
optimizer = opt.Adam(model.parameters(), lr= 0.001)
# scheduler = opt.lr_scheduler.CyclicLR(optimizer, 0.0001, 0.001, 100,100, cycle_momentum=False)
# scheduler = opt.lr_scheduler.ReduceLROnPlateau(optimizer)
# optimizer = opt.SGD(model.parameters(), lr= 0.01)
criterion = torch.nn.CrossEntropyLoss().to(device)

training on cuda:0


### Xavier Initialisation of weights

In [15]:
def init_weights(m):
        if type(m) == nn.Linear or type(m) == nn.Conv2d:
            nn.init.xavier_uniform_(m.weight)

model.apply(init_weights)

AlexNet(
  (NN): Sequential(
    (0): Conv2d(3, 96, kernel_size=(11, 11), stride=(4, 4), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(96, 256, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(256, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU()
    (8): Conv2d(384, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): ReLU()
    (10): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU()
    (12): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (avgpool): AdaptiveAvgPool2d(output_size=(6, 6))
  (classifier): Sequential(
    (0): Dropout(p=0.5, inplace=False)
    (1): Linear(in_features=9216, out_features=4096, bias=True)
    (2): ReLU()
    (3): Dropout(p=0.5, inplace=False)
    (4): Lin

In [17]:
#Use this to test the model training process, by overfitting a mini-batch of the dataset

train_loader_1 = DataLoader(train_set,batch_size = 20, shuffle = True)
images, labels = next(iter(train_loader_1))

epochs = int(5e2)
epochs_start_from = 0
model.train()
# tb = SummaryWriter()
for epoch in tqdm(range(epochs_start_from, epochs)):

    train_loss = 0
    train_correct = 0

    optimizer.zero_grad()
    images, labels = images.to(device), labels.to(device)
    preds = model(images)
    loss = criterion(preds, labels)
    train_loss+= loss.item()
    # train_correct+= get_num_correct(preds, labels)
    loss.backward()
    optimizer.step()
    # tb.add_scalar("loss", loss, epoch)
    # scheduler.step()

    # clear_output(wait=True)
    if not epoch%100:
        clear_output(wait=True)
        print(loss)
        # print(scheduler.get_last_lr(), scheduler.get_lr())
    if not epoch%10:
        pass
        # tb.add_scalars(".",{"1":i for i in scheduler.get_lr()}, epoch)

tensor(2.3842e-07, device='cuda:0', grad_fn=<NllLossBackward0>)


In [20]:
a = torch.nn.BCEWithLogitsLoss()

### Fully training Model

In [16]:
def func(e):
    return e.norm()

In [17]:
epochs = 2
# model.load_state_dict(torch.load(model_state_path))

epochs_start_from = 0
for epoch in tqdm(range(epochs_start_from, epochs)):

    train_loss = 0
    train_correct = 0

    model.train()
    for batch, (images, labels) in tqdm(enumerate(train_loader)):
        images = torch.movedim(images, 3, 1)
        images = F.interpolate(images, 224, mode="nearest")
        images, labels = images.to(device), labels.to(device)
        preds = model(images)
        loss = criterion(preds, labels)
        train_loss+= loss.item()
        train_correct+= get_num_correct(preds, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # scheduler.step()
        tb.add_scalar("Batch_Train Loss", train_loss, epoch*train_loader.__len__()+batch)
        tb.add_scalar("Batch_Train Accuracy", train_correct/ len(train_set), epoch*train_loader.__len__()+batch)
        # tb.add_scalar("Learning Rate", scheduler.get_lr()[0], epoch*train_loader.__len__()+batch)

    tb.add_scalar("Train Loss", train_loss, epoch)
    tb.add_scalar("Train Accuracy", train_correct/ len(train_set), epoch)

    valid_loss = 0.0
    valid_correct = 0.0

    model.eval()
    
    img = images[0].unsqueeze(0)
    tb.add_image("Image", np.squeeze(img, axis=0), epoch)
    results = [layers[0](img)]
    for i in range(1,len(layers)):
        results.append(layers[i](results[-1]))
    outputs = results

    
    for name, weight in model.named_parameters():
        tb.add_histogram(name,weight, epoch)
        tb.add_histogram(f'{name}.grad',weight.grad, epoch)


    for num_layer in tqdm(range(len(outputs))):
        figu = plt.figure(figsize=(50, 10))
        layer_viz = outputs[num_layer][0, :, :, :]
        layer_viz = layer_viz.data
        for i, filter in enumerate(layer_viz):
            a = np.floor(np.sqrt(len(layer_viz))) if len(layer_viz)>1 else 1
            b = np.floor(len(layer_viz)/a + 1) if len(layer_viz)%a else len(layer_viz)/a
            _ = plt.subplot(int(a), int(b), i + 1)
            _ = plt.imshow(filter.cpu())
            _ = plt.axis("off")
        tb.add_figure(f"({num_layer}): {layers[num_layer]}_applied", figu,global_step=epoch ,close=True)
    
    for j in tqdm(range(len(model_weights))):
        figu = plt.figure()
        for i, filter in enumerate(model_weights[j]):
            filter = .5*(filter+1)
            if np.shape(filter)[0] == 3:
                a = np.floor(np.sqrt(len(model_weights[j]))) if len(model_weights[j])>1 else 1
                b = np.floor(len(model_weights[j])/a + 1) if len(model_weights[j])%a else len(model_weights[j])/a
                _ = plt.subplot(int(a), int(b), i + 1)
                _ = plt.imshow(filter[:, :, :].permute(1,2,0).cpu().detach(), cmap='gray')
                _ =  plt.axis('off')
            else:
                values = [x for x in filter]
                values.sort(key = func, reverse=True)
                v = torch.cat([values[0].unsqueeze(0), values[1].unsqueeze(0), values[2].unsqueeze(0)], 0)
                a = np.floor(np.sqrt(len(model_weights[j]))) if len(model_weights[j])>1 else 1
                b = np.floor(len(model_weights[j])/a + 1) if len(model_weights[j])%a else len(model_weights[j])/a
                _ = plt.subplot(int(a), int(b), i + 1)
                t = torch.cat([v[0, :, :].unsqueeze(0), v[1, :, :].unsqueeze(0), v[2, :, :].unsqueeze(0)])
                _ = plt.imshow(t[:, :, :].permute(1,2,0).cpu().detach(), cmap='gray')
                _ =  plt.axis('off')
        tb.add_figure(f"Conv{[j]}", figu, global_step = epoch, close=True)
    
    for images, labels in tqdm(test_loader):
        images, labels = images.to(device), labels.to(device)
         
        # Forward Pass
        target = model(images)
        # Find the Loss
        loss = criterion(target,labels)
        # Calculate Loss
        valid_loss += loss.item()
        valid_correct+= get_num_correct(target, labels)
        tb.add_scalar("Validation Loss", train_loss, epoch)
        tb.add_scalar("Validation Accuracy", train_correct/ len(train_set), epoch)

    clear_output(wait=True)
    print("epoch:", epoch, "train_accuracy:", train_correct/len(train_set), "train_loss:",train_loss, "valid_loss:", valid_loss,  "valid_accuracy:", valid_correct/len(test_set))

    torch.save(model.state_dict(), model_state_path)
    torch.save(model.state_dict(), f'./state_dicts/{model_name}/state_dict_model_epoch-{epochs_start_from}.pth')
    epochs_start_from+=1

torch.save(model, model_path)

epoch: 1 train_accuracy: 0.4565 train_loss: 743.7565760612488 valid_loss: 140.38032925128937 valid_accuracy: 0.4985


In [None]:
tb.close()