<a href="https://colab.research.google.com/github/globalit6679/AI-Project-Monitoring/blob/main/Deep%20Learning%20%26%20Machine%20Learning/CW3/DLCV_VGG16.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch.nn.functional as F
import torch
import torchvision
from matplotlib import pyplot as plt
from torchvision import utils
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

import time
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets
from torchvision import transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt

if torch.cuda.is_available():
    torch.backends.cudnn.deterministic = True

In [2]:
class VGGBlock(nn.Module):
    def __init__(self, in_channels, out_channels,batch_norm=False):

        super().__init__()

        conv2_params = {'kernel_size': (3, 3),
                        'stride'     : (1, 1),
                        'padding'   : 1
                        }

        noop = lambda x : x

        self._batch_norm = batch_norm

        self.conv1 = nn.Conv2d(in_channels=in_channels,out_channels=out_channels , **conv2_params)
        self.bn1 = nn.BatchNorm2d(out_channels) if batch_norm else noop

        self.conv2 = nn.Conv2d(in_channels=out_channels,out_channels=out_channels, **conv2_params)
        self.bn2 = nn.BatchNorm2d(out_channels) if batch_norm else noop

        self.max_pooling = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))

    @property
    def batch_norm(self):
        return self._batch_norm

    def forward(self,x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.relu(x)

        x = self.conv2(x)
        x = self.bn2(x)
        x = F.relu(x)

        x = self.max_pooling(x)

        return x

In [3]:
class VGG16(nn.Module):

    def __init__(self, input_size, num_classes=10,batch_norm=False):
        super(VGG16, self).__init__()

        self.in_channels,self.in_width,self.in_height = input_size

        self.block_1 = VGGBlock(self.in_channels,64,batch_norm=batch_norm)
        self.block_2 = VGGBlock(64, 128,batch_norm=batch_norm)
        self.block_3 = VGGBlock(128, 256,batch_norm=batch_norm)
        self.block_4 = VGGBlock(256,512,batch_norm=batch_norm)

        self.classifier = nn.Sequential(
            nn.Linear(2048, 4096),
            nn.ReLU(True),
            nn.Dropout(p=0.65),
            nn.Linear(4096, 4096),
            nn.ReLU(True),
            nn.Dropout(p=0.65),
            nn.Linear(4096, num_classes)
        )

    @property
    def input_size(self):
        return self.in_channels,self.in_width,self.in_height

    def forward(self, x):

        x = self.block_1(x)
        x = self.block_2(x)
        x = self.block_3(x)
        x = self.block_4(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)

        return x

In [4]:
data_transform = transforms.Compose([
  transforms.Resize(32),
  transforms.RandomHorizontalFlip(),
  transforms.ToTensor(),
])

test_transform = transforms.Compose([
  transforms.Resize(32),
  transforms.ToTensor(),
])

In [5]:
from torchvision import datasets
from torchvision import transforms
from torch.utils.data import DataLoader

# CIFAR10 dataset
c_train_dataset = datasets.CIFAR10(root='./data',
                                 train=True,
                                 transform=data_transform,
                                 download=True)

c_test_dataset = datasets.CIFAR10(root='./data',
                                train=False,
                                transform=test_transform,
                                download=True)


c_train_loader = DataLoader(dataset=c_train_dataset,
                          num_workers=2,
                          batch_size=64,
                          shuffle=True)

c_test_loader = DataLoader(dataset=c_test_dataset,
                          num_workers=2,
                          batch_size=64,
                          shuffle=False)

# MNIST dataset
train_dataset = datasets.MNIST(root='./data',
                                 train=True,
                                 transform=data_transform,
                                 download=True)

test_dataset = datasets.MNIST(root='./data',
                                train=False,
                                transform=test_transform,
                                download=True)


train_loader = DataLoader(dataset=train_dataset,
                          num_workers=2,
                          batch_size=64,
                          shuffle=True)

test_loader = DataLoader(dataset=test_dataset,
                         num_workers=2,
                         drop_last=True,
                         batch_size=64,
                         shuffle=False)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:02<00:00, 58604152.13it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to ./data/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9912422/9912422 [00:00<00:00, 45632184.26it/s]


Extracting ./data/MNIST/raw/train-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to ./data/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28881/28881 [00:00<00:00, 3536189.10it/s]


Extracting ./data/MNIST/raw/train-labels-idx1-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1648877/1648877 [00:00<00:00, 15045176.26it/s]


Extracting ./data/MNIST/raw/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4542/4542 [00:00<00:00, 5778140.36it/s]

Extracting ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw






In [6]:
c_loaders = {"train": c_train_loader,
           "test": c_test_loader}

loaders = {"train": train_loader,
           "test": test_loader}

In [None]:
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
import logging
import time
import datetime

net = VGG16(input_size=(3, 64, 64), batch_norm=True)
net2 = VGG16(input_size=(1, 64, 64), batch_norm=True)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr = 0.01)
optimizer2 = optim.SGD(net2.parameters(), lr = 0.01)

device = 'cuda' if torch.cuda.is_available() else 'cpu'

class EarlyStopping:
    def __init__(self, patience=5, delta=0, verbose=False, path='checkpoint.pt'):
        self.patience = patience
        self.delta = delta
        self.verbose = verbose
        self.path = path
        self.counter = 0
        self.best_score = None
        self.early_stop = False

    def __call__(self, test_loss, model):
        score = -test_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(test_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(test_loss, model)
            self.counter = 0

    def save_checkpoint(self, test_loss, model):
        if self.verbose:
            print(f'Saving model')
        torch.save(model.state_dict(), self.path)


def train(net, loaders, optimizer, criterion, tensorboard, epochs=100, dev=device, save_param = False, model_name='vgg16'):
    try:
        early_stopping = EarlyStopping(patience=5, verbose=True)
        net = net.to(dev)
        history_loss = {"train": [], "test": []}
        history_accuracy = {"train": [], "test": []}
        writer = SummaryWriter(f"runs/vgg16_{tensorboard}")
        timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
        writer.add_text('Timestamp: ',timestamp)

        for epoch in range(epochs):
            sum_loss = {"train": 0, "test": 0}
            sum_accuracy = {"train": 0, "test": 0}
            for split in ["train", "test"]:
                if split == "train":
                    net.train()
                else:
                    net.eval()
                for (input, labels) in loaders[split]:
                    input = input.to(dev)
                    labels = labels.to(dev)
                    optimizer.zero_grad()
                    pred = net(input)
                    labels = labels.long()
                    loss = criterion(pred, labels)
                    sum_loss[split] += loss.item()
                    if split == "train":
                        loss.backward()
                        optimizer.step()
                    _,pred_label = torch.max(pred, dim = 1)
                    pred_labels = (pred_label == labels).float()

                    batch_accuracy = pred_labels.sum().item()/input.size(0)
                    sum_accuracy[split] += batch_accuracy
            epoch_loss = {split: sum_loss[split]/len(loaders[split]) for split in ["train", "test"]}
            epoch_accuracy = {split: sum_accuracy[split]/len(loaders[split]) for split in ["train", "test"]}

            early_stopping(epoch_loss['test'], net)
            if early_stopping.early_stop:
                print("Early stopping triggered")
                break

            writer.add_scalar("Training Loss", epoch_loss['train'])
            writer.add_scalar("Training Accuracy", epoch_accuracy['train'])  # Log accuracy
            writer.add_scalar("Test Loss", epoch_loss['test'])
            writer.add_scalar("Test Accuracy", epoch_accuracy['test'])  # Log accuracy

            for split in ["train", "test"]:
                history_loss[split].append(epoch_loss[split])
                history_accuracy[split].append(epoch_accuracy[split])

            print(f"Epoch {epoch+1}:",
                  f"Train Loss={epoch_loss['train']:.4f},",
                  f"Train Accuracy={epoch_accuracy['train']:.4f},",
                  f"Test Loss={epoch_loss['test']:.4f},",
                  f"Test Accuracy={epoch_accuracy['test']:.4f},")
    finally:
        plt.title("Loss")
        for split in ["train", "test"]:
            plt.plot(history_loss[split], label=split)
        plt.legend()
        plt.show()

        plt.title("Accuracy")
        for split in ["train", "test"]:
            plt.plot(history_accuracy[split], label=split)
        plt.legend()
        plt.show()

train(net, c_loaders, optimizer, criterion, dev=device, tensorboard='cifar10') # CIFAR10

  self.pid = os.fork()


In [None]:
import matplotlib.pyplot as plt
import torchvision.transforms.functional as TF

# Visualisation samples (CIFAR10)
def imshow(img):
    img = img.permute(1, 2, 0).cpu().numpy()
    plt.imshow(img)
    plt.axis('off')
    plt.show()

def visualize_samples(net, loader, dev=device):
    net.eval()
    with torch.no_grad():
        correct_samples = []
        incorrect_samples = []

        for inputs, labels in loader:
            inputs = inputs.to(dev)
            labels = labels.to(dev)

            outputs = net(inputs)
            _, predicted = torch.max(outputs, 1)

            correct_mask = predicted == labels

            correct_samples.extend([(inputs[i], labels[i], predicted[i]) for i in range(len(inputs)) if correct_mask[i]])
            incorrect_samples.extend([(inputs[i], labels[i], predicted[i]) for i in range(len(inputs)) if not correct_mask[i]])

    print("Correctly classified samples:")
    for i in range(min(5, len(correct_samples))):
        input, label, predicted = correct_samples[i]
        print(f"True Label: {label.item()}, Predicted Label: {predicted.item()}")
        imshow(input)

    print("\nIncorrectly classified samples:")
    for i in range(min(5, len(incorrect_samples))):
        input, label, predicted = incorrect_samples[i]
        print(f"True Label: {label}, Predicted Label: {predicted}")
        imshow(input)

visualize_samples(net, c_loaders['test'], dev=device)

In [None]:
train(net2, loaders, optimizer2, criterion, dev=device, tensorboard='mnist') # MNIST

In [None]:
# Visualisation samples (MNIST)
visualize_samples(net2, loaders['test'], dev=device)