In [None]:
!python --version

In [None]:
%pip install keras==2.15.0

In [None]:
!pip show keras

In [None]:
import tarfile

try:
    with tarfile.open('/kaggle/input/lfw-dataset/lfw-funneled.tgz', 'r') as tar:
        tar.extractall(path='/kaggle/working/Dataset/Raw')
except tarfile.TarError as e:
    print("Failed")

In [None]:
import math
import os
import random
import re
import shutil
import sys
import time
from collections import Counter
from typing import Any, Callable

import albumentations as A
import cv2
import imgaug
import keras
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tqdm
from keras import backend as K
from keras import layers, losses
from keras.applications import MobileNet
from keras.layers import (Activation, Conv2D, Dense, Dropout, GlobalAveragePooling2D, Flatten, Lambda,
                          MaxPooling2D)
from keras.models import Sequential, Model
from keras.utils import to_categorical
from PIL import Image
from rich import print
from sklearn.calibration import LabelEncoder
from sklearn.model_selection import train_test_split


In [None]:
# !cp -r /kaggle/input/lfw-deepfunneled-with-mates /kaggle/working/lfw-deepfunneled-with-mates

In [None]:
# !cp -r /kaggle/input/mates-only-v2 /kaggle/working/mates-only-v2

In [None]:
SHUFFLE_NUMBER = 42

In [None]:
def identity(x):
    return x

def swap(t):
    return (t[1],t[0])

In [None]:
random.seed(SHUFFLE_NUMBER)
imgaug.seed(SHUFFLE_NUMBER)

In [None]:
default_transform = A.Compose(
    [
#         A.RandomCrop(width=200, height=200),
        A.RandomCropFromBorders (crop_left=0.1, crop_right=0.1, crop_top=0.1, crop_bottom=0.1),
        A.HorizontalFlip(p=0.5),
        # A.RandomScale(scale_limit=(-0.5, 2.0), p=0.5),
        A.ShiftScaleRotate(p=0.5, shift_limit=0.1, scale_limit=0.2, rotate_limit=15),
        A.RandomBrightnessContrast(p=0.2),
        A.Blur(blur_limit=3, p=0.2),
        A.Downscale(scale_min=0.35, scale_max=0.40, p=1)
    ]
)


def augment_data(
    img: np.ndarray,
    augmentation_count: int = 10,
    augmentation_pipeline=None,
    desired_shape: tuple[int, int] = (250, 250),
) -> list[np.ndarray]:
    images = []
    if augmentation_pipeline is None:
        augmentation_pipeline = default_transform
    for _ in range(augmentation_count):
        aug_image = augmentation_pipeline(image=img)["image"]
        aug_image = Image.fromarray(aug_image).resize(desired_shape)
        aug_image = np.array(aug_image)
        images.append(aug_image)
    return images


In [None]:
class LazyData:
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    
    def __init__(
        self,
        filename: str,
        load_strategy: str | Callable[[str | bytes], Any] | None = None,
        *args,
        **kwargs,
    ):
        self._filename = filename
        self._data = None
        self._load_strategy = load_strategy
        self._args = args
        self._kwargs = kwargs

    def __call__(self):
        if self._data is None:
            self._build_data()
        return self._data

    def __load_strategy_image(self, buf) -> np.ndarray[np.uint8]:
        img = mpimg.imread(buf, format="jpg")
        if self._kwargs.get("desired_shape") is not None:
            pilimg = Image.fromarray(img).resize(self._kwargs.get("desired_shape"), Image.ANTIALIAS)
            img = np.array(pilimg)
        return img
    
    def __load_strategy_image_face(self, buf) -> np.ndarray[np.uint8]:
        img = mpimg.imread(buf, format="jpg")
#         print("__load_strategy_image_face")
        if self._kwargs.get("desired_shape") is not None:
            desired_shape = self._kwargs.get("desired_shape")
            if desired_shape[0] > img.shape[0] or desired_shape[1] > img.shape[1]:
#                 print("Failure point 1: Desired Shape can't be fit inside image")
#                 print(f"{desired_shape[0] = }, {desired_shape[1] = }")
#                 print(f"{img.shape[0] = }, {img.shape[1] = }")
                return img
            
            image = img.copy()
            faces = LazyData.face_cascade.detectMultiScale(image, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
            if len(faces) > 0:
                x, y, w, h = faces[0]
            else:
                x, y, w, h = 0, 0, img.shape[1], img.shape[0]
            
            cx, cy = (x + x + w)//2, (y + y + h)//2
            nw, nh = desired_shape[1], desired_shape[0]
            exw, exh = w, h
            
            if nw < w:
                exw = nw
            if nh < h:
                exh = nh
            if nw > w:
                lb = cx - nw//2
                rb = cx + nw//2
                if lb < 0 and rb > img.shape[1]:
                    raise Exception("AbsurdError: Desired shape greater than image")
                elif lb < 0:
                    cx += -lb
                elif rb > img.shape[1]:
                    cx -= (rb - img.shape[1])
            if nh > h:
                ub = cy - nh//2
                db = cy + nh//2
                if ub < 0 and db > img.shape[0]:
                    raise Exception("AbsurdError: Desired shape greater than image")
                elif ub < 0:
                    cy += -ub
                elif db > img.shape[0]:
                    cy -= (db - img.shape[0])
            
            new_img = img[(cy-nh//2):(cy+nh//2), (cx-nw//2):(cx+nw//2)]
            
            # possible off by ones correction
            pilimg = Image.fromarray(img).resize(swap(desired_shape), Image.ANTIALIAS)
            img = np.array(pilimg)
#         print(img.shape)
        return img
            
    def _build_data(self):
        with open(self._filename, "rb") as f:
            if self._load_strategy is None:
                self._data = f.read()
            elif self._load_strategy == "image":
                self._data = self.__load_strategy_image(self._filename)
            elif self._load_strategy == "image_face":
                self._data = self.__load_strategy_image_face(self._filename)
            else:
                buf = f.read()
                self._data = self._load_strategy(buf)

In [None]:
def dataset(
    path: str,
    min_faces: int | None = 20,
    max_faces: int | None = None,
    hard_limit: bool = False,
    shuffle: bool = True,
    random_state: int | None = None,
    verbose: bool | None = True,
    desired_shape: tuple[int,int] = (250,250)
):
    if random_state is not None:
        np.random.seed(random_state)

    excluded_dirs = []
    capped_dirs = []
    capped_counts = {}
    for direc in os.listdir(path):
        if os.path.isdir(os.path.join(path, direc)):
            if min_faces is not None:
                if len(os.listdir(os.path.join(path, direc))) < min_faces:
                    excluded_dirs.append(direc)
            if max_faces is not None:
                if len(os.listdir(os.path.join(path, direc))) > max_faces:
                    if hard_limit:
                        excluded_dirs.append(direc)
                    else:
                        capped_dirs.append(direc)

    ds = []
    tracker = tqdm.tqdm if verbose else identity
    pattern = re.compile(r"(.*)_(?:\d+)\.jpg")

    for root, dirs, files in tracker(os.walk(path)):
        for file in files:
            if file.endswith(".jpg"):
                match = pattern.match(file)
                if match:
                    target = match.group(1)

                    if target in excluded_dirs:
                        continue

                    if max_faces is not None:
                        if target in capped_dirs:
                            if target not in capped_counts:
                                capped_counts[target] = 0
                            capped_counts[target] += 1
                            if capped_counts[target] > max_faces:
                                continue

                    ds.append(
                        [
                            LazyData(
                                os.path.abspath(os.path.join(root, file)),
                                load_strategy="image_face",
                                desired_shape=desired_shape
                            ),
                            target,
                        ]
                    )

    if shuffle:
        np.random.shuffle(ds)

    return np.array(ds)


def fetch_lfw_people(
    path_to_dataset: str = "Dataset/Raw",
    min_faces: int | None = 20,
    max_faces: int | None = None,
    hard_limit: bool = False,
    shuffle: bool = True,
    random_state: int | None = None,
    verbose: bool = True,
    desired_shape: tuple[int,int] = (250, 250)
):
    dst = dataset(
        path_to_dataset,
        min_faces=min_faces,
        max_faces=max_faces,
        hard_limit=hard_limit,
        shuffle=shuffle,
        random_state=random_state,
        verbose=verbose,
        desired_shape=desired_shape,
    )
    X = dst[:, 0]
    Y = dst[:, 1]
    return X, Y


In [None]:
def visualize_image(axes_array, images_array, labels_array, figure):
    axes_array = axes_array.flatten()

    # assert len(axes_array) == len(images_array) == len(labels_array)
    assert len(axes_array) <= len(images_array)
    assert len(axes_array) <= len(labels_array)

    for i, ax in enumerate(axes_array):
        img = ax.imshow(images_array[i], cmap="gray")
        figure.colorbar(img, ax=ax)
        ax.set_title(labels_array[i])
        ax.axis("off")


In [None]:


# TODO: Documentation


def dump_test_files(x_test, y_test, path_prefix: str, verbose=True):
    # clear out the directory
    if os.path.exists(path_prefix):
        shutil.rmtree(path_prefix)

    if os.path.exists(path_prefix + ".zip"):
        os.remove(path_prefix + ".zip")

    os.makedirs(path_prefix)

    freq_table = {}
    for i, x in enumerate(x_test):
        target = y_test[i]
        if target not in freq_table:
            freq_table[target] = 0
            os.makedirs(os.path.join(path_prefix, str(target)))
        freq_table[target] += 1

        # write image as jpeg
        Image.fromarray(x).save(
            os.path.join(
                path_prefix, str(target), f"{target}_{freq_table[target]:04}.jpg"
            )
        )

    shutil.make_archive(path_prefix, "zip", path_prefix)
    if verbose:
        print("[bold green]Test files dumped successfully[/bold green]")

    shutil.rmtree(os.path.join(path_prefix))


def export_dataset_objects(
    path_to_dataset: str = "Dataset/Raw",
    min_faces: int | None = 20,
    max_faces: int | None = None,
    hard_limit: bool = False,
    shuffle=True,
    random_state=None,
    test_size=0.2,
    verbose=True,
    augment=True,
    desired_shape: tuple[int, int] = (250, 250),
    augmentation_count: int = 10,
    augmentation_upto: int | None = None,
    augmentation_pipeline=None,
    experimental_export: bool = True,
    export: bool = True,
):
    X, Y = fetch_lfw_people(
        path_to_dataset=path_to_dataset,
        min_faces=min_faces,
        max_faces=max_faces,
        hard_limit=hard_limit,
        shuffle=shuffle,
        random_state=random_state,
        verbose=verbose,
        desired_shape=desired_shape,
    )

    log = print if verbose else identity

    log("[bold green]Dataset loaded successfully[/bold green]")

    export_path = os.path.dirname(path_to_dataset)

    tracker = tqdm.tqdm if verbose else identity

    # print(sys.getsizeof(X[0]))
    # exit(1)

    __x = []
    for x in tracker(X):
        __x.append(x())
    # # exit(1)

    __x = np.array(__x, copy=False)
    log("[bold green]Binary data loaded successfully[/bold green]")
    # __x = np.zeros((X.shape[0], 250, 250, 3), dtype=np.uint8)
    # print(__x.shape)
    # for i, x in tracker(enumerate(X)):
    #     __x[i] = x()

    x_train, x_test, y_train, y_test = train_test_split(
        __x,
        Y,
        test_size=test_size,
        random_state=random_state,
        stratify=Y,
    )

    log("[bold green]Dataset split successfully[/bold green]")

    if augment:
        if augmentation_upto is None:
            xy = []
            for i, x in tracker(enumerate(x_train)):
                aug_data = augment_data(
                    x,
                    desired_shape=desired_shape,
                    augmentation_count=augmentation_count,
                    augmentation_pipeline=augmentation_pipeline,
                )

                for data in aug_data:
#                     data = Image.fromarray(data).convert("L")
#                     data = np.array(data, copy=False)
                    xy.append([data, y_train[i]])
                # print(xy)
                # fig, axes = plt.subplots(2, 5, figsize=(20, 10))
                # axes = axes.flatten()

                # for img, ax in zip(aug_data, axes):
                #     ax.imshow(img)
                #     ax.axis("off")
                # plt.tight_layout()
                # plt.show()

                # exit(1)
            log("[bold green]Augmentation done successfully[/bold green]")
            xy_data = np.array(xy, dtype=object, copy=False)
            print(xy_data.shape)
            log("[bold green]Augmented data converted to numpy array[/bold green]")
            if shuffle:
                np.random.shuffle(xy_data)
                log("[bold green]Augmented data shuffled successfully[/bold green]")

            x_train_data = xy_data[:, 0]
            log("[bold green]x_train_data split successfully[/bold green]")
            y_train_data = xy_data[:, 1]
            log("[bold green]y_train_data split successfully[/bold green]")
#             x_train_data = x_train_data / 255
            log("[bold green]x_train_data normalized successfully[/bold green]")
#             x_test = x_test / 255
            log("[bold green]x_test normalized successfully[/bold green]")
            if export:
                if not experimental_export:
                    x_train_data.dump(os.path.join(export_path, "x_train.npy"))
                    y_train_data.dump(os.path.join(export_path, "y_train.npy"))
                    x_test.dump(os.path.join(export_path, "x_test.npy"))
                    y_test.dump(os.path.join(export_path, "y_test.npy"))
                else:
                    np.savez_compressed(
                        os.path.join(export_path, "data.npz"),
                        x_train=x_train_data,
                        y_train=y_train_data,
                        x_test=x_test,
                        y_test=y_test,
                    )
                    log("[bold green]Dataset exported successfully[/bold green]")

            # og_data = np.array(__x)
            # og_data.dump("Dataset/x_og.npy")
            # y_data.dump("Dataset/y.npy")
        else:
            xy = []
            idents = {}
            for i, y in enumerate(y_train):
                if y in idents:
                    idents[y].append(i)
                else:
                    idents[y] = [i]
            log("[bold green]Identified classes successfully[/bold green]")

            if augmentation_upto == 0:
                # augment upto max_faces
                if max_faces is None:
                    raise Exception("max_faces must be int if augmentation_upto is 0")

                for y in idents:
                    augmentation_len = max_faces - len(idents[y])
                    for _k in range(augmentation_len):
                        # choose random image
                        random_image = np.random.choice(idents[y])
                        # augment it
                        aug_data = augment_data(
                            x_train[random_image],
                            desired_shape=desired_shape,
                            augmentation_count=1,
                            augmentation_pipeline=augmentation_pipeline,
                        )
                        aug_image = aug_data[0]
#                         aug_image = Image.fromarray(aug_image).convert("L")
#                         aug_image = np.array(aug_image, copy=False)
                        # add it to the dataset
                        xy.append([aug_image, y])
                    log(
                        "[bold green]Augmented upto max_faces successfully[/bold green]"
                    )
            elif augmentation_upto > 0:
                # augment upto augmentation_upto
                for y in idents:
                    faces = len(idents[y])
                    if augmentation_upto > faces:
                        augmentation_len = augmentation_upto - len(idents[y])
                        for _k in range(augmentation_len):
                            # choose random image
                            random_image = np.random.choice(idents[y])
                            # augment it
                            aug_data = augment_data(
                                x_train[random_image],
                                desired_shape=desired_shape,
                                augmentation_count=1,
                                augmentation_pipeline=augmentation_pipeline,
                            )
                            aug_image = aug_data[0]
#                             aug_image = Image.fromarray(aug_image).convert("L")
#                             aug_image = np.array(aug_image, copy=False)
                            # add it to the dataset
                            xy.append([aug_image, y])
                log(
                    "[bold green]Augmented upto augmentation_upto successfully[/bold green]"
                )
            else:
                raise Exception("augmentation_upto must be bool or int")

            # add the original image with the label
            for i, x in tracker(enumerate(x_train)):
                cv_image = x
#                 cv_image = Image.fromarray(cv_image).convert("L")
#                 cv_image = np.array(cv_image, copy=False)
                xy.append([cv_image, y_train[i]])
            log("[bold green]Original images added successfully[/bold green]")

            xy_data = np.array(xy, dtype=object, copy=False)
            print(xy_data.shape)
            log("[bold green]Augmented data converted to numpy array[/bold green]")
            if shuffle:
                np.random.shuffle(xy_data)
                log("[bold green]Augmented data shuffled successfully[/bold green]")
            x_train_data = xy_data[:, 0]
            print(x_train_data.shape)
            log("[bold green]x_train_data split successfully[/bold green]")
            y_train_data = xy_data[:, 1]
            print(y_train_data.shape)
            log("[bold green]y_train_data split successfully[/bold green]")
#             x_train_data = x_train_data / 255
            log("[bold green]x_train_data normalized successfully[/bold green]")
#             x_test = x_test / 255
            log("[bold green]x_test normalized successfully[/bold green]")

            if export:
                if not experimental_export:
                    x_train_data.dump(os.path.join(export_path, "x_train.npy"))
                    y_train_data.dump(os.path.join(export_path, "y_train.npy"))
                    x_test.dump(os.path.join(export_path, "x_test.npy"))
                    y_test.dump(os.path.join(export_path, "y_test.npy"))
                else:
                    np.savez_compressed(
                        os.path.join(export_path, "data.npz"),
                        x_train=x_train_data,
                        y_train=y_train_data,
                        x_test=x_test,
                        y_test=y_test,
                    )
                    log("[bold green]Dataset exported successfully[/bold green]")

    else:
        # x_data = np.array(__x)
        # x_data.dump("Dataset/x.npy")
        # Y.dump("Dataset/y.npy")
#         x_train = x_train / 255
#         x_test = x_test / 255
        if export:
            if not experimental_export:
                x_train.dump(os.path.join(export_path, "x_train.npy"))
                y_train.dump(os.path.join(export_path, "y_train.npy"))
                x_test.dump(os.path.join(export_path, "x_test.npy"))
                y_test.dump(os.path.join(export_path, "y_test.npy"))
            else:
                np.savez_compressed(
                    os.path.join(export_path, "data.npz"),
                    x_train=x_train,
                    y_train=y_train,
                    x_test=x_test,
                    y_test=y_test,
                )


if __name__ == "__main__":
    export_dataset_objects(
        path_to_dataset="/kaggle/working/Dataset/Raw/lfw_funneled",
        shuffle=True,
        random_state=SHUFFLE_NUMBER,
        min_faces=20,
        max_faces=60,
        hard_limit=False,
        augment=True,
        augmentation_upto=100,
        experimental_export=True,
        test_size=0.3,
        desired_shape=(250,250),
        export=True,
    )


In [None]:
# !ls /kaggle/working/lfw-deepfunneled-with-mates | grep --color=always "data.npz"

In [None]:
import numpy as np 
data = np.load("/kaggle/working/Dataset/Raw/data.npz", allow_pickle=True)
X = data['x_train']
Y = data['y_train']
x_test = data['x_test']
y_test = data['y_test']

In [None]:
from collections import Counter

label_counts = Counter(Y)

for label, count in label_counts.items():
    print(f"Label {label}: {count} images")


In [None]:
import numpy as np

unique_identities = np.unique(Y)
n_unique_identities = len(unique_identities)

print(f"There are {len(unique_identities)} unique identities.")

In [None]:
from collections import Counter

label_counts = Counter(Y)

# Find the identity with the least number of images
min_label, min_count = min(label_counts.items(), key=lambda x: x[1])

print(f"Identity {min_label} has the least number of images: {min_count} images")


In [None]:
X.shape

In [None]:
x_test.shape

In [None]:
import matplotlib.pyplot as plt

# Function to plot images
def plot_images(images, title):
    plt.figure(figsize=(10,10))
    for i in range(5):
        plt.subplot(1,5,i+1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        plt.imshow(images[i], cmap=plt.cm.binary)
        plt.xlabel(title)
    plt.show()

# Plot images from X
plot_images(X, "X")

# Plot images from x_test
plot_images(x_test, "x_test")


In [None]:
import cv2
import matplotlib.pyplot as plt

def detect_faces_in_image(input_image):
    # Initialize face detector
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

    image = input_image.copy()
#     # Convert the image to 8-bit
#     image = cv2.convertScaleAbs(input_image)
    
#     # Check if the image is already grayscale
#     if len(image.shape) == 3:
#         # Convert the image to grayscale
#         gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
#     else:
#         gray = image
    
    # Detect faces in the image
    faces = face_cascade.detectMultiScale(image, scaleFactor=1.1,  minNeighbors=5, minSize=(30, 30))
    
    for (x, y, w, h) in faces:
        # Draw a rectangle around the face
        cv2.rectangle(image, (x, y), (x+w, y+h), (0, 255, 0), 2)
        
    return image
        

img_faces_train = [
    detect_faces_in_image(image) for i, image in enumerate(X) if i < 5
]
img_faces_test = [
    detect_faces_in_image(image) for i, image in enumerate(x_test) if i < 5
]

fig, axes = plt.subplots(2, 5, figsize=(40, 10))
axes = axes.flatten()
for i in range(10):
    if i < 5:
        ax = axes[i]
        ax.imshow(img_faces_train[i], cmap="gray")
        ax.axis("off")
    else:
        ax = axes[i]
        ax.imshow(img_faces_test[i - 5])
        ax.axis("off")
plt.tight_layout()
plt.show()


In [None]:
print(x_test[0].dtype)


# crop the face

In [None]:
import cv2
import matplotlib.pyplot as plt

def detect_and_crop_faces_in_image(input_image):
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

    image = input_image.copy()
#     # Convert the image to 8-bit
#     image = cv2.convertScaleAbs(input_image)
    
#     # Check if the image is already grayscale
#     if len(image.shape) == 2:
#         image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
    
#     gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    
    faces = face_cascade.detectMultiScale(image, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
    
    # If a face is detected, crop it out
    if len(faces) > 0:
        x, y, w, h = faces[0]
        cropped_face = image[y:y+h, x:x+w]
    else:
        cropped_face = image

    # Resize the cropped face or the original image to (227, 227)
    resized_image = cv2.resize(cropped_face, (224, 224))
    return resized_image

cropped_face_X = []
cropped_face_x_test = []

# from multiprocessing import Pool

# with Pool(4) as pool:
tracker = tqdm.tqdm
#     cropped_face_X = pool.map_async(detect_and_crop_faces_in_image, tracker(X)).get()
#     cropped_face_x_test = pool.map_async(detect_and_crop_faces_in_image, tracker(x_test)).get()

for image in tracker(X):
    cropped_face = detect_and_crop_faces_in_image(image)
    cropped_face_X.append(cropped_face) 

for image in tracker(x_test):
    cropped_face = detect_and_crop_faces_in_image(image)
    cropped_face_x_test.append(cropped_face)

In [None]:
fig, axes = plt.subplots(2, 5, figsize=(40, 10))
axes = axes.flatten()
for i in range(10):
    if i < 5:
        ax = axes[i]
        ax.imshow(cropped_face_X[i], cmap="gray")
        ax.axis("off")
    else:
        ax = axes[i]
        ax.imshow(cropped_face_x_test[i - 5])
        ax.axis("off")
plt.tight_layout()
plt.show()

In [None]:
import tensorflow as tf
from keras import backend as K
import cv2
import numpy as np
import keras
from keras.models import Sequential
from keras import layers, losses
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Lambda, Activation
from keras.utils import to_categorical
from sklearn.calibration import LabelEncoder

unique_identities = np.unique(Y)

# Check if GPU is available and if not, set the CPU as device
if tf.test.gpu_device_name():
    print('Default GPU Device: {}'.format(tf.test.gpu_device_name()))
else:
    print("Please install GPU version of TF")

# def normalize_images(image_array):
#     # Normalize the images 
#     for i, image in enumerate(image_array):
#         image = ((image - np.min(image)) / (np.max(image) - np.min(image)))
#         image_array[i] = image
#     return image_array

def normalize_image(image):
    return ((image - tf.math.reduce_min(image)) / (tf.math.reduce_max(image) - tf.math.reduce_min(image)))

# # Apply the function to our final_X and final_x_test 
# final_X = normalize_images(cropped_face_X)
# final_x_test = normalize_images(cropped_face_x_test)

# Convert lists to Numpy arrays
# final_X = np.array(final_X)
# final_x_test = np.array(final_x_test)


# X_data = tf.data.Dataset.from_generator(iter(cropped_face_X))
# x_test_data = tf.data.Dataset.from_generator(iter(cropped_face_x_test))

# final_X = X_data.apply(normalize_image)
# final_x_test = x_test_data.apply(normalize_image)

# Initialize the label encoder
le = LabelEncoder()

# Fit the label encoder and transform the labels
Y_int = le.fit_transform(Y)
print(Y_int)

Y_one_hot = to_categorical(Y_int, num_classes=n_unique_identities)

dataset_size = int(len(cropped_face_X))

train_val_data = tf.data.Dataset.from_tensor_slices((cropped_face_X, Y_one_hot))
print("created tensor slice")
train_val_data = train_val_data.map(lambda x, y: (normalize_image(x), y))
print("added mapping")

train_val_data = train_val_data.shuffle(5000)
print("shuffled")
train_data = train_val_data.take(int(0.9 * dataset_size)).batch(32)
print("train data")
val_data = train_val_data.skip(int(0.9 * dataset_size)).batch(32)
print("val data")

In [None]:
# import keras_tuner as kt
# class MyHypermodel(kt.applications.HyperResNet):
#     def __init__(
#         self,
#         include_top=True,
#         input_shape=None,
#         input_tensor=None,
#         classes=None,
#         **kwargs,
#     ):
#         super().__init__(include_top, input_shape, input_tensor, **kwargs)

#     def build(self, hp):
#         model = super().build(hp)
# #         model = layers.Dense(n_unique_identities, activation='softmax')(model)
        
#         inp = model.layers[0].input
#         x = model.layers[-1].output
#         x = layers.Dense(n_unique_identities, activation='softmax', name='predictions')(x)
#         model = keras.Model(inputs=inp, outputs=x)

#         hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 5e-3, 1e-3, 5e-4, 1e-4, 5e-5, 1e-5])

#         model.compile(optimizer=tf.keras.optimizers.Adam(hp_learning_rate),
#                      loss=tf.keras.losses.categorical_crossentropy,
#                      metrics=['accuracy'])
#         return model
    
#     def fit(self, hp, model, *args, **kwargs):
#         history = model.fit(*args, **kwargs)
#         return {
#             "loss": history.history["loss"],
#             "accuracy": history.history["accuracy"],
#             "val_loss": history.history["val_loss"],
#             "val_accuracy": history.history["val_accuracy"]
#         }

In [None]:
# def get_hypermodel():
#     hypermodel = kt.applications.HyperResNet(
#         include_top=False, input_shape=(256, 256, 3), input_tensor=None, classes=n_unique_identities
#     )
    
#     return hypermodel

In [None]:
# def build_model(hp):
#     model = get_hypermodel()
#     model = layers.Dense(n_unique_identities, activation='softmax')(model)
    
#     hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 5e-3, 1e-3, 5e-4, 1e-4, 5e-5, 1e-5])
    
#     model.compile(optimizer=tf.keras.optimizers.Adam(hp_learning_rate),
#                  loss=tf.keras.losses.categorical_crossentropy,
#                  metrics=['accuracy'])
#     return model


In [None]:
# hp = kt.HyperParameters()
# hp.Choice('learning_rate', values=[1e-2, 5e-3, 1e-3, 5e-4, 1e-4, 5e-5, 1e-5])
# tuner = kt.Hyperband(
#                      MyHypermodel(include_top=False, input_shape=(256, 256, 3), input_tensor=None, classes=n_unique_identities),
#                      objective='val_accuracy',
# #                      hyperparameters=hp,
#                      max_epochs=50,
#                      factor=3,
#                      directory='my_dir',
#                      project_name='intro_to_kt5',
#                      overwrite=True)
# hypermodel = kt.applications.HyperResNet(
#     include_top=False, input_shape=(256, 256, 3), input_tensor=None, classes=None, **kwargs
# )

In [None]:
# stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)

In [None]:
# tuner.search(train_data, epochs=100, validation_data=val_data, callbacks=[stop_early])

# Get the optimal hyperparameters
# best_hps=tuner.get_best_hyperparameters(num_trials=1)[0]

In [None]:
# print(f"""
# The hyperparameter search is complete. The optimal learning rate for the optimizer
# is {best_hps.get('learning_rate')}.
# """)

In [None]:
# # Set the number of neurons in the dense layers
# neurons = 1024

# Define the AlexNet model
# model = Sequential()
# model.add(layers.experimental.preprocessing.Resizing(227, 227, interpolation="bilinear", input_shape=final_X.shape[1:]))
# model.add(layers.Conv2D(96, 11, strides=4, padding='same', input_shape=(227,227,3)))
# model.add(layers.Lambda(tf.nn.local_response_normalization))
# model.add(layers.Activation('relu'))
# model.add(layers.MaxPooling2D(3, strides=2))
# model.add(layers.Conv2D(256, 5, strides=2, padding='same'))
# model.add(layers.Lambda(tf.nn.local_response_normalization))
# model.add(layers.Activation('relu'))
# model.add(layers.MaxPooling2D(3, strides=2))
# model.add(layers.Conv2D(384, 3, strides=1, padding='same'))
# model.add(layers.Activation('relu'))
# model.add(layers.Conv2D(384, 3, strides=1, padding='same'))
# model.add(layers.Activation('relu'))
# model.add(layers.Conv2D(256, 3, strides=1, padding='same'))
# model.add(layers.Activation('relu'))
# model.add(layers.MaxPooling2D(3, strides=2))
# model.add(layers.Flatten())
# model.add(layers.Dense(4096, activation='relu'))
# model.add(layers.Dropout(0.5))
# model.add(layers.Dense(4096, activation='relu'))
# model.add(layers.Dropout(0.5))
# model.add(layers.Dense(n_unique_identities, activation='softmax'))

base_model_mobilenet = MobileNet(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
base_model_mobilenet.trainable = False
# print(base_model_mobilenet.summary())
# Create a function to add a new top layer (classifier) to the base models
def add_new_top_layer(base_model, num_classes):
    x = base_model.output
#     x = Flatten()(x)
    x = GlobalAveragePooling2D()(x)
    x = Dense(1024, activation='relu')(x)
    x = Dropout(0.5)(x)
    predictions = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=base_model.input, outputs=predictions)
    return model

# Add new top layers
# model_resnet50 = add_new_top_layer(base_model_resnet50, num_classes)
model_mobilenet = add_new_top_layer(base_model_mobilenet, n_unique_identities)
# print(model_mobilenet.summary())

# # model = Sequential()
# # model.add(Conv2D(96, 11, strides=4, padding='same', input_shape=(227,227,3)))
# # model.add(Lambda(tf.nn.local_response_normalization))
# # model.add(Activation('relu'))
# # model.add(MaxPooling2D(pool_size=3, strides=2))

# # model.add(Conv2D(256, 5, strides=4, padding='same'))
# # # model.add(Lambda(tf.nn.local_response_normalization))
# # model.add(Activation('relu'))
# # model.add(MaxPooling2D(pool_size=3, strides=2))
# # # Add more layers as per the AlexNet architecture...
# # model.add(Flatten())
# # model.add(Dense(neurons, activation='relu'))
# # model.add(Dense(neurons, activation='relu'))
# # model.add(Dense(19, activation='softmax'))  # num_classes should be the number of classes in dataset

# # # Compile the model
# # model.compile(optimizer='adam', loss=losses.categorical_crossentropy, metrics=['accuracy'])
# # rescale = tf.keras.layers.Rescaling(1./127.5, offset=-1)
# # base_model = tf.keras.applications.MobileNetV2(input_shape=(160,160,3),
# #                                                include_top=False,
# #                                                weights='imagenet')
# # base_model.trainable = False
# # # global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
# # prediction_layer = tf.keras.layers.Dense(19, activation='softmax')

# # inputs = tf.keras.Input(shape=(160, 160, 3))
# # # x = data_augmentation(inputs)
# # x = rescale(inputs)
# # x = base_model(x, training=False)
# # # x = global_average_layer(x)
# # x = tf.keras.layers.Flatten()(x)
# # x = tf.keras.layers.Dropout(0.5)(x)
# # outputs = prediction_layer(x)
# # model = tf.keras.Model(inputs, outputs)

base_learning_rate = 0.00005
model_mobilenet.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=base_learning_rate),
              loss=tf.keras.losses.categorical_crossentropy,
              metrics=['accuracy'])

In [None]:
# # Train the model
model_mobilenet.fit(train_data, epochs=25, validation_data=val_data, callbacks=keras.callbacks.EarlyStopping(patience=5))

# Validate for test set

In [None]:
# Initialize the label encoder
# le = LabelEncoder()

# Fit the label encoder and transform the labels
# Y_int_test = le.fit_transform(y_test)
# Y_int_test = le.inverse_transform(y_test)
Y_int_test = le.transform(y_test)
Y_one_hot_test = to_categorical(Y_int_test, num_classes=n_unique_identities)

In [None]:
from sklearn.metrics import classification_report
import numpy as np
from keras.models import load_model
import os

test_data = tf.data.Dataset.from_tensor_slices((cropped_face_x_test,))
test_data = test_data.map(lambda x: (normalize_image(x))).batch(32)

# models = {}
# for model_file in os.listdir("/kaggle/input/test-models/model"):
#     if model_file.endswith(".h5"):
#         models[model_file] = load_model("/kaggle/input/test-models/model/" + model_file)

# for model in models:
print(f"Model: {model_mobilenet}")
val_predictions = model_mobilenet.predict(test_data)

# Convert predictions classes to one hot vectors 
val_predictions_classes = np.argmax(val_predictions, axis = 1) 
# Convert validation observations to one hot vectors
val_true = np.argmax(Y_one_hot_test, axis = 1)

# Generate the classification report
report = classification_report(val_true, val_predictions_classes)
print(report)


In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

c_mat = confusion_matrix(val_true, val_predictions_classes)
disp = ConfusionMatrixDisplay(c_mat)
disp.plot()

plt.show()


In [None]:
model_mobilenet.save('/kaggle/working/base_model-mobilenet-62I-10E-00005LR-bitcrushed-35-40.h5')

In [None]:
!ls /kaggle/working

In [None]:
# Print the class names
print("Class names:", le.classes_)


In [None]:

# print("Shape of labels:", Y_int.shape)


In [None]:
# # Convert lists to Numpy arrays
# final_X = np.array(cropped_face_X)
# final_x_test = np.array(cropped_face_x_test)

# print("Shape of training data:", final_X.shape)
# print("Shape of test data:", final_x_test.shape)


In [None]:
# for i in range(n_unique_identities):
#     print(f"{i}: {le.inverse_transform(i)}")
print(le.inverse_transform([i for i in range(n_unique_identities)]))

In [None]:
!pip show keras

In [None]:
!mv /kaggle/working/Dataset/Raw/data.npz /kaggle/working/data.npz

In [None]:
!tar -zcvf export.tar.gz data.npz base_model-mobilenet-62I-10E-00005LR-bitcrushed-35-40.h5

In [None]:
!rm -rf /kaggle/working/Dataset base_model-mobilenet-62I-10E-00005LR-bitcrushed-35-40.h5