In [None]:
import os


import matplotlib.pyplot as plt
from torchsummary import summary
from torch.utils.data import Subset, DataLoader


import numpy as np
from sklearn.metrics import (
    confusion_matrix,
    ConfusionMatrixDisplay,
    classification_report,
)


import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

In [None]:
# Check for GPU availability and set the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device.type.upper()} for computation.")

In [None]:
# Define constants
BATCH_SIZE = 64
IMAGE_SIZE = (128, 128)
DATASET_PATH = r"C:\Users\aliseydi\Git\Assignment 3\dataset\raw-img"

# create a folder named graphs to save the model summary
if not os.path.exists("graphs"):
    os.makedirs("graphs")

# create a folder named models
if not os.path.exists("models"):
    os.makedirs("models")

# create a folder named logs
if not os.path.exists("logs"):
    os.makedirs("logs")

In [None]:
print(f"Number of classes: {len(os.listdir(DATASET_PATH))}")

# Print number of images per class
for folder_name in os.listdir(DATASET_PATH):
    path = os.path.join(DATASET_PATH, folder_name)
    print(f"{folder_name}: {len(os.listdir(path))} images")

In [None]:
# Define transformations for training
data_transforms = transforms.Compose(
    [
        transforms.Resize(IMAGE_SIZE),
        transforms.ToTensor(),
        transforms.RandomHorizontalFlip(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]
)


dataset = datasets.ImageFolder(DATASET_PATH, transform=data_transforms)
NUM_CLASSES = len(dataset.classes)

In [None]:
# Splitting dataset into train, validation, and test sets
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(
    dataset, [train_size, val_size, test_size]
)

# Sample a subset for each dataset to speed up training during development
train_subset = Subset(train_dataset, range(500))
val_subset = Subset(val_dataset, range(100))
test_subset = Subset(test_dataset, range(100))


# Define data loaders with a smaller batch size for actual training and evaluation
train_dataloader = DataLoader(
    train_subset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2
)
val_dataloader = DataLoader(
    val_subset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2
)
test_dataloader = DataLoader(
    test_subset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2
)

In [None]:
def find_examples(loader):
    """Find one example per class from the dataset"""
    examples_per_class = {}
    for images, labels in loader:
        for i, label in enumerate(labels):
            label = label.item()
            if label not in examples_per_class:
                examples_per_class[label] = images[i]  # store the first occurrence
            if len(examples_per_class) == NUM_CLASSES:
                break
        if len(examples_per_class) == NUM_CLASSES:
            break
    return examples_per_class

In [None]:
def show_images(examples, title):
    """Show images with labels, auto-scaling images based on data type."""
    plt.figure(figsize=(15, 10))

    # sort exapmles by class index
    examples = dict(sorted(examples.items()))

    for idx, (label, image) in enumerate(examples.items()):
        ax = plt.subplot(
            2, 5, idx + 1
        )  # Adjust subplot parameters for number of classes
        ax.axis("off")
        ax.set_title(dataset.classes[label])

        # Convert PyTorch tensor to numpy array after adjusting the channel dimension
        if image.dtype == torch.float32:
            img = image.numpy()
            if img.min() < 0 or img.max() > 1:
                img = (img - img.min()) / (img.max() - img.min())  # Normalize to [0, 1]
            img = img.transpose(1, 2, 0)  # Correct ordering for matplotlib
        elif image.dtype == torch.uint8:
            img = image.numpy() / 255.0  # Scale to [0, 1]
            img = img.transpose(1, 2, 0)  # Correct ordering for matplotlib
        else:
            img = image.numpy()
            img = img.transpose(1, 2, 0)  # Correct ordering for matplotlib

        plt.imshow(img)  # Show the image
    plt.suptitle(title, fontsize=16, weight="bold")
    plt.tight_layout()

    # save the plot to graphs folsder using title as the name
    plt.savefig(f"graphs/{title}.png")
    plt.show()

In [None]:
# Find and display examples from each dataset
train_examples = find_examples(train_dataloader)
show_images(train_examples, "Examples from Each Class - Training Set")

val_examples = find_examples(val_dataloader)
show_images(val_examples, "Examples from Each Class - Validation Set")

test_examples = find_examples(test_dataloader)
show_images(test_examples, "Examples from Each Class - Testing Set")

# Part 1


## Without Dropout


In [None]:
class CustomCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(CustomCNN, self).__init__()
        # Initial convolution layer
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)  # Output: 224x224
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)  # Output: 112x112

        # Additional convolution layers with reduced number of filters
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)  # Output: 112x112
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)  # Output: 56x56

        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)  # Output: 56x56
        self.relu3 = nn.ReLU()
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)  # Output: 28x28

        self.conv4 = nn.Conv2d(128, 128, kernel_size=3, padding=1)  # Output: 28x28
        self.relu4 = nn.ReLU()

        self.conv5 = nn.Conv2d(128, 64, kernel_size=3, padding=1)  # Output: 28x28
        self.relu5 = nn.ReLU()

        self.conv6 = nn.Conv2d(64, 32, kernel_size=3, padding=1)  # Output: 28x28
        self.relu6 = nn.ReLU()
        self.pool6 = nn.MaxPool2d(kernel_size=2, stride=2)  # Output: 14x14

        # Global average pooling
        self.global_avg_pool = nn.AdaptiveAvgPool2d(
            (1, 1)
        )  # Reduces each channel to 1x1

        # Fully connected layers
        self.fc1 = nn.Linear(32, 128)  # Reduced input features after global pooling
        self.relu7 = nn.ReLU()
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)

        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)

        x = self.conv3(x)
        x = self.relu3(x)
        x = self.pool3(x)

        x = self.conv4(x)
        x = self.relu4(x)

        x = self.conv5(x)
        x = self.relu5(x)

        x = self.conv6(x)
        x = self.relu6(x)
        x = self.pool6(x)

        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)  # Flatten the output

        x = self.fc1(x)
        x = self.relu7(x)
        x = self.fc2(x)
        return x


# Instantiate the model
model = CustomCNN(num_classes=10).to(device)

# Print the model summary for an input size of (3, 224, 224)
summary(model, input_size=(3, IMAGE_SIZE[0], IMAGE_SIZE[1]))

In [None]:
# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()


def train_model(
    logger,
    num_epochs,
    save_path="best_model.pth",
    batch_size=64,
    learning_rate=0.001,
):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Define data loaders with the actual batch size

    train_dataloader = DataLoader(
        train_subset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2
    )
    val_dataloader = DataLoader(
        val_subset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2
    )

    model.train()
    best_f1 = 0.0  # Initialize the best F1 score

    # Lists to store metrics for plotting
    train_losses, val_accuracies, val_f1s, val_precisions, val_recalls = (
        [],
        [],
        [],
        [],
        [],
    )

    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, labels in train_dataloader:
            images, labels = images.to(device), labels.to(device)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * images.size(0)

        # Calculate training epoch loss
        epoch_loss = running_loss / len(train_dataset)
        train_losses.append(epoch_loss)

        logger.info(f"Epoch [{epoch+1}/{num_epochs}], Training Loss: {epoch_loss:.4f}")

        # Validation phase
        model.eval()
        val_labels = []
        val_preds = []
        with torch.no_grad():
            for images, labels in val_dataloader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs, 1)
                val_labels.extend(labels.cpu().numpy())
                val_preds.extend(predicted.cpu().numpy())

        # Calculate metrics
        accuracy = accuracy_score(val_labels, val_preds)
        precision, recall, f1, _ = precision_recall_fscore_support(
            val_labels, val_preds, average="weighted"
        )

        val_accuracies.append(accuracy)
        val_f1s.append(f1)
        val_precisions.append(precision)
        val_recalls.append(recall)

        logger.info(
            f"Validation Metrics - Acc: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}"
        )

        # Save the model if it has the best F1 score so far
        if f1 > best_f1:
            best_f1 = f1
            torch.save(model.state_dict(), f"models/{save_path}")
            logger.info(f"Model saved: Improved F1 from {best_f1:.4f} to {f1:.4f}")

    # close logger
    logger.handlers.clear()

    # remove logger object
    del logger

    # Plotting
    plt.figure(figsize=(12, 8))
    plt.subplot(2, 1, 1)
    plt.plot(train_losses, label="Training Loss")
    plt.plot(val_accuracies, label="Validation Accuracy")

    # use batch size and learning rate as the name of the plot
    plt.title(
        f"Training Loss, Validation Accuracy, and F1 Score - {batch_size}_{learning_rate}"
    )

    plt.xlabel("Epoch")
    plt.ylabel("Metrics")
    plt.legend()

    plt.subplot(2, 1, 2)
    plt.plot(val_precisions, label="Validation Precision")
    plt.plot(val_recalls, label="Validation Recall")
    plt.plot(val_f1s, label="Validation F1 Score")

    # use batch size and learning rate as the name of the plot
    plt.title(f"Validation Precision and Recall - {batch_size}_{learning_rate}")

    plt.xlabel("Epoch")
    plt.ylabel("Metrics")
    plt.legend()

    plt.tight_layout()

    # save graphs to the graphs folder using batch size and learning rate model name as the nname
    plt.savefig(f"graphs/{batch_size}_{learning_rate}_{save_path}.png")

    # plt.show()

In [None]:
def evaluate_model(model_path, test_dataloader):
    model.load_state_dict(torch.load(f"models/{model_path}"))
    model.to(device)
    model.eval()

    all_labels = []
    all_preds = []

    with torch.no_grad():
        for images, labels in test_dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)

            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    # Display classification report
    print("Classification Report:")
    print(
        classification_report(
            all_labels,
            all_preds,
            target_names=[f"Class {i}" for i in range(len(np.unique(all_labels)))],
        )
    )

    # Calculate the confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    display_labels = [f"Class {i}" for i in range(len(cm))]

    # Display the confusion matrix
    fig, ax = plt.subplots(figsize=(8, 8))
    ConfusionMatrixDisplay(cm, display_labels=display_labels).plot(
        values_format="d", ax=ax
    )

    # use model path as the title of the plot
    plt.title(f"Confusion Matrix - {model_path}")
    plt.tight_layout()

    # save the confusion matrix to the graphs folder using model name as the name
    plt.savefig(f"graphs/conf_matrix_{model_path}.png")

    # plt.show()

In [None]:
# create logger function to save model training logs using python logging which is save to a file
import logging


def create_logger(filename="logs.log"):
    logging.basicConfig(
        filename=f"logs/{filename}",
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )

    # create a logger
    logger = logging.getLogger(__name__)
    return logger

# Model Train


In [None]:
learning_rates = [0.001, 0.0001, 0.000001]
batch_sizes = [32, 64]

for learnig_rate in learning_rates:
    for batch_size in batch_sizes:
        model = CustomCNN(num_classes=10).to(device)
        logger = create_logger(f"lr_{learnig_rate}_bs_{batch_size}.log")
        train_model(
            num_epochs=100,
            save_path=f"best_model_lr_{learnig_rate}_bs_{batch_size}.pth",
            batch_size=batch_size,
            learning_rate=learnig_rate,
            logger=logger,
        )

        evaluate_model(
            f"best_model_lr_{learnig_rate}_bs_{batch_size}.pth", test_dataloader
        )

## With Dropout


In [None]:
class CustomDropCNN(nn.Module):
    def __init__(self, num_classes=10, dropout_rate=0.5):
        super(CustomDropCNN, self).__init__()
        # Initial convolution layer
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)  # Output: 224x224
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)  # Output: 112x112

        # Additional convolution layers with reduced number of filters
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)  # Output: 112x112
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)  # Output: 56x56

        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)  # Output: 56x56
        self.relu3 = nn.ReLU()
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)  # Output: 28x28

        self.conv4 = nn.Conv2d(128, 128, kernel_size=3, padding=1)  # Output: 28x28
        self.relu4 = nn.ReLU()

        self.conv5 = nn.Conv2d(128, 64, kernel_size=3, padding=1)  # Output: 28x28
        self.relu5 = nn.ReLU()

        self.conv6 = nn.Conv2d(64, 32, kernel_size=3, padding=1)  # Output: 28x28
        self.relu6 = nn.ReLU()
        self.pool6 = nn.MaxPool2d(kernel_size=2, stride=2)  # Output: 14x14

        # Dropout layer
        self.dropout = nn.Dropout(dropout_rate)

        # Global average pooling
        self.global_avg_pool = nn.AdaptiveAvgPool2d(
            (1, 1)
        )  # Reduces each channel to 1x1

        # Fully connected layers
        self.fc1 = nn.Linear(32, 128)  # Reduced input features after global pooling
        self.relu7 = nn.ReLU()
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)

        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)

        x = self.conv3(x)
        x = self.relu3(x)
        x = self.pool3(x)

        x = self.conv4(x)
        x = self.relu4(x)

        x = self.conv5(x)
        x = self.relu5(x)

        x = self.conv6(x)
        x = self.relu6(x)
        x = self.pool6(x)

        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)  # Flatten the output

        x = self.dropout(x)  # Apply dropout before entering the fully connected layer
        x = self.fc1(x)
        x = self.relu7(x)
        x = self.fc2(x)
        return x


model = CustomDropCNN(num_classes=10).to(device)

summary(model, input_size=(3, IMAGE_SIZE[0], IMAGE_SIZE[1]))

In [None]:
dropout_rates = [0.25, 0.5, 0.75, 1.0]

for rate in dropout_rates:
    model = CustomDropCNN(num_classes=10, dropout_rate=rate).to(device)
    logger = create_logger(f"dropout_{rate}.log")
    train_model(
        logger=logger,
        num_epochs=100,
        save_path=f"best_model_dropout_{rate}.pth",
        batch_size=64,
        learning_rate=0.001,
    )

    evaluate_model(f"best_model_dropout_{rate}.pth", test_dataloader)

# Part 2


## Train FC Layer


In [None]:
import torchvision.models as models


class EfficientNetB0Custom(nn.Module):
    def __init__(self, num_classes=10, dropout_rate=0.5):
        super(EfficientNetB0Custom, self).__init__()
        # Load the pre-trained EfficientNet-B0 model
        self.efficientnet_b0 = models.efficientnet_b0(pretrained=True)

        # Replace the classifier part of the EfficientNet-B0
        self.efficientnet_b0.classifier = nn.Sequential(
            nn.Linear(
                self.efficientnet_b0.classifier[1].in_features, 128
            ),  # Adapted to the new structure
            nn.ReLU(),
            nn.Linear(128, num_classes),
        )

    def forward(self, x):
        # Use the modified EfficientNet-B0 to perform the forward pass
        x = self.efficientnet_b0(x)
        return x


model = EfficientNetB0Custom(num_classes=10).to(device)

summary(model, input_size=(3, IMAGE_SIZE[0], IMAGE_SIZE[1]))

In [None]:
model = EfficientNetB0Custom(num_classes=10).to(device)
train_model(
    logger=logger,
    num_epochs=100,
    save_path="best_model_efficient_64_1e-4.pth",
    batch_size=64,
    learning_rate=0.001,
)

evaluate_model("best_model_efficient_64_1e-4.pth", test_dataloader)

In [None]:
class EfficientNetB0CustomConv(nn.Module):
    def __init__(self, num_classes=10):
        super(EfficientNetB0CustomConv, self).__init__()
        self.base_model = models.efficientnet_b0(pretrained=True)
        self.setup_base_model()
        self.redefine_classifier(num_classes)

    def setup_base_model(self):
        # Freeze all parameters initially
        for param in self.base_model.parameters():
            param.requires_grad = False

        # Unfreeze the parameters of the last two convolutional blocks
        for param in self.base_model.features[-2:].parameters():
            param.requires_grad = True

    def redefine_classifier(self, num_classes):
        # Redefine the classifier to fit the number of classes
        in_features = self.base_model.classifier[1].in_features
        self.base_model.classifier = nn.Sequential(
            nn.Linear(in_features, 128),  # First fully connected layer
            nn.ReLU(),  # Activation layer
            nn.Linear(128, num_classes),  # Final layer for class prediction
        )

    def forward(self, x):
        # Forward pass through the base model
        return self.base_model(x)


model = EfficientNetB0CustomConv(num_classes=10).to(device)

# Display the model summary
summary(model, input_size=(3, IMAGE_SIZE[0], IMAGE_SIZE[1]))

In [None]:
model = EfficientNetB0Custom(num_classes=10).to(device)
train_model(
    logger=logger,
    num_epochs=100,
    save_path="best_model_efficient_64_1e-4_conv.pth",
    batch_size=64,
    learning_rate=0.001,
)

evaluate_model("best_model_efficient_64_1e-4_conv.pth", test_dataloader)