In [33]:
import os
import cv2
import math
import numpy as np
import pandas as pd
from tqdm import tqdm
from retinaface import RetinaFace
from PIL import Image


def align_face(image, landmarks, target_size=(224, 224)):
    """
    Align face using eye landmarks and crop to the bounding box.
    Args:
        image (numpy array): Input image.
        landmarks (dict): Detected landmarks with 'left_eye' and 'right_eye'.
        target_size (tuple): Desired output size for the aligned face.
    Returns:
        numpy array: Aligned and resized face.
    """
    try:
        left_eye = np.array(landmarks["left_eye"])
        right_eye = np.array(landmarks["right_eye"])

        # Compute angle between eyes
        delta_y = right_eye[1] - left_eye[1]
        delta_x = right_eye[0] - left_eye[0]
        angle = np.degrees(np.arctan2(delta_y, delta_x))

        # Compute center between eyes
        eye_center = (left_eye + right_eye) / 2
        rotation_matrix = cv2.getRotationMatrix2D(tuple(eye_center), angle, scale=1)

        # Rotate the entire image
        rotated_img = cv2.warpAffine(image, rotation_matrix, (image.shape[1], image.shape[0]))

        return rotated_img
    except Exception as e:
        print(f"Error aligning face: {e}")
        return None


def detect_and_save_faces(file_path, output_dir, names, target_size=(224, 224)):
    """
    Detect faces, align using landmarks, and save them in folders named after people.
    Args:
        file_path (str): Path to the image.
        output_dir (str): Path to save processed images.
        names (list): List of names corresponding to detected faces.
    """
    try:
        # Read image
        img = cv2.imread(file_path)
        if img is None:
            print(f"Unable to read {file_path}. Skipping...")
            return

        # Detect faces with RetinaFace
        faces = RetinaFace.detect_faces(file_path)

        if not faces:
            print(f"No faces detected in {file_path}. Skipping...")
            return

        # Sort detected faces by their x-coordinates
        sorted_faces = sorted(faces.items(), key=lambda item: item[1]["facial_area"][0])

        # Ensure face count matches the number of names
        if len(sorted_faces) != len(names):
            print(f"Mismatch in face count and names for {file_path}. Skipping...")
            return

        # Process and save each face
        for name, (key, face_data) in zip(names, sorted_faces):
            landmarks = face_data["landmarks"]
            aligned_face = align_face(img, landmarks, target_size)

            if aligned_face is None:
                continue

            # Crop the face bounding box
            x1, y1, x2, y2 = face_data["facial_area"]
            cropped_face = aligned_face[y1:y2, x1:x2]

            # Resize to target size
            resized_face = cv2.resize(cropped_face, target_size)

            # Save aligned face
            person_dir = os.path.join(output_dir, name)
            os.makedirs(person_dir, exist_ok=True)
            output_file_path = os.path.join(person_dir, os.path.basename(file_path))
            Image.fromarray(cv2.cvtColor(resized_face, cv2.COLOR_BGR2RGB)).save(output_file_path)

    except Exception as e:
        print(f"Error processing {file_path}: {e}")


def preprocess_faces(dataset_dir, output_dir, csv_path, batch_size=16):
    """
    Preprocess dataset images by detecting, aligning, and saving faces.
    Args:
        dataset_dir (str): Path to the dataset.
        output_dir (str): Path to save processed images.
        csv_path (str): Path to the CSV file with image-to-names mapping.
        batch_size (int): Number of images to process per batch.
    """
    os.makedirs(output_dir, exist_ok=True)

    # Load names mapping from CSV
    names_data = pd.read_csv(csv_path)
    names_mapping = {
        str(row["image"]): row["label_name"].split(";")
        for _, row in names_data.iterrows()
    }

    # Collect image paths
    image_files = [os.path.join(root, file)
                   for root, _, files in os.walk(dataset_dir)
                   for file in files if file.endswith(".jpg")]

    # Process images in batches
    for i in tqdm(range(0, len(image_files), batch_size), desc="Processing batches"):
        batch_files = image_files[i:i + batch_size]
        for file_path in batch_files:
            # Extract image name without leading zeros
            img_name = os.path.basename(file_path).split('.')[0].lstrip("0")
            if img_name not in names_mapping:
                continue  # Skip if not in CSV

            names = names_mapping[img_name]
            detect_and_save_faces(file_path, output_dir, names)


# Example usage
dataset_dir = "../data/images/cleaned_images"
output_dir = "../data/faces5_train"
csv_path = "../data/labels/clean_data.csv"

preprocess_faces(dataset_dir, output_dir, csv_path, batch_size=16)


Processing batches:   0%|          | 0/46 [00:00<?, ?it/s]

Mismatch in face count and names for ../data/images/cleaned_images/0195.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0601.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0703.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0387.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0710.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0235.jpg. Skipping...
No faces detected in ../data/images/cleaned_images/0151.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0281.jpg. Skipping...


Processing batches:   2%|▏         | 1/46 [00:15<11:19, 15.10s/it]

No faces detected in ../data/images/cleaned_images/0155.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0290.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0190.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0626.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0367.jpg. Skipping...
No faces detected in ../data/images/cleaned_images/0042.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0616.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0600.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0545.jpg. Skipping...


Processing batches:   4%|▍         | 2/46 [00:30<11:07, 15.16s/it]

Mismatch in face count and names for ../data/images/cleaned_images/0389.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0424.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0477.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0261.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0399.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0475.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0623.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0338.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0738.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0468.jpg. Skipping...


Processing batches:   7%|▋         | 3/46 [00:45<10:56, 15.26s/it]

Mismatch in face count and names for ../data/images/cleaned_images/0342.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0345.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0364.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0751.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0102.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0380.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0147.jpg. Skipping...


Processing batches:   9%|▊         | 4/46 [01:00<10:41, 15.28s/it]

Mismatch in face count and names for ../data/images/cleaned_images/0238.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0415.jpg. Skipping...
No faces detected in ../data/images/cleaned_images/0069.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0724.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0303.jpg. Skipping...
No faces detected in ../data/images/cleaned_images/0829.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0663.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0263.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0609.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0288.jpg. Skipping...
Mismatch in face count and names for ../data/images/cleaned_images/0718.jpg. Skipping...
Mismatch in face count and names for ../data/images/c

Processing batches:  11%|█         | 5/46 [01:18<10:44, 15.72s/it]


KeyboardInterrupt: 

In [31]:
def load_images(image_folder, label_map=None):
    images = []
    image_labels = []

    for filename in os.listdir(image_folder):
        img_path = os.path.join(image_folder, filename)
        img = cv2.imread(img_path)

        if img is not None:
            images.append((filename, img))
            if label_map:
                image_labels.append(label_map.get(filename, []))  # Default to empty list for test images

    return images, image_labels if label_map else None

In [32]:
import pandas as pd
# Example usage
data_folder = "../data/images/cleaned_images"
output_folder = "../data/faces5_train"
label_csv_path = "../data/labels/clean_data.csv"
batch_size = 50

# Load label data
label_data = pd.read_csv(label_csv_path)
label_data['label_name'] = label_data['label_name'].apply(eval)  # Convert string to list
label_map = dict(zip(label_data['image'].astype(str).str.zfill(4) + ".jpg", label_data['label_name']))

# Load training images and labels
images, labels = load_images(data_folder, label_map=label_map)


extract_and_save_faces(images, labels, output_folder, batch_size)


Processing 0032.jpg...
Detections: {}
Processing 0195.jpg...
Detections: {'face_1': {'score': np.float64(0.9996499419212341), 'facial_area': [np.int64(282), np.int64(142), np.int64(407), np.int64(311)], 'landmarks': {'right_eye': [np.float32(310.58563), np.float32(210.92842)], 'left_eye': [np.float32(370.72183), np.float32(210.68129)], 'nose': [np.float32(338.374), np.float32(243.73334)], 'mouth_right': [np.float32(317.30133), np.float32(270.83136)], 'mouth_left': [np.float32(365.88617), np.float32(271.02014)]}}, 'face_2': {'score': np.float64(0.9977530837059021), 'facial_area': [np.int64(169), np.int64(241), np.int64(186), np.int64(259)], 'landmarks': {'right_eye': [np.float32(172.85423), np.float32(249.6075)], 'left_eye': [np.float32(180.10976), np.float32(249.28572)], 'nose': [np.float32(175.98828), np.float32(254.03227)], 'mouth_right': [np.float32(174.8612), np.float32(256.62875)], 'mouth_left': [np.float32(179.8528), np.float32(256.29584)]}}}
Processing 0569.jpg...
Detections: {'

KeyboardInterrupt: 