# Implementation of the Ensemble Method

In [None]:
import os

os.chdir("..")

In [None]:
import cv2
import sys
import random
import numpy as np
import concurrent.futures
import matplotlib.pyplot as plt
import urllib.request as urlreq

from typing import Counter
from sklearn.svm import SVC
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from kymatio.numpy import Scattering2D
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import precision_score, recall_score, f1_score
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA

import scipy.io as scio
from face_frontalization import frontalize
from face_frontalization import camera_calibration as calib

%load_ext autoreload
%autoreload 2

In [None]:
random_seed = 8
testing_person = "Adrien"

J = 3
k_histogram = 100
min_image_num = 10
training_set_size = 10
training_image_num = 5
num_misclassified_to_show = 5
face_image_target_size = (64, 64)

base_folder = "demo/dataset-easy"
haarcascade = "model_checkpoints/haarcascade.xml"
haarcascade_url = "https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/haarcascade_frontalface_alt2.xml"
LBFmodel = "model_checkpoints/lbfmodel.yaml"
LBFmodel_url = "https://github.com/kurnianggoro/GSOC2017/raw/master/data/lbfmodel.yaml"

frontalize_model_name = "model_dlib"
frontalize_model_path = "model_checkpoints/model3Ddlib.mat"

eye_mask_mat = "eyemask"
eye_mask_mat_path = "model_checkpoints/eyemask.mat"

In [None]:
for filename, url in zip([haarcascade, LBFmodel], [haarcascade_url, LBFmodel_url]):
    if os.path.exists(filename):
        print("File exists")
    else:
        urlreq.urlretrieve(url, filename)
        print("File downloaded")

In [None]:
data = {}

for person in os.listdir(base_folder):
    person_dir = os.path.join(base_folder, person)
    if os.path.isdir(person_dir):
        images = []
        for img_name in os.listdir(person_dir):
            img_path = os.path.join(person_dir, img_name)
            img = cv2.imread(img_path)
            if img is not None:
                images.append(img)
        if len(images) >= min_image_num:
            data[person] = images

In [None]:
x_train = []
y_train = []
x_test = []
y_test = []

people_names = list(data.keys())

for person, images in data.items():
    train_images = []
    test_images = []
    random.seed(random_seed)
    random.shuffle(images)

    train_images.extend(images[:training_image_num])
    test_images.extend(images[training_image_num:])
    train_labels = [person] * training_image_num
    test_labels = [person] * (len(images) - training_image_num)

    random.seed(random_seed)
    other_people = (
        [p for p in people_names if p != person]
        * 2
        * (training_set_size - training_image_num)
    )

    for i, other_person in enumerate(other_people):
        chosen_image = random.sample(data[other_person], 1)[0]
        if i % 2 == 0:
            train_images.append(chosen_image)
            train_labels.append("Unknown")
        else:
            test_images.append(chosen_image)
            test_labels.append("Unknown")

    x_train.append(train_images)
    x_test.append(test_images)
    y_train.append(train_labels)
    y_test.append(test_labels)

In [None]:
i_to_remove = [i for i, inner in enumerate(y_train) if testing_person not in inner]
for i in i_to_remove:
    del x_train[i]
    del x_test[i]
    del y_train[i]
    del y_test[i]

In [None]:
scattering = Scattering2D(
    J=J,
    shape=(face_image_target_size),
)

face_detector = cv2.CascadeClassifier(haarcascade)
landmark_detector = cv2.face.createFacemarkLBF()
landmark_detector.loadModel(LBFmodel)

sift = cv2.SIFT_create(contrastThreshold=0, edgeThreshold=sys.maxsize)
bf = cv2.BFMatcher()

In [None]:
def extract_sift_features(image_list):
    keypoints_list = []
    descriptors_list = []

    for img in image_list:
        keypoints, descriptors = sift.detectAndCompute(img, None)

        keypoints_list.append(keypoints)
        descriptors_list.append(descriptors)

    return keypoints_list, descriptors_list


def create_bow_histogram(descriptors, kmeans):
    if descriptors is None or len(descriptors) == 0:
        return np.zeros(kmeans.cluster_centers_.shape[0])

    cluster_indices = kmeans.predict(descriptors)
    hist, _ = np.histogram(
        cluster_indices,
        bins=np.arange(kmeans.n_clusters + 1),
        range=(0, kmeans.n_clusters),
    )

    return hist


def train_bovw_svm(descriptors_list, label_list):
    all_descriptors = np.vstack(descriptors_list)

    kmeans = KMeans(n_clusters=k_histogram)
    kmeans.fit(all_descriptors)

    histograms = []
    for descriptors in descriptors_list:
        hist = create_bow_histogram(descriptors, kmeans)
        histograms.append(hist)

    svm = SVC(kernel="linear")
    svm.fit(histograms, label_list)

    return svm, kmeans


def predict_with_svm(svm, kmeans, new_image):
    _, new_descriptors = sift.detectAndCompute(new_image, None)
    if new_descriptors is None or len(new_descriptors) == 0:
        return None

    new_hist = create_bow_histogram(new_descriptors, kmeans)
    prediction = svm.predict([new_hist])
    return prediction[0]

In [None]:
def normalize_face(img):
    model3D = frontalize.ThreeD_Model(frontalize_model_path, frontalize_model_name)

    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    faces = face_detector.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5)
    if len(faces) == 0:
        raise RuntimeError("No faces detected.")

    main_face = np.array([max(faces, key=lambda rect: rect[2] * rect[3])])
    retval, landmarks = landmark_detector.fit(gray, main_face)
    if not retval or len(landmarks) == 0:
        raise RuntimeError("Could not detect landmarks.")

    # OpenCV returns landmarks as a list, where each element is an array of shape (1, 68, 2).
    lmarks = landmarks[0][0]
    proj_matrix, _, _, _ = calib.estimate_camera(model3D, lmarks)

    eyemask = np.asarray(scio.loadmat(eye_mask_mat_path)[eye_mask_mat])
    frontal_raw, frontal_sym = frontalize.frontalize(
        img, proj_matrix, model3D.ref_U, eyemask
    )

    return frontal_raw, frontal_sym


def obtain_only_face(i, frontal_view):
    faces = face_detector.detectMultiScale(
        frontal_view, scaleFactor=1.1, minNeighbors=5
    )

    if len(faces) == 0:
        raise RuntimeError(f"No faces detected (after frontalization) {i}.")

    main_face = np.array([max(faces, key=lambda rect: rect[2] * rect[3])])
    _, landmarks = landmark_detector.fit(frontal_view, main_face)

    lmarks = landmarks[0][0]
    hull = cv2.convexHull(np.array(lmarks, dtype=np.int32))

    min_x = min(lmarks, key=lambda p: p[0])[0]
    max_x = max(lmarks, key=lambda p: p[0])[0]
    min_y = min(lmarks, key=lambda p: p[1])[1]
    max_y = max(lmarks, key=lambda p: p[1])[1]

    mask = np.zeros((frontal_view.shape[0], frontal_view.shape[1]), dtype=np.uint8)
    cv2.fillPoly(mask, [hull], 255)

    masked_face = frontal_view.copy()
    if masked_face.dtype != np.uint8:
        masked_face = np.uint8(np.clip(masked_face, 0, 255))

    masked_face[mask == 0] = 0
    masked_face = masked_face[
        int(min_y) - 5 : int(max_y) + 5, int(min_x) - 5 : int(max_x) + 5
    ]

    masked_face = cv2.cvtColor(masked_face, cv2.COLOR_BGR2GRAY)
    resized_face = cv2.resize(masked_face, face_image_target_size)
    return resized_face


def normalize_list(image_list, useSym):
    frontalized_input = [normalize_face(x)[1 if useSym else 0] for x in image_list]

    cropped_input = [obtain_only_face(i, x) for i, x in enumerate(image_list)]
    frontalized_cropped_input = [
        obtain_only_face(i, x) for i, x in enumerate(frontalized_input)
    ]

    return cropped_input, frontalized_cropped_input

In [None]:
def calculate_metrics(true_labels, predicted_labels):
    precision = precision_score(
        true_labels,
        predicted_labels,
        average="weighted",
        labels=np.unique(true_labels),
        zero_division=0,
    )

    recall = recall_score(
        true_labels,
        predicted_labels,
        average="weighted",
        labels=np.unique(true_labels),
        zero_division=0,
    )

    f1 = f1_score(
        true_labels,
        predicted_labels,
        average="weighted",
        labels=np.unique(true_labels),
        zero_division=0,
    )

    return precision, recall, f1


def track_misclassifications(test_images, true_labels, predicted_labels):
    misclassified_images = []
    misclassified_true_labels = []
    misclassified_pred_labels = []

    true_labels = np.array(true_labels)
    misclassified_indices = np.where(predicted_labels != true_labels)[0]

    for idx in misclassified_indices:
        misclassified_images.append(test_images[idx])
        misclassified_true_labels.append(true_labels[idx])
        misclassified_pred_labels.append(predicted_labels[idx])

    return misclassified_images, misclassified_true_labels, misclassified_pred_labels


def visualize_misclassifications(
    title, misclassified_images, misclassified_true_labels, misclassified_pred_labels
):
    random.seed(random_seed)
    misclassified_indices_sample = random.sample(
        range(len(misclassified_images)),
        min(num_misclassified_to_show, len(misclassified_images)),
    )

    plt.figure(figsize=(15, 4))

    for idx, misclassified_idx in enumerate(misclassified_indices_sample):
        image = misclassified_images[misclassified_idx]
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        true_label = misclassified_true_labels[misclassified_idx]
        predicted_label = misclassified_pred_labels[misclassified_idx]

        plt.subplot(1, num_misclassified_to_show, idx + 1)
        plt.imshow(image_rgb)
        plt.title(f"True: {true_label}\nPred: {predicted_label}")
        plt.axis("off")

    plt.suptitle(title, fontweight="bold")
    plt.tight_layout()
    plt.show()


def print_metrics(title, all_precision, all_recall, all_f1):
    average_precision = np.mean(all_precision)
    average_recall = np.mean(all_recall)
    average_f1 = np.mean(all_f1)

    print(title)
    print(f"Average Precision: {average_precision}")
    print(f"Average Recall: {average_recall}")
    print(f"Average F1 Score: {average_f1}")

In [None]:
def eval_fisher(
    training_set_person_i, training_label_set_person_i, testing_set_person_i
):
    train_images_flat = np.array([p.flatten() for p in training_set_person_i])
    test_images_flat = np.array([p.flatten() for p in testing_set_person_i])

    # Avoid singularity issue in LDA and help generalize
    pca = PCA(n_components=min(5, train_images_flat.shape[0] - 1))
    train_pca = pca.fit_transform(train_images_flat)
    test_pca = pca.transform(test_images_flat)

    lda = LDA()
    lda.fit(train_pca, training_label_set_person_i)

    train_fisherfaces = lda.transform(train_pca)
    test_fisherfaces = lda.transform(test_pca)

    knn = KNeighborsClassifier(n_neighbors=1)
    knn.fit(train_fisherfaces, training_label_set_person_i)
    test_predictions = knn.predict(test_fisherfaces)
    return test_predictions


def eval_wavelet_scattering(
    training_set_person_i, training_label_set_person_i, testing_set_person_i
):
    train_scattered = np.array(
        [scattering(img.astype(np.float32)) for img in training_set_person_i]
    )
    test_scattered = np.array(
        [scattering(img.astype(np.float32)) for img in testing_set_person_i]
    )

    pca = PCA(n_components=5)
    train_pca = pca.fit_transform(train_scattered.reshape(len(train_scattered), -1))
    test_pca = pca.transform(test_scattered.reshape(len(test_scattered), -1))

    svm = SVC(kernel="linear")
    svm.fit(train_pca, training_label_set_person_i)
    test_predictions = svm.predict(test_pca)
    return test_predictions


def eval_bovw_kpsift_kpsift(
    training_set_person_i, training_label_set_person_i, testing_set_person_i
):
    _, train_desc = extract_sift_features(training_set_person_i)
    trained_svm, kmeans = train_bovw_svm(train_desc, training_label_set_person_i)
    test_predictions = [
        predict_with_svm(trained_svm, kmeans, xi) for xi in testing_set_person_i
    ]
    return test_predictions

In [None]:
def eval_ensemble(useSym):
    all_precision = []
    all_recall = []
    all_f1 = []

    misclassified_images = []
    misclassified_true_labels = []
    misclassified_pred_labels = []

    for i in range(len(y_test)):
        try:
            training_label_set_person_i = y_train[i]
            training_set_person_i, frontalized_training_set_person_i = normalize_list(
                x_train[i], useSym
            )

            testing_label_set_person_i = y_test[i]
            testing_set_person_i, frontalized_testing_set_person_i = normalize_list(
                x_test[i], useSym
            )

            with concurrent.futures.ThreadPoolExecutor() as executor:
                future_bowv = executor.submit(
                    eval_bovw_kpsift_kpsift,
                    training_set_person_i,
                    training_label_set_person_i,
                    testing_set_person_i,
                )

                future_fisher = executor.submit(
                    eval_fisher,
                    frontalized_training_set_person_i,
                    training_label_set_person_i,
                    frontalized_testing_set_person_i,
                )

                future_wavelet = executor.submit(
                    eval_wavelet_scattering,
                    frontalized_training_set_person_i,
                    training_label_set_person_i,
                    frontalized_testing_set_person_i,
                )

                bowv_kpsift_test_predictions = future_bowv.result()
                fisher_test_predictions = future_fisher.result()
                wavelet_scattering_test_predictions = future_wavelet.result()

            all_predictions = np.vstack(
                [
                    fisher_test_predictions,
                    bowv_kpsift_test_predictions,
                    wavelet_scattering_test_predictions,
                ]
            )

            test_predictions = np.array(
                [
                    Counter(all_predictions[:, i]).most_common(1)[0][0]
                    for i in range(all_predictions.shape[1])
                ]
            )

            precision, recall, f1 = calculate_metrics(
                testing_label_set_person_i, test_predictions
            )

            all_precision.append(precision)
            all_recall.append(recall)
            all_f1.append(f1)

            (
                misclassified_batch_images,
                misclassified_batch_true_labels,
                misclassified_batch_pred_labels,
            ) = track_misclassifications(
                testing_set_person_i,
                # frontalized_testing_set_person_i,
                testing_label_set_person_i,
                test_predictions,
            )

            misclassified_images.extend(misclassified_batch_images)
            misclassified_true_labels.extend(misclassified_batch_true_labels)
            misclassified_pred_labels.extend(misclassified_batch_pred_labels)
        except Exception as e:
            print(
                f"An error occurred at index {i} for one of the images: {e} Skipping {y_train[i][0]}'s dataset."
            )
            continue

    return (all_precision, all_recall, all_f1), (
        misclassified_images,
        misclassified_true_labels,
        misclassified_pred_labels,
    )

In [None]:
metrics_ensemble, misclassified_ensemble = eval_ensemble(useSym=False)

In [None]:
print_metrics("Using Ensemble", *metrics_ensemble)

In [None]:
visualize_misclassifications("Using Ensemble", *misclassified_ensemble)