In [1]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import numpy as np
from sklearn.model_selection import train_test_split

In [2]:
# CNN model refer to https://arxiv.org/abs/1609.00408
class FaceCNN(nn.Module):
    def __init__(self, num_classes=40):
        super(FaceCNN, self).__init__()
        
        # Using the sequential model architecture as provided
        self.model = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=1),  # (1)
            nn.LeakyReLU(0.01),                                                             # (2)
            nn.MaxPool2d(kernel_size=2, stride=2),                                          # (3)
            
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1), # (4)
            nn.LeakyReLU(0.01),                                                             # (5)
            nn.MaxPool2d(kernel_size=2, stride=2),                                          # (6)
            
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1),# (7)
            nn.LeakyReLU(0.01),                                                             # (8)
            nn.MaxPool2d(kernel_size=3, stride=3),                                          # (9)
            nn.Flatten(),                                                                   # (10)
            nn.Linear(in_features=128 * 7 * 9, out_features=1024),                                 # (11)
            nn.LeakyReLU(0.01),                                                             # (12)
            nn.Dropout(p=0.5),                                                              # (13)
            nn.Linear(in_features=1024, out_features=40),                                   # (14)
            nn.LogSoftmax(dim=1)                                                            # (15)
        )
        
    def forward(self, x):
        return self.model(x)

In [3]:
# Custom Dataset for loading PGM files
class PGMFaceDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.samples = []
        
        # Parse directory structure
        for class_idx in range(1, 41):  # s1 to s40
            class_dir = os.path.join(root_dir, f"s{class_idx}")
            if os.path.isdir(class_dir):
                for file_name in os.listdir(class_dir):
                    if file_name.endswith('.pgm'):
                        self.samples.append((
                            os.path.join(class_dir, file_name),
                            class_idx - 1  # 0-based index for classes
                        ))
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        
        # Open PGM file with PIL
        image = Image.open(img_path)
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

In [4]:
# Training function with top-1 and top-5 metrics and early stopping
def train_model(model, train_loader, val_loader, criterion, optimizer, 
                num_epochs=10, patience=5, device='cuda'):
    best_acc = 0.0
    best_epoch = -1
    patience_counter = 0
    
    history = {
        'train_loss': [], 'train_top1_acc': [],
        'val_loss': [], 'val_top1_acc': [], 'val_top5_acc': []
    }
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        top1_correct = 0
        total = 0
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            
            # Statistics
            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            top1_correct += (predicted == labels).sum().item()
        
        # Calculate training metrics
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_top1_acc = top1_correct / total
        
        # Save training metrics
        history['train_loss'].append(epoch_loss)
        history['train_top1_acc'].append(epoch_top1_acc)
        
        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_loss:.4f}, Train Top-1 Acc: {epoch_top1_acc:.4f}')
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        val_top1_correct = 0
        val_top5_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item() * inputs.size(0)
                
                # For top-1 accuracy
                _, top1_preds = torch.max(outputs, 1)
                val_top1_correct += (top1_preds == labels).sum().item()
                
                # For top-5 accuracy (or fewer if num_classes < 5)
                k = min(5, outputs.size(1))
                _, top5_preds = torch.topk(outputs, k, dim=1)
                for i, label in enumerate(labels):
                    if label in top5_preds[i]:
                        val_top5_correct += 1
                
                val_total += labels.size(0)
        
        # Calculate validation metrics
        val_epoch_loss = val_loss / len(val_loader.dataset)
        val_top1_acc = val_top1_correct / val_total
        val_top5_acc = val_top5_correct / val_total
        
        # Save validation metrics
        history['val_loss'].append(val_epoch_loss)
        history['val_top1_acc'].append(val_top1_acc)
        history['val_top5_acc'].append(val_top5_acc)
        
        print(f'Epoch {epoch+1}/{num_epochs}, Val Loss: {val_epoch_loss:.4f}, '
              f'Val Top-1 Acc: {val_top1_acc:.4f}, Val Top-5 Acc: {val_top5_acc:.4f}')
        
        # Check if current model is the best
        if val_top1_acc > best_acc:
            best_acc = val_top1_acc
            best_epoch = epoch
            patience_counter = 0  # Reset patience counter
            
            # Save best model
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_top1_acc': val_top1_acc,
                'val_top5_acc': val_top5_acc,
                'train_loss': epoch_loss,
                'val_loss': val_epoch_loss
            }, 'best_face_cnn.pth')
            
            print(f'New best model saved with Val Top-1 Acc: {val_top1_acc:.4f}')
        else:
            patience_counter += 1
            print(f'Patience counter: {patience_counter}/{patience}')
        
        # Check for early stopping
        if patience_counter >= patience:
            print(f'Early stopping triggered after epoch {epoch+1}')
            print(f'Best model was at epoch {best_epoch+1} with Val Top-1 Acc: {best_acc:.4f}')
            break
    
    
    print(f'Training completed after {epoch+1} epochs')
    if best_epoch != epoch:
        print(f'Best model was at epoch {best_epoch+1} with Val Top-1 Acc: {best_acc:.4f}')
    
    # Return the trained model and training history
    return model, history, best_epoch+1

# Evaluation function with top-1 and top-5 accuracy
def evaluate_model(model, test_loader, device='cuda'):
    model.eval()
    top1_correct = 0
    top5_correct = 0
    total = 0
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            
            # For top-1 accuracy
            _, top1_preds = torch.max(outputs, 1)
            top1_correct += (top1_preds == labels).sum().item()
            
            # For top-5 accuracy (or fewer if num_classes < 5)
            k = min(5, outputs.size(1))
            _, top5_preds = torch.topk(outputs, k, dim=1)
            for i, label in enumerate(labels):
                if label in top5_preds[i]:
                    top5_correct += 1
            
            total += labels.size(0)
    
    top1_acc = top1_correct / total
    top5_acc = top5_correct / total
    
    print(f'Test Top-1 Accuracy: {top1_acc:.4f} ({top1_correct}/{total})')
    print(f'Test Top-5 Accuracy: {top5_acc:.4f} ({top5_correct}/{total})')
    
    return top1_acc, top5_acc

In [15]:
# Main function
def main():
    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Set hyperparameters
    batch_size = 32
    num_epochs = 20
    learning_rate = 0.001
    patience = 7
    
    # Define transformations
    transform = transforms.Compose([
        transforms.Grayscale(num_output_channels=1),  # Ensure grayscale
        #transforms.Resize((112, 112)),  # Adjust based on your dataset to make in_features=8064 work
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize grayscale images
    ])
    
    # Create dataset
    #os.path.join("..", "dp_pixelized_faces", f"block_size_2", "epsilon_0.1")
    #root_dir = os.path.join("..", 'gaussian_faces', "kernel_size_21")  # Update this to your dataset path
    root_dir = os.path.join("..", "dp_pixelized_faces", f"block_size_4", "epsilon_1")
    dataset = PGMFaceDataset(root_dir=root_dir, transform=transform)
    
    # Split dataset
    train_idx, temp_idx = train_test_split(list(range(len(dataset))), test_size=0.3, stratify=[dataset.samples[i][1] for i in range(len(dataset))])
    val_idx, test_idx = train_test_split(temp_idx, test_size=0.5, stratify=[dataset.samples[i][1] for i in temp_idx])
    
    train_dataset = torch.utils.data.Subset(dataset, train_idx)
    val_dataset = torch.utils.data.Subset(dataset, val_idx)
    test_dataset = torch.utils.data.Subset(dataset, test_idx)
    
    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    
    # Create model, loss function, and optimizer
    model = FaceCNN(num_classes=40).to(device)
    criterion = nn.NLLLoss()  # Since we're using LogSoftmax as the final layer
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    # Train the model with early stopping
    model, training_history, best_epoch = train_model(
        model, 
        train_loader, 
        val_loader, 
        criterion, 
        optimizer, 
        num_epochs=num_epochs,  # Set a high maximum, early stopping will likely trigger before this
        patience=patience,     # Stop after 7 epochs without improvement
        device=device
    )
    
    # Evaluate the model with enhanced metrics
    print("\n=== Final Model Evaluation ===")
    top1_acc, top5_acc = evaluate_model(model, test_loader, device)

    # evaluate the best saved model
    best_model = FaceCNN(num_classes=40).to(device)
    checkpoint = torch.load('best_face_cnn.pth')
    best_model.load_state_dict(checkpoint['model_state_dict'])
    print("\n=== Best Model Evaluation ===")
    best_top1_acc, best_top5_acc = evaluate_model(best_model, test_loader, device)

if __name__ == "__main__":
    main()

Using device: cuda
Epoch 1/20, Train Loss: 3.7460, Train Top-1 Acc: 0.0179
Epoch 1/20, Val Loss: 3.6725, Val Top-1 Acc: 0.0333, Val Top-5 Acc: 0.2000
New best model saved with Val Top-1 Acc: 0.0333
Epoch 2/20, Train Loss: 3.6588, Train Top-1 Acc: 0.0429
Epoch 2/20, Val Loss: 3.5135, Val Top-1 Acc: 0.1500, Val Top-5 Acc: 0.4833
New best model saved with Val Top-1 Acc: 0.1500
Epoch 3/20, Train Loss: 3.1629, Train Top-1 Acc: 0.2107
Epoch 3/20, Val Loss: 2.2849, Val Top-1 Acc: 0.4000, Val Top-5 Acc: 0.7333
New best model saved with Val Top-1 Acc: 0.4000
Epoch 4/20, Train Loss: 2.2192, Train Top-1 Acc: 0.3679
Epoch 4/20, Val Loss: 1.5426, Val Top-1 Acc: 0.6833, Val Top-5 Acc: 0.9333
New best model saved with Val Top-1 Acc: 0.6833
Epoch 5/20, Train Loss: 1.5142, Train Top-1 Acc: 0.5643
Epoch 5/20, Val Loss: 1.1281, Val Top-1 Acc: 0.6833, Val Top-5 Acc: 0.9000
Patience counter: 1/7
Epoch 6/20, Train Loss: 0.9851, Train Top-1 Acc: 0.7036
Epoch 6/20, Val Loss: 0.6621, Val Top-1 Acc: 0.8167, Val

## Test the trained model performance on original face dataset (without blur nor pixelized)

In [None]:
# Function to test the model on a single image
def test_single_image(model, image_path, transform, class_names=None, device='cuda'):
    # Set model to evaluation mode
    model.eval()
    
    try:
        # Load the image
        image = Image.open(image_path)
        print(f"Successfully loaded image: {image_path}")
        print(f"Original image size: {image.size}")
        
        # Apply transformations
        if transform:
            image_tensor = transform(image)
            print(f"Transformed tensor shape: {image_tensor.shape}")
        
        # Add batch dimension
        image_tensor = image_tensor.unsqueeze(0).to(device)
        
        # Get prediction
        with torch.no_grad():
            output = model(image_tensor)
            probabilities = torch.exp(output)
            
            # Get top 3 predictions
            topk_probs, topk_indices = torch.topk(probabilities, 3)
            
            # Convert to numpy for easier handling
            topk_probs = topk_probs.cpu().numpy()[0]
            topk_indices = topk_indices.cpu().numpy()[0]
            
            # Print results
            print("\nTop 3 predictions:")
            for i in range(3):
                class_idx = topk_indices[i]
                prob = topk_probs[i]
                class_name = f"Class {class_idx}" if class_names is None else class_names[class_idx]
                print(f"{i+1}. {class_name} - Probability: {prob:.4f} ({prob*100:.2f}%)")
            
            # Return the top prediction
            top_class_idx = topk_indices[0]
            top_prob = topk_probs[0]
            
            return top_class_idx, top_prob
            
    except Exception as e:
        print(f"Error testing image {image_path}: {e}")
        import traceback
        traceback.print_exc()
        return None, None
# Function to test a pre-trained model on a single image (can be called from outside)
def test_pretrained_model(model_path, image_path):
    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Create transformation
    transform = transforms.Compose([
        transforms.Grayscale(num_output_channels=1),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5], std=[0.5])
    ])
    
    # Create class names dictionary
    class_names = {}
    for i in range(40):
        class_names[i] = f"Subject {i+1}"
    
    # Load the model
    model = FaceCNN(num_classes=40).to(device)
    try:
        checkpoint = torch.load('best_face_cnn.pth')
        model.load_state_dict(checkpoint['model_state_dict'])
        #model.load_state_dict(torch.load(model_path, map_location=device))
        print(f"Successfully loaded model from {model_path}")
    except Exception as e:
        print(f"Error loading model: {e}")
        return
    
    # Test the image
    class_idx, prob = test_single_image(model, image_path, transform, class_names, device)
    
    if class_idx is not None:
        print(f"\nFinal prediction: {class_names[class_idx]} with probability {prob:.4f}")

In [None]:
test_pretrained_model("best_face_cnn.pth", os.path.join("..", "att_faces", "s1", "2.pgm"))