In [2]:
import os
import glob
import random
import cv2
import dlib
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
import torchvision.transforms as transforms
import torchvision.models as models
from torchvision import datasets

In [3]:
# -----------------------------
# 1. Setup dlib: Face Detector & Landmark Predictor
# -----------------------------
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat") 

In [4]:
# -----------------------------
# 2. Define Helper Functions
# -----------------------------
def compute_frontal_score(landmarks):
    """
    Compute the frontal score of a face using eye and nose landmarks.
    A higher score (closer to 1) indicates a more frontal (symmetric) face.
    """
    # Extract left (36-41) and right (42-47) eye landmarks
    left_eye_points = [(landmarks.part(n).x, landmarks.part(n).y) for n in range(36, 42)]
    right_eye_points = [(landmarks.part(n).x, landmarks.part(n).y) for n in range(42, 48)]
    left_eye_center = np.mean(left_eye_points, axis=0).astype(int)
    right_eye_center = np.mean(right_eye_points, axis=0).astype(int)
    eyes_midpoint = (
        (left_eye_center[0] + right_eye_center[0]) // 2,
        (left_eye_center[1] + right_eye_center[1]) // 2
    )
    # Nose tip (landmark 30)
    nose_tip = (landmarks.part(30).x, landmarks.part(30).y)

    # Calculate distance D: perpendicular distance from nose tip to eye line
    line_vector = np.array(right_eye_center) - np.array(left_eye_center)
    line_length = np.linalg.norm(line_vector)
    if line_length == 0:
        return 0
    perpendicular_vector = np.array([-line_vector[1], line_vector[0]])
    D = abs(np.dot((np.array(nose_tip) - np.array(left_eye_center)),
                   perpendicular_vector / line_length))
    # Project the nose tip onto the eye line and compute d (distance from eyes’ midpoint)
    projection_factor = np.dot(np.array(nose_tip) - np.array(left_eye_center), line_vector) / (line_length ** 2)
    nose_projection = np.array(left_eye_center) + projection_factor * line_vector
    d = np.linalg.norm(nose_projection - np.array(eyes_midpoint))
    # Frontal score: higher value indicates a more frontal face
    frontal_score = 1 / (1 + (d / D) ** 2) if D != 0 else 0
    return frontal_score

In [5]:
def split_face(image, landmarks):
    """
    Splits the given face image into left and right halves based on landmarks.
    Returns the two shifted (centered) halves.
    """
    # Compute eye centers
    left_eye_points = [(landmarks.part(n).x, landmarks.part(n).y) for n in range(36, 42)]
    right_eye_points = [(landmarks.part(n).x, landmarks.part(n).y) for n in range(42, 48)]
    left_eye_center = np.mean(left_eye_points, axis=0).astype(int)
    right_eye_center = np.mean(right_eye_points, axis=0).astype(int)
    eyes_midpoint = (
        (left_eye_center[0] + right_eye_center[0]) // 2,
        (left_eye_center[1] + right_eye_center[1]) // 2
    )
    # Nose tip (landmark 30)
    nose_tip = (landmarks.part(30).x, landmarks.part(30).y)

    # Define the eye connecting line
    eye_line = np.array(right_eye_center) - np.array(left_eye_center)
    eye_line_length = np.linalg.norm(eye_line)
    if eye_line_length == 0:
        return None, None

    # Project the nose tip onto the eye line
    projection_factor = np.dot(np.array(nose_tip) - np.array(left_eye_center), eye_line) / (eye_line_length ** 2)
    nose_projection = np.array(left_eye_center) + projection_factor * eye_line

    # Determine the splitting point as the midpoint between the eyes_midpoint and the nose projection
    split_point = (nose_projection + np.array(eyes_midpoint)) / 2
    n = eye_line / eye_line_length  # normalized direction along the eye line

    height, width, _ = image.shape
    # Create a grid of (x,y) coordinates for all pixels in the image
    xx, yy = np.meshgrid(np.arange(width), np.arange(height))
    coords = np.stack([xx, yy], axis=-1)  # shape: (height, width, 2)
    diff = coords - split_point
    # Dot product to decide side: pixels with negative dot belong to the left half
    dot = diff[..., 0] * n[0] + diff[..., 1] * n[1]
    left_mask = (dot < 0).astype(np.uint8) * 255
    right_mask = (dot >= 0).astype(np.uint8) * 255

    # Apply masks to obtain left/right images
    left_image = cv2.bitwise_and(image, image, mask=left_mask)
    right_image = cv2.bitwise_and(image, image, mask=right_mask)

    # (Optional) Recentering each half
    left_nonzero = cv2.findNonZero(left_mask)
    right_nonzero = cv2.findNonZero(right_mask)

    if left_nonzero is not None:
        left_avg = np.mean(left_nonzero, axis=0).astype(int)[0]
        diff_left = width // 2 - left_avg[0]
    else:
        diff_left = 0

    if right_nonzero is not None:
        right_avg = np.mean(right_nonzero, axis=0).astype(int)[0]
        diff_right = right_avg[0] - width // 2
    else:
        diff_right = 0

    M_left = np.float32([[1, 0, diff_left], [0, 1, 0]])
    M_right = np.float32([[1, 0, -diff_right], [0, 1, 0]])

    left_image_shifted = cv2.warpAffine(left_image, M_left, (width, height))
    right_image_shifted = cv2.warpAffine(right_image, M_right, (width, height))

    return left_image_shifted, right_image_shifted

In [6]:
# -----------------------------
# 3. Process the LFW Dataset and Save Processed Images
# -----------------------------
# Fraction of symmetric images to split (e.g., p = 0.3 means 30% of symmetric images will be split)
p = 0.3

# Set the dataset directory (each subfolder is a person with that person's images)
dataset_dir = "./dataset"  # Update this to your LFW dataset folder path

all_images = []
all_labels = []

# Loop over each person's folder
for label in os.listdir(dataset_dir):
    person_dir = os.path.join(dataset_dir, label)
    if not os.path.isdir(person_dir):
        continue

    # Get image files (assuming .jpg and .png)
    image_files = glob.glob(os.path.join(person_dir, "*.jpg")) + glob.glob(os.path.join(person_dir, "*.png"))
    for image_file in image_files:
        image = cv2.imread(image_file)
        if image is None:
            continue

        # Convert image to grayscale for landmark detection
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        faces = detector(gray)
        if len(faces) == 0:
            continue  # Skip if no face is detected

        face = faces[0]  # Assume the first detected face is the target
        landmarks = predictor(gray, face)
        frontal_score = compute_frontal_score(landmarks)

        # For symmetric faces (frontal score >= 0.5), optionally split them into two halves
        if frontal_score >= 0.5:
            if random.random() < p:
                left_img, right_img = split_face(image, landmarks)
                if left_img is not None and right_img is not None:
                    all_images.append(left_img)
                    all_labels.append(label)
                    all_images.append(right_img)
                    all_labels.append(label)
                else:
                    all_images.append(image)
                    all_labels.append(label)
            else:
                all_images.append(image)
                all_labels.append(label)
        else:
            # For asymmetric faces, keep the original image
            all_images.append(image)
            all_labels.append(label)

print("Total processed images:", len(all_images))

Total processed images: 17063


In [7]:
# Save processed images to disk so that you don't need to re-run the preprocessing each time
output_dir = "./processed_dataset"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Create subdirectories for each label
unique_labels = set(all_labels)
for label in unique_labels:
    label_dir = os.path.join(output_dir, label)
    if not os.path.exists(label_dir):
        os.makedirs(label_dir)

# Save each image (using a unique filename)
for idx, (img, label) in enumerate(zip(all_images, all_labels)):
    filename = f"{label}_{idx}.jpg"
    filepath = os.path.join(output_dir, label, filename)
    cv2.imwrite(filepath, img)

print("Processed images have been saved to:", output_dir)


Processed images have been saved to: ./processed_dataset


In [8]:
# -----------------------------
# 4. Load Processed Images Using PyTorch's ImageFolder
# -----------------------------
# Define transforms: resize to 224x224, convert to tensor, and normalize as expected by MobileNetV2
data_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),  # Converts to [0,1] and shape (C, H, W)
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# Use ImageFolder to load the processed dataset (it expects the folder structure: processed_dataset/class_name/images)
dataset = datasets.ImageFolder(root=output_dir, transform=data_transforms)
print("Total images in processed dataset:", len(dataset))


Total images in processed dataset: 29779


In [9]:
# -----------------------------
# 5. Split the Dataset and Create DataLoaders
# -----------------------------
# For example, use an 80/20 split for training and testing
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# -----------------------------
# 6. Build and Train the MobileNetV2 Model Using CUDA
# -----------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Load a pretrained MobileNetV2 model and replace its classifier head
model = models.mobilenet_v2(pretrained=True)
num_features = model.last_channel  # typically 1280
num_classes = len(dataset.classes)
model.classifier[1] = nn.Linear(num_features, num_classes)
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    running_corrects = 0
    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        running_corrects += torch.sum(preds == labels.data)
    epoch_loss = running_loss / len(train_dataset)
    epoch_acc = running_corrects.double() / len(train_dataset)
    print(f"Epoch {epoch+1}/{num_epochs}: Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")


Using device: cuda




Epoch 1/10: Loss: 8.2574 Acc: 0.0222
Epoch 2/10: Loss: 7.7772 Acc: 0.0243
Epoch 3/10: Loss: 7.5643 Acc: 0.0262
Epoch 4/10: Loss: 7.3484 Acc: 0.0319
Epoch 5/10: Loss: 7.0944 Acc: 0.0377
Epoch 6/10: Loss: 6.9320 Acc: 0.0434
Epoch 7/10: Loss: 6.5875 Acc: 0.0529
Epoch 8/10: Loss: 6.2540 Acc: 0.0629
Epoch 9/10: Loss: 5.8814 Acc: 0.0751
Epoch 10/10: Loss: 5.5637 Acc: 0.0823


In [None]:
# -----------------------------
# 7. Evaluate the Model on the Test Set
# -----------------------------
model.eval()
test_loss = 0.0
test_corrects = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        test_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        test_corrects += torch.sum(preds == labels.data)
test_loss = test_loss / len(test_dataset)
test_acc = test_corrects.double() / len(test_dataset)
print(f"Test Loss: {test_loss:.4f} Acc: {test_acc:.4f}")

Test Loss: 6.6848 Acc: 0.0759
