In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import zipfile

dataset_path = '/content/drive/MyDrive/CNN_tutorial/imagenette2-160.zip'

with zipfile.ZipFile(dataset_path, 'r') as zip_ref:
    zip_ref.extractall('./data')

### Section 4: Training CNNs

#### 4.1 Getting our dataset
We will be using [Imagenette](https://github.com/fastai/imagenette) as our classification dataset. This is a 10-class subset of the popular ImageNet dataset.


In [None]:
import torchvision
import matplotlib.pyplot as plt

In [None]:
train_data = torchvision.datasets.Imagenette(root='./data', size='160px',  split='train')

In [None]:
train_data

In [None]:
class_labels = {
    0: "tench",
    1: "english springer",
    2: "casette player",
    3: "chain saw",
    4: "church",
    5: "french horn",
    6: "garbage truck",
    7: "gas pump",
    8: "golf ball",
    9: "parachute"
}


In [None]:
# visualising sample images and counting images per class
def visualise_dataset(ds, tensor=False):
    imgs_to_visualize = {}
    class_count = {}

    for img, label in ds:
        # create a label key and add one image plus start count
        if label not in imgs_to_visualize.keys():
            imgs_to_visualize[label] = img
            class_count[label] = 1

        # keep counting for each new label
        else:
            class_count[label] += 1

    fig, axs = plt.subplots(2, 5, figsize=(10, 6)) # empty canvas with 10 subplots (2 rows, 5 cols)
    axs = axs.flatten() # since axes are 2D, we can flatten them to easily access

    for i, (label_name, img) in enumerate(imgs_to_visualize.items()):
        if tensor:
            img = img.permute(1, 2, 0)

        ax = axs[i] #select axes
        ax.imshow(img)  #show img
        ax.set_title(f"{class_labels[label_name]} ({class_count[label_name]})", fontsize=10) #add class name and class count as title
        ax.axis('off')


    fig.suptitle("Sample images from Imagenette", fontsize=18)
    plt.show()


visualise_dataset(train_data)

#### 4.2 Getting our data ready for the CNN

Problems with our dataset currently:

- each image is stored as an Image object
- each image is a rectangle of non-uniform size

We can transform our dataset to solve these

In [None]:
import torch
import torchvision.transforms as transforms

# define a composition of transformations
transform = transforms.Compose([
    transforms.ToTensor(), # Convert from Image to Tensor
    transforms.Resize((160, 160)), # Resize to square (160x160)px
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)), # ImageNet Channelwise Mean and SD
    ])

In [None]:
# apply these transformations to the dataset
train_ds = torchvision.datasets.Imagenette(root='./data', size='160px',  split='train', transform=transform)
test_ds = torchvision.datasets.Imagenette(root='./data', size='160px',  split='val', transform=transform)

# create data loaders (optimized for CNN access - batched loading into memory, parallelization, shuffling, etc)
train_loader = torch.utils.data.DataLoader(train_ds, batch_size=32, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_ds, batch_size=32, shuffle=True)

In [None]:
visualise_dataset(train_ds, tensor=True)

#### 4.3 Building our CNN


CNN Architecture

![Image0](https://www.researchgate.net/publication/336805909/figure/fig1/AS:817888827023360@1572011300751/Schematic-diagram-of-a-basic-convolutional-neural-network-CNN-architecture-26.ppm)

Convolutional Layer

![Image](https://miro.medium.com/v2/resize:fit:1400/1*L1SVH2rBxGvJx3L4aB59Cg.png)

Pooling Layer

![Image2](https://pyimagesearch.com/wp-content/uploads/2021/05/Convolutional-Neural-Networks-CNNs-and-Layer-Types.png)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [None]:
class SimpleCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(128 * 20 * 20, 512)
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        # Conv Block 1
        x = self.conv1(x)
        x = F.relu(x)
        x = self.pool(x)

        # Conv Block 2
        x = self.conv2(x)
        x = F.relu(x)
        x = self.pool(x)

        # Conv Block 3
        x = self.conv3(x)
        x = F.relu(x)
        x = self.pool(x)

        # Flatten
        x = x.reshape(x.size(0), -1)  # Flatten for the fully connected layer

        # Fully Connected Layers
        x = self.fc1(x) # Representation Layer
        x = F.relu(x)

        x = self.fc2(x) # Final Classification Layer
        return x

In [None]:
simple_model = SimpleCNN()
loss_fn = nn.CrossEntropyLoss() # loss function
optimizer = optim.SGD(simple_model.parameters()) # optimizer that will update the weights of our CNN
num_training_epochs = 5

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
simple_model.to(device) # move model to gpu
print(f"Using device {device}")

#### Training our Network

In [None]:
from tqdm import tqdm

def train_loop(model, epochs, train_dataloader):
  accuracies = []
  losses = []
  for epoch in range(epochs):
    model.train()
    running_loss = 0
    running_acc = 0

    print(f"Epoch {epoch+1}/{epochs}")

    for imgs, labels in tqdm(train_dataloader):
      # forward pass
      imgs, labels = imgs.to(device), labels.to(device)
      optimizer.zero_grad() #reset the gradients
      output = model(imgs)
      loss = loss_fn(output, labels)

      # backward pass
      loss.backward() #gradients are computed for each parameter/weight
      optimizer.step()  #update the parameters with SGD

      running_loss += loss.item() # keep track of batch loss
      _, preds = torch.max(output, 1) # find the max indices (preds)
      running_acc += torch.sum(preds == labels) #keep track of batch accuracy

    epoch_loss = running_loss / len(train_dataloader) #loss is computed per batch
    epoch_acc = running_acc / len(train_dataloader.dataset) #acc is computed per sample

    print(f"Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f}")
    losses.append(epoch_loss)
    accuracies.append(epoch_acc.item())

  return losses, accuracies

In [None]:
losses, accuracies = train_loop(simple_model, 5, train_loader)

In [None]:
def plot_train_loss_accuracies(loss, accuracies):

    fig, ax = plt.subplots(1, 2, figsize=(15, 5))
    ax[0].plot(loss)
    ax[0].set_title('Train Loss')
    ax[0].set_xlabel('Epochs')

    ax[1].plot(accuracies)
    ax[1].set_title('Train Accuracy')
    ax[1].set_ylim(0, 1)
    ax[1].set_xlim(0, num_training_epochs)
    ax[1].set_xlabel('Epochs')

    plt.show()
    return

plot_train_loss_accuracies(losses, accuracies)

In [None]:
def get_test_accuracy(model, test_dataloader):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in tqdm(test_dataloader):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy

In [None]:
get_test_accuracy(simple_model, test_loader)

#### Go deeper!

In [None]:
class VGG16(nn.Module):
    def __init__(self, num_classes=10):
        super(VGG16, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU())
        self.layer2 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU())
        self.layer4 = nn.Sequential(
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer5 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU())
        self.layer6 = nn.Sequential(
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU())
        self.layer7 = nn.Sequential(
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer8 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU())
        self.layer9 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU())
        self.layer10 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer11 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU())
        self.layer12 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU())
        self.layer13 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(5*5*512, 4096),
            nn.ReLU())
        self.fc1 = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU())
        self.fc2= nn.Sequential(
            nn.Linear(4096, num_classes))

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        out = self.layer6(out)
        out = self.layer7(out)
        out = self.layer8(out)
        out = self.layer9(out)
        out = self.layer10(out)
        out = self.layer11(out)
        out = self.layer12(out)
        out = self.layer13(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        out = self.fc1(out)
        out = self.fc2(out)
        return out

In [None]:
vgg_model = VGG16()
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(vgg_model.parameters())
num_training_epochs = 5
vgg_model = vgg_model.to(device)

In [None]:
vgg_losses, vgg_accuracies = train_loop(vgg_model, 5, train_loader)

In [None]:
model_state_dict = torch.load('/content/drive/MyDrive/CNN_tutorial/vgg_model_e20.pth')
better_vgg = VGG16()
better_vgg.load_state_dict(model_state_dict)
better_vgg = better_vgg.to(device)

In [None]:
get_test_accuracy(better_vgg, test_loader)

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

def plot_confusion_matrix(model, test_loader, class_names):
    model.eval()
    true_labels = []
    predicted_labels = []

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            true_labels.extend(labels.cpu().numpy())
            predicted_labels.extend(predicted.cpu().numpy())

    # creates a confusion matrix using scikit-learn
    cm = confusion_matrix(true_labels, predicted_labels)


    plt.figure(figsize=(10, 8))
    # plotting the confusion matrix as a heatmap
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')

    plt.show()


In [None]:
plot_confusion_matrix(simple_model, test_loader, class_labels.values())

In [None]:
plot_confusion_matrix(better_vgg, test_loader, class_labels.values())

In [None]:
def visualize_model_pred(model, index):
    img, label = train_data[index]
    img_model, label_model = train_ds[index]

    plt.figure(figsize=(5, 5))
    plt.imshow(img)

    pred_label = model(img_model.unsqueeze(0).to(device))
    _, pred = torch.max(pred_label, 1)

    true_label = class_labels[label_model]
    pred_label = class_labels[pred.item()]
    plt.title(f"True: {true_label},  Pred:{pred_label}")
    plt.axis('off')
    plt.show()

In [None]:
# visualize_model_pred(simple_model, 100)
visualize_model_pred(better_vgg, 100)


### 4.4 Finetuning your model

In [None]:
import zipfile
ft_ds_path = '/content/drive/MyDrive/CNN_tutorial/hymenoptera_data.zip'

with zipfile.ZipFile(ft_ds_path, 'r') as zip_ref:
    zip_ref.extractall('./data')

In [None]:
input_size = 224

data_transforms = {
    'train': transforms.Compose([
        # transforms.RandomResizedCrop(input_size), # Augmentation 1
        # transforms.RandomHorizontalFlip(),        # Augmentation 2
        transforms.Resize(input_size),
        transforms.CenterCrop(input_size),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(input_size),
        transforms.CenterCrop(input_size),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [None]:
ft_train_ds = torchvision.datasets.ImageFolder(root='./data/hymenoptera_data/train', transform=data_transforms['train'])
ft_test_ds = torchvision.datasets.ImageFolder(root='./data/hymenoptera_data/val', transform=data_transforms['val'])


ft_train_loader = torch.utils.data.DataLoader(ft_train_ds, batch_size=32, shuffle=True)
ft_test_loader = torch.utils.data.DataLoader(ft_test_ds, batch_size=32, shuffle=True)

In [None]:
imgs_to_visualize = {}
class_count = {}

ft_class_labels = {
    0: "ants",
    1: "bees"
}

for img, label in ft_train_ds:
    if label not in imgs_to_visualize.keys():
        imgs_to_visualize[label] = img
        class_count[label] = 1
    else:
        class_count[label] += 1

fig, axs = plt.subplots(1, 2, figsize=(10, 5))
axs = axs.flatten()

for i, (label_name, img) in enumerate(imgs_to_visualize.items()):
    ax = axs[i]
    img = img.permute(1, 2, 0)
    ax.imshow(img)
    ax.set_title(f"{ft_class_labels[label_name]} ({class_count[label_name]})")
    ax.axis('off')


fig.suptitle("Sample images", fontsize=18)
plt.show()

In [None]:
ft_vgg = torchvision.models.vgg16(weights='DEFAULT') # downloading a model from torchvision pretrained on ImageNet

# freezing all the weights
for param in ft_vgg.parameters():
    param.requires_grad = False
ft_vgg

In [None]:
# replace final classification layer
num_features = ft_vgg.classifier[6].in_features
ft_vgg.classifier[6] = nn.Linear(num_features, 2) #our dataset has 2 classes
ft_vgg.to(device)

# check which layers are trainable (should only be the final layer "classifier.6")
params_to_update = []
for name,param in ft_vgg.named_parameters():
    if param.requires_grad == True:
        params_to_update.append(param)
        print("\t",name, param.shape)

In [None]:
optimizer = optim.SGD(params_to_update, lr=0.001) #only feeding in the "params_to_update" to the optimizer
loss_fn = nn.CrossEntropyLoss()

ft_loss, ft_accuracy = train_loop(ft_vgg, 5, ft_train_loader)

In [None]:
vgg_random = torchvision.models.vgg16(weights=None)

num_features = vgg_random.classifier[6].in_features
vgg_random.classifier[6] = nn.Linear(num_features, 2) #our dataset has 2 classes
vgg_random.to(device)

optimizer = optim.SGD(vgg_random.parameters(), lr=0.001)

rand_loss, rand_accuracy = train_loop(vgg_random, 5, ft_train_loader)


### 4.5 Extracting Activations

In [None]:
ft_vgg

In [None]:
activations = {}
# function that will get activation from layer with name "name"
def get_activation(name):
    def hook(model, input, output):
        activations[name] = np.squeeze(output.cpu().detach()) #squeeze-removes batchdim, cpu-moves to cpu, detach-removes gradient
    return hook

ft_vgg.classifier[3].register_forward_hook(get_activation('classifier.linear')) #register_forward_hook requries a "hook" sub function which it calls internally

# feeding a random input, could be an image tensor
X = torch.rand(1, 3, 224, 224).to(device)

output = ft_vgg(X)

In [None]:
activations['classifier.linear']

In [None]:
import scipy.io as sio

mat_data = {'activations': activations['classifier.linear'].numpy()}
sio.savemat('activations.mat', mat_data)

### 4.5 Moving a model to MATLAB

In [None]:
model = torchvision.models.resnet18(pretrained=True) # tracing only works with 'modern' networks
model.eval()
model.to("cpu")
X = torch.rand(1, 3, 224, 224) # example input required to trace
traced_model = torch.jit.trace(model.forward, X) #tracing the model and saving it in a 'Script' format that is Python-free
traced_model.save('traced_model.pt')
