In [None]:
from google.colab import drive #if youre using colab
drive.mount('/content/drive')

In [None]:
!pip install optuna

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim

import optuna
from tqdm.auto import tqdm
from copy import deepcopy
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

from torchvision import transforms
from torchvision.models import efficientnet_v2_s, EfficientNet_V2_S_Weights

In [None]:
FILEPATH = "/content/drive/MyDrive/Colab Notebooks/capstone_data" #change according to your local machine

In [None]:
faces = np.load(f"{FILEPATH}/preprocessed_faces.npy")
labels = np.load(f"{FILEPATH}/preprocessed_labels.npy")
print("Faces shape:", faces.shape)
print("Labels shape:", labels.shape)

In [None]:
plt.figure(figsize=(12, 8))
for i in range(min(12, len(faces))):
    plt.subplot(3, 4, i + 1)
    plt.imshow(faces[i+5])
    plt.title(f"Label: {labels[i+5]}")
    plt.axis("off")
plt.show()

In [None]:
EPOCHS = 20 #set to 50 or more to find better parameter later
BATCH = 32
OUT_CLASSES = 4
IMG_SIZE = 224

In [None]:
train_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomVerticalFlip(0.6),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])


In [None]:
label_index = {"combination": 0, "dry": 1, "normal": 2, "oily": 3}
index_label = {0: "combination", 1: "dry", 2: "normal", 3: "oily"}

faces_train, faces_temp, labels_train, labels_temp = train_test_split(
    faces, labels, test_size=0.2, random_state=42
)
faces_val, faces_test, labels_val, labels_test = train_test_split(
    faces_temp, labels_temp, test_size=0.5, random_state=42
)


In [None]:
class NPYDataset(Dataset):
    def __init__(self, data, labels, transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img = self.data[idx]
        label = self.labels[idx]
        if self.transform:
            img = self.transform(img)
        return img, label

In [None]:
train_ds = NPYDataset(faces_train, labels_train, train_transform)
val_ds = NPYDataset(faces_val, labels_val, transform)
test_ds = NPYDataset(faces_test, labels_test, transform)

train_dl = DataLoader(train_ds, batch_size=BATCH, shuffle=True)
val_dl = DataLoader(val_ds, batch_size=BATCH, shuffle=False)
test_dl = DataLoader(test_ds, batch_size=BATCH, shuffle=False)

In [None]:
effnet = efficientnet_v2_s(weights=EfficientNet_V2_S_Weights.IMAGENET1K_V1)
num_ftrs = effnet.classifier[1].in_features
effnet.classifier[1] = nn.Linear(num_ftrs, OUT_CLASSES)

device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)
torch.cuda.empty_cache()
torch.cuda.ipc_collect()

In [None]:
def objective(trial):
    lr = trial.suggest_float("lr", 1e-2, 1e-1, log=True)
    model = deepcopy(effnet).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=lr)
    best_val_loss = float('inf')

    for epoch in range(1, EPOCHS + 1):
        # Training
        model.train()
        running_loss, correct, total = 0.0, 0, 0
        for data, target in tqdm(train_dl, desc=f"Epoch {epoch} Training"):
            optimizer.zero_grad()
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            loss = criterion(outputs, target)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * data.size(0)
            correct += (outputs.argmax(1) == target).sum().item()
            total += data.size(0)
        train_loss, train_acc = running_loss/total, correct/total

        # Validation
        model.eval()
        val_running, val_corr, val_tot = 0.0, 0, 0
        with torch.no_grad():
            for data, target in tqdm(val_dl, desc=f"Epoch {epoch} Validation"):
                data, target = data.to(device), target.to(device)
                outputs = model(data)
                loss = criterion(outputs, target)
                val_running += loss.item() * data.size(0)
                val_corr += (outputs.argmax(1) == target).sum().item()
                val_tot += data.size(0)
        val_loss, val_acc = val_running/val_tot, val_corr/val_tot

        if val_loss < best_val_loss:
            best_val_loss = val_loss

        print(f"Epoch {epoch} | LR: {lr:.6f} | "
              f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc*100:.2f}% | "
              f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc*100:.2f}%")

    return best_val_loss

In [None]:
# study = optuna.create_study(direction="maximize")
# study.optimize(objective, n_trials=2)
# print("Best hyperparameters:", study.best_params)
# print("Best value (validation accuracy):", study.best_value)

study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=5)
print("Best hyperparameters:", study.best_params)
print("Best value (validation loss):", study.best_value)

In [None]:
best_lr = study.best_params["lr"]
print("Retraining with best hyperparameters...")
final_model = deepcopy(effnet).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(final_model.parameters(), lr=best_lr)
best_final_loss = float('inf')

for epoch in range(1, EPOCHS + 1):
    # Training
    final_model.train()
    running_loss, correct, total = 0.0, 0, 0
    for data, target in tqdm(train_dl, desc=f"Retrain Epoch {epoch}"):
        optimizer.zero_grad()
        data, target = data.to(device), target.to(device)
        outputs = final_model(data)
        loss = criterion(outputs, target)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * data.size(0)
        correct += (outputs.argmax(1) == target).sum().item()
        total += data.size(0)

    # Validation
    final_model.eval()
    val_running, val_corr, val_tot = 0.0, 0, 0
    with torch.no_grad():
        for data, target in tqdm(val_dl, desc="Validation"):
            data, target = data.to(device), target.to(device)
            outputs = final_model(data)
            loss = criterion(outputs, target)
            val_running += loss.item() * data.size(0)
            val_corr += (outputs.argmax(1) == target).sum().item()
            val_tot += data.size(0)
    val_loss, val_acc = val_running/val_tot, val_corr/val_tot

    if val_loss < best_final_loss:
        best_final_loss = val_loss
        best_model = deepcopy(final_model)

    print(f"Epoch {epoch} | LR: {best_lr:.6f} | "
          f"Train Loss: {running_loss/total:.4f} | Train Acc: {correct/total*100:.2f}% | "
          f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc*100:.2f}%")

In [None]:
best_model.eval()
test_preds = []
truth = []
with torch.no_grad():
    for data, target in test_dl:
        if device == "cuda":
            data = data.cuda()
        outputs = best_model(data)
        test_preds.extend(outputs.argmax(1).cpu().numpy())
        truth.extend(target.numpy())

In [None]:
score = accuracy_score(truth, test_preds)
report = classification_report(truth, test_preds)
cm = confusion_matrix(truth, test_preds)

In [None]:
print("Test Accuracy:", round(score * 100, 2), "%")
print(report)

In [None]:
sns.heatmap(cm, annot=True, fmt='d')
plt.title(f"Accuracy: {round(score * 100, 2)}%")
plt.show()

In [None]:
fig, axes = plt.subplots(nrows=5, ncols=5, figsize=(10, 6))
index = 0
for i in range(5):
    for j in range(5):
        img = Image.fromarray(np.uint8(faces_test[index] * 255))
        axes[i][j].imshow(img)
        axes[i][j].set_title("Pred: {}\nTruth: {}".format(
            index_label[test_preds[index]], index_label[truth[index]]
        ))
        axes[i][j].axis("off")
        index += 1
plt.tight_layout()
plt.show()