# CNN model optimised with Optuna and Transfer learning
- Setting up PyTorch model architecture with efficientnet_b0 backbone
- Optimizing hyperparameters using Optuna 5 trials
- Training final model with most optimal hyperparameters

In [5]:
#imports
import os
import json
import random
import numpy as np
from pathlib import Path
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from tqdm import tqdm
import optuna
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns


In [6]:
#setting random seeds n device
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#setting up paths 
projectRoot = Path().resolve().parent
outputRoot = projectRoot / "ModifiedDataset"

trainImagePath = outputRoot / "train" / "images"
trainLabelPath = outputRoot / "train" / "labels"
testImagePath = outputRoot / "test" / "images"
testLabelPath = outputRoot / "test" / "labels"

with open(outputRoot / "kfold_splits.json", "r") as f:
    kfold_splits = json.load(f)

In [7]:
#loading dataset func
def load_dataset(image_dir, label_dir):
    image_files = sorted([f for f in os.listdir(image_dir) if f.endswith(('.jpg', '.png', '.jpeg'))])
    paths, labels = [], []
    for fname in image_files:
        paths.append(os.path.join(image_dir, fname))
        label_path = os.path.join(label_dir, fname.replace('.jpg', '.txt').replace('.jpeg', '.txt').replace('.png', '.txt'))
        with open(label_path) as f:
            labels.append(int(f.read().strip()))
    return paths, np.array(labels)

train_image_paths, train_labels = load_dataset(trainImagePath, trainLabelPath)
test_image_paths, test_labels = load_dataset(testImagePath, testLabelPath)

augImagePath = outputRoot / "AugmentedData" / "images"
augLabelPath = outputRoot / "AugmentedData" / "labels"
aug_image_paths, aug_labels = load_dataset(augImagePath, augLabelPath)

all_train_image_paths = train_image_paths + aug_image_paths
all_train_labels = np.concatenate([train_labels, aug_labels])


In [8]:
# transforms and dataset class
def get_transforms():
    return transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, transform):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img = cv2.imread(self.image_paths[idx])
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = self.transform(img)
        return img, self.labels[idx]

In [9]:
#model builder and utilities
def build_model(pretrained=True, num_classes=3):
    base = models.efficientnet_b0(weights='IMAGENET1K_V1' if pretrained else None)
    features = base.classifier[1].in_features
    base = nn.Sequential(base.features, nn.AdaptiveAvgPool2d(1))

    model = nn.Sequential(
        base,
        nn.Flatten(),
        nn.Dropout(0.5),
        nn.Linear(features, 256),
        nn.ReLU(),
        nn.Dropout(0.3),
        nn.Linear(256, num_classes)
    )
    return model.to(device)

def freeze_model(model, freeze_ratio=0.5):
    params = list(model[0].parameters())
    cutoff = int(len(params) * freeze_ratio)
    for i, param in enumerate(params):
        param.requires_grad = i >= cutoff
    return model

def calculate_class_weights(labels):
    counts = np.bincount(labels)
    total = len(labels)
    return torch.tensor(total / (len(counts) * counts), dtype=torch.float32).to(device)

def evaluate_model(model, loader):
    model.eval()
    preds, targets = [], []
    with torch.no_grad():
        for x, y in tqdm(loader, desc="Evaluating", leave=False):
            x, y = x.to(device), y.to(device)
            out = model(x)
            preds.extend(out.argmax(1).cpu().numpy())
            targets.extend(y.cpu().numpy())
    acc = accuracy_score(targets, preds)
    prec, rec, f1, _ = precision_recall_fscore_support(targets, preds, average='weighted')
    cm = confusion_matrix(targets, preds)
    report = classification_report(targets, preds, target_names=['No Mask', 'Mask', 'Improper Mask'])
    return acc, prec, rec, f1, cm, report

In [10]:
#optuna objective
def objective(trial):
    batch_size = trial.suggest_categorical('batch_size', [16, 32])
    lr = trial.suggest_float('learning_rate', 1e-5, 5e-4, log=True)
    weight_decay = trial.suggest_float('weight_decay', 1e-5, 1e-3, log=True)
    freeze_ratio = trial.suggest_float('freeze_ratio', 0.5, 0.9)
    use_class_weights = trial.suggest_categorical('use_class_weights', [True, False])
    scheduler_type = trial.suggest_categorical('scheduler_type', ['step', 'cosine', 'none'])

    scores = []

    for fold_idx in kfold_splits:
        train_idx = kfold_splits[fold_idx]['train']
        val_idx = kfold_splits[fold_idx]['val']

        X_train = [all_train_image_paths[i] for i in train_idx]
        y_train = all_train_labels[train_idx]
        X_val = [all_train_image_paths[i] for i in val_idx]
        y_val = all_train_labels[val_idx]

        train_loader = DataLoader(CustomDataset(X_train, y_train, get_transforms()), batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(CustomDataset(X_val, y_val, get_transforms()), batch_size=batch_size, shuffle=False)

        model = build_model()
        model = freeze_model(model, freeze_ratio)

        class_weights = calculate_class_weights(y_train) if use_class_weights else None
        criterion = nn.CrossEntropyLoss(weight=class_weights)
        optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)

        if scheduler_type == 'step':
            scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.5)
        elif scheduler_type == 'cosine':
            scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)
        else:
            scheduler = None

        for epoch in range(10):
            model.train()
            for xb, yb in tqdm(train_loader, desc=f"Fold {fold_idx} Epoch {epoch+1}", leave=False):
                xb, yb = xb.to(device), yb.to(device)
                optimizer.zero_grad()
                loss = criterion(model(xb), yb)
                loss.backward()
                optimizer.step()
            if scheduler:
                scheduler.step()

        acc, *_ = evaluate_model(model, val_loader)
        scores.append(acc)

        del model
        torch.cuda.empty_cache()

    return np.mean(scores)


In [11]:
#running Optuna search
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=5)

best_params = study.best_trial.params
print("\nBest Hyperparameters:", best_params)


[I 2025-04-27 06:04:29,422] A new study created in memory with name: no-name-6c3ef741-a653-4928-8daa-d3edde251f2d
[I 2025-04-27 06:27:37,147] Trial 0 finished with value: 0.9407457821762113 and parameters: {'batch_size': 16, 'learning_rate': 0.0003081036640811829, 'weight_decay': 0.0003620014114001303, 'freeze_ratio': 0.5890893944032941, 'use_class_weights': False, 'scheduler_type': 'step'}. Best is trial 0 with value: 0.9407457821762113.
[I 2025-04-27 06:49:33,471] Trial 1 finished with value: 0.8687605762248156 and parameters: {'batch_size': 16, 'learning_rate': 4.305161439365348e-05, 'weight_decay': 6.229649916553409e-05, 'freeze_ratio': 0.7072290223746327, 'use_class_weights': True, 'scheduler_type': 'cosine'}. Best is trial 0 with value: 0.9407457821762113.
[I 2025-04-27 07:08:26,682] Trial 2 finished with value: 0.7601442250857076 and parameters: {'batch_size': 32, 'learning_rate': 1.440407281858524e-05, 'weight_decay': 3.736507515202722e-05, 'freeze_ratio': 0.8191615918797701, '


Best Hyperparameters: {'batch_size': 16, 'learning_rate': 0.0003081036640811829, 'weight_decay': 0.0003620014114001303, 'freeze_ratio': 0.5890893944032941, 'use_class_weights': False, 'scheduler_type': 'step'}


In [12]:
#final model training and validation
batch_size = best_params['batch_size']
lr = best_params['learning_rate']
weight_decay = best_params['weight_decay']
freeze_ratio = best_params['freeze_ratio']
use_class_weights = best_params['use_class_weights']
scheduler_type = best_params['scheduler_type']

model = build_model()
model = freeze_model(model, freeze_ratio)

val_split = 0.1
val_size = int(len(all_train_image_paths) * val_split)
train_paths = all_train_image_paths[val_size:]
train_labels_ = all_train_labels[val_size:]
val_paths = all_train_image_paths[:val_size]
val_labels_ = all_train_labels[:val_size]

train_loader_final = DataLoader(CustomDataset(train_paths, train_labels_, get_transforms()), batch_size=batch_size, shuffle=True)
val_loader_final = DataLoader(CustomDataset(val_paths, val_labels_, get_transforms()), batch_size=batch_size, shuffle=False)

class_weights = calculate_class_weights(train_labels_) if use_class_weights else None
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)

if scheduler_type == 'step':
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.5)
elif scheduler_type == 'cosine':
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)
else:
    scheduler = None

best_val_loss = float('inf')
patience = 5
counter = 0

for epoch in range(10):
    model.train()
    for xb, yb in tqdm(train_loader_final, desc=f"Final Train Epoch {epoch+1}", leave=False):
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        loss = criterion(model(xb), yb)
        loss.backward()
        optimizer.step()
    if scheduler:
        scheduler.step()

    model.eval()
    val_loss = 0
    with torch.no_grad():
        for xb, yb in val_loader_final:
            xb, yb = xb.to(device), yb.to(device)
            outputs = model(xb)
            loss = criterion(outputs, yb)
            val_loss += loss.item()

    val_loss /= len(val_loader_final)
    print(f"Epoch {epoch+1}: Validation Loss = {val_loss:.4f}")

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model_wts = model.state_dict()
        counter = 0
    else:
        counter += 1

    if counter >= patience:
        print("Early stopping triggered.")
        break

model.load_state_dict(best_model_wts)

                                                                                                                        

Epoch 1: Validation Loss = 0.3883


                                                                                                                        

Epoch 2: Validation Loss = 0.4119


                                                                                                                        

Epoch 3: Validation Loss = 0.2984


                                                                                                                        

Epoch 4: Validation Loss = 0.3092


                                                                                                                        

Epoch 5: Validation Loss = 0.2975


                                                                                                                        

Epoch 6: Validation Loss = 0.3152


                                                                                                                        

Epoch 7: Validation Loss = 0.3273


                                                                                                                        

Epoch 8: Validation Loss = 0.3123


                                                                                                                        

Epoch 9: Validation Loss = 0.2963


                                                                                                                        

Epoch 10: Validation Loss = 0.3099


<All keys matched successfully>

In [13]:
#evaluation
print("\n=== Evaluation on Test Set ===")
test_loader = DataLoader(CustomDataset(test_image_paths, test_labels, get_transforms()), batch_size=32, shuffle=False)
test_results = evaluate_model(model, test_loader)
print(test_results[-1])
print(f"Test Set Accuracy: {test_results[0]*100:.2f}%")



=== Evaluation on Test Set ===


                                                                                                                        

               precision    recall  f1-score   support

      No Mask       0.75      0.65      0.69        51
         Mask       0.94      0.97      0.96       388
Improper Mask       0.83      0.53      0.65        19

     accuracy                           0.92       458
    macro avg       0.84      0.72      0.77       458
 weighted avg       0.91      0.92      0.91       458

Test Set Accuracy: 91.92%




In [14]:
#saving model and params
save_path = projectRoot / "Models"
save_path.mkdir(parents=True, exist_ok=True)

torch.save(model.state_dict(), save_path / "CNN_EfficientNet.pth")

with open(save_path / "CNN_EfficientNet_params.json", "w") as f:
    json.dump(best_params, f, indent=4)

print("\nModel and hyperparameters saved successfully!")



Model and hyperparameters saved successfully!


Development of CNN Model: Major Findings

- **Transfer learning works effectively**: Pre-trained efficientnet_b0 vastly outperforms resnet18 and MobileNetV2, suggesting architecture matters more than model size

- **Class imbalance** still remained an issue: but this was mititgated as much as possible by using augmentation, detection of the minority class ("Improper Mask 2") is still lagging behind, with only 53% recall compared to 97% for the majority class


- **Hyperparameter significance**: Learning rate and freeze ratio were critical, freezing pre-trained layers helped to provide the right balance between feature reuse and adaptation

- **Training efficiency**: Early stopping typically triggered between epochs 5-7, indicating diminishing returns beyond this point and potential for faster training cycles, and this was significantly helpful at reducing training time