In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("msambare/fer2013")

print("Path to dataset files:", path)

In [None]:
import torch
from torch import nn
import torchvision
from torchvision import datasets, transforms

In [None]:
import os

os.listdir(path)

In [None]:
train_dir = f"{path}/train"
test_dir = f"{path}/test"

In [None]:
from torchvision import transforms

data_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.TrivialAugmentWide(num_magnitude_bins=31),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]
)

test_transform = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]
)

In [None]:
train_data = datasets.ImageFolder(
    root=train_dir, transform=data_transforms, target_transform=None
)
test_data = datasets.ImageFolder(
    root=test_dir, transform=test_transform, target_transform=None
)

In [None]:
class_names = train_data.classes
class_names

In [None]:
from torch.utils.data import DataLoader

train_data_loader = DataLoader(train_data, batch_size=32, shuffle=True, num_workers=2)
test_data_loader = DataLoader(test_data, batch_size=32, num_workers=2, shuffle=False)

In [None]:
img, label = test_data[0]
img.shape, label

In [None]:
import matplotlib.pyplot as plt

plt.imshow(img.permute(1, 2, 0))
plt.title(class_names[label])

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

In [None]:
weights = torchvision.models.EfficientNet_B2_Weights.DEFAULT
model = torchvision.models.efficientnet_b2(weights=weights).to(device)

In [None]:
for layer in model.features.parameters():
    layer.requires_grad = False

for layer in list(model.features.parameters())[:-7]:
    layer.requires_grad = True


torch.cuda.manual_seed(42)
torch.manual_seed(42)

model.classifier = nn.Sequential(
    nn.Dropout(p=0.6, inplace=True),
    nn.Linear(in_features=1408, out_features=len(class_names), bias=True),
)

model.to(device)

In [None]:
x = torch.rand((1, 3, 224, 224)).to(device)
model(x)

In [None]:
train_weights = torch.tensor(
    [1.02, 9.37, 0.99, 0.57, 0.82, 0.85, 1.28], dtype=torch.float32
).to(device)

In [None]:
train_loss_fn = nn.CrossEntropyLoss(weight=train_weights)
test_loss_fn = nn.CrossEntropyLoss()

optimizer = torch.optim.Adam(
    [
        {"params": model.features.parameters(), "lr": 1e-4},
        {"params": model.classifier.parameters(), "lr": 5e-4},
    ],
    weight_decay=1e-4,
)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, patience=5, mode="min", factor=0.1
)

In [None]:
def acc_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item()
    return correct / len(y_true) * 100

In [None]:
results_path = "./emotions_results (b2_5e-4)"

import os

if not os.path.exists(results_path):
    os.mkdir(results_path)
    print("created")

with open(f"{results_path}/class_names.txt", "w") as f:
    f.write("\n".join(class_names))

In [None]:
patience = 3
early_stop = 0
best_loss = None
epochs = 10

In [None]:
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter(log_dir=f"{results_path}/runs")

In [None]:
from tqdm.auto import tqdm
import time

for epoch in tqdm(range(1, epochs + 1)):
    train_acc, train_loss = 0, 0
    model.train()
    for batch, (X, y) in tqdm(
        enumerate(train_data_loader),
        leave=False,
        total=len(train_data_loader),
        desc=f"Training Epoch {epoch}",
    ):
        X, y = X.to(device), y.to(device)
        logits = model(X)
        loss = train_loss_fn(logits, y)
        train_loss += loss.item()
        train_acc += acc_fn(y, logits.argmax(dim=1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    train_loss /= len(train_data_loader)
    train_acc /= len(train_data_loader)

    model.eval()
    test_acc, test_loss = 0, 0
    with torch.inference_mode():
        for batch, (X, y) in tqdm(
            enumerate(test_data_loader),
            leave=False,
            total=len(test_data_loader),
            desc=f"Testing Epoch {epoch}",
        ):
            X, y = X.to(device), y.to(device)
            logits = model(X)
            test_loss += test_loss_fn(logits, y).item()
            test_acc += acc_fn(y, logits.argmax(dim=1))
        test_acc /= len(test_data_loader)
        test_loss /= len(test_data_loader)

    writer.add_scalars(
        main_tag="Loss",
        tag_scalar_dict={"train_loss": train_loss, "test_loss": test_loss},
        global_step=epoch,
    )
    writer.add_scalars(
        main_tag="Accuracy",
        tag_scalar_dict={"train_acc": train_acc, "test_acc": test_acc},
        global_step=epoch,
    )
    writer.add_scalar(
        tag="Learning Rate",
        scalar_value=optimizer.param_groups[0]["lr"],
        global_step=epoch,
    )

    info = f"Epoch: {epoch} | Train acc: {train_acc:.5f} | Train loss: {train_loss:.5f} | Test acc: {test_acc:.5f} | Test loss: {test_loss:.5f}"

    with open(f"{results_path}/training_info.txt", "a") as f:
        f.write(info + "\n")

    print(info)

    old_lr = optimizer.param_groups[0]["lr"]
    scheduler.step(test_loss)
    new_lr = optimizer.param_groups[0]["lr"]

    if new_lr < old_lr:
        print(
            f"Learning rate is reduced from: {old_lr} -> {new_lr} after epoch: {epoch}"
        )

    if best_loss is None:
        best_loss = test_loss
        torch.save(model, f"{results_path}/model.pth")
        print(f"Best model saved after epoch: {epoch}")

    if test_loss < best_loss:
        best_loss = test_loss
        torch.save(model, f"{results_path}/model.pth")
        print(f"Best model saved after epoch: {epoch}")
        early_stop = 0
    else:
        early_stop += 1
        if early_stop == patience:
            print(f"Early stopping after epoch: {epoch}")
            break

In [None]:
from tqdm.auto import tqdm

model = torch.load(f"{results_path}/model.pth", weights_only=False)
model.to(device)

test_preds = []

test_loss, test_acc = 0, 0
model.eval()
with torch.inference_mode():
    for batch, (X, y) in tqdm(
        enumerate(test_data_loader),
        leave=False,
        total=len(test_data_loader),
        desc="Testing",
    ):
        X, y = X.to(device), y.to(device)
        logits = model(X)
        test_loss += test_loss_fn(logits, y).item()
        test_acc += acc_fn(y, logits.argmax(dim=1))
        y_pred = logits.argmax(dim=1)
        test_preds.append(y_pred.cpu())

    test_loss /= len(test_data_loader)
    test_acc /= len(test_data_loader)

test_preds = torch.cat(test_preds)
test_loss, test_acc

In [None]:
try:
    from torchmetrics import ConfusionMatrix
except:
    !pip install torchmetrics
    from torchmetrics import ConfusionMatrix

cm = ConfusionMatrix(task="multiclass", num_classes=len(class_names))
conf_mat = cm(test_preds, torch.Tensor(test_data.targets).type(torch.int64))

In [None]:
from mlxtend.plotting import plot_confusion_matrix
import matplotlib.pyplot as plt

fig, ax = plot_confusion_matrix(
    conf_mat=conf_mat.numpy(), class_names=class_names, figsize=(7, 7)
)
plt.title("Confusion matrix")
plt.savefig(f"{results_path}/confusion_matrix.png", dpi=1000)

In [None]:
model.cpu()
torch.save(model, f"{results_path}/cpu_model.pth")