## BBM418 Computer Vision Lab. Assignment 3.
## by Mehmet Giray Nacakci, 21989009.
<br>

## The ../Micro_Organism/ folder containing image sets should be put in the same location as
## /code folder, so that /code/part_1.ipynb can read the images.
<br>

## After running the "Initialization Code" section,
## You can choose a model to Train and Display Results in the next section.
## Results are printed in this notebook, and plots are saved to ../results/ folder.


# INITIALISATION CODE

## Loading Libraries, Defining Functions, Pre-Processing the Dataset

In [None]:

""" Python version 3.11 """
""" PyTorch 2.0.1 """
import torch  # 2.0.1
import torch.nn as nn
import torch.optim as optim

import os
import time
import random
import numpy as np
from PIL import Image
from matplotlib import pyplot
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay

root = "../Micro_Organism/"
classes_folders = os.listdir(root)
classes_folders = [i for i in classes_folders if i != ".DS_Store"]   # Fix for macOS

classes_to_indices = {key: index for index, key in enumerate(classes_folders)}  # enumerated such as 0, 1, ..., 7

# Dataset Partitions
train_images_files, validation_images_files, test_images_files = [], [], []
train_images, validation_images, test_images = [], [], []
train_labels, validation_labels, test_labels = [], [], []  # enumerated as a number such as 0, 1, ..., 7

# For plotting the results
training_loss_array = []
training_accuracy_array = []
validation_loss_array = []
validation_accuracy_array = []


def show_an_image_from_each_class(image_categories):

    pyplot.figure(figsize=(12, 8))

    for i, class_ in enumerate(image_categories):
        class_path = "../Micro_Organism/" + class_
        if "DS_Store" in class_path:
            continue   # Fix for macOS

        images_in_folder = os.listdir(class_path)
        first_image = Image.open(class_path + '/' + images_in_folder[0])
        pyplot.subplot(2, 4, i + 1)
        pyplot.imshow(first_image)
        pyplot.title(class_)
        pyplot.axis('off')

    pyplot.tight_layout()
    pyplot.show()


print("\nExamples of Dataset Classes:\n")
show_an_image_from_each_class(classes_folders)


def pre_process_image(img):
    # some images are read as a single channel or 4 channels (png), fix it to 3 channels
    if img.mode != "RGB":
        img = img.convert("RGB")

    # high-quality resizing.
    img = img.resize((224, 224), resample=Image.LANCZOS, reducing_gap=3.0)
    # Resizing the images without an antialiasing filter allows the CNN to observe the images in their original form,
    # preserving the fine-grained details and textures. """

    img = np.array(img, dtype=np.float32) / 255.0  # normalize
    img = np.transpose(img, (2, 0, 1))  # size: (3 x height x width) to be fed into CNN

    return img


def read_and_preprocess_dataset():

    # Collect image files paths
    for i, class_ in enumerate(classes_folders):
        class_path = "../Micro_Organism/" + class_
        if "DS_Store" in class_path:
            continue   # Fix for macOS

        """ Since the categories in the dataset has 72, 75, 75 , 76, 85, 86, 152, 168 images,
        for balance and homogeneity (for less bias towards any class);
        each class is partitioned as  50 train,  10 validation,  10 test  images. """
        images_in_folder = os.listdir(class_path)[0:70]  # use only 70 images
        random.shuffle(images_in_folder)
        for j, image in enumerate(images_in_folder):
            img_name = class_path + '/' + images_in_folder[j]

            if j > 59:
                test_images_files.append(img_name)
            elif j > 49:
                validation_images_files.append(img_name)
            else:
                train_images_files.append(img_name)

    # shuffle, such that batches almost have homogenous many images of each class
    random.shuffle(train_images_files)
    random.shuffle(validation_images_files)
    random.shuffle(test_images_files)

    # LOAD AND PRE-PROCESS Images

    for image_path in train_images_files:
        train_images.append(pre_process_image(Image.open(image_path)))

        class_no = classes_to_indices.get(image_path.split('/')[-2])
        train_labels.append(class_no)

    for image_path in validation_images_files:
        validation_images.append(pre_process_image(Image.open(image_path)))

        class_no = classes_to_indices.get(image_path.split('/')[-2])
        validation_labels.append(class_no)

    for image_path in test_images_files:
        test_images.append(pre_process_image(Image.open(image_path)))

        class_no = classes_to_indices.get(image_path.split('/')[-2])
        test_labels.append(class_no)


read_and_preprocess_dataset()

# feed dataset partitions to PyTorch
# _images_tensor sizes: (dataset partition size x 3 x height x width)

training_images_tensor = torch.stack([torch.from_numpy(img) for img in train_images])
training_labels_tensor = torch.tensor(train_labels)
validation_images_tensor = torch.stack([torch.from_numpy(img) for img in validation_images])
validation_labels_tensor = torch.tensor(validation_labels)
test_images_tensor = torch.stack([torch.from_numpy(img) for img in test_images])
test_labels_tensor = torch.tensor(test_labels)

print("\nDataset pre-processed.")


def draw_results_plots(model_no, model_title):
    results_plot = pyplot.figure(figsize=(12, 8))
    pyplot.title(label=model_title, loc="center", y=1.0, fontsize=16, pad=35)
    pyplot.axis('off')

    """ Training and Validation LOSS Plot """
    results_plot.add_subplot(1, 2, 1)
    pyplot.plot(training_loss_array, color="orange")
    pyplot.plot(validation_loss_array, color="blue")
    pyplot.title("Loss vs Epochs", fontsize=16)
    pyplot.ylabel('Loss', fontsize=16)
    pyplot.xlabel('Epochs', fontsize=16)
    pyplot.legend(['Training Loss', 'Validation Loss'], fontsize=16)

    """ Training and Validation ACCURACY Plot """
    results_plot.add_subplot(1, 2, 2)
    pyplot.plot(training_accuracy_array, color="orange")
    pyplot.plot(validation_accuracy_array, color="blue")
    pyplot.title("Accuracy vs Epochs", fontsize=16)  # , fontsize=28, pad=20
    pyplot.ylabel('Accuracy', fontsize=16)
    pyplot.xlabel('Epochs', fontsize=16)
    pyplot.legend(['Training Accuracy', 'Validation Accuracy'], fontsize=16)

    results_plot.subplots_adjust(top=0.75)
    pyplot.tight_layout()
    results_folder = "../results/"
    if not os.path.exists(results_folder):
        os.mkdir(results_folder)

    pyplot.savefig(results_folder + "model_" + str(model_no) + ".jpg")  # , dpi=450)
    pyplot.show()


def plot_confusion_matrix_heatmap(model_no, predictions_, labels_tensor_):

    predictions_ = predictions_.tolist()
    labels_ = labels_tensor_.tolist()

    confusion_matrix_ = confusion_matrix(labels_, predictions_)
    cm_display = ConfusionMatrixDisplay(confusion_matrix=confusion_matrix_, display_labels=classes_folders)
    cm_display.plot()
    pyplot.title('Model_no = ' + str(model_no) + '   Confusion Matrix of Test Set', x=0.2, fontsize=17)
    pyplot.ylabel('Actual Class', fontsize=13)
    pyplot.xlabel('Predicted Class', fontsize=13)
    pyplot.xticks(rotation=90)
    pyplot.tight_layout()

    results_folder = "../results/"
    if not os.path.exists(results_folder):
        os.mkdir(results_folder)
    pyplot.savefig(results_folder + "model_" + str(model_no) + "_confusion.jpg")
    pyplot.show()


class ResidualBlock(nn.Module):
    # It can be considered as: this block branches out into two, then merges.
    # There is a Residual connection from the start of the block, connects to the end of the block.
    def __init__(self):
        super(ResidualBlock, self).__init__()
        self.model = nn.Sequential(
            # image size: 16 x 28 x 28
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            # image size: 32 x 28 x 28
            nn.Conv2d(32, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            # image size: 16 x 28 x 28
        )

    def forward(self, x):
        # Residual connection's value is directly (identity) added to the parallel layers' output.
        return self.model(x) + x


class CustomCNN(nn.Module):
    def __init__(self, use_residual, dropout_probability=0.0):
        super(CustomCNN, self).__init__()
        self.use_residual, self.dropout_probability = use_residual, dropout_probability

        """ A custom architecture """

        layers = [
            # Convolutional Layers
            # image size: 3 x 224 x 224
            nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # image size: 16 x 112 x 112
            nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            # image size: 16 x 56 x 56
            nn.Conv2d(16, 16, kernel_size=3, stride=2, padding=1),
            nn.ReLU()
            # image size: 16 x 28 x 28
        ]

        if self.use_residual:
            layers.append(ResidualBlock())
        else:
            layers.extend([
                nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
                nn.ReLU(),
                nn.Conv2d(32, 16, kernel_size=3, stride=1, padding=1),
                nn.ReLU(),
            ])

        layers.extend([
            nn.Conv2d(16, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
        ])

        if self.dropout_probability > 0.1:
            """ Dropout layer randomly (different for each forward pass) sets some of its inputs to zero during training.
            This helps regularization and thus reduces the risk of OverFitting, increases generalization ability.

            It is generally recommended to dropout towards the end, closer to the fully connected layers,
            so that, network will be forced to be more adaptive and rely less heavily on specific features or neurons. """
            layers.append(nn.Dropout(p=self.dropout_probability))

        layers.extend([
            # image size: 16 x 28 x 28
            nn.MaxPool2d(kernel_size=2, stride=2),
            # output size: 16 x 14 x 14 = 3136
            nn.Flatten(),
            # Fully Connected Layer
            nn.Linear(in_features=3136, out_features=8)
        ])

        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)


def train_and_evaluate_model(model_no, use_residual, epoch_number, batch_size, learning_rate, dropout_probability=0.0):

    model_title = "model_no=" + str(model_no) + " , use_residual=" + str(use_residual) + " , epoch=" + str(epoch_number) + "\n"
    model_title += "batch_size=" + str(batch_size) + " , learning_rate=" + str(learning_rate)
    if dropout_probability > 0.1:
        model_title += " , dropout_probability=" + str(dropout_probability)
    print("\n - - - - - - - - - - - - - \nTraining started for   Model_no =", model_no,  "\n")

    global_start = time.time()

    training_loss_array.clear()
    training_accuracy_array.clear()
    validation_loss_array.clear()
    validation_accuracy_array.clear()

    model = CustomCNN(use_residual, dropout_probability)

    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()

    # Initial Performance
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        training_outputs = model(training_images_tensor)
        training_loss = criterion(training_outputs, training_labels_tensor).item()

        _, training_predictions = torch.max(training_outputs, 1)
        training_accuracy = sum(np.array(training_predictions) == np.array(training_labels_tensor)) / len(training_labels_tensor)

        validation_outputs = model(validation_images_tensor)
        validation_loss = criterion(validation_outputs, validation_labels_tensor).item()

        _, validation_predictions = torch.max(validation_outputs, 1)
        validation_accuracy = sum(np.array(validation_predictions) == np.array(validation_labels_tensor)) / len(validation_labels_tensor)

        print("Before Training (0 Epochs):     ", end="")
        print(f'Training Loss: {training_loss:.3f} , Validation Loss: {validation_loss:.3f}', end=" ")
        print(f', Training Accuracy: {training_accuracy:.3f} , Validation Accuracy: {validation_accuracy:.3f}')
        # plotting purposes
        training_loss_array.append(training_loss)
        training_accuracy_array.append(training_accuracy)
        validation_loss_array.append(validation_loss)
        validation_accuracy_array.append(validation_accuracy)

    for epoch in range(epoch_number):
        model.train()  # training mode

        epoch_start_time = time.time()

        for batch_start in range(0, len(training_images_tensor), batch_size):

            batch_images = training_images_tensor[batch_start:batch_start + batch_size]  # batch size x 3 x height x width
            batch_labels = training_labels_tensor[batch_start:batch_start + batch_size]

            optimizer.zero_grad()  # clear gradients of previous iteration, because pytorch keeps them

            # Forward pass
            outputs = model(batch_images)
            loss = criterion(outputs, batch_labels)

            # Backward pass
            loss.backward()
            optimizer.step()

            # print(" a Batch is processed in ", str(int(time.time() - batch_start_time)), "seconds")

        model.eval()  # Set the model to evaluation mode

        with torch.no_grad():  # disable gradient calculation mode, in order to use less cpu and memory
            training_outputs = model(training_images_tensor)
            training_loss = criterion(training_outputs, training_labels_tensor).item()
            _, training_predictions = torch.max(training_outputs, 1)
            training_accuracy = sum(np.array(training_predictions) == np.array(training_labels_tensor)) / len(training_labels_tensor)

            validation_outputs = model(validation_images_tensor)
            validation_loss = criterion(validation_outputs, validation_labels_tensor).item()

            _, validation_predictions = torch.max(validation_outputs, 1)
            validation_accuracy = sum(np.array(validation_predictions) == np.array(validation_labels_tensor)) / len(validation_labels_tensor)

            print(f'Epoch [{epoch+1}/{epoch_number}],', end=" ")
            print(" took ", str(int(time.time() - epoch_start_time)), "seconds.  ", end="")
            print(f'Training Loss: {training_loss:.3f} , Validation Loss: {validation_loss:.3f}', end=" ")
            print(f', Training Accuracy: {training_accuracy:.3f} , Validation Accuracy: {validation_accuracy:.3f}')
            # plotting purposes
            training_loss_array.append(training_loss)
            training_accuracy_array.append(training_accuracy)
            validation_loss_array.append(validation_loss)
            validation_accuracy_array.append(validation_accuracy)

    test_outputs = model(test_images_tensor)
    test_loss = criterion(validation_outputs, test_labels_tensor).item()
    _, test_predictions = torch.max(test_outputs, 1)
    test_accuracy = sum(np.array(test_predictions) == np.array(test_labels_tensor)) / len(test_labels_tensor)
    print(f'\nAFTER TRAINING:  model_no={model_no},  TEST SET RESULTS: , Test Loss: {test_loss:.2f}, Test ACCURACY:  {(test_accuracy*100):.2f}%\n')

    draw_results_plots(model_no, model_title)
    plot_confusion_matrix_heatmap(model_no, test_predictions, test_labels_tensor)

    print("* *  Execution completed. Total time:  ", int(time.time() - global_start), " seconds.  * * \n")
    # never tried: Save the trained model: torch.save(model.state_dict(), 'custom_cnn_model.pth')


# Choose and Train a model.
## Models differ by Hyper-Parameters and some Architectural differences.
<br>

A model train and evaluation take around 700 seconds for 80 Epochs.

#### Models without Residual Connection

In [None]:
train_and_evaluate_model(model_no=1, use_residual=False, epoch_number=80, batch_size=20, learning_rate=0.0001)

In [None]:
train_and_evaluate_model(model_no=2, use_residual=False, epoch_number=80, batch_size=20, learning_rate=0.0003)

In [None]:
train_and_evaluate_model(model_no=3, use_residual=False, epoch_number=80, batch_size=40, learning_rate=0.0001)

In [None]:
train_and_evaluate_model(model_no=4, use_residual=False, epoch_number=80, batch_size=40, learning_rate=0.0003)

#### Models with Residual Connection

In [None]:
train_and_evaluate_model(model_no=5, use_residual=True, epoch_number=80, batch_size=20, learning_rate=0.0001)

In [None]:
train_and_evaluate_model(model_no=6, use_residual=True, epoch_number=80, batch_size=20, learning_rate=0.0003)

In [None]:
train_and_evaluate_model(model_no=7, use_residual=True, epoch_number=80, batch_size=40, learning_rate=0.0001)

In [None]:
train_and_evaluate_model(model_no=8, use_residual=True, epoch_number=80, batch_size=40, learning_rate=0.0003)

# DROPOUT

### The best model among those Without Residual Connection is model_4,
### and the best model among those With Residual Connection is model_8.

### Now, let's try different Dropout Probability values on these best models. And obtain model_9, 10, 11, 12.

In [None]:
train_and_evaluate_model(model_no=9, use_residual=False, epoch_number=80, batch_size=40, learning_rate=0.0003, dropout_probability=0.2)

In [None]:
train_and_evaluate_model(model_no=10, use_residual=False, epoch_number=80, batch_size=40, learning_rate=0.0003, dropout_probability=0.5)

In [None]:
train_and_evaluate_model(model_no=11, use_residual=True, epoch_number=80, batch_size=40, learning_rate=0.0003, dropout_probability=0.2)

In [None]:
train_and_evaluate_model(model_no=12, use_residual=True, epoch_number=80, batch_size=40, learning_rate=0.0003, dropout_probability=0.5)

### End of Part 1.