# **IMPORT**

In [53]:
import os
import json
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Subset, random_split
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import f1_score, accuracy_score

# **FUNCTIONS**

In [54]:
def save_json(path, content):
    with open(path, "w", encoding="utf-8") as file:
        json.dump(content, file, ensure_ascii=False, indent=3)


# **LOAD DATA**

## ***Load CIFAR10***

In [55]:
BATCH_SIZE = 128

In [56]:
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465),
                         (0.2470, 0.2435, 0.2616))
])
transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465),
                         (0.2470, 0.2435, 0.2616))
])

train_dataset = torchvision.datasets.CIFAR10(root="./data", train=True, download=True, transform=transform_train)
test_dataset = torchvision.datasets.CIFAR10(root="./data", train=False, download=True, transform=transform_test)


## ***Train - Dev - Test***

In [57]:
train_size = int(0.8 * len(train_dataset))
dev_size = len(train_dataset) - train_size

In [58]:
train_dataset, dev_dataset= random_split(train_dataset, [train_size, dev_size])

In [59]:
train_loader_fullset = DataLoader(train_dataset, batch_size=len(train_dataset), shuffle=False, num_workers=2)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
dev_loader = DataLoader(dev_dataset, batch_size=100, shuffle=False, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=100, shuffle=False, num_workers=2)

# **BUILD MODEL**

## ***Build Model***

In [60]:
class SimpleCNN(nn.Module):
    """
    Một mô hình CNN cơ bản cho bài toán Image Classification.
    Cấu trúc:
      - 3 khối convolution + batchnorm + relu + maxpool
      - 2 tầng fully-connected để phân loại
    """
    def __init__(self, num_classes=10):
        super(SimpleCNN, self).__init__()

        # Block 1: Conv -> BN -> ReLU -> MaxPool
        self.block1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Block 2
        self.block2 = nn.Sequential(
            nn.Conv2d(32, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2)
        )

        # Block 3
        self.block3 = nn.Sequential(
            nn.Conv2d(64, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2)
        )

        # Fully Connected Layers
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128 * 4 * 4, 256),  # CIFAR-10 có input 32x32 → sau 3 lần pool còn 4x4
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.classifier(x)
        return x

## ***Optimizer***

In [61]:
EPOCHS = 30
LR = 1e-4

In [62]:
optimizers_list = {
    "GD": (optim.SGD, {"lr": LR}), # Gradient Descent
    "SGD": (optim.SGD, {"lr": LR}), # Schocastic Gradient Descent
    "AdaGrad": (optim.Adagrad, {"lr": LR}), # AdaGrad
    "Adam": (optim.Adam, {"lr": LR}) # Adam
}

# **TRAINING**

## ***Trainer Class***

In [80]:
class Trainer:
    def __init__(self, optimizer_name, optimizer_cls, optimizer_params, epochs, save_dir) -> None:
        self.epochs = epochs
        self.save_dir = save_dir
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.optimizer_name = optimizer_name
        self.optimizer_cls = optimizer_cls
        self.optimizer_params = optimizer_params

        self.load()

    #-- Load
    def load(self):
        self.load_model()
        self.load_optimizer()
        self.load_loss_fn()
        self.load_ckpt()

    def load_model(self):
        self.model = SimpleCNN()
        self.model = self.model.to(self.device)

    def load_optimizer(self):
        self.optimizer = self.optimizer_cls(self.model.parameters(), **self.optimizer_params)

    def load_loss_fn(self):
        self.criterion = nn.CrossEntropyLoss()

    #-- Training
    def train(
            self,
            train_loader,
            val_loader
        ):
        train_losses = []
        val_losses = []
        epoch_val_logs = {}

        best_f1 = -1000

        pbar = tqdm(range(self.epochs), desc="Training Process ...")
        for epoch in pbar:
            #~ Training
            self.model.train()
            for inputs, targets in train_loader:
                inputs, targets = inputs.to(self.device), targets.to(self.device)
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = self.criterion(outputs, targets)
                loss.backward()
                self.optimizer.step()
                loss_scalar = loss.detach().cpu().item()
                train_losses.append(loss_scalar)

            #~ Validation
            self.model.eval()
            all_predicts = []
            all_targets = []
            with torch.no_grad():
                for inputs, targets in val_loader:
                    inputs, targets = inputs.to(self.device), targets.to(self.device)
                    outputs = self.model(inputs)
                    _, predicts = torch.max(outputs, 1)

                    #~ Save inferences
                    all_predicts.extend(predicts.cpu().numpy())
                    all_targets.extend(targets.cpu().numpy())

                    #~ Calculate Loss
                    loss = self.criterion(outputs, targets)
                    loss_scalar = loss.detach().cpu().item()
                    val_losses.append(loss_scalar)

            val_acc = accuracy_score(all_targets, all_predicts)
            val_f1_macro = f1_score(all_targets, all_predicts, average='macro')
            val_f1_weighted = f1_score(all_targets, all_predicts, average='weighted')

            if val_f1_macro > best_f1:
                self.save_checkpoint()

            #~ Save Log
            train_loss_avg = sum(train_losses) / len(train_losses),
            val_loss_avg = sum(val_losses) / len(val_losses)

            epoch_log = {
                "train_loss": f"{train_loss_avg}",
                "val_loss": f"{val_loss_avg}",
                "val_acc": val_acc,
                "val_f1_macro": val_f1_macro,
                "val_f1_weighted": val_f1_weighted
            }
            pbar.set_postfix(epoch_log)
            epoch_val_logs[epoch] = epoch_log

        return epoch_val_logs


    def inference(self, test_loader):
        all_predicts = []
        all_targets = []
        test_losses = []
        with torch.no_grad():
            for inputs, targets in tqdm(test_loader, desc="Inferencing"):
                inputs, targets = inputs.to(self.device), targets.to(self.device)
                outputs = self.model(inputs)
                _, predicts = torch.max(outputs, 1)

                #~ Save inferences
                all_predicts.extend(predicts.cpu().numpy())
                all_targets.extend(targets.cpu().numpy())

                #~ Calculate Loss
                loss = self.criterion(outputs, targets)
                loss_scalar = loss.detach().cpu().item()
                test_losses.append(loss_scalar)

            test_loss_avg = sum(test_losses) / len(test_losses)
            test_acc = accuracy_score(all_targets, all_predicts)
            test_f1_macro = f1_score(all_targets, all_predicts, average='macro')
            test_f1_weighted = f1_score(all_targets, all_predicts, average='weighted')
            return {
                "test_loss": test_loss_avg,
                "test_acc": test_acc,
                "test_f1_macro": test_f1_macro,
                "test_f1_weighted": test_f1_weighted
            }


    #-- Save Model
    def save_checkpoint(self):
        checkpoint_path = os.path.join(self.save_dir, f"cnn_model_checkpoint_{self.optimizer_name}_best.pt")
        torch.save({
            "model_state_dict": self.model.state_dict(),
            "optimizer_state_dict": self.optimizer.state_dict(),
        }, checkpoint_path)
        print(f"\n\n✅ Model checkpoint saved to: {checkpoint_path}")


    def load_ckpt(self):
        checkpoint_path = os.path.join(self.save_dir, f"cnn_model_checkpoint_{self.optimizer_name}_best.pt")
        if not os.path.exists(checkpoint_path):
            print(f"✅ No {checkpoint_path} founded")
            return

        # Tạo model và optimizer
        self.model = self.model.to(self.device)
        self.optimizer = self.optimizer_cls(self.model.parameters(), **self.optimizer_params)

        # Load checkpoint
        checkpoint = torch.load(checkpoint_path, map_location=self.device)
        self.model.load_state_dict(checkpoint["model_state_dict"])
        self.optimizer.load_state_dict(checkpoint["optimizer_state_dict"])

        self.model.eval()  # đặt model sang evaluation mode
        print(f"✅ Model loaded from {checkpoint_path}")



    #-- Get Model
    def get_model(self):
        return self.model

In [81]:
optimizers_list = {
    "GD": (optim.SGD, {"lr": LR}), # Gradient Descent
    "SGD": (optim.SGD, {"lr": LR}), # Schocastic Gradient Descent
    "AdaGrad": (optim.Adagrad, {"lr": LR}), # AdaGrad
    "Adam": (optim.Adam, {"lr": LR}) # Adam
}

## ***Training GD***

In [78]:
gd_trainer = Trainer(
    optimizer_name="GD",
    optimizer_cls=optimizers_list["GD"][0],
    optimizer_params=optimizers_list["GD"][1],
    epochs=EPOCHS,
    save_dir="/content/drive/MyDrive/QUÝNHEA/Project/models"
)

✅ No /content/drive/MyDrive/QUÝNHEA/Project/models/cnn_model_checkpoint_GD_best.pt founded


In [66]:
# gd_logs = gd_trainer.train(train_loader_fullset, dev_loader)

## ***Training SGD***

In [82]:
sgd_trainer = Trainer(
    optimizer_name="SGD",
    optimizer_cls=optimizers_list["SGD"][0],
    optimizer_params=optimizers_list["SGD"][1],
    epochs=EPOCHS,
    save_dir="/content/drive/MyDrive/QUÝNHEA/Project/models"
)

✅ Model loaded from /content/drive/MyDrive/QUÝNHEA/Project/models/cnn_model_checkpoint_SGD_best.pt


In [83]:
# sgd_logs = sgd_trainer.train(train_loader, dev_loader)

In [None]:
# sgd_save_path = os.path.join("/content/drive/MyDrive/QUÝNHEA/Project/models", "sgd_log.json")
# save_json(sgd_save_path, sgd_logs)

In [87]:
test_sgd_logs = sgd_trainer.inference(test_loader)
test_sgd_save_path = os.path.join("/content/drive/MyDrive/QUÝNHEA/Project/models", "test_sgd_log.json")
save_json(test_sgd_save_path, test_sgd_logs)

Inferencing: 100%|██████████| 100/100 [00:02<00:00, 45.12it/s]


In [85]:
test_sgd_logs

{'test_loss': 1.8440232408046722,
 'test_acc': 0.3657,
 'test_f1_macro': 0.3566435823626392,
 'test_f1_weighted': 0.3566435823626392}

## ***Training Adam***

In [86]:
adam_trainer = Trainer(
    optimizer_name="Adam",
    optimizer_cls=optimizers_list["Adam"][0],
    optimizer_params=optimizers_list["Adam"][1],
    epochs=EPOCHS,
    save_dir="/content/drive/MyDrive/QUÝNHEA/Project/models"
)

✅ Model loaded from /content/drive/MyDrive/QUÝNHEA/Project/models/cnn_model_checkpoint_Adam_best.pt


In [None]:
# adam_logs = adam_trainer.train(train_loader, dev_loader)

In [None]:
# adam_save_path = os.path.join("/content/drive/MyDrive/QUÝNHEA/Project/models", "adam_log.json")
# save_json(adam_save_path, adam_logs)

In [88]:
test_adam_logs = adam_trainer.inference(test_loader)
test_adam_save_path = os.path.join("/content/drive/MyDrive/QUÝNHEA/Project/models", "test_adam_log.json")
save_json(test_adam_save_path, test_adam_logs)

Inferencing: 100%|██████████| 100/100 [00:04<00:00, 21.38it/s]


In [91]:
test_adam_logs

{'test_loss': 0.6607854717969894,
 'test_acc': 0.7686,
 'test_f1_macro': 0.7666959863863341,
 'test_f1_weighted': 0.7666959863863342}

## ***Training AdaGrad***

In [89]:
adaGrad_trainer = Trainer(
    optimizer_name="AdaGrad",
    optimizer_cls=optimizers_list["AdaGrad"][0],
    optimizer_params=optimizers_list["AdaGrad"][1],
    epochs=EPOCHS,
    save_dir="/content/drive/MyDrive/QUÝNHEA/Project/models"
)

✅ Model loaded from /content/drive/MyDrive/QUÝNHEA/Project/models/cnn_model_checkpoint_AdaGrad_best.pt


In [None]:
adaGrad_logs = adaGrad_trainer.train(train_loader, dev_loader)

In [None]:
adaGrad_save_path = os.path.join("/content/drive/MyDrive/QUÝNHEA/Project/models", "adaGrad_log.json")
save_json(adaGrad_save_path, adaGrad_logs)

In [90]:
test_adaGrad_logs = adaGrad_trainer.inference(test_loader)
test_adaGrad_save_path = os.path.join("/content/drive/MyDrive/QUÝNHEA/Project/models", "test_adaGrad_log.json")
save_json(test_adaGrad_save_path, test_adaGrad_logs)

Inferencing: 100%|██████████| 100/100 [00:04<00:00, 20.31it/s]


In [92]:
test_adaGrad_logs

{'test_loss': 1.4940692114830016,
 'test_acc': 0.4732,
 'test_f1_macro': 0.46892948540150964,
 'test_f1_weighted': 0.46892948540150975}

# **EVALUATE**

# **EDA**