In [1]:
import os
import re
import time
import copy
import optuna

from pathlib import Path
from collections import defaultdict
import matplotlib.pyplot as plt
from PIL import Image
import seaborn as sns
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix
from sklearn.model_selection import train_test_split

import torch
from torch.cuda.amp import autocast, GradScaler
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

import torchvision
from torchvision import models, datasets, transforms


print("PyTorch Version: ",torch.__version__)
print("Torchvision Version: ",torchvision.__version__)
print(torch.cuda.is_available())

  from .autonotebook import tqdm as notebook_tqdm


PyTorch Version:  2.1.0+cu118
Torchvision Version:  0.16.0+cu118
True


input dir

In [2]:
CURRENT_DIR = os.getcwd()
MAIN_FOLDER = Path(CURRENT_DIR).parent
OUTPUT_FOLDER = os.path.join(CURRENT_DIR, 'augmented_dataset')  
FOLD_DATA = os.path.join(CURRENT_DIR, 'fold_data') 

BATCH_SIZE = 32
NUM_CLASSES = 1

DEVICE = torch.device("cuda")


Data processing

In [3]:
# Data Transforms
def get_data_transforms():
    normalize = [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]

    return {
        'train': transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(*normalize)
        ]),
        'val': transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(*normalize)
        ]),
        'test': transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(*normalize)
        ]),
    }

data_transforms = get_data_transforms()

In [5]:
class BasicImageDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, torch.tensor(self.labels[idx], dtype=torch.float32)

In [4]:
def load_gender_dict_from_txt(fold_data_dir):
    gender_dict = {}
    for fname in os.listdir(fold_data_dir):
        if not fname.endswith('.txt'):
            continue
        with open(os.path.join(fold_data_dir, fname), 'r') as f:
            next(f)  # skip header
            for line in f:
                parts = line.strip().split('\t')
                if len(parts) < 5:
                    continue
                user_id = parts[0]
                gender = parts[4].lower()
                if gender in ['m', 'f']:
                    gender_dict[user_id] = gender
    return gender_dict

In [None]:
def collect_images_and_labels(split_folder, gender_dict):
    image_paths, labels = [], []
    for person_id in os.listdir(split_folder):
        person_path = os.path.join(split_folder, person_id)
        if not os.path.isdir(person_path):
            continue
        gender = gender_dict.get(person_id)
        if gender not in ['m', 'f']:
            continue
        label = 0 if gender == 'm' else 1
        for img_file in os.listdir(person_path):
            full_path = os.path.join(person_path, img_file)
            if os.path.isfile(full_path):
                image_paths.append(full_path)
                labels.append(label)
    return image_paths, labels

In [6]:
def get_dataloaders(main_image_folder, fold_data_dir, batch_size, transforms):
    gender_dict = load_gender_dict_from_txt(fold_data_dir)

    data_splits = {}
    for split in ['train', 'val', 'test']:
        folder = os.path.join(main_image_folder, split)
        image_paths, labels = collect_images_and_labels(folder, gender_dict)
        dataset = BasicImageDataset(image_paths, labels, transform=transforms[split])
        data_splits[split] = DataLoader(dataset, batch_size=batch_size, shuffle=(split == 'train'), num_workers=0)

        print(f"{split.upper()} — {len(dataset)} images")

    return data_splits

Train

In [7]:
# intialize model resnet18
def load_model(num_classes):
    model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, num_classes)
    model.num_classes = num_classes

    for param in model.parameters():
        param.requires_grad = True

    return model.to(DEVICE)

In [8]:
def train_model(model, dataloaders, optimizer, num_epochs=25, patience=5, trial=None):
    criterion = nn.BCEWithLogitsLoss()
    scaler = GradScaler() 

    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float("inf")
    epochs_no_improve = 0
    history = {
        'train_loss': [], 'val_loss': [],
        'train_acc': [], 'val_acc': [],
        'train_prec': [], 'val_prec': [],
        'train_rec': [], 'val_rec': [],
        'train_f1': [], 'val_f1': [],
    }

    for epoch in range(num_epochs):
        print(f'\nEpoch {epoch + 1}/{num_epochs}')
        for phase in ['train', 'val']:
            model.train() if phase == 'train' else model.eval()
            running_loss, running_corrects = 0.0, 0
            all_preds, all_labels = [], []

            for inputs, labels in dataloaders[phase]:
                inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    with autocast():  
                        outputs = model(inputs)
                        outputs = outputs.view(-1)
                        labels = labels.view(-1)
                        loss = criterion(outputs, labels)
                        preds = (outputs > 0.5).long()

                    if phase == 'train':
                        scaler.scale(loss).backward()             
                        scaler.step(optimizer)                   
                        scaler.update()                         

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = accuracy_score(all_labels, all_preds)
            epoch_prec = precision_score(all_labels, all_preds, zero_division=0)
            epoch_rec = recall_score(all_labels, all_preds, zero_division=0)
            epoch_f1 = f1_score(all_labels, all_preds, zero_division=0)

            history[f'{phase}_loss'].append(epoch_loss)
            history[f'{phase}_acc'].append(epoch_acc)
            history[f'{phase}_prec'].append(epoch_prec)
            history[f'{phase}_rec'].append(epoch_rec)
            history[f'{phase}_f1'].append(epoch_f1)

            print(f"{phase.upper()} — Loss: {epoch_loss:.4f} | Acc: {epoch_acc:.4f} | "
                  f"Prec: {epoch_prec:.4f} | Rec: {epoch_rec:.4f} | F1: {epoch_f1:.4f}")

            if phase == 'val':
                if trial is not None:
                    trial.report(epoch_acc, epoch)
                    if trial.should_prune():
                        print("Trial pruned.")
                        raise optuna.TrialPruned()

                if epoch_loss < best_loss:
                    best_loss = epoch_loss
                    best_model_wts = copy.deepcopy(model.state_dict())
                    epochs_no_improve = 0
                else:
                    epochs_no_improve += 1

        if epochs_no_improve >= patience:
            print("Early stopping triggered.")
            break
        
    # Plot val acc and val loss
    def plot_training_curves(history):
        plt.figure(figsize=(12, 5))

        # Accuracy
        plt.subplot(1, 2, 1)
        plt.plot(history['val_acc'], label='Validation Accuracy')
        plt.xlabel("Epoch")
        plt.ylabel("Accuracy")
        plt.title("Validation Accuracy over Epochs")
        plt.legend()

        # Loss
        plt.subplot(1, 2, 2)
        plt.plot(history['val_loss'], label='Validation Loss', color='orange')
        plt.xlabel("Epoch")
        plt.ylabel("Loss")
        plt.title("Validation Loss over Epochs")
        plt.legend()

        plt.tight_layout()
        plt.show()

    plot_training_curves(history)
    model.load_state_dict(best_model_wts)
    print(f"\nTraining complete — Best Val Loss: {best_loss:.4f}")
    return model, history


In [9]:
def objective(trial):
    lr = trial.suggest_float("lr", 1e-6, 1e-3, log=True)
    weight_decay = trial.suggest_float("weight_decay", 1e-6, 1e-3, log=True)
    batch_size = trial.suggest_categorical("batch_size", [32, 64, 128, 256])

    print(f"\n--- Starting Trial {trial.number} ---")
    print(f"Learning Rate: {lr}")
    print(f"Weight Decay: {weight_decay}")
    print(f"Batch Size: {batch_size}\n")

    dataloaders = get_dataloaders(
        main_image_folder=OUTPUT_FOLDER,
        fold_data_dir=FOLD_DATA,
        batch_size=batch_size,
        transforms=data_transforms
    )

    model = load_model(NUM_CLASSES)
    params_to_update = [p for p in model.parameters() if p.requires_grad]
    optimizer = optim.Adam(params_to_update, lr=lr, weight_decay=weight_decay)

    model, history = train_model(model, dataloaders, optimizer, num_epochs=25, trial=trial)

    best_val_acc = max(history['val_acc'])

    return best_val_acc

In [10]:
# Hyperparameter Optimization Optuna
study = optuna.create_study(direction="maximize")  
study.optimize(objective, n_trials=20)

print("Best trial:")
trial = study.best_trial
print(f"  Val Accuracy: {trial.value:.4f}")
for key, value in trial.params.items():
    print(f"    {key}: {value}")

[I 2025-04-02 21:30:40,236] A new study created in memory with name: no-name-9a305c7a-d071-4e7f-8abf-ddb2a83d15ef



--- Starting Trial 0 ---
Learning Rate: 0.0003552567400665288
Weight Decay: 0.0001575501166479809
Batch Size: 128

Reading fold file: fold_0_data.txt
Reading fold file: fold_1_data.txt
Reading fold file: fold_2_data.txt
Reading fold file: fold_3_data.txt
Reading fold file: fold_4_data.txt
Train size: 149036, Val size: 37259, Test size: 20700

Epoch 1/25
TRAIN — Loss: 0.5736 | Acc: 0.6850 | Prec: 0.7621 | Rec: 0.6231 | F1: 0.6856
VAL — Loss: 0.5669 | Acc: 0.7292 | Prec: 0.7459 | Rec: 0.7716 | F1: 0.7585

Epoch 2/25
TRAIN — Loss: 0.5313 | Acc: 0.7121 | Prec: 0.7929 | Rec: 0.6465 | F1: 0.7123
VAL — Loss: 0.5379 | Acc: 0.7265 | Prec: 0.7605 | Rec: 0.7354 | F1: 0.7477

Epoch 3/25


[W 2025-04-02 21:45:08,239] Trial 0 failed with parameters: {'lr': 0.0003552567400665288, 'weight_decay': 0.0001575501166479809, 'batch_size': 128} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "/home/user/mambaforge/envs/training/lib/python3.10/site-packages/optuna/study/_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
  File "/tmp/ipykernel_5399/1732800389.py", line 17, in objective
    model, history = train_model(model, dataloaders, optimizer, num_epochs=25, trial=trial)
  File "/tmp/ipykernel_5399/3080167636.py", line 23, in train_model
    for inputs, labels in dataloaders[phase]:
  File "/home/user/mambaforge/envs/training/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 630, in __next__
    data = self._next_data()
  File "/home/user/mambaforge/envs/training/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 674, in _next_data
    data = self._dataset_fetcher.fetch(index) 

KeyboardInterrupt: 

test evaluation

In [None]:
def evaluate_model(model, dataloader, device=DEVICE):
    model.eval()
    criterion = nn.BCEWithLogitsLoss()
    
    running_loss, running_corrects = 0.0, 0
    all_preds, all_labels = [], []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)

            with autocast():
                outputs = model(inputs)
                outputs = outputs.view(-1)
                labels = labels.view(-1)
                loss = criterion(outputs, labels)
                preds = (torch.sigmoid(outputs) > 0.5).long()

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    dataset_size = len(dataloader.dataset)
    test_loss = running_loss / dataset_size
    test_acc = accuracy_score(all_labels, all_preds)
    test_prec = precision_score(all_labels, all_preds, zero_division=0)
    test_rec = recall_score(all_labels, all_preds, zero_division=0)
    test_f1 = f1_score(all_labels, all_preds, zero_division=0)

    print(f"Test Evaluation:")
    print(f"Loss: {test_loss:.4f}")
    print(f"Accuracy: {test_acc:.4f}")
    print(f"Precision: {test_prec:.4f}")
    print(f"Recall: {test_rec:.4f}")
    print(f"F1 Score: {test_f1:.4f}")

    return {
        "loss": test_loss,
        "accuracy": test_acc,
        "precision": test_prec,
        "recall": test_rec,
        "f1": test_f1
    }

In [None]:
def get_test_loader(batch_size):
    test_image_paths, test_labels = load_folds_dataset(OUTPUT_FOLDER, FOLD_DATA, ["fold_4_data.txt"])
    test_dataset = BasicImageDataset(test_image_paths, test_labels, transform=data_transforms['val'])
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
    print(f"Test size: {len(test_dataset)}")
    return test_loader

In [None]:
# After Optuna
best_model = load_model(NUM_CLASSES)
best_model.load_state_dict(torch.load("best_model.pth"))  # if saved
best_model.to(DEVICE)

test_loader = get_test_loader(batch_size=study.best_trial.params['batch_size'])
evaluate_model(best_model, test_loader)

In [None]:
# # Save the model
# torch.save(model, 'efficientnet_test.pth')
# print("Model saved as efficientnet_test.pth")