**IMPORTANT NOTE**: Please edit the paths according to your needs.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Testing dataset directory which includes the labels.txt file in it
test_data_path = "/content/drive/MyDrive/test"

# Training dataset directory path
# ***IMPORTANT***: The one with all the people's names as the sub-directories
training_data_path = "/content/drive/MyDrive/Release"

# Saved models directory path
saved_models_dir = "/content/drive/MyDrive/saved_models"

# Location of face detection DNN pre-trained model files
# dnn_prototxt_path = "/content/drive/MyDrive/pre_trained_files/deploy.prototxt"
# dnn_model_path = "/content/drive/MyDrive/pre_trained_files/res10_300x300_ssd_iter_140000.caffemodel"


# Utility Functions

Imports

In [None]:
import os
import cv2
import pathlib
import random
import shutil
import numpy as np
from PIL import Image

from collections import defaultdict

from typing import Tuple, Optional, List

from google.colab.patches import cv2_imshow

!pip3 install pillow-heif
from pillow_heif import register_heif_opener

import tensorflow as tf
from tensorflow.keras import layers, models, optimizers

from keras.preprocessing.image import ImageDataGenerator

from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB, MultinomialNB
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import distance_metrics
from sklearn.preprocessing import LabelEncoder

!pip3 install joblib
import joblib

Data augmentation

In [None]:
def augment_image(
    image: np.ndarray,
    num_augs: int = 5
) -> List[np.ndarray]:
    """Generates and returns a number of augmented images plus the orginal"""
    # Initialize parameter ranges for augmentation
    datagen = ImageDataGenerator(
        rotation_range=10,
        width_shift_range=0.2,
        height_shift_range=0.2,
        shear_range=0.2,
        zoom_range=0.2,
        horizontal_flip=True,
        fill_mode='nearest'
    )

    # Store the augmented images, including the original image
    aug_images = [image]

    image = image.reshape((1,) + image.shape)
    # Apply of the data augmentation on the image
    for i, batch in enumerate(datagen.flow(image, batch_size=1)):
        aug_image = batch[0].astype('uint8')
        aug_images.append(aug_image)

        # Break after generating the specified number of augmentations
        if i == num_augs - 1:
            break

    return aug_images


Face detection and cropping


In [None]:
def get_face_from_image_fc(
    input_image: np.ndarray,
    face_cascade: Optional[cv2.CascadeClassifier] = None
) -> np.ndarray:
    """Detects and crops and image of a face"""
    # Convert image to gray scale to reduce affects from other factors
    gray_image = cv2.cvtColor(input_image, cv2.COLOR_BGR2GRAY)
    # Histogram equalization
    gray_image = cv2.equalizeHist(gray_image)

    # Detect faces in the image
    if face_cascade is None:
        face_cascade = cv2.CascadeClassifier(
        cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
    )
    faces = face_cascade.detectMultiScale(
        gray_image,
        scaleFactor=1.03,
        minNeighbors=5,
        minSize=(30, 30)
    )

    # No faces are detected
    if len(faces) == 0:
        print("No faces detected in the image. Returning original image.")
        return input_image

    if len(faces)>1:
        print('Multiple faces detected. Returning first face detected.')
        # print('Extra face boundaries: ', faces[1:])

    # Get the coordinates of the first face detected
    x, y, w, h = faces[0]

    # Crop the image to center it on the face detected
    cropped_image = input_image[y:y+h, x:x+w]

    # Return cropped image
    return cropped_image


def get_face_from_image_dnn(
    input_image: np.ndarray,
    prototxt_path: str,
    model_path: str,
    confidence_threshold: float = 0.15,
    relative_change_threshold: float = 0.3
) -> np.ndarray:
    """Detects and crops an image of a face using a dnn model."""

    # Load the pre-trained dnn model
    net = cv2.dnn.readNetFromCaffe(prototxt_path, model_path)

    # Prepare the image for face detection
    blob = cv2.dnn.blobFromImage(
        image=cv2.resize(input_image, (300, 300)),
        scalefactor=1.0,
        size=(300, 300),
        mean=(104.0, 177.0, 123.0)
    )
    net.setInput(blob)

    # Run the forward pass to get face detections
    detections = net.forward()

    # Sort detections based on confidence (descending order)
    sorted_detections = sorted(
        range(detections.shape[2]),
        key=lambda i: detections[0, 0, i, 2],
        reverse=True
    )

    # Get most confident face and crop using its bounding box
    most_confident = detections[0, 0, sorted_detections[0], 2]
    if most_confident >= confidence_threshold:
        box = detections[0, 0, sorted_detections[0], 3:7] * np.array(
            [input_image.shape[1],
             input_image.shape[0],
             input_image.shape[1],
             input_image.shape[0]])
        (x, y, w, h) = box.astype("int")

        # Draw the bounding box on the original image
        # cv2.rectangle(input_image, (x, y), (x+w, y+h), (0, 255, 0), 1)
        # cv2_imshow(input_image)

        # Crop the image to center it on the face detected
        cropped_image = input_image[y:y+h, x:x+w]

        # Return image based on relative change (size) of input_image size
        relative_change_size = abs((cropped_image.size-input_image.size) / input_image.size)
        if relative_change_size <= relative_change_threshold:
            return cropped_image

    else:
        # No faces detected
        print("No faces detected in the image. Returning original image.")
        return input_image

    # Keep recursing until relative change (size) conditions are met
    return get_face_from_image_dnn(cropped_image)


Split dataset for training and validation

In [None]:
def get_data_and_label(input_dir: str) -> Tuple[np.ndarray, List[str]]:
    """Iterate through input directory and get the images and labels"""
    images = []
    labels = []
    for img in pathlib.Path(input_dir).iterdir():
        # Read the image and label
        image = cv2.imread(str(img))
        image = cv2.resize(image, (224, 224))
        image = image.flatten()
        images.append(image)
        name = img.stem.split('_')[0]
        labels.append(name)
    images_arr = np.array(images)

    return images_arr, labels


def group_files_by_name(file_names: List[str]) -> dict:
    grouped_files = {}
    for file_name in file_names:
        base_name, _ = str(file_name).rsplit('_', 1)
        if base_name in grouped_files:
            grouped_files[base_name].append(file_name)
        else:
            grouped_files[base_name] = [file_name]

    return grouped_files


def split_images_train_and_val(
    source_dir: str,
    output_dir: str,
    split_ratio: float = 0.8
) -> None:
    # Ensure the source directory exists
    # source_path = pathlib.Path(source_dir)
    if not source_dir.is_dir():
        raise FileNotFoundError(f"Source directory '{source_dir}' not found.")

    # Create train and validation directories within the source directory
    # output_path = pathlib.Path(output_dir)
    if output_dir.is_dir():
        print(f"{output_dir} output directory exist already, doing nothing.")
        return
    output_dir.mkdir(exist_ok=False)
    train_path = output_dir.joinpath('train')
    val_path = output_dir.joinpath('validation')
    train_path.mkdir(exist_ok=True)
    val_path.mkdir(exist_ok=True)

    # Get a list of all image files (should all be .png format)
    image_files = list(source_dir.glob('*.png'))

    # Create a dict to group all the images based on label
    grouped_image_files = group_files_by_name(image_files)

    # For each class, split the images into training and validation
    for base_name, files in grouped_image_files.items():
        # Randomly split the images
        num_train = int(len(files) * split_ratio)
        random.shuffle(files)
        train_set = files[:num_train]
        val_set = files[num_train:]

        # Move images to the corresponding directories
        for img_file in train_set:
            shutil.copy(str(img_file), str(train_path / img_file.name))
        for img_file in val_set:
            shutil.copy(str(img_file), str(val_path / img_file.name))


Preprocess the dataset

In [None]:
def preprocess_dataset(
    input_dir_path: str,
    output_dir_path: str,
    face_detection_method: str = 'fc'
) -> None:
    """Preprocess all the images in a directory"""
    # Check if the provided face detection method is valid
    valid_fd_methods = ["dnn", "fc"]
    if face_detection_method not in valid_fd_methods:
        raise ValueError(f"Invalid face_detection_method. Choose from {valid_fd_methods}")
    print(f"Using: {face_detection_method} as the face_detection_method.")

    # Output directory path
    output_dir = pathlib.Path(output_dir_path)
    output_dir.mkdir(parents=True, exist_ok=True)

    # Iterate through all of images in the input directory
    for img in pathlib.Path(input_dir_path).iterdir():
        image = cv2.imread(str(img))
        output_file = output_dir.joinpath(img.name)
        if output_file.is_file():
            print(f"File={output_file} already exist, skipping.")
            continue
        print(f"input image: {img}")

        # Apply data augmentation
        augmented_images = augment_image(np.array(image))

        for idx, a_img in enumerate(augmented_images):
            # Detect face and crop image
            # Use dnn model
            if face_detection_method == 'dnn':
                cropped_image = get_face_from_image_dnn(
                    input_image=a_img,
                    prototxt_path=dnn_prototxt_path,
                    model_path=dnn_model_path
                )
            # Use face_cascade
            if face_detection_method == 'fc':
                face_cascade = cv2.CascadeClassifier(
                    cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
                )
                cropped_image = get_face_from_image_fc(
                    input_image=a_img,
                    face_cascade=face_cascade
                )

            # Normalize the cropped image
            image_norm = cv2.resize(cropped_image, (224, 224))

            # Save images
            new_filename = f"{output_file.stem}{idx}{output_file.suffix}"
            new_file = output_dir.joinpath(new_filename)
            cv2.imwrite(str(new_file), image_norm)


Rename and Normalize test file to work with our labelling system

In [None]:
def convert_to_png(input_path, output_path):
    try:
        with Image.open(input_path) as img:
            img.save(output_path, 'PNG')
        print(f"Converted {input_path} to {output_path}")
    except Exception as e:
        print(f"Error converting {input_path}: {e}")


def batch_convert_to_png(input_dir, output_dir):
    # Create the output directory if it doesn't exist
    output_path = pathlib.Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)

    # Loop through all files in the input directory
    input_path = pathlib.Path(input_dir)
    for file_path in input_path.iterdir():
        # Check if the file is a file but not the labels.txt file
        if file_path.is_file() and file_path.suffix != '.txt':
            output_filename = file_path.stem + ".png"
            output_file_path = output_path.joinpath(output_filename)
            convert_to_png(file_path, output_file_path)


def generate_new_filenames(input_text):
    """Create a mapping of old names to new names"""
    name_mapping = {}
    counters = defaultdict(int)

    lines = input_text.strip().split('\n')

    for line in lines:
        filename, name = line.split('\t')
        counter = counters[name]
        counters[name] += 1
        new_filename = f"{name}_{counter}.png"
        name_mapping[filename] = new_filename

    return name_mapping


def normalize_test_files(
    labels_file_path: str,
    image_dir: str,
    output_dir: str
) -> None:
    """Renames and copies the files from the testing dataset to our labelling system"""
    batch_convert_to_png(image_dir, image_dir)

    with open(labels_file_path, 'r') as file:
        input_text = file.read()
    name_mapping = generate_new_filenames(input_text)

    output_dir.mkdir(exist_ok=True)

    for old_name, new_name in name_mapping.items():
        # Rename the file and copy to new location
        old_file_path = image_dir.joinpath(old_name)
        if not old_file_path.exists():
            continue
        new_file_path = output_dir.joinpath(new_name)
        shutil.copy(old_file_path, new_file_path)

Normalizing the file types and labelling

In [None]:
def flattenData(
    ipath,
    opath,
    nameOverride=None,
    indexInit=0
) -> None:
    """
    Takes all files under ipath, or the file at ipath (default under
    Training/Release), and outputs the same files as name_i.png all directly
    under opath (default under Training/Flattened), with name set to the name
    of the subdirectory directly above the input file (by default) and i being a
    zero-based (by default) index of that specific subdirectory.
    """
    register_heif_opener()
    rdf=list(os.walk(ipath))
    if not rdf:
        rdf=('/'.join(ipath.split('/')[:-1]),[],[ipath.split('/')[-1]])
    for root,dirs,files in rdf:
        person=nameOverride
        for f in files:
            if person!=root.split('/')[-1]:
                if not nameOverride:
                    person=root.split('/')[-1]
                index=indexInit
            with Image.open(os.path.join(root,f)) as im:
                im.save(os.path.join(opath,'_'.join([person,str(index)])+'.png'))
            index+=1

Training and validating classifers

In [None]:
def train_classifier(X_train, X_test, y_train, y_test, clf, clf_name, param_grid):
    """Train and test a classifier"""
    pca = PCA()
    scaler = StandardScaler()
    pipe = Pipeline(steps=[("scaler", scaler), ("dim_reduction", pca), ("clf", clf)])

    # Set to find best dimensionality reduction method
    param_grid["dim_reduction__n_components"] = [5, 10, 25, 50]

    # Find best params
    search = GridSearchCV(pipe, param_grid, scoring='f1_weighted')
    search.fit(X_train, y_train)
    print("CV score=%0.3f with parameters:" % search.best_score_)
    print(search.best_params_)

    # Use the best parameters for the classifier
    best_params = search.best_params_
    pca.set_params(n_components=best_params["dim_reduction__n_components"])
    clf_best_params = {key.replace("clf__", ""): value for key, value in best_params.items() if "clf__" in key}
    clf.set_params(**clf_best_params)
    clf_pipe = Pipeline(steps=[("scaler", scaler), ("dim_reduction", pca), ("clf", clf)])

    # Train
    clf_pipe.fit(X_train, y_train)

    # Make predictions
    train_predictions = clf_pipe.predict(X_train)
    test_predictions = clf_pipe.predict(X_test)

    # Evaluate the accuracy
    train_accuracy = accuracy_score(y_train, train_predictions)
    validation_accuracy = accuracy_score(y_test, test_predictions)
    print(f"Train Accuracy: {train_accuracy:.2f}")
    print(f"Validation Accuracy: {validation_accuracy:.2f}")

    # Save the classifer (WARNING: This will override previously saved models)
    model_name = clf_name + "_clf.pkl"
    model_save_path = pathlib.Path(saved_models_dir).joinpath(model_name)
    joblib.dump(clf_pipe, model_save_path)


# Preprocessing and Training

Establish paths for datasets

In [None]:
# Source directory after normalizing file types and established labelling
source_dir = pathlib.Path(training_data_path).joinpath('Flattened')

# Ratio for spliting dataset between training and validation
split_ratio = 0.8

# Preprocessed dataset path
preprocessed_dir = pathlib.Path(training_data_path).joinpath(f"Preprocessed_Split_{split_ratio}")

# Training set paths
training_path = preprocessed_dir.joinpath("train")
output_train_path = preprocessed_dir.joinpath(training_path.stem + "_preprocessed")

# Validation set paths
validation_path = preprocessed_dir.joinpath("validation")
output_val_path = preprocessed_dir.joinpath(validation_path.stem + "_preprocessed")

# Create a save_models_dir if one doesn't already exist
pathlib.Path(saved_models_dir).mkdir(exist_ok=True)

Normalize raw dataset file type and labelling system

In [None]:
source_dir.mkdir(exist_ok=True)
flattenData(
    ipath=training_data_path,
    opath=source_dir
)

Create training and validation sets

In [None]:
# Split the dataset into a training and validation set
split_images_train_and_val(
    source_dir=source_dir,
    output_dir=preprocessed_dir,
    split_ratio=split_ratio
)

# Preprocess the datasets
# Training set
preprocess_dataset(
    input_dir_path=training_path,
    output_dir_path=output_train_path
)
# Validation set
preprocess_dataset(
    input_dir_path=validation_path,
    output_dir_path=output_val_path
)

# NOTE: should manually clean up any poor quality images

In [None]:
# Get the training and validation data and labels
X_train, y_train = get_data_and_label(
    input_dir=output_train_path
)
X_val, y_val = get_data_and_label(
    input_dir=output_val_path
)

Train, validate, and tune hyperparameters

In [None]:
# Training the classifers

print("Random Forest:")
rf_grid = {
    "clf__n_estimators": [50, 75, 125],
    "clf__max_depth": [4, 8, 12],
}
rf_clf = RandomForestClassifier(random_state=22)
train_classifier(X_train, X_val, y_train, y_val, rf_clf, 'rf', rf_grid)
print("---")

print(f"KNN:")
knn_grid = {
    "clf__n_neighbors": [3, 5, 7],
}
knn_clf = KNeighborsClassifier()
train_classifier(X_train, X_val, y_train, y_val, knn_clf, 'knn', knn_grid)
print("---")

print("SVM:")
svm_grid = {
    "clf__C": [1, 2, 4],
    "clf__gamma": [1/8, 1/16, 1/32],
}
svm_clf = SVC(kernel='rbf')
train_classifier(X_train, X_val, y_train, y_val, svm_clf, 'svm', svm_grid)
print("---")

# Testing

In [None]:
# Testing dataset directory which includes the labels.txt file in it
test_dir_path = pathlib.Path(test_data_path)

# Path to desired output the normalized and flattened testing dataset
output_test_path = test_dir_path.joinpath("Flattened")

In [None]:
# Normalize the file types and labelling
normalize_test_files(
    labels_file_path=test_dir_path.joinpath('labels.txt'),
    image_dir=test_dir_path,
    output_dir=output_test_path
)

# Face detection and cropping
for image in output_test_path.iterdir():
    img = cv2.imread(str(image))
    cropped_image = get_face_from_image_dnn(input_image=img)

    # Normalize the cropped image
    image_norm = cv2.resize(cropped_image, (224, 224))

    # Save images
    cv2.imwrite(str(image), image_norm)


In [None]:
# Get the data and labels
X_test, y_test = get_data_and_label(
    input_dir=output_test_path
)

In [None]:
# Load the specific trained model you want to test
trained_clf = "/content/drive/MyDrive/saved_models/svm_clf.pkl"
trained_clf_path = pathlib.Path(trained_clf)

clf = joblib.load(trained_clf)
test_predictions = clf.predict(X_test)
test_accuracy = accuracy_score(y_test, test_predictions)

print(f"Test Accuracy for {trained_clf_path.stem}: {test_accuracy:.2f}")