In [None]:
import copy
import gc
import io
import json
import logging
import os
import random
import shutil
import sys
import time
from typing_extensions import Any, Dict, List, Tuple, Union

from sklearnex import patch_sklearn
patch_sklearn()

import joblib
import matplotlib.pyplot as plt
import numpy as np
import torch
import torchvision
import torchvision.transforms as T
from fvcore.nn import FlopCountAnalysis
from sklearn import svm
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
from torchsummary import summary
from tqdm_loggable.auto import tqdm

In [None]:
class TqdmToLogger(io.StringIO):
    def __init__(self, logger, level=logging.INFO):
        super().__init__()
        self.logger = logger
        self.level = level

    def write(self, buf):
        # strip() removes the \r and \n characters tqdm uses for terminal updates
        content = buf.strip('\r\n\t ')
        if content:
            self.logger.log(self.level, content)

    def flush(self):
        pass

In [None]:
def get_logger():
    logger = logging.getLogger("main_logger")
    logger.setLevel(logging.INFO)
    
    # Clear existing handlers if any (prevents duplicate logs in notebooks)
    if logger.hasHandlers():
        logger.handlers.clear()

    # Create a format for your logs
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

    # Stream Handler (Standard Output)
    sh = logging.StreamHandler(sys.stdout)
    sh.setFormatter(formatter)
    logger.addHandler(sh)

    # File Handler (Direct logging to a file)
    fh = logging.FileHandler("experiment_progress.log")
    fh.setFormatter(formatter)
    logger.addHandler(fh)

    return logger


logger = get_logger()
tqdm_out = TqdmToLogger(logger, level=logging.INFO)

In [None]:
SEED = 42
generator = torch.Generator()

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {device}")

def set_seed(seed: int = SEED) -> None:
    # set random seed for reproducibility
    logger.info(f"Setting seed: {seed}")
    torch.manual_seed(seed)
    random.seed(seed)
    np.random.seed(seed)
    generator.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed()

In [None]:
mnist_dataset = None
fashion_mnist_dataset = None
train_split_ratio = 0.7
val_split_ratio = 0.1
test_split_ratio = 0.2

In [None]:
def load_mnist_dataset() -> None:
    logger.info("Loading MNIST dataset...")
    global mnist_dataset
    
    if mnist_dataset is None:
        train_dataset = torchvision.datasets.MNIST(
            root='./data',
            train=True,
            transform=torchvision.transforms.ToTensor(),
            download=True
        )
        test_dataset = torchvision.datasets.MNIST(
            root='./data',
            train=False,
            transform=torchvision.transforms.ToTensor(),
            download=True
        )
        
        # concatenate train and test datasets
        mnist_dataset = torch.utils.data.ConcatDataset(
            [
                train_dataset,
                test_dataset
            ]
        )
        
        # split into train, val, test
        total_size = len(mnist_dataset)
        train_size = int(train_split_ratio * total_size)
        val_size = int(val_split_ratio * total_size)
        test_size = total_size - train_size - val_size
        mnist_train, mnist_val, mnist_test = torch.utils.data.random_split(
            mnist_dataset,
            [train_size, val_size, test_size],
            generator=generator
        )
        
        mnist_dataset = {
            "train": mnist_train,
            "val": mnist_val,
            "test": mnist_test
        }
        
        del train_dataset, test_dataset, mnist_train, mnist_val, mnist_test


def load_fashion_mnist_dataset() -> None:
    logger.info("Loading Fashion-MNIST dataset...")
    global fashion_mnist_dataset
    
    if fashion_mnist_dataset is None:
        train_dataset = torchvision.datasets.FashionMNIST(
            root='./data',
            train=True,
            transform=torchvision.transforms.ToTensor(),
            download=True
        )
        test_dataset = torchvision.datasets.FashionMNIST(
            root='./data',
            train=False,
            transform=torchvision.transforms.ToTensor(),
            download=True
        )
        # concatenate train and test datasets
        fashion_mnist_dataset = torch.utils.data.ConcatDataset(
            [
                train_dataset,
                test_dataset
            ]
        )
        # split into train, val, test
        total_size = len(fashion_mnist_dataset)
        train_size = int(train_split_ratio * total_size)
        val_size = int(val_split_ratio * total_size)
        test_size = total_size - train_size - val_size
        fashion_mnist_train, fashion_mnist_val, fashion_mnist_test = torch.utils.data.random_split(
            fashion_mnist_dataset,
            [train_size, val_size, test_size],
            generator=generator
        )
        
        fashion_mnist_dataset = {
            "train": fashion_mnist_train,
            "val": fashion_mnist_val,
            "test": fashion_mnist_test
        }
        
        del train_dataset, test_dataset, fashion_mnist_train, fashion_mnist_val, fashion_mnist_test

def load_datasets() -> None:
    load_mnist_dataset()
    load_fashion_mnist_dataset()

In [None]:
load_datasets()

In [None]:
# dataset information
logger.info(f"MNIST train data size - {len(mnist_dataset['train'])}")
logger.info(f"MNIST train data size - {len(mnist_dataset['val'])}")
logger.info(f"MNIST train data size - {len(mnist_dataset['test'])}")
logger.info(f"MNIST image shape - {mnist_dataset['train'][0][0].shape}\n")

logger.info(f"Fashion-MNIST train data size - {len(fashion_mnist_dataset['train'])}")
logger.info(f"Fashion-MNIST train data size - {len(fashion_mnist_dataset['val'])}")
logger.info(f"Fashion-MNIST train data size - {len(fashion_mnist_dataset['test'])}")
logger.info(f"Fashion-MNIST image shape - {fashion_mnist_dataset['train'][0][0].shape}")

In [None]:
train_transform = T.Compose([
    T.ToPILImage(),
    T.Resize(256),
    T.RandomResizedCrop(
        224,
        scale=(0.8, 1.0),
        ratio=(0.9, 1.1)
    ),
    T.RandomRotation(15),
    T.RandomAffine(
        degrees=0,
        translate=(0.1, 0.1)
    ),
    T.Grayscale(num_output_channels=3),  # 3 channel for resnet
    T.ToTensor(),
    T.Normalize(
        mean=(0.5, 0.5, 0.5),
        std=(0.5, 0.5, 0.5)
    )
])


test_transform = T.Compose([
    T.ToPILImage(),
    T.Resize(224),
    T.Grayscale(num_output_channels=3),
    T.ToTensor(),
    T.Normalize(
        mean=(0.5, 0.5, 0.5),
        std=(0.5, 0.5, 0.5)
    )
])

In [None]:
class customDataset(torch.utils.data.Dataset):
    def __init__(
        self,
        base_dataset: torch.utils.data.Dataset,
        transform: Union[torchvision.transforms.Compose, None] = None
    ):
        self.base_dataset = base_dataset
        self.transform = transform
    
    def __len__(self) -> int:
        return len(self.base_dataset)
    
    def __getitem__(self, idx: int) -> Any:
        img, label = self.base_dataset[idx]
        if self.transform:
            img = self.transform(img)
        return img, label

In [None]:
aug_mnist_dataset = None
aug_fashion_mnist_dataset = None

def load_augmented_datasets() -> None:
    logger.info("Loading augmented datasets...")
    global aug_mnist_dataset, aug_fashion_mnist_dataset
    
    if aug_mnist_dataset is None:
        aug_mnist_dataset = {
            "train": customDataset(
                mnist_dataset["train"],
                transform=train_transform
            ),
            "val": customDataset(
                mnist_dataset["val"],
                transform=test_transform
            ),
            "test": customDataset(
                mnist_dataset["test"],
                transform=test_transform
            )
        }
    
    if aug_fashion_mnist_dataset is None:
        aug_fashion_mnist_dataset = {
            "train": customDataset(
                fashion_mnist_dataset["train"],
                transform=train_transform
            ),
            "val": customDataset(
                fashion_mnist_dataset["val"],
                transform=test_transform
            ),
            "test": customDataset(
                fashion_mnist_dataset["test"],
                transform=test_transform
            )
        }

load_augmented_datasets()

In [None]:
# dataset information
logger.info(f"MNIST train data size - {len(aug_mnist_dataset['train'])}")
logger.info(f"MNIST train data size - {len(aug_mnist_dataset['val'])}")
logger.info(f"MNIST train data size - {len(aug_mnist_dataset['test'])}")
logger.info(f"MNIST image shape - {aug_mnist_dataset['train'][0][0].shape}\n")

logger.info(f"Fashion-MNIST train data size - {len(aug_fashion_mnist_dataset['train'])}")
logger.info(f"Fashion-MNIST train data size - {len(aug_fashion_mnist_dataset['val'])}")
logger.info(f"Fashion-MNIST train data size - {len(aug_fashion_mnist_dataset['test'])}")
logger.info(f"Fashion-MNIST image shape - {aug_fashion_mnist_dataset['train'][0][0].shape}")

In [None]:
def get_resnet18_model(num_classes: int = 10) -> torch.nn.Module:
    model = torchvision.models.resnet18(pretrained=False)
    # Modify the final layer to match num_classes
    model.fc = torch.nn.Linear(model.fc.in_features, num_classes)
    return model

def get_resnet32_model(num_classes: int = 10) -> torch.nn.Module:
    model = torchvision.models.resnet34(pretrained=False)
    # Modify the final layer to match num_classes
    model.fc = torch.nn.Linear(model.fc.in_features, num_classes)
    return model

def get_resnet50_model(num_classes: int = 10) -> torch.nn.Module:
    model = torchvision.models.resnet50(pretrained=False)
    # Modify the final layer to match num_classes
    model.fc = torch.nn.Linear(model.fc.in_features, num_classes)
    return model

In [None]:
logger.info("ResNet18 Model Summary:")
summary(get_resnet18_model().to(device), (3, 224, 224)) # ~11 million parameters

In [None]:
logger.info("ResNet32 Model Summary:")
summary(get_resnet32_model().to(device), (3, 224, 224)) # ~21 million parameters

In [None]:
logger.info("ResNet50 Model Summary:")
summary(get_resnet50_model().to(device), (3, 224, 224)) # ~25 million parameters

### Q1, a) Training RESNET-18, 50 on MNIST, Fashion-MNIST

In [None]:
def get_data_loader(
    dataset: torch.utils.data.Dataset,
    batch_size: int,
    drop_last: bool = False,
    shuffle: bool = True,
    num_workers: int = 8
) -> torch.utils.data.DataLoader:
    return torch.utils.data.DataLoader(
        dataset,
        batch_size=batch_size,
        drop_last=drop_last,
        shuffle=shuffle,
        num_workers=num_workers,
        pin_memory=True,
        generator=generator
    )

In [None]:
# calculate loss and accuracy
def calculate_loss_and_accuracy(
    model: torch.nn.Module,
    data_loader: torch.utils.data.DataLoader,
    criterion: torch.nn.Module,
    device: torch.device
) -> Dict[str, float]:
    model.eval()
    total_loss = 0.0
    correct_predictions = 0
    total_samples = 0
    
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            correct_predictions += torch.sum(preds == labels).item()
            total_samples += inputs.size(0)
    
    avg_loss = total_loss / total_samples
    accuracy = correct_predictions / total_samples
    
    return {
        "loss": avg_loss,
        "accuracy": accuracy
    }

In [None]:
def save_training_plots(
    training_stats: Dict[str, list],
    save_dir: str
) -> None:
    epochs = range(1, len(training_stats["train_loss"]) + 1)
    
    # Plot Loss
    # x-axis should be intiger
    
    plt.figure()
    plt.plot(epochs, training_stats["train_loss"], label='Train Loss')
    plt.plot(epochs, training_stats["val_loss"], label='Validation Loss')
    # plt.plot(epochs, training_stats["test_loss"], label='Test Loss')
    
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Loss over Epochs')
    plt.legend()
    plt.grid(True)
    loss_plot_path = os.path.join(save_dir, 'loss_plot.png')
    plt.savefig(loss_plot_path)
    plt.close()
    
    # Plot Accuracy
    plt.figure()
    plt.plot(epochs, training_stats["train_accuracy"], label='Train Accuracy')
    plt.plot(epochs, training_stats["val_accuracy"], label='Validation Accuracy')
    # plt.plot(epochs, training_stats["test_accuracy"], label='Test Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.title('Accuracy over Epochs')
    plt.legend()
    plt.grid(True)
    acc_plot_path = os.path.join(save_dir, 'accuracy_plot.png')
    plt.savefig(acc_plot_path)
    plt.close()

In [None]:
def save_model_and_stats(
    model: torch.nn.Module,
    optimizer: torch.optim.Optimizer,
    config: Dict[str, Any],
    training_stats: Dict[str, list],
    base_dir: str    
) -> Tuple[str, str, str]:
    # model_save_dir_name = (
    #     f"model_{config['dataset_name']}_{config['model_name']}_{config['optimizer_name']}_",
    #     f"lr_{str(config['learning_rate']).replace('.', '_')}_batch{config['batch_size']}_epoch{epoch+1}"
    # )
    model_save_dir_name = '_'.join(f'{key}_{value}' for key, value in config.items())
    model_dir = os.path.join(base_dir, ''.join(model_save_dir_name))
    os.makedirs(model_dir, exist_ok=True) 
    
    model_path = os.path.join(model_dir, 'model.pth')
    model_save_state = {
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'training_stats': training_stats,
        'config': config
    }
    torch.save(model_save_state, model_path)
    
    del model_save_state
    return model_path, model_dir, base_dir

In [None]:

# @torch.compile(fullgraph=True)
def train_model(config: Dict[str, Any], base_dir: str, device:torch.device) -> List[Dict[str, Any]]:
    torch.set_float32_matmul_precision('high')
    # config has keys: dataset_name, model_name, learning_rate, batch_size, optimizer_name, num_epochs
    if config["model_name"] == "resnet18":
        model = get_resnet18_model().to(device)
    elif config["model_name"] == "resnet32":
        model = get_resnet32_model().to(device)
    elif config["model_name"] == "resnet50":
        model = get_resnet50_model().to(device)
    else:
        raise ValueError(f"Unsupported model name: {config['model_name']}")

    # model = torch.compile(model)
    
    if config["optimizer_name"] == "sgd":
        optimizer = torch.optim.SGD(
            model.parameters(),
            lr=config["learning_rate"],
            momentum=0.9
        )
    elif config["optimizer_name"] == "adam":
        optimizer = torch.optim.Adam(
            model.parameters(),
            lr=config["learning_rate"]
        )
    else:
        raise ValueError(f"Unsupported optimizer name: {config['optimizer_name']}")
    
    train_data_loader = get_data_loader(
        aug_mnist_dataset["train"] if config["dataset_name"] == "mnist" else aug_fashion_mnist_dataset["train"],
        batch_size=config["batch_size"],
        shuffle=True,
        drop_last=True
    )
    val_data_loader = get_data_loader(
        aug_mnist_dataset["val"] if config["dataset_name"] == "mnist" else aug_fashion_mnist_dataset["val"],
        batch_size=config["batch_size"],
        shuffle=False,
        drop_last=False
    )
    test_data_loader = get_data_loader(
        aug_mnist_dataset["test"] if config["dataset_name"] == "mnist" else aug_fashion_mnist_dataset["test"],
        batch_size=config["batch_size"],
        shuffle=False,
        drop_last=False
    )
    
    training_stats = {
        "train_loss": [],
        "train_accuracy": [],
        "val_loss": [],
        "val_accuracy": []
    }
    scaler = torch.amp.GradScaler(device=device.type, enabled=(device.type=="cuda"))
    criterion = torch.nn.CrossEntropyLoss()
    
    start_time = time.time()
    
    return_data = []
    
    for epoch in range(config["num_epochs"]):
        model.train()
        running_loss = 0.0
        correct_predictions = 0
        total_samples = 0

        for inputs, labels in tqdm(
            train_data_loader, desc=f"Epoch {epoch+1}/{config['num_epochs']}", file=tqdm_out,
            mininterval=30.0
        ):
            inputs, labels = inputs.to(device, non_blocking=True), labels.to(device, non_blocking=True)
            
            optimizer.zero_grad(set_to_none=True)
            # with torch.autocast(device_type="cuda" if torch.cuda.is_available() else "cpu"):
            with torch.autocast(device_type=device.type, dtype=torch.float16):
                outputs = model(inputs)
                loss = criterion(outputs, labels)
            
            scaler.scale(loss).backward()
            # loss.backward()
            scaler.step(optimizer)
            scaler.update()
            # optimizer.step()
            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs.float(), 1)
            correct_predictions += torch.sum(preds == labels).item()
            total_samples += inputs.size(0)
            
        epoch_loss = running_loss / total_samples
        epoch_accuracy = correct_predictions / total_samples
        
        with torch.no_grad():
            val_metrics = calculate_loss_and_accuracy(
                model,
                val_data_loader,
                criterion,
                device
            )
        model.train()
        
        # test_metrics = calculate_loss_and_accuracy(
        #     model,
        #     test_data_loader,
        #     criterion,
        #     device
        # )
        
        training_stats["train_loss"].append(epoch_loss)
        training_stats["train_accuracy"].append(epoch_accuracy)
        training_stats["val_loss"].append(val_metrics["loss"])
        training_stats["val_accuracy"].append(val_metrics["accuracy"])
        # training_stats["test_loss"].append(test_metrics["loss"])
        # training_stats["test_accuracy"].append(test_metrics["accuracy"])
        
        # if (epoch + 1) % 10 == 0:
        logger.info(
            f"Epoch [{epoch+1}/{config['num_epochs']}], "
            f"Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_accuracy:.4f}, "
            f"Val Loss: {val_metrics['loss']:.4f}, Val Acc: {val_metrics['accuracy']:.4f}, "
            # f"Test Loss: {test_metrics['loss']:.4f}, Test Acc: {test_metrics['accuracy']:.4f}"
        )
        
        # save intermediate model at every 5 epochs or final epoch
        if (epoch + 1)%5==0 or epoch+1==config["num_epochs"]:
            mid_train_time = time.time()
            mid_elapsed_time = mid_train_time - start_time
            logger.info(f"Mid Training Time after {epoch+1} epochs: {mid_elapsed_time/60:.2f} minutes.")
            
            mid_stats = copy.deepcopy(training_stats)
            mid_config = copy.deepcopy(config)
            mid_config["num_epochs"] = epoch + 1
            
            mid_model_path, mid_model_dir, _ = save_model_and_stats(
                model,
                optimizer,
                mid_config,
                mid_stats,
                base_dir=base_dir
            )
            save_training_plots(mid_stats, mid_model_dir)
            logger.info(f"Mid-training model saved at: {mid_model_path}")
            
            with torch.no_grad():
                mid_test_metrics = calculate_loss_and_accuracy(
                    model,
                    test_data_loader,
                    criterion,
                    device
                )
            model.train()
            
            mid_stats["model_path"] = mid_model_path
            mid_stats["model_dir"] = mid_model_dir
            mid_stats["base_dir"] = base_dir
            mid_stats["training_time"] = mid_elapsed_time # in seconds
            mid_stats["test_loss"] = mid_test_metrics["loss"]
            mid_stats["test_accuracy"] = mid_test_metrics["accuracy"]
            
            return_data.append({**mid_config, **mid_stats})
    
    end_time = time.time()
    elapsed_time = end_time - start_time
    
    logger.info(f"Training completed in {elapsed_time/60:.2f} minutes.")
    
    del model, optimizer, train_data_loader, val_data_loader, test_data_loader
    torch.cuda.empty_cache()
    gc.collect()
    return return_data

In [None]:
# to search for both mnist and fashion-mnist datasets for every configuration
base_experiment_dir = './q1_a_experiments'
os.makedirs(base_experiment_dir, exist_ok=True)

mnist_search_space = {
    "dataset_name": "mnist",
    "model_name": ["resnet18", "resnet50"],
    "learning_rate": [0.001, 0.0001],
    "batch_size": [16, 32],
    "optimizer_name": ["sgd", "adam"],
    "num_epochs": [10]
}

results = []

for model_name in mnist_search_space["model_name"]:
    for learning_rate in mnist_search_space["learning_rate"]:
        for batch_size in mnist_search_space["batch_size"]:
            for optimizer_name in mnist_search_space["optimizer_name"]:
                for num_epochs in mnist_search_space["num_epochs"]:
                    config = {
                        "dataset_name": mnist_search_space["dataset_name"],
                        "model_name": model_name,
                        "learning_rate": learning_rate,
                        "batch_size": batch_size,
                        "optimizer_name": optimizer_name,
                        "num_epochs": num_epochs
                    }
                    logger.info(f"Training with config: {config}")
                    return_data = train_model(
                        config, os.path.join(base_experiment_dir, 'mnist_experiments'),
                        device=device
                    )
                    logger.info(f"Training completed.\n")
                    results.extend(return_data)

# save the results to json file
result_path = os.path.join(base_experiment_dir, 'mnist_experiments_results.json')
with open(result_path, 'w') as f:
    json.dump(results, f, indent=4)

# identify best model based on test accuracy
best_model = max(results, key=lambda x: x['test_accuracy'])
logger.info(f"Best Model Config: {best_model}")
logger.info(f"Best Model Test Accuracy: {best_model['test_accuracy']:.4f}")

# save best model to base_dir/mnist_best_model.pth
best_model_src_path = best_model['model_path']
best_model_dest_path = os.path.join(base_experiment_dir, 'mnist_best_model.pth')
shutil.copy(best_model_src_path, best_model_dest_path)
logger.info(f"Best model saved to: {best_model_dest_path}")

In [None]:
fashion_mnist_search_space = {
    "dataset_name": "fashion-mnist",
    "model_name": ["resnet18", "resnet50"],
    "learning_rate": [0.001, 0.0001],
    "batch_size": [16, 32],
    "optimizer_name": ["sgd", "adam"],
    "num_epochs": [10]
}

results = []

for model_name in fashion_mnist_search_space["model_name"]:
    for learning_rate in fashion_mnist_search_space["learning_rate"]:
        for batch_size in fashion_mnist_search_space["batch_size"]:
            for optimizer_name in fashion_mnist_search_space["optimizer_name"]:
                for num_epochs in fashion_mnist_search_space["num_epochs"]:
                    config = {
                        "dataset_name": fashion_mnist_search_space["dataset_name"],
                        "model_name": model_name,
                        "learning_rate": learning_rate,
                        "batch_size": batch_size,
                        "optimizer_name": optimizer_name,
                        "num_epochs": num_epochs
                    }
                    logger.info(f"Training with config: {config}")
                    return_data = train_model(
                        config, os.path.join(base_experiment_dir, 'fashion_mnist_experiments'),
                        device=device
                    )
                    logger.info(f"Training completed.\n")
                    results.extend(return_data)

# save the results to json file
result_path = os.path.join(base_experiment_dir, 'fashion_mnist_experiments_results.json')
with open(result_path, 'w') as f:
    json.dump(results, f, indent=4)

# identify best model based on test accuracy
best_model = max(results, key=lambda x: x['test_accuracy'])
logger.info(f"Best Model Config: {best_model}")
logger.info(f"Best Model Test Accuracy: {best_model['test_accuracy']:.4f}")

# save best model to base_dir/mnist_best_model.pth
best_model_src_path = best_model['model_path']
best_model_dest_path = os.path.join(base_experiment_dir, 'fashion_mnist_best_model.pth')
shutil.copy(best_model_src_path, best_model_dest_path)
logger.info(f"Best model saved to: {best_model_dest_path}")

### Q1, b) Training SVM

In [None]:
numpy_mnist_dataset = None
numpy_fashion_mnist_dataset = None

In [None]:
def create_numpy_dataset(
    basedata: torch.utils.data.Dataset,
    transform: torchvision.transforms.Compose
    # n_augments: int = 1
) -> Tuple[np.array, np.array]:
    
    X_list = []
    y_list = []
    
    for img, label in tqdm(basedata, desc="Creating numpy dataset", file=tqdm_out, mininterval=30.0):
        img_np = transform(img).numpy().reshape(-1)
        X_list.append(img_np)
        y_list.append(int(label))
    
    X = np.stack(X_list)
    y = np.array(y_list)
    del X_list, y_list
    return X, y

In [None]:
def load_np_mnist_dataset() -> None:
    global numpy_mnist_dataset
    
    train_transform = T.Compose([
        T.ToPILImage(),
        T.RandomRotation(15),
        T.RandomAffine(degrees=0, translate=(0.1, 0.1)),
        T.ToTensor()
    ])

    test_transform = T.Compose([
        T.ToPILImage(),
        T.ToTensor()
    ])
    
    if numpy_mnist_dataset is not None:
        return
    
    train_data = create_numpy_dataset(
        basedata=mnist_dataset["train"],
        transform=train_transform
    )
    
    val_data = create_numpy_dataset(
        basedata=mnist_dataset["val"],
        transform=test_transform
    )
    
    test_data = create_numpy_dataset(
        basedata=mnist_dataset["test"],
        transform=test_transform
    )
    
    numpy_mnist_dataset = {
        "train": train_data,
        "val": val_data,
        "test": test_data
    }
    return


def load_np_fashion_mnist_dataset() -> None:
    global numpy_fashion_mnist_dataset
    
    train_transform = T.Compose([
        T.ToPILImage(),
        T.RandomRotation(15),
        T.RandomAffine(degrees=0, translate=(0.1, 0.1)),
        T.ToTensor()
    ])

    test_transform = T.Compose([
        T.ToPILImage(),
        T.ToTensor()
    ])
    
    if numpy_fashion_mnist_dataset is not None:
        return
    
    train_data = create_numpy_dataset(
        basedata=fashion_mnist_dataset["train"],
        transform=train_transform
    )
    
    val_data = create_numpy_dataset(
        basedata=fashion_mnist_dataset["val"],
        transform=test_transform
    )
    
    test_data = create_numpy_dataset(
        basedata=fashion_mnist_dataset["test"],
        transform=test_transform
    )
    numpy_fashion_mnist_dataset = {
        "train": train_data,
        "val": val_data,
        "test": test_data
    }
    return

def load_np_dataset() -> None:
    load_np_mnist_dataset()
    load_np_fashion_mnist_dataset()       

In [None]:
load_np_dataset()

In [None]:
def save_svm_model_and_stats(
    model: svm.SVC,
    config: Dict[str, Any],
    training_stats: Dict[str, Any],
    base_dir: str    
) -> Tuple[str, str, str]:
    model_save_dir_name = '_'.join(f'{key}_{value}' for key, value in config.items())
    model_dir = os.path.join(base_dir, ''.join(model_save_dir_name))
    os.makedirs(model_dir, exist_ok=True) 
    
    model_path = os.path.join(model_dir, 'svm_model.pkl')
    joblib.dump(model, model_path)
    
    with open(os.path.join(model_dir, 'training_stats.json'), 'w') as f:
        json.dump(training_stats, f, indent=4)
    
    return (model_path, model_dir, base_dir)

In [None]:
def train_svm(config: Dict[str, Any], base_dir: str) -> Dict[str, Any]:
    # config has keys: dataset_name, kernel_name, kernel_params
    if config["dataset_name"] == "mnist":
        dataset = numpy_mnist_dataset
    elif config["dataset_name"] == "fashion_mnist":
        dataset = numpy_fashion_mnist_dataset
    else:
        raise ValueError(f"Unsupported dataset name: {config['dataset_name']}")
    
    X_train, y_train = dataset["train"]
    X_val, y_val = dataset["val"]
    X_test, y_test = dataset["test"]
    
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_val = scaler.transform(X_val)
    X_test = scaler.transform(X_test)
    
    logger.info(f"Training SVM with config: {config}")
    
    if config["kernel_name"] == "poly":
        model = svm.SVC(
            kernel='poly',
            degree=config["kernel_params"].get("degree", 3),
            C=config["kernel_params"].get("C", 1.0),
            gamma=config["kernel_params"].get("gamma", 'scale'),
            random_state=SEED
        )
    elif config["kernel_name"] == "rbf":
        model = svm.SVC(
            kernel='rbf',
            C=config["kernel_params"].get("C", 1.0),
            gamma=config["kernel_params"].get("gamma", 'scale'),
            random_state=SEED
        )
    else:
        raise ValueError(f"Unsupported kernel name: {config['kernel_name']}")
    
    start_time = time.time()    
    model.fit(X_train, y_train)
    end_time = time.time()
    elapsed_time = end_time - start_time
    
    y_train_pred = model.predict(X_train)
    y_val_pred = model.predict(X_val)
    y_test_pred = model.predict(X_test)
    
    train_accuracy = accuracy_score(y_train, y_train_pred)
    val_accuracy = accuracy_score(y_val, y_val_pred)
    test_accuracy = accuracy_score(y_test, y_test_pred)
    
    logger.info(
        f"SVM Training completed. "
        f"Train Acc: {train_accuracy:.4f}, Val Acc: {val_accuracy:.4f}, Test Acc: {test_accuracy:.4f}"
    )
    
    logger.info(f"SVM Training completed in {elapsed_time/60:.2f} minutes.")
    
    training_stats = {
        "train_accuracy": train_accuracy,
        "val_accuracy": val_accuracy,
        "test_accuracy": test_accuracy
    }
    
    model_bath, model_dir, _ = save_svm_model_and_stats(
        model,
        config,
        training_stats,
        base_dir=base_dir
    )
    
    training_stats["model_path"] = model_bath
    training_stats["model_dir"] = model_dir
    training_stats["base_dir"] = base_dir
    training_stats["training_time"] = elapsed_time # in seconds
    
    return training_stats

In [None]:
base_experiment_dir = './q1_b_experiments'
os.makedirs(base_experiment_dir, exist_ok=True)

mnist_search_space = {
    "dataset_name": ["mnist"],
    "kernel_name": ["poly", "rbf"],
    "kernel_params": {
        "poly": {
            "degree": [2, 3],
            "C": [0.1, 1.0],
            "gamma": ['scale', 'auto']
        },
        "rbf": {
            "C": [0.1, 1.0],
            "gamma": ['scale', 'auto']
        }
    }
}

results = []

for dataset_name in mnist_search_space["dataset_name"]:
    for kernel_name in mnist_search_space["kernel_name"]:
        if kernel_name == "poly":
            for degree in mnist_search_space["kernel_params"]["poly"]["degree"]:
                for C in mnist_search_space["kernel_params"]["poly"]["C"]:
                    for gamma in mnist_search_space["kernel_params"]["poly"]["gamma"]:
                        config = {
                            "dataset_name": dataset_name,
                            "kernel_name": kernel_name,
                            "kernel_params": {
                                "degree": degree,
                                "C": C,
                                "gamma": gamma
                            }
                        }
                        training_stats = train_svm(config, os.path.join(base_experiment_dir, 'mnist_svm_experiments'))
                        results.append({**config, **training_stats})
        elif kernel_name == "rbf":
            for C in mnist_search_space["kernel_params"]["rbf"]["C"]:
                for gamma in mnist_search_space["kernel_params"]["rbf"]["gamma"]:
                    config = {
                        "dataset_name": dataset_name,
                        "kernel_name": kernel_name,
                        "kernel_params": {
                            "C": C,
                            "gamma": gamma
                        }
                    }
                    training_stats = train_svm(config, os.path.join(base_experiment_dir, 'mnist_svm_experiments'))
                    results.append({**config, **training_stats})

# save the results to json file
with open(os.path.join(base_experiment_dir, 'mnist_svm_experiments_results.json'), 'w') as f:
    json.dump(results, f, indent=4)

# identify best model based on test accuracy
best_model = max(results, key=lambda x: x['test_accuracy'])
logger.info(f"Best SVM Model Config: {best_model}")
logger.info(f"Best SVM Model Test Accuracy: {best_model['test_accuracy']:.4f}")

# save best model to base_dir/mnist_svm_best_model.pkl
best_model_src_path = best_model['model_path']
best_model_dest_path = os.path.join(base_experiment_dir, 'mnist_svm_best_model.pkl')
shutil.copy(best_model_src_path, best_model_dest_path)
logger.info(f"Best SVM model saved to: {best_model_dest_path}")

In [None]:
fashion_mnist_search_space = {
    "dataset_name": ["fashion_mnist"],
    "kernel_name": ["poly", "rbf"],
    "kernel_params": {
        "poly": {
            "degree": [2, 3],
            "C": [0.1, 1.0],
            "gamma": ['scale', 'auto']
        },
        "rbf": {
            "C": [0.1, 1.0],
            "gamma": ['scale', 'auto']
        }
    }
}

results = []

for dataset_name in fashion_mnist_search_space["dataset_name"]:
    for kernel_name in fashion_mnist_search_space["kernel_name"]:
        if kernel_name == "poly":
            for degree in fashion_mnist_search_space["kernel_params"]["poly"]["degree"]:
                for C in fashion_mnist_search_space["kernel_params"]["poly"]["C"]:
                    for gamma in fashion_mnist_search_space["kernel_params"]["poly"]["gamma"]:
                        config = {
                            "dataset_name": dataset_name,
                            "kernel_name": kernel_name,
                            "kernel_params": {
                                "degree": degree,
                                "C": C,
                                "gamma": gamma
                            }
                        }
                        training_stats = train_svm(config, os.path.join(base_experiment_dir, 'fashion_mnist_svm_experiments'))
                        results.append({**config, **training_stats})
        elif kernel_name == "rbf":
            for C in fashion_mnist_search_space["kernel_params"]["rbf"]["C"]:
                for gamma in fashion_mnist_search_space["kernel_params"]["rbf"]["gamma"]:
                    config = {
                        "dataset_name": dataset_name,
                        "kernel_name": kernel_name,
                        "kernel_params": {
                            "C": C,
                            "gamma": gamma
                        }
                    }
                    training_stats = train_svm(config, os.path.join(base_experiment_dir, 'fashion_mnist_svm_experiments'))
                    results.append({**config, **training_stats})

# save the results to json file
with open(os.path.join(base_experiment_dir, 'fashion_mnist_svm_experiments_results.json'), 'w') as f:
    json.dump(results, f, indent=4)

# identify best model based on test accuracy
best_model = max(results, key=lambda x: x['test_accuracy'])
logger.info(f"Best SVM Model Config: {best_model}")
logger.info(f"Best SVM Model Test Accuracy: {best_model['test_accuracy']:.4f}")

# save best model to base_dir/fashion_mnist_svm_best_model.pkl
best_model_src_path = best_model['model_path']
best_model_dest_path = os.path.join(base_experiment_dir, 'fashion_mnist_svm_best_model.pkl')
shutil.copy(best_model_src_path, best_model_dest_path)
logger.info(f"Best SVM model saved to: {best_model_dest_path}")

### Q2) Performance comparison on Fashion-MNIST

In [None]:
# Gpu must be present for this code to run
if not torch.cuda.is_available():
    logger.error("GPU is not available. Exiting the program.")
    exit(1)

In [None]:
base_experiment_dir = './q2_experiments'
os.makedirs(base_experiment_dir, exist_ok=True)

search_space = {
    "dataset_name": "fashion-mnist",
    "model_name": ["resnet18", "resnet50"],
    "learning_rate": [0.001],
    "batch_size": [16],
    "optimizer_name": ["sgd", "adam"],
    "num_epochs": [1],
    "device": ['cpu', 'cuda']
}

results = []

for model_name in search_space["model_name"]:
    for optimizer_name in search_space["optimizer_name"]:
        for device_type in search_space["device"]:
            config = {
                "dataset_name": search_space["dataset_name"],
                "model_name": model_name,
                "learning_rate": search_space["learning_rate"][0],
                "batch_size": search_space["batch_size"][0],
                "optimizer_name": optimizer_name,
                "num_epochs": search_space["num_epochs"][0],
                "device": device_type
            }
            device = torch.device(device_type)
            logger.info(f"Training with config: {config} on device: {device}")
            training_stats = train_model(
                config, os.path.join(base_experiment_dir, f'{device_type}_experiments'),
                device=device
            )
            training_stats = {}
            logger.info(f"Training completed.\n")
            results.append({**config, **training_stats})
    
# calculate FLOPs for each configuration
for result in results:
    model_name = result["model_name"]
    if model_name == "resnet18":
        model = get_resnet18_model().to('cpu')
    elif model_name == "resnet32":
        model = get_resnet32_model().to('cpu')
    elif model_name == "resnet50":
        model = get_resnet50_model().to('cpu')
    else:
        raise ValueError(f"Unsupported model name: {model_name}")
    model.to(result["device"])
    model.eval()
    dummy_input = torch.randn(1, 3, 224, 224).to(result["device"])
    total_flops = FlopCountAnalysis(model, dummy_input)
    result['flops'] = total_flops.total()
    print(f"Model: {model_name}, Device: {result['device']}, FLOPs: {total_flops.total()}")

# save the results to json file
with open(os.path.join(base_experiment_dir, 'device_comparison_experiments_results.json'), 'w') as f:
    json.dump(results, f, indent=4)

: 