In [None]:
import os
import shutil
import random
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import accuracy_score
from torchvision.datasets import ImageFolder
from torchvision import transforms
from torch.utils.data import DataLoader, SubsetRandomSampler

In [None]:
# Split into training and testing

# Load the labels from the Excel file
labels_path = './Facial Database/labels.csv'
labels_df = pd.read_csv(labels_path)

# Split the data into training and testing sets
train_df, test_df = train_test_split(labels_df, test_size=0.2, random_state=42)

train_dir = './database/train'
test_dir = './database/test'

os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

# Create subdirectories for each label
for label in labels_df['label'].unique():
    os.makedirs(os.path.join(train_dir, label), exist_ok=True)
    os.makedirs(os.path.join(test_dir, label), exist_ok=True)

def move_images(dataframe, split_dir):
    for index, row in dataframe.iterrows():
        src_path = os.path.join('./Facial Database', row['pth'])  # Ensure this path is correct
        dst_path = os.path.join(split_dir, row['label'], os.path.basename(row['pth']))  # Use basename to avoid directory duplication
        if os.path.exists(src_path):  # Check if the source file exists
            shutil.copy2(src_path, dst_path)
        else:
            print(f"File {src_path} does not exist.")  # Debugging line to check missing files

# Move images to train and test directories
move_images(train_df, train_dir)
move_images(test_df, test_dir)

print("Files have been distributed into train and test directories.")

Files have been distributed into train and test directories.


In [None]:
# Prepare the data
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
])

dataset = ImageFolder(root='./database/train', transform=transform)

# Set the device
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")

In [None]:
# CNN model
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=9, stride=2)
        self.bn1 = nn.BatchNorm2d(16)
        self.dropout = nn.Dropout(0.4)
        self.conv2 = nn.Conv2d(16, 64, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.dropout = nn.Dropout(0.4)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.dropout = nn.Dropout(0.2)
        self.fc1 = nn.Linear(128*3*3, 8)

    def forward(self, x):
        x = nn.functional.relu(self.bn1(self.conv1(x)))
        x = nn.functional.max_pool2d(x, kernel_size=2, stride=2)
        x = self.dropout(x)
        x = nn.functional.relu(self.bn2(self.conv2(x)))
        x = nn.functional.max_pool2d(x, kernel_size=2, stride=2)
        x = self.dropout(x)
        x = nn.functional.relu(self.bn3(self.conv3(x)))
        x = nn.functional.adaptive_max_pool2d(x, (3, 3))
        x = x.view(x.size(0), -1)
        x = self.dropout(x)
        x = self.fc1(x)
        return x

In [None]:
def train_and_evaluate(model, train_loader, val_loader, criterion, optimizer, epochs=70, patience=10):
    best_val_loss = float('inf')
    best_model_wts = model.state_dict()
    patience_counter = 0

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        train_targets = []
        train_outputs = []

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

            _, preds = torch.max(outputs, 1)
            train_targets.extend(labels.cpu().numpy())
            train_outputs.extend(preds.cpu().numpy())

        train_accuracy = accuracy_score(train_targets, train_outputs)
        train_loss = running_loss / len(train_loader)

        model.eval()
        val_loss = 0.0
        val_targets = []
        val_outputs = []
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                _, preds = torch.max(outputs, 1)
                val_targets.extend(labels.cpu().numpy())
                val_outputs.extend(preds.cpu().numpy())

        val_accuracy = accuracy_score(val_targets, val_outputs)
        val_loss /= len(val_loader)

        print(f'Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_wts = model.state_dict()
            patience_counter = 0
        else:
            patience_counter += 1

        if patience_counter >= patience:
            print("Early stopping")
            break

    model.load_state_dict(best_model_wts)
    return best_val_loss, model


In [8]:
k = 2
kf = KFold(n_splits=k, shuffle=True, random_state=42)
num_classes = len(dataset.classes)
best_val_loss = float('inf')
best_model_path = 'best_model.pth'
batch_size = 64

for fold, (train_idx, val_idx) in enumerate(kf.split(np.arange(len(dataset)))):
    print(f'Fold {fold+1}')

    train_sampler = SubsetRandomSampler(train_idx)
    val_sampler = SubsetRandomSampler(val_idx)

    train_loader = DataLoader(dataset, batch_size=batch_size, sampler=train_sampler)
    val_loader = DataLoader(dataset, batch_size=batch_size, sampler=val_sampler)

    model = CNN().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0025)

    fold_val_loss, trained_model = train_and_evaluate(model, train_loader, val_loader, criterion, optimizer)

    if fold_val_loss < best_val_loss:
        best_val_loss = fold_val_loss
        random_number = random.randint(0000, 9999)
        best_model_path = f'models/model_{random_number}.pth'
        torch.save(trained_model.state_dict(), best_model_path)
        print(f'Saved best model with validation loss: {best_val_loss:.4f}')


Fold 1
Epoch 1, Train Loss: 2.2242, Train Accuracy: 0.1719, Val Loss: 2.3000, Val Accuracy: 0.1952
Epoch 2, Train Loss: 1.9983, Train Accuracy: 0.2348, Val Loss: 1.9963, Val Accuracy: 0.2335
Epoch 3, Train Loss: 1.8464, Train Accuracy: 0.3081, Val Loss: 1.8327, Val Accuracy: 0.2978
Epoch 4, Train Loss: 1.7048, Train Accuracy: 0.3664, Val Loss: 1.8425, Val Accuracy: 0.3196
Epoch 5, Train Loss: 1.5761, Train Accuracy: 0.4099, Val Loss: 1.6031, Val Accuracy: 0.3925
Epoch 6, Train Loss: 1.4613, Train Accuracy: 0.4507, Val Loss: 1.5316, Val Accuracy: 0.4174
Epoch 7, Train Loss: 1.3795, Train Accuracy: 0.4846, Val Loss: 1.3852, Val Accuracy: 0.4893
Epoch 8, Train Loss: 1.3136, Train Accuracy: 0.5123, Val Loss: 1.3739, Val Accuracy: 0.4918
Epoch 9, Train Loss: 1.2625, Train Accuracy: 0.5302, Val Loss: 1.4365, Val Accuracy: 0.4637
Epoch 10, Train Loss: 1.2074, Train Accuracy: 0.5510, Val Loss: 1.2285, Val Accuracy: 0.5394
Epoch 11, Train Loss: 1.1809, Train Accuracy: 0.5563, Val Loss: 1.2262, 