## Convolutional Networks

We'll check out how to build a **convolutional network** to classify CIFAR10 images. By using weight sharing - multiple units with the same weights - convolutional layers are able to learn repeated patterns in your data. For example, a unit could learn the pattern for an eye, or a face, or lower level features like edges.


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [13]:
import numpy as np
import time

import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
import torch.utils.data as utils
from torchvision import datasets, transforms
from torch.utils.data.sampler import SubsetRandomSampler
import matplotlib.pyplot as plt
%matplotlib inline

In [14]:
label_names = [
    'airplane',
    'automobile',
    'bird',
    'cat',
    'deer',
    'dog',
    'frog',
    'horse',
    'ship',
    'truck'
]


def plot_images(images, cls_true, cls_pred=None):
    """
    Adapted from https://github.com/Hvass-Labs/TensorFlow-Tutorials/
    """
    fig, axes = plt.subplots(3, 3)

    for i, ax in enumerate(axes.flat):
        # plot img
        ax.imshow(images[i, :, :, :], interpolation='spline16')

        # show true & predicted classes
        cls_true_name = label_names[cls_true[i]]
        if cls_pred is None:
            xlabel = "{0} ({1})".format(cls_true_name, cls_true[i])
        else:
            cls_pred_name = label_names[cls_pred[i]]
            xlabel = "True: {0}\nPred: {1}".format(
                cls_true_name, cls_pred_name
            )
        ax.set_xlabel(xlabel)
        ax.set_xticks([])
        ax.set_yticks([])

    plt.show()

In [None]:
def get_train_valid_loader(data_dir='data',
                           batch_size=64,
                           augment=True,
                           random_seed = 1,
                           valid_size=0.02,
                           shuffle=True,
                           show_sample=False,
                           num_workers=4,
                           pin_memory=False):
    """
    Utility function for loading and returning train and valid
    multi-process iterators over the CIFAR-10 dataset. A sample
    9x9 grid of the images can be optionally displayed.
    If using CUDA, num_workers should be set to 1 and pin_memory to True.
    Params
    ------
    - data_dir: path directory to the dataset.
    - batch_size: how many samples per batch to load.
    - augment: whether to apply the data augmentation scheme
      mentioned in the paper. Only applied on the train split.
    - random_seed: fix seed for reproducibility.
    - valid_size: percentage split of the training set used for
      the validation set. Should be a float in the range [0, 1].
    - shuffle: whether to shuffle the train/validation indices.
    - show_sample: plot 9x9 sample grid of the dataset.
    - num_workers: number of subprocesses to use when loading the dataset.
    - pin_memory: whether to copy tensors into CUDA pinned memory. Set it to
      True if using GPU.
    Returns
    -------
    - train_loader: training set iterator.
    - valid_loader: validation set iterator.
    """
    error_msg = "[!] valid_size should be in the range [0, 1]."
    assert ((valid_size >= 0) and (valid_size <= 1)), error_msg

    normalize = transforms.Normalize(
        mean=[0.4914, 0.4822, 0.4465],
        std=[0.2023, 0.1994, 0.2010],
    )

    # define transforms
    valid_transform = transforms.Compose([
            transforms.ToTensor(),
            normalize,
    ])

    #########################################################
    #PART ADDED BY ME--------------------------------------------------------
    if augment:
      train_transform_original = transforms.Compose([
            transforms.ToTensor(),
            normalize,
        ])
      train_dataset_original = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=train_transform_original,
      )

      train_transform1 = transforms.Compose([
            #transforms.RandomCrop(32, padding=4),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            normalize,
        ])

      train_dataset1 = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=train_transform1,
       )

      train_transform2 = transforms.Compose([
            transforms.RandomCrop(32, padding=4),
            #transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            normalize,
        ])

      train_dataset2 = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=train_transform2,
       )


      valid_dataset = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=valid_transform,
      )

      train_dataset = torch.utils.data.ConcatDataset([train_dataset_original, train_dataset1,train_dataset2])
      valid_dataset = torch.utils.data.ConcatDataset([valid_dataset,valid_dataset, valid_dataset])

    else:
      train_transform = transforms.Compose([
            transforms.ToTensor(),
            normalize,
        ])

      # load the dataset
      train_dataset = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=train_transform,
      )

      valid_dataset = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=valid_transform,
      )


    #END OF MY PART---------------------------------------------------------------

    ################################################################################
    """ #ORIGINAL CODE
    if augment:
        train_transform = transforms.Compose([
            transforms.RandomCrop(32, padding=4),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            normalize,
        ])
    else:
        train_transform = transforms.Compose([
            transforms.ToTensor(),
            normalize,
        ])



    # load the dataset
    train_dataset = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=train_transform,
    )

    valid_dataset = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=valid_transform,
    )
    """
    ########################################################

    num_train = len(train_dataset)
    indices = list(range(num_train))
    split = int(np.floor(valid_size * num_train))

    if shuffle:
        np.random.seed(random_seed)
        np.random.shuffle(indices)

    train_idx, valid_idx = indices[split:], indices[:split]
    train_sampler = SubsetRandomSampler(train_idx)
    valid_sampler = SubsetRandomSampler(valid_idx)

    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, sampler=train_sampler,
        num_workers=num_workers, pin_memory=pin_memory,
    )
    valid_loader = torch.utils.data.DataLoader(
        valid_dataset, batch_size=batch_size, sampler=valid_sampler,
        num_workers=num_workers, pin_memory=pin_memory,
    )


    # visualize some images
    if show_sample:
        sample_loader = torch.utils.data.DataLoader(
            train_dataset, batch_size=9, shuffle=shuffle,
            num_workers=num_workers, pin_memory=pin_memory,
        )
        data_iter = iter(sample_loader)
        images, labels = next(data_iter)
        X = images.numpy().transpose([0, 2, 3, 1])
        plot_images(X, labels)

    return (train_loader, valid_loader)

trainloader, valloader = get_train_valid_loader()

In [None]:
class ConvNet(nn.Module):
    def __init__(self, n_input_channels=3, n_output=10):
        super().__init__()
        # Define convolutional layers
        self.conv1 = nn.Conv2d(3, 16, 3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
        self.conv4 = nn.Conv2d(64, 128, 3, padding=1)
        self.conv5 = nn.Conv2d(128, 256, 3, padding=1)
        self.conv6 = nn.Conv2d(256, 256, 3, padding=1)
        # Define batch normalization layers
        self.bn1 = nn.BatchNorm2d(16)
        self.bn2 = nn.BatchNorm2d(32)
        self.bn3 = nn.BatchNorm2d(64)
        self.bn4 = nn.BatchNorm2d(128)
        self.bn5 = nn.BatchNorm2d(256)
        #self.bn6 = nn.BatchNorm2d(256)
        # Define max pooling layer
        self.pool = nn.MaxPool2d(2, 2)
        # Define fully connected layers
        self.fc1 = nn.Linear(256 * 8 * 8, 512)
        self.fc2 = nn.Linear(512, 10)
        # Define dropout layer
        self.dropout = nn.Dropout(0.25)

    def forward(self, x):
        # Forward pass through convolutional layers
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = F.relu(self.bn4(self.conv4(x)))
        x = F.relu(self.bn5(self.conv5(x)))
        x = F.relu(self.bn5(self.conv6(x)))
        # Flatten the output of the last convolutional layer
        x = x.reshape(x.size(0), -1)
        # Apply dropout
        x = self.dropout(x)
        # Forward pass through fully connected layers
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

    def predict(self, x):
        logits = self.forward(x)
        return F.softmax(logits)


In [16]:
# https://github.com/meng1994412/VGGNet_from_scratch/blob/master/pipeline/nn/conv/minivggnet.py
# I took inspiration from the MiniVggNet implementation in the above link

class MiniVGG(nn.Module):
    def __init__(self, n_input_channels=3, n_output=10):
        super(MiniVGG, self).__init__()
        self.conv1 = nn.Conv2d(n_input_channels, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout1 = nn.Dropout(0.25)

        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.conv4 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout2 = nn.Dropout(0.25)

        self.fc1 = nn.Linear(64 * 8 * 8, 512)
        self.bn5 = nn.BatchNorm1d(512)
        self.dropout3 = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, 256)
        self.bn6 = nn.BatchNorm1d(256)
        self.fc3 = nn.Linear(256, n_output)



    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool1(F.relu(self.bn2(self.conv2(x))))
        x = self.dropout1(x)

        x = F.relu(self.bn3(self.conv3(x)))
        x = self.pool2(F.relu(self.bn4(self.conv4(x))))
        x = self.dropout2(x)

        x = x.view(-1, 64 * 8 * 8)
        x = F.relu(self.bn5(self.fc1(x)))
        x = self.dropout3(x)
        x = F.relu(self.bn6(self.fc2(x)))
        x = self.dropout3(x)
        x = self.fc3(x)
        return x

    def predict(self, x):
        logits = self.forward(x)
        return F.softmax(logits)


In [None]:
#Uncomment the net you want to use
#net = ConvNet()
net = MiniVGG()

# use CUDA for training
# Define the device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
num_workers = 1 if device.type == 'cuda' else 4
pin_memory = True if device.type == 'cuda' else False
# Load the data
trainloader, valloader = get_train_valid_loader(num_workers=num_workers, pin_memory=pin_memory )

# move all to device
net.to(device)
# Move trainloader and valloader to device
trainloader = [(i. to(device), j. to(device)) for i, j in trainloader]
valloader = [(i. to(device), j. to(device)) for i, j in valloader]

In [None]:
from torch.optim.lr_scheduler import ReduceLROnPlateau

# Define the loss function (categorical cross-entropy)
criterion = nn.CrossEntropyLoss()

# Specify the optimizer (SGD with learning rate of 0.01)
optimizer = optim.SGD(net.parameters(), lr=0.01)
#optimizer = optim.Adam(net.parameters(), lr=0.001)

# Learning rate scheduler
scheduler = ReduceLROnPlateau(optimizer, 'min', patience=4, factor=0.1)

# Initialize variables
train_loss = 0.0
valid_loss = 0.0
epochs = 100
steps = 0
running_loss = 0
print_every = 80
max_no_improve = 5  # Maximum number of epochs with no improvement
no_improve_count = 0  # Counter for epochs with no improvement
best_accuracy = 0.0  # Best validation accuracy

# Initialize variables for storing accuracy values over epochs
epoch_list = []
accuracy_list = []
lr_values = []

for e in range(epochs):
    start = time.time()
    for images, labels in iter(trainloader):
        steps += 1
        # Zero the parameter gradients
        optimizer.zero_grad()
        # Forward pass
        output = net(images)
        # Calculate the loss
        loss = criterion(output, labels)
        # Backward pass
        loss.backward()
        # Perform a single optimization step (parameter update)
        optimizer.step()
        running_loss += loss.item()

    # Calculate validation accuracy at the end of each epoch
    accuracy = 0
    for images, labels in valloader:
        predicted = net.predict(images).data
        equality = (labels == predicted.max(1)[1])
        accuracy += equality.type_as(torch.FloatTensor()).mean()
    accuracy /= len(valloader)

    # Print accuracy and loss at the end of each epoch
    print("Epoch: {}/{}..".format(e + 1, epochs),
          "Loss: {:.4f}..".format(running_loss / len(trainloader)),
          "Validation accuracy: {:.4f}".format(accuracy))

    # Append epoch number and accuracy to lists for plotting
    epoch_list.append(e)
    accuracy_list.append(accuracy)

    # Check for early stopping based on validation accuracy
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        no_improve_count = 0
    else:
        no_improve_count += 1

    # Check for early stopping
    if no_improve_count >= max_no_improve:
        print("Validation accuracy did not improve for {} epochs. Stopping training.".format(max_no_improve))
        break

    running_loss = 0

    # Update learning rate scheduler
    scheduler.step(valid_loss)
    current_lr = optimizer.param_groups[0]['lr']
    lr_values.append(current_lr)


Save best trained model.

In [None]:
# Plot accuracy over epochs
plt.plot(epoch_list, accuracy_list, label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Validation Accuracy over Epochs')
plt.legend()

# Add grid
plt.grid(True)

# Plot learning rate changes
for epoch, lr in enumerate(lr_values):
    if epoch > 0 and lr != lr_values[epoch - 1]:
        plt.text(epoch, accuracy_list[epoch], f'LR: {lr:.5f}', fontsize=8, rotation=45)

# Write last accuracy below the blue line
last_accuracy = accuracy_list[-1]
last_epoch = epoch_list[-1]
plt.text(last_epoch, last_accuracy - 0.02, f'Last Accuracy: {last_accuracy:.4f}', fontsize=10, ha='right')

plt.show()

In [23]:
#torch.save(net.state_dict(), 'drive/MyDrive/Colab Notebooks/CNN_ConvNet_final2.ckpt')
torch.save(net.state_dict(), 'drive/MyDrive/Colab Notebooks/CNN_MiniVGG_deeperfully.ckpt')