In [None]:
import os
import pickle
import numpy as np
import pandas as pd

from glob import glob
from datetime import datetime
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
from torchvision import transforms

from PIL import Image

In [None]:

def fetch_cifar_sets(base_dir="cifar-10-batches-py"):

    data_chunks = []
    label_store = []

    # collect training batches
    for batch_name in [f"data_batch_{i}" for i in range(1, 6)]:

        with open(os.path.join(base_dir, batch_name), "rb") as fh:
            block = pickle.load(fh, encoding="bytes")

        data_chunks.append(block[b"data"])
        label_store.extend(block[b"labels"])

    train_blob = np.vstack(data_chunks).astype(np.float32)
    train_blob = (train_blob / 255.0).reshape(-1, 3, 32, 32)

    train_images = torch.from_numpy(train_blob)
    train_labels = torch.LongTensor(label_store)

    train_pool = TensorDataset(train_images, train_labels)

    split_point = int(len(train_pool) * 0.8)
    train_set, val_set = random_split(
        train_pool,
        [split_point, len(train_pool) - split_point]
    )

    # load test set
    with open(os.path.join(base_dir, "test_batch"), "rb") as fh:
        test_block = pickle.load(fh, encoding="bytes")

    test_blob = test_block[b"data"].astype(np.float32) / 255.0
    test_blob = test_blob.reshape(-1, 3, 32, 32)

    test_images = torch.from_numpy(test_blob)
    test_labels = torch.LongTensor(test_block[b"labels"])

    test_set = TensorDataset(test_images, test_labels)

    return train_set, val_set, test_set

def fetch_catdog_sets(image_dir="dogs-vs-cats/train", resize=(64, 64)):

    image_files = glob(os.path.join(image_dir, "*.jpg"))

    tensor_list = []
    label_list = []

    pipeline = transforms.Compose([
        transforms.Resize(resize),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5),
                             (0.5, 0.5, 0.5))
    ])

    for path in image_files:

        fname = os.path.basename(path).lower()

        if fname.startswith("cat"):
            cls = 0
        elif fname.startswith("dog"):
            cls = 1
        else:
            continue

        img = Image.open(path).convert("RGB")
        tensor_img = pipeline(img)

        tensor_list.append(tensor_img)
        label_list.append(cls)

    stacked_imgs = torch.stack(tensor_list)
    stacked_labels = torch.LongTensor(label_list)

    dataset_all = TensorDataset(stacked_imgs, stacked_labels)

    cutoff = int(len(dataset_all) * 0.8)
    train_subset, val_subset = random_split(
        dataset_all,
        [cutoff, len(dataset_all) - cutoff]
    )

    return train_subset, val_subset


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim


class CNNModel(nn.Module):

    def __init__(self,
                 num_classes=10,
                 act_choice="relu",
                 input_dims=(3, 32, 32)):

        super().__init__()

        self.act_fn = self._select_activation(act_choice)

        # ----- convolution backbone -----
        self.stage_a = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32)
        )

        self.stage_b = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64)
        )

        self.stage_c = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128)
        )

        self.pool = nn.MaxPool2d(2, 2)

        flat_features = self._compute_flatten(input_dims)

        # ----- dense head -----
        self.fc1 = nn.Linear(flat_features, 256)
        self.dropout = nn.Dropout(0.5)
        self.fc_out = nn.Linear(256, num_classes)

    # -------------------------------------------------

    def _select_activation(self, name):

        key = name.lower()

        table = {
            "relu": nn.ReLU(),
            "tanh": nn.Tanh(),
            "leaky_relu": nn.LeakyReLU()
        }

        if key not in table:
            raise ValueError("Unsupported activation")

        return table[key]

    # -------------------------------------------------

    def _compute_flatten(self, dims):

        with torch.no_grad():

            probe = torch.zeros(1, *dims)

            for block in (self.stage_a,
                          self.stage_b,
                          self.stage_c):

                probe = self.pool(self.act_fn(block(probe)))

            return probe.numel()

    # -------------------------------------------------

    def _extract_features(self, x):

        for block in (self.stage_a,
                      self.stage_b,
                      self.stage_c):

            x = self.pool(self.act_fn(block(x)))

        return x

    # -------------------------------------------------

    def forward(self, x):

        x = self._extract_features(x)
        x = torch.flatten(x, 1)

        x = self.fc1(x)
        x = self.act_fn(x)
        x = self.dropout(x)

        return self.fc_out(x)

def initialize_model_weights(model, mode="xavier"):

    strategy = mode.lower()

    for module in model.modules():

        if isinstance(module, (nn.Conv2d, nn.Linear)):

            if strategy == "xavier":
                nn.init.xavier_uniform_(module.weight)

            elif strategy == "kaiming":
                nn.init.kaiming_uniform_(
                    module.weight,
                    nonlinearity="relu"
                )

            elif strategy == "random":
                nn.init.normal_(
                    module.weight,
                    mean=0.0,
                    std=0.05
                )

            if module.bias is not None:
                nn.init.zeros_(module.bias)

def create_optimizer(model, opt_name="adam", lr=1e-3):

    key = opt_name.lower()

    factory = {
        "sgd": lambda: optim.SGD(
            model.parameters(),
            lr=lr,
            momentum=0.9
        ),
        "adam": lambda: optim.Adam(
            model.parameters(),
            lr=lr
        ),
        "rmsprop": lambda: optim.RMSprop(
            model.parameters(),
            lr=lr
        )
    }

    if key not in factory:
        raise ValueError("Unsupported optimizer")

    return factory[key]()


In [None]:
from tqdm import tqdm
from datetime import datetime
import pandas as pd
import os
import torch
import torch.nn as nn

def execute_training(model,
                     train_loader,
                     val_loader,
                     optimizer,
                     criterion,
                     epochs=10,
                     dataset_name="default",
                     run_id="default",
                     device="cuda"):

    model_dir = os.path.join("models", dataset_name)
    os.makedirs(model_dir, exist_ok=True)

    model.to(device)

    best_accuracy = 0.0

    for _ in tqdm(range(epochs)):

        # -------- training --------
        model.train()
        running_loss = 0.0

        for batch in train_loader:

            inputs, targets = batch[:2]
            inputs = inputs.to(device)
            targets = targets.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss_value = criterion(outputs, targets)

            loss_value.backward()
            optimizer.step()

            running_loss += loss_value.item()

        # -------- validation --------
        model.eval()

        correct = 0
        seen = 0

        with torch.no_grad():

            for batch in val_loader:

                inputs, targets = batch[:2]
                inputs = inputs.to(device)
                targets = targets.to(device)

                logits = model(inputs)

                _, predictions = torch.max(logits, dim=1)

                seen += targets.size(0)
                correct += (predictions == targets).sum().item()

        accuracy = (100.0 * correct / seen) if seen else 0.0

        if accuracy > best_accuracy:

            best_accuracy = accuracy

            best_file = os.path.join(
                model_dir,
                f"model_{run_id}_best.pth"
            )

            torch.save(model.state_dict(), best_file)

    print(
        f"Best Validation Accuracy for {run_id}: "
        f"{best_accuracy:.2f}%\n"
        f"Finished: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
    )

    final_file = os.path.join(
        model_dir,
        f"model_{run_id}_final.pth"
    )

    torch.save(model.state_dict(), final_file)

def evaluate_saved_runs(model_builder,
                        config_list,
                        dataloaders,
                        device="mps",
                        csv_path="results.csv"):

    records = []
    loss_fn = nn.CrossEntropyLoss()

    for cfg in config_list:

        dataset_key = cfg["dataset"]
        val_loader = dataloaders[dataset_key]["val"]

        net = model_builder(
            num_classes=cfg["num_classes"],
            act_choice=cfg["activation"],
            input_dims=cfg["input_shape"]
        ).to(device)

        checkpoint = os.path.join(
            "models",
            dataset_key,
            f"model_{cfg['activation']}_{cfg['init']}_{cfg['optimizer']}_best.pth"
        )

        if not os.path.exists(checkpoint):
            print(f"Missing checkpoint: {checkpoint} â€” skipping")
            continue

        net.load_state_dict(torch.load(checkpoint, map_location=device))
        net.eval()

        cumulative_loss = 0.0
        correct_preds = 0
        total_samples = 0

        with torch.no_grad():

            for batch in val_loader:

                inputs, targets = batch[:2]
                inputs = inputs.to(device)
                targets = targets.to(device)

                logits = net(inputs)

                loss_value = loss_fn(logits, targets)
                cumulative_loss += loss_value.item()

                _, preds = torch.max(logits, dim=1)

                total_samples += targets.size(0)
                correct_preds += (preds == targets).sum().item()

        avg_loss = cumulative_loss / len(val_loader)
        accuracy = 100.0 * correct_preds / total_samples

        dataset_label = (
            "Cifar-10" if dataset_key == "cifar"
            else "Dogs vs Cats"
        )

        records.append({
            "dataset": dataset_label,
            "activation": cfg["activation"],
            "init": cfg["init"],
            "optimizer": cfg["optimizer"],
            "accuracy": accuracy,
            "val_loss": avg_loss
        })

        print(
            f"[{dataset_key}] "
            f"{cfg['activation']}_{cfg['init']}_{cfg['optimizer']} | "
            f"Accuracy: {accuracy:.2f}% | "
            f"Loss: {avg_loss:.4f}"
        )

    results_df = pd.DataFrame(records)
    results_df.to_csv(csv_path, index=False)

    print(f"Results exported â†’ {csv_path}")


In [None]:
cifar_train, cifar_val, cifar_test = fetch_cifar_sets()

cifar_loaders = {
    "train": DataLoader(
        cifar_train,
        batch_size=64,
        shuffle=True
    ),
    "val": DataLoader(
        cifar_val,
        batch_size=64,
        shuffle=False
    )
}


catdog_train, catdog_val = fetch_catdog_sets()

catdog_loaders = {
    "train": DataLoader(
        catdog_train,
        batch_size=64,
        shuffle=True
    ),
    "val": DataLoader(
        catdog_val,
        batch_size=64,
        shuffle=False
    )
}


# unified loader registry (for benchmarking shell)
all_loaders = {
    "cifar": cifar_loaders,
    "catdog": catdog_loaders
}


activation_space = [
    "relu",
    "tanh",
    "leaky_relu"
]

init_space = [
    "xavier",
    "kaiming",
    "random"
]

optimizer_space = [
    "sgd",
    "adam",
    "rmsprop"
]

if torch.backends.mps.is_available():
    device_target = "mps"

elif torch.cuda.is_available():
    device_target = "cuda"

else:
    device_target = "cpu"

print(f"Compute backend: {device_target}")


  batch = pickle.load(f, encoding='bytes')
  batch = pickle.load(f, encoding='bytes')


Using device: mps


In [None]:

dataset_specs = {
    "cifar": {
        "classes": 10,
        "train_loader": cifar_loaders["train"],
        "val_loader": cifar_loaders["val"],
        "input_shape": (3, 32, 32)
    },
    "dvc": {
        "classes": 2,
        "train_loader": catdog_loaders["train"],
        "val_loader": catdog_loaders["val"],
        "input_shape": (3, 64, 64)
    }
}


for act in activation_space:
    for init_mode in init_space:
        for opt_name in optimizer_space:
            for dataset_key, spec in dataset_specs.items():

                run_label = f"{act}_{init_mode}_{opt_name}"

                save_path = os.path.join(
                    "models",
                    dataset_key,
                    f"model_{run_label}_best.pth"
                )

                if os.path.exists(save_path):
                    print(
                        f"âœ… Model already trained â†’ "
                        f"{dataset_key} | {run_label} â€” skipping"
                    )
                    continue

                print(
                    f"\nðŸš€ Launching run â†’ {dataset_key} | {run_label}\n"
                    f"Start time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
                )

                model = CNNModel(
                    num_classes=spec["classes"],
                    act_choice=act,
                    input_dims=spec["input_shape"]
                )

                initialize_model_weights(
                    model,
                    mode=init_mode
                )

                optimizer = create_optimizer(
                    model,
                    opt_name=opt_name,
                    lr=1e-3
                )

                loss_fn = nn.CrossEntropyLoss()

                execute_training(
                    model,
                    spec["train_loader"],
                    spec["val_loader"],
                    optimizer,
                    loss_fn,
                    epochs=10,
                    dataset_name=dataset_key,
                    run_id=run_label,
                    device=device_target
                )


âœ… Model already trained: cifar with config relu_xavier_sgd, skipping...
âœ… Model already trained: dvc with config relu_xavier_sgd, skipping...
âœ… Model already trained: cifar with config relu_xavier_adam, skipping...
âœ… Model already trained: dvc with config relu_xavier_adam, skipping...
âœ… Model already trained: cifar with config relu_xavier_rmsprop, skipping...
âœ… Model already trained: dvc with config relu_xavier_rmsprop, skipping...
âœ… Model already trained: cifar with config relu_kaiming_sgd, skipping...
âœ… Model already trained: dvc with config relu_kaiming_sgd, skipping...
âœ… Model already trained: cifar with config relu_kaiming_adam, skipping...
âœ… Model already trained: dvc with config relu_kaiming_adam, skipping...
âœ… Model already trained: cifar with config relu_kaiming_rmsprop, skipping...
âœ… Model already trained: dvc with config relu_kaiming_rmsprop, skipping...
âœ… Model already trained: cifar with config relu_random_sgd, skipping...
âœ… Model already traine

In [None]:
val_loader_map = {
    "cifar": {"val": cifar_loaders["val"]},
    "dvc": {"val": catdog_loaders["val"]}
}


experiment_specs = []

dataset_templates = {
    "cifar": {
        "num_classes": 10,
        "input_shape": (3, 32, 32)
    },
    "dvc": {
        "num_classes": 2,
        "input_shape": (3, 64, 64)
    }
}

for act in activation_space:
    for init_mode in init_space:
        for opt_name in optimizer_space:

            for dataset_key, meta in dataset_templates.items():

                config_entry = {
                    "dataset": dataset_key,
                    "activation": act,
                    "init": init_mode,
                    "optimizer": opt_name,
                    "num_classes": meta["num_classes"],
                    "input_shape": meta["input_shape"]
                }

                experiment_specs.append(config_entry)

evaluate_saved_runs(
    CNNModel,
    experiment_specs,
    val_loader_map,
    device=device_target,
    csv_path="experiment_results.csv"
)


[cifar] Config: relu_xavier_sgd | Accuracy: 73.53% | Loss: 0.7427
[dvc] Config: relu_xavier_sgd | Accuracy: 87.50% | Loss: 0.3020
[cifar] Config: relu_xavier_adam | Accuracy: 77.60% | Loss: 0.6383
[dvc] Config: relu_xavier_adam | Accuracy: 87.70% | Loss: 0.2929
[cifar] Config: relu_xavier_rmsprop | Accuracy: 80.74% | Loss: 0.5714
[dvc] Config: relu_xavier_rmsprop | Accuracy: 86.14% | Loss: 0.3036
[cifar] Config: relu_kaiming_sgd | Accuracy: 70.41% | Loss: 0.8383
[dvc] Config: relu_kaiming_sgd | Accuracy: 87.68% | Loss: 0.3176
[cifar] Config: relu_kaiming_adam | Accuracy: 78.00% | Loss: 0.6295
[dvc] Config: relu_kaiming_adam | Accuracy: 88.64% | Loss: 0.2590
[cifar] Config: relu_kaiming_rmsprop | Accuracy: 80.14% | Loss: 0.5628
[dvc] Config: relu_kaiming_rmsprop | Accuracy: 86.48% | Loss: 0.3128
[cifar] Config: relu_random_sgd | Accuracy: 70.15% | Loss: 0.8540
[dvc] Config: relu_random_sgd | Accuracy: 83.46% | Loss: 0.3798
[cifar] Config: relu_random_adam | Accuracy: 80.31% | Loss: 0.55

In [None]:
df = pd.DataFrame(pd.read_csv('experiment_results.csv'))
df

Unnamed: 0,dataset,activation,init,optimizer,accuracy,val_loss
0,Cifar-10,relu,xavier,sgd,73.53,0.742685
1,Dogs vs Cats,relu,xavier,sgd,87.5,0.302032
2,Cifar-10,relu,xavier,adam,77.6,0.638299
3,Dogs vs Cats,relu,xavier,adam,87.7,0.292867
4,Cifar-10,relu,xavier,rmsprop,80.74,0.571371
5,Dogs vs Cats,relu,xavier,rmsprop,86.14,0.303589
6,Cifar-10,relu,kaiming,sgd,70.41,0.838343
7,Dogs vs Cats,relu,kaiming,sgd,87.68,0.317576
8,Cifar-10,relu,kaiming,adam,78.0,0.629464
9,Dogs vs Cats,relu,kaiming,adam,88.64,0.259028


In [None]:
best_cifar = df[df.dataset == "Cifar-10"].nlargest(1, "accuracy")
best_cifar

Unnamed: 0,dataset,activation,init,optimizer,accuracy,val_loss
52,Cifar-10,leaky_relu,random,rmsprop,84.03,0.463258


In [None]:
best_dvc = df[df.dataset == "Dogs vs Cats"].nlargest(1, "accuracy")
best_dvc

Unnamed: 0,dataset,activation,init,optimizer,accuracy,val_loss
45,Dogs vs Cats,leaky_relu,kaiming,adam,89.74,0.234952
