In [None]:
import torch 

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
report_dir = "docs/reports/model"

In [None]:
from torchvision import transforms, datasets

from torch.utils.data import DataLoader, random_split

class GetDataset:
    input_dir = ""

    def __init__(self, input_dir, batch_size=32):
        transform = transforms.Compose(
            [transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))]
        )

        train_valid_data = datasets.CIFAR10(
            f"{input_dir}/train", train=True, download=True, transform=transform
        )
        test_dataset = datasets.CIFAR10(
            f"{input_dir}/test", train=False, download=True, transform=transform
        )

        train_dataset, valid_dataset = random_split(train_valid_data, (45000, 5000))

        print(
            "Image shape of a random sample image : {}".format(
                train_dataset[0][0].numpy().shape
            ),
            end="\n\n",
        )

        print("Training Set:   {} images".format(len(train_dataset)))
        print("Validation Set:   {} images".format(len(valid_dataset)))
        print("Test Set:       {} images".format(len(test_dataset)))

        batch_size = 32

        self.train_loader = DataLoader(
            train_dataset, batch_size=batch_size, shuffle=True
        )
        self.valid_loader = DataLoader(
            valid_dataset, batch_size=batch_size, shuffle=True
        )
        self.test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
        
        self.num_classes = len(train_valid_data.classes)


In [None]:
from torch import nn, flatten


class Auxiliary(nn.Module):
    def __init__(self, in_channels, num_classes):
        super(Auxiliary, self).__init__()

        self.pool = nn.AdaptiveAvgPool2d((4, 4))
        self.conv = nn.Conv2d(in_channels, 128, kernel_size=1, stride=1, padding=0)
        self.act = nn.ReLU()

        self.fc1 = nn.Linear(2048, 1024)
        self.dropout = nn.Dropout(0.7)
        self.fc2 = nn.Linear(1024, num_classes)

    def forward(self, x):
        out = self.pool(x)
        out = self.conv(out)
        out = self.act(out)

        out = flatten(out, 1)  # reshaping to one dim tensor

        out = self.fc1(out)
        out = self.act(out)

        out = self.dropout(out)

        out = self.fc2(out)

        return out


In [None]:
from torch import nn


# the starter convolutional block
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding):
        super(ConvBlock, self).__init__()

        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        self.bn = nn.BatchNorm2d(out_channels)
        self.act = nn.ReLU()

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.act(x)

        return x


In [None]:
from torch import nn, cat

class Inception(nn.Module):
    def __init__(
        self,
        in_channels,
        num1x1,
        num3x3_reduce,
        num3x3,
        num5x5_reduce,
        num5x5,
        pool_proj,
    ):
        super(Inception, self).__init__()

        self.block1 = nn.Sequential(
            ConvBlock(in_channels, num1x1, kernel_size=1, stride=1, padding=0)
        )

        self.block2 = nn.Sequential(
            ConvBlock(in_channels, num3x3_reduce, kernel_size=1, stride=1, padding=0),
            ConvBlock(num3x3_reduce, num3x3, kernel_size=3, stride=1, padding=1),
        )

        self.block3 = nn.Sequential(
            ConvBlock(in_channels, num5x5_reduce, kernel_size=1, stride=1, padding=0),
            ConvBlock(num5x5_reduce, num5x5, kernel_size=5, stride=1, padding=2),
        )

        self.block4 = nn.Sequential(
            nn.MaxPool2d(
                3, stride=1, padding=1, ceil_mode=True
            ),  # adds extra cols (as padding to cover the stride used)
            ConvBlock(in_channels, pool_proj, kernel_size=1, stride=1, padding=0),
        )

    def forward(self, x):
        out1 = self.block1(x)
        out2 = self.block2(x)
        out3 = self.block3(x)
        out4 = self.block4(x)

        return cat([out1, out2, out3, out4], 1)


In [None]:
import torch
import torch.nn as nn
from torch import optim

class GoogleLeNet(nn.Module):
    def __init__(self, num_classes):
        super(GoogleLeNet, self).__init__()

        self.conv1 = ConvBlock(3, 64, kernel_size=7, stride=2, padding=3)
        self.pool1 = nn.MaxPool2d(3, stride=2, padding=0, ceil_mode=True)
        self.conv2 = ConvBlock(64, 64, kernel_size=1, stride=1, padding=0)
        self.conv3 = ConvBlock(64, 192, kernel_size=3, stride=1, padding=1)
        self.pool2 = nn.MaxPool2d(3, stride=2, padding=0, ceil_mode=True)

        self.inception3A = Inception(
            in_channels=192,
            num1x1=64,
            num3x3_reduce=96,
            num3x3=128,
            num5x5_reduce=16,
            num5x5=32,
            pool_proj=32,
        )

        self.inception3B = Inception(
            in_channels=256,
            num1x1=128,
            num3x3_reduce=128,
            num3x3=192,
            num5x5_reduce=32,
            num5x5=96,
            pool_proj=64,
        )

        self.pool3 = nn.MaxPool2d(3, stride=2, padding=0, ceil_mode=True)

        self.inception4A = Inception(
            in_channels=480,
            num1x1=192,
            num3x3_reduce=96,
            num3x3=208,
            num5x5_reduce=16,
            num5x5=48,
            pool_proj=64,
        )

        self.inception4B = Inception(
            in_channels=512,
            num1x1=160,
            num3x3_reduce=112,
            num3x3=224,
            num5x5_reduce=24,
            num5x5=64,
            pool_proj=64,
        )

        self.inception4C = Inception(
            in_channels=512,
            num1x1=128,
            num3x3_reduce=128,
            num3x3=256,
            num5x5_reduce=24,
            num5x5=64,
            pool_proj=64,
        )

        self.inception4D = Inception(
            in_channels=512,
            num1x1=112,
            num3x3_reduce=144,
            num3x3=288,
            num5x5_reduce=32,
            num5x5=64,
            pool_proj=64,
        )

        self.inception4E = Inception(
            in_channels=528,
            num1x1=256,
            num3x3_reduce=160,
            num3x3=320,
            num5x5_reduce=32,
            num5x5=128,
            pool_proj=128,
        )

        self.pool4 = nn.MaxPool2d(3, stride=2, padding=0, ceil_mode=True)

        self.inception5A = Inception(
            in_channels=832,
            num1x1=256,
            num3x3_reduce=160,
            num3x3=320,
            num5x5_reduce=32,
            num5x5=128,
            pool_proj=128,
        )

        self.inception5B = Inception(
            in_channels=832,
            num1x1=384,
            num3x3_reduce=192,
            num3x3=384,
            num5x5_reduce=48,
            num5x5=128,
            pool_proj=128,
        )

        self.pool5 = nn.AdaptiveAvgPool2d((1, 1))

        self.dropout = nn.Dropout(0.4)
        self.fc = nn.Linear(1024, num_classes)

        self.aux4A = Auxiliary(512, num_classes)
        self.aux4D = Auxiliary(528, num_classes)

    def forward(self, x):
        out = self.conv1(x)
        out = self.pool1(out)
        out = self.conv2(out)
        out = self.conv3(out)
        out = self.pool2(out)

        out = self.inception3A(out)
        out = self.inception3B(out)
        out = self.pool3(out)
        out = self.inception4A(out)

        aux1 = self.aux4A(out)

        out = self.inception4B(out)
        out = self.inception4C(out)
        out = self.inception4D(out)

        aux2 = self.aux4D(out)

        out = self.inception4E(out)
        out = self.pool4(out)
        out = self.inception5A(out)
        out = self.inception5B(out)
        out = self.pool5(out)

        out = torch.flatten(out, 1)  # flattent to one dim tensor
        out = self.dropout(out)
        out = self.fc(out)

        return out, aux1, aux2

    def get_criterion(self):
        return nn.CrossEntropyLoss()

    def get_optimizer(self, learning_rate=0.01):
        return optim.Adam(self.parameters(), lr=learning_rate)


In [None]:
import torch

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker

class ModelTrainer:
    train_loss = []
    val_loss = []

    train_acc = []
    val_acc = []

    def __init__(
        self,
        model,
        train_data,
        validation_data,
        test_data,
        optimizer,
        criterion,
        epochs,
    ):
        self.model = model
        self.optimizer = optimizer
        self.criterion = criterion

        self.train_data = train_data
        self.validation_data = validation_data
        self.test_data = test_data

        self.epochs = epochs

    def fit(self):
        num_train_samples = len(self.train_data)
        num_validation_samples = len(self.validation_data)

        for epoch in range(self.epochs):
            curr_train_loss = 0
            correct_in_train = 0

            self.model.train()

            for inputs, labels in self.train_data:
                inputs, labels = inputs.to(device), labels.to(device)

                # set gradients to zero again, to not accumulate old gradients in the new clalculations
                self.optimizer.zero_grad()

                # generate predictions on the current set of inputs
                # recall that we also extract the predictions from the auxialiaries
                # and count them in the loss calculation
                pred, aux_pred1, aux_pred2 = self.model(inputs)

                # calculate loss
                real_loss = self.criterion(pred, labels)
                loss_aux1 = self.criterion(aux_pred1, labels)
                loss_aux2 = self.criterion(aux_pred2, labels)

                loss = real_loss + (0.3 * loss_aux1) + (0.3 * loss_aux2)

                loss.backward()  # run backpropagation
                self.optimizer.step()  # compute the new weights

                # now store correctly predicted, and loss of this iteration

                # first, find the predicted class by looking for the maximum in the dimension 1
                _, predicted = torch.max(pred.data, 1)

                correct_in_train += (predicted == labels).float().sum().item()
                curr_train_loss += (
                    loss.data.item() * inputs.shape[0]
                )  # multiplied with batch length

            train_epoch_loss = curr_train_loss / num_train_samples
            self.train_loss.append(train_epoch_loss)

            train_acc_curr = correct_in_train / num_train_samples
            self.train_acc.append(train_acc_curr)

            # Now check trained weights on the validation set
            val_running_loss = 0
            correct_val = 0

            self.model.eval()

            with torch.no_grad():
                for inputs, labels in self.validation_data:
                    inputs, labels = inputs.to(device), labels.to(device)

                    # Forward pass.
                    prediction, prediction_aux1, prediction_aux2 = self.model(inputs)

                    # Compute the loss.
                    real_loss = self.criterion(prediction, labels)
                    loss_aux1 = self.criterion(prediction_aux1, labels)
                    loss_aux2 = self.criterion(prediction_aux2, labels)

                    loss = real_loss + (0.3 * loss_aux1) + (0.3 * loss_aux2)

                    # Compute validation accuracy.
                    _, predicted_outputs = torch.max(prediction.data, 1)
                    correct_val += (predicted_outputs == labels).float().sum().item()

                # Compute batch loss.
                val_running_loss += loss.data.item() * inputs.shape[0]

                val_epoch_loss = val_running_loss / num_validation_samples
                self.val_loss.append(val_epoch_loss)

                val_acc_curr = correct_val / num_validation_samples
                self.val_acc.append(val_acc_curr)

            info = "[Epoch {}/{}]: train-loss = {:0.6f} | train-acc = {:0.3f} | val-loss = {:0.6f} | val-acc = {:0.3f}"

            print(
                info.format(
                    epoch + 1,
                    self.epochs,
                    train_epoch_loss,
                    train_acc_curr,
                    val_epoch_loss,
                    val_acc_curr,
                )
            )

            torch.save(self.model.state_dict(), f"{report_dir}/checkpoint_{epoch + 1}")

        torch.save(self.model.state_dict(), f"{report_dir}/resnet-56_weights")

    def test(self):
        num_test_samples = len(self.test_data)
        correctly_classified = 0

        self.model.eval()

        # no gradient calculation on evaluation
        with torch.no_grad():
            for inputs, labels in self.test_data:
                inputs, labels = inputs.to(device), labels.to(device)

                pred = self.model(inputs)

                _, predicted_labels = torch.max(pred.data, 1)

                correctly_classified += (
                    (predicted_labels == labels).float().sum().item()
                )

        self.test_accuracy = correctly_classified / num_test_samples
        print("Test accuracy: {}".format(self.test_accuracy))

    def plot_trainning_report(self):
        epochs = np.arange(self.epochs) + 1

        plt.figure(figsize=(20, 5))

        plt.subplot(121)

        plt.plot(epochs, self.train_loss, "blue", label="Training Losses")
        plt.plot(epochs, self.val_loss, "r", label="Validation Losses")

        plt.gca().xaxis.set_major_locator(mticker.MultipleLocator(1))
        plt.title("Training and Validation Losses vs Epochs")
        plt.xlabel("Epochs")
        plt.ylabel("Losses")
        plt.legend()
        plt.grid("off")
        plt.show()

        plt.subplot(122)

        plt.plot(epochs, self.train_acc, "blue", label="Training Accuracies")
        plt.plot(epochs, self.val_acc, "r", label="Validation Accuracies")

        plt.gca().xaxis.set_major_locator(mticker.MultipleLocator(1))

        plt.title("Training and Validation Accuracies vs Epochs")
        plt.xlabel("Epochs")
        plt.ylabel("Accuracies")
        plt.legend()
        plt.grid("off")
        plt.savefig(f"{report_dir}/training_classification_report.png")

    def plot_testing_report(self):
        plt.figure(figsize=(10, 6))
        plt.plot(self.epochs, self.test_accuracy, "b", label="Testing Accuracy")
        plt.xlabel("Epochs")
        plt.ylabel("Accuracy")
        plt.title("Testing Accuracy Over Epochs")
        plt.legend()
        plt.grid(True)
        plt.show()

        plt.savefig(f"{report_dir}/testing_classification_report.png")


In [None]:
data = GetDataset(input_dir="./data/raw", batch_size=32)

model = GoogleLeNet(num_classes=data.num_classes).to(device)

model_trainer = ModelTrainer(
    model=model,
    train_data=data.train_loader,
    validation_data=data.valid_loader,
    test_data=data.test_loader,
    optimizer=model.get_optimizer(),
    criterion=model.get_criterion(),
    epochs=15,
)
model_trainer.fit()
model_trainer.plot_trainning_report()

model_trainer.test()
model_trainer.plot_testing_report()
