In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import ImageFolder
from facenet_pytorch import InceptionResnetV1
from PIL import Image
import os
import json

# Set device
device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu")

# Load datasets
def load_data(datapath, batch_size=32):
    transform = transforms.Compose([
        transforms.Resize((160, 160)),
        transforms.ToTensor(),
    ])
    # load dataset from both male and female folders
    dataset = ImageFolder(root=datapath, transform=transform)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    return loader

# Custom model class with forward method
class CustomFaceNet(torch.nn.Module):
    def __init__(self, num_classes=2, freeze_layers=0):
        super(CustomFaceNet, self).__init__()
        self.model = InceptionResnetV1(pretrained='vggface2')

        # Freeze layers if specified
        if freeze_layers > 0:
            for param in list(self.model.parameters())[:freeze_layers]:
                param.requires_grad = False

        # Replace the last layer with a new fully connected layer to output two classes (male, female)
        self.fc = torch.nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.model(x)
        return self.fc(x)

# Fine-tune FaceNet
def finetune_facenet(data_loader, num_epochs=5, freeze_layers=0):
    model = CustomFaceNet().to(device)

    # Use CrossEntropyLoss for multi-class classification
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.0001)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.1)

    model.train()
    for epoch in range(num_epochs):
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        # Step the scheduler
        scheduler.step()
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

    # Save the model
    torch.save(model.state_dict(), f'model_freeze_{freeze_layers}.pth')
    return model

# Test the model and validate predictions
def test_model(model, test_images_folder):
    model.eval()
    results = {}
    false_results = []  # To store incorrect predictions

    for img_name in os.listdir(test_images_folder):
        img_path = os.path.join(test_images_folder, img_name)
        image = Image.open(img_path).convert('RGB')
        transform = transforms.Compose([
            transforms.Resize((160, 160)),
            transforms.ToTensor(),
        ])
        image = transform(image).unsqueeze(0).to(device)

        # Ground truth label based on the file name
        true_label = 'F' if img_name.startswith('Female') else 'M'

        with torch.no_grad():
            output = model(image)
            predicted = torch.argmax(output, dim=1).item()
            predicted_label = 'M' if predicted == 1 else 'F'

        # Record results
        results[img_name] = predicted_label

        # Check if the prediction was correct
        if predicted_label != true_label:
            false_results.append({
                'image': img_name,
                'predicted': predicted_label,
                'actual': true_label
            })

    return results, false_results

# Main function
def main():
    data_folder = '/content/drive/MyDrive/CUHK/STAT 6207 Deep Learning/train_data'
    test_images_folder = '/content/drive/MyDrive/CUHK/STAT 6207 Deep Learning/test_data'

    data_loader = load_data(data_folder)

    # Fine-tune with different freezing layers
    results = {}
    all_false_results = {}  # To store false predictions for all freeze_layer settings
    for freeze_layers in [0, 5, 10, 15]:
        model = finetune_facenet(data_loader, num_epochs=5, freeze_layers=freeze_layers)
        # Test and validate the model
        predictions, false_results = test_model(model, test_images_folder)
        results[freeze_layers] = predictions
        all_false_results[freeze_layers] = false_results

    # Save results to JSON
    with open('Basic_task_FaceNet.json', 'w') as json_file:
        json.dump(results, json_file)

    # Optionally: You can also save the false predictions or analyze them further
    # For now, just keeping it in memory for further analysis
    print("False predictions have been stored for further analysis.")
    return all_false_results

if __name__ == "__main__":
    false_predictions = main()