# INTRODUCTION

Competition: https://codalab.lisn.upsaclay.fr/competitions/16970#learn_the_details-overview

To promote and advance the use of synthetic data for face recognition, we organize the second edition of the Face Recognition Challenge in the Era of Synthetic Data (FRCSyn). This challenge intends to explore the application of synthetic data to the field of face recognition in order to find solutions to the current limitations in the technology, for example, in terms of privacy concerns associated with real data, bias in demographic groups (e.g., ethnicity and gender), and lack of performance in challenging conditions such as large age gaps between enrolment and testing, pose variations, occlusions, etc.

This challenge intends to provide an in-depth analysis of the following research questions:

What are the limits of face recognition technology trained only with synthetic data?
Can the use of synthetic data be beneficial to reduce the current limitations in face recognition technology?
This is a novel and very important research line nowadays due to the recent discontinuation of face recognition datasets due to privacy concerns. Furthermore, state-of-the-art face recognition technology has several limitations in terms of bias in demographic groups (e.g., ethnicity and gender), and lack of performance in challenging conditions such as large age gaps between enrolment and testing, pose variations, occlusions, etc.

Here, we adress **Task 2: synthetic data for overall performance improvement** (e.g., according to age, pose, expression, occlusion, demographic groups, etc).

In [1]:
!pip install "keras<3.0.0" "tensorflow<2.16" "tf-models-official<2.16" mediapipe-model-maker
!pip install numpy==1.23.5
!pip install tensorflow-addons

Collecting keras<3.0.0
  Downloading keras-2.15.0-py3-none-any.whl.metadata (2.4 kB)
Collecting tensorflow<2.16
  Downloading tensorflow-2.15.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.2 kB)
Collecting tf-models-official<2.16
  Downloading tf_models_official-2.15.0-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting mediapipe-model-maker
  Downloading mediapipe_model_maker-0.2.1.4-py3-none-any.whl.metadata (1.7 kB)
Collecting wrapt<1.15,>=1.11.0 (from tensorflow<2.16)
  Downloading wrapt-1.14.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Collecting tensorboard<2.16,>=2.15 (from tensorflow<2.16)
  Downloading tensorboard-2.15.2-py3-none-any.whl.metadata (1.7 kB)
Collecting gin-config (from tf-models-official<2.16)
  Downloading gin_config-0.5.0-py3-none-any.whl.metadata (2.9 kB)
Collecting pycocotools (from tf-models-official<2.16)
  Downloading pycocotools-2.0.10-cp310-cp3

In [2]:
# workaround compatible version
from scipy.spatial.qhull import QhullError
from scipy import spatial
spatial.QhullError = QhullError

  from scipy.spatial.qhull import QhullError


In [3]:
import glob
import math
import os
import pickle
from collections import defaultdict
from itertools import combinations
from typing import Optional, Tuple, Callable

import imgaug as ia
import imgaug.augmenters as iaa
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
import tensorflow_addons as tfa
from joblib import Parallel, delayed
from PIL import Image
from sklearn.manifold import TSNE
from sklearn.metrics import accuracy_score
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier

# Corrección de compatibilidad para np.bool
np.bool = np.bool_

2025-07-28 10:00:13.236327: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-07-28 10:00:13.236396: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-07-28 10:00:13.238134: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered

TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://githu

# 0. Params

In [4]:
PATH_SYNC: str = '/kaggle/input/faces-webface/faces_webface'
MAX_SUBJECTS: int = len(os.listdir(PATH_SYNC))
MAX_IMAGES_PER_SUBJECT: int = 50 # Determines the maximum number of unique subjects in the dataset by counting
INPUT_SHAPE: Tuple[int, int, int] = (112, 112, 3) # width, height, and RGB channels
PATH_MASKED_IDX_TRAIN_TEST: str = "/kaggle/working/01_idx_train_test.pkl" # precomputed training and testing indices

# 1. Create model

## MobileFaceNet

### Model explanation

MobileFaceNet is a lightweight and efficient deep learning model designed specifically for face recognition tasks, particularly in resource-constrained environments such as mobile devices or edge devices. It aims to provide high accuracy while keeping the model size and computational requirements low.

- **Architecture**:

MobileFaceNet is based on the MobileNetV2 architecture, which utilizes depthwise separable convolutions and inverted residuals to reduce the number of parameters and computational cost.
It consists of several layers of depthwise separable convolutions followed by global average pooling and fully connected layers

- **Feature Extraction**:

MobileFaceNet extracts discriminative features from face images using its convolutional layers.
The depthwise separable convolutions help capture spatial information efficiently while reducing the number of parameters.
Global average pooling is applied to aggregate spatial information and generate a compact feature representation.
Embedding Generation:

The output of the convolutional layers is passed through fully connected layers to generate a fixed-size embedding vector.
This embedding vector encodes essential facial characteristics in a compact and discriminative manner.

- **Loss Function**:

MobileFaceNet typically uses a softmax-based loss function such as ArcFace or CosFace during training.
These loss functions aim to maximize inter-class variations and minimize intra-class variations in the embedding space, leading to better face recognition performance.

- **Model Optimization**:

MobileFaceNet is designed to be efficient both in terms of model size and computational complexity.
Techniques such as depthwise separable convolutions and model pruning are employed to reduce the number of parameters and inference latency

- **Deployment**:

Due to its lightweight nature, MobileFaceNet is suitable for deployment on mobile devices, edge devices, and other resource-constrained environments.
It can be integrated into face recognition applications for tasks such as face authentication, access control, and identity verification.

### Implementatition

**Convolution Blocks** (*conv_block* and *separable_conv_block*): Defines reusable convolutional blocks with batch normalization and Swish activation. These blocks use standard and separable convolutions, which reduce model parameters and improve efficiency.

In [5]:
def conv_block(inputs, filters, kernel_size, strides):
    x = tf.keras.layers.Conv2D(filters, kernel_size, padding="valid", strides=strides,
                               kernel_regularizer=tf.keras.regularizers.l2(1e-4)) (inputs)
    x = tf.keras.layers.BatchNormalization(axis=-1) (x)
    x = tf.keras.layers.Activation("swish") (x)
    return x

In [6]:
def separable_conv_block(inputs, filters, kernel_size, strides):
    x = tf.keras.layers.SeparableConv2D(filters=64, kernel_size=3, strides=(1, 1), padding="same",
                                        kernel_regularizer=tf.keras.regularizers.l2(1e-4)) (inputs)
    x = tf.keras.layers.BatchNormalization(axis=-1) (x)
    x = tf.keras.layers.Activation("swish") (x)
    return x

**Bottleneck Layer** (*bottleneck*): Implements the bottleneck structure from MobileNetV2 with an expansion-convolution-depthwise sequence. If r is set to True, it also applies a residual connection, enhancing feature reuse.

In [7]:
def bottleneck(inputs, filters, kernel, t, s, r=False):
    tchannel = tf.keras.backend.int_shape(inputs)[-1] * t
    x1 = conv_block(inputs, tchannel, (1, 1), (1, 1))
    x1 = tf.keras.layers.DepthwiseConv2D(kernel_size=kernel, strides=s, padding="same", depth_multiplier=1,
                                         kernel_regularizer=tf.keras.regularizers.l2(1e-4)) (x1)
    x1 = tf.keras.layers.BatchNormalization(axis=-1) (x1)
    x1 = tf.keras.layers.Activation("swish") (x1)
    x2 = tf.keras.layers.Conv2D(filters, kernel_size=1, strides=1, padding="same",
                                kernel_regularizer=tf.keras.regularizers.l2(1e-4)) (x1)
    x2 = tf.keras.layers.BatchNormalization(axis=-1) (x2)
    if r:
        x2 = tf.keras.layers.add([x2, inputs])
    return x2

**Inverted Residual Block** (*inverted_residual_block*): A loop of bottleneck layers with variable depth, strides, and residual connections for flexible feature extraction.

In [8]:
def inverted_residual_block(inputs, filters, kernel, t, strides, n):
    x = bottleneck(inputs, filters, kernel, t, strides)
    for i in range(1, n):
        x = bottleneck(x, filters, kernel, t, 1, True)
    return x

**Linear Global Depthwise Convolution Block** (*linear_GD_conv_block*): Applies depthwise convolution with batch normalization, used later in the network to capture spatially pooled features.

In [9]:
def linear_GD_conv_block(inputs, kernel_size, strides):
    
    x = tf.keras.layers.DepthwiseConv2D(kernel_size=kernel_size, strides=strides, padding="valid", depth_multiplier=1,
                                        kernel_regularizer=tf.keras.regularizers.l2(1e-4)) (inputs)
    x = tf.keras.layers.BatchNormalization(axis=-1) (x)
    return x

**MobileFaceNet Model Definition** (*mobile_face_net*): Combines all the defined layers into the final MobileFaceNet architecture. Layers stack according to a depthwise convolutional design, ending with a compact output of 128 channels, ideal for generating a compact face embedding.

In [10]:
def mobile_face_net(inputs):
    x = conv_block(inputs, 64, 3, 2)
    x = separable_conv_block(x, 64, 3, 1)
    x = inverted_residual_block(x, 64, 3, t=2, strides=2, n=5)
    x = inverted_residual_block(x, 128, 3, t=4, strides=2, n=1)
    x = inverted_residual_block(x, 128, 3, t=2, strides=1, n=6)
    x = inverted_residual_block(x, 128, 3, t=4, strides=2, n=1)
    x = inverted_residual_block(x, 128, 3, t=2, strides=1, n=2)
    x = conv_block(x, 512, 1, 1)
    x = linear_GD_conv_block(x, 7, 1)
    x = conv_block(x, 128, 1, 1)
    return x

## ArcFace

The ArcFace model is a custom layer implemented in TensorFlow used in classification problems to enhance the separability of class features. It's based on the concept of the ArcFace loss function in feature space. The ArcFace model enhances class discrimination by adjusting the angles between features and class weight vectors, which can result in improved classification performance in multi-class classification problems.

In [11]:
class ArcFace(tf.keras.layers.Layer):
    def __init__(self, n_classes=10, s=64.0, m=0.5, regularizer=None, **kwargs):
        super(ArcFace, self).__init__(**kwargs)
        self.n_classes = n_classes
        self.s = s
        self.m = m
        self.regularizer = tf.keras.regularizers.get(regularizer)

    def build(self, input_shape):
        super(ArcFace, self).build(input_shape[0])
        self.W = self.add_weight(name='W',
                                shape=(input_shape[0][-1], self.n_classes),
                                initializer='glorot_uniform',
                                trainable=True,
                                regularizer=self.regularizer)

    def call(self, inputs):
        x, y = inputs
        c = tf.keras.backend.shape(x)[-1]
        x = tf.nn.l2_normalize(x, axis=1)
        W = tf.nn.l2_normalize(self.W, axis=0)
        logits = x @ W
        theta = tf.acos(tf.keras.backend.clip(logits, -1.0 
                                              + tf.keras.backend.epsilon(), 
                                              1.0 - tf.keras.backend.epsilon()))
        target_logits = tf.cos(theta + self.m)
        logits = logits * (1 - y) + target_logits * y
        logits *= self.s
        out = tf.nn.softmax(logits)
        return out

    def compute_output_shape(self, input_shape):
        return (None, self.n_classes)

    def get_config(self):
        config = super(ArcFace, self).get_config()
        config.update({
            'n_classes': self.n_classes,
            's': self.s,
            'm': self.m,
            'regularizer': tf.keras.regularizers.serialize(self.regularizer)
        })
        return config

# 2. Train model

This section gathers all image file paths, splits them into training and testing sets, and saves these indices for consistency. With this dataset preparation complete, the model is now ready for training, allowing us to use the predefined training and testing splits to evaluate model performance reliably

In [12]:
file_images = np.asarray(glob.glob(PATH_SYNC +  "*/**/*.png"))

idx_train, idx_test = train_test_split(range(file_images.shape[0]), test_size=0.3)

with open(PATH_MASKED_IDX_TRAIN_TEST, "wb") as f:
    pickle.dump([idx_train, idx_test], f)

In [13]:
with open(PATH_MASKED_IDX_TRAIN_TEST, "rb") as f:
    idx_train, idx_test = pickle.load(f)

The ArcFaceDataGenerator class is a custom data generator for TensorFlow, designed to load and preprocess face image data in batches for model training. Key functionalities include:

- Initialization: Configures batch size, shuffling, and optional data augmentation. Augmentations include grayscale conversion, salt-and-pepper noise, solarization, Gaussian blur, gamma contrast, and color quantization.

- Batch Loading: The __getitem__ method retrieves a batch of image files, loads each image, and converts it to an array format. It also assigns one-hot encoded labels based on the image path.

- Data Augmentation: The __augment method applies augmentation if enabled, using a random combination of transformations for each batch to improve model generalization.

- Epoch Handling: on_epoch_end shuffles the data indices at each epoch if shuffle is set to True.

In [14]:
class ArcFaceDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, 
                 file_images: np.ndarray,
                 batch_size: Optional[int] = 8,
                 shuffle: Optional[bool] = True,
                 augment: Optional[bool] = True):
        self.file_images = file_images
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.indexes = np.arange(self.file_images.shape[0])
        self.augment = augment
        self.seq = iaa.SomeOf((1),
                              [iaa.Identity(),
                               iaa.Grayscale(alpha=1.),
                               iaa.SaltAndPepper(p=0.1),
                               iaa.Solarize(p=0.8),
                               iaa.GaussianBlur(sigma=(0.25, 0.5)),
                               iaa.GammaContrast(gamma=(0.5, 1.5)),
                               iaa.UniformColorQuantizationToNBits(nb_bits=(3, 8)),
                              ])
        
    def __len__(self):
        return int(np.ceil(len(self.indexes) / self.batch_size))
    
    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indexes)
    
    def __augment(self, images: np.ndarray):
        images = list((images).astype(np.uint8))
        images_aug = self.seq(images=images)
        return np.asarray(images_aug, dtype=np.float32)
    
    def __getitem__(self, idx: int):
        file_images = self.file_images[self.indexes[idx * self.batch_size : (idx + 1) * self.batch_size]]
        X = []
        y = np.zeros((len(file_images), MAX_SUBJECTS), dtype=np.float32)
        for i, file_image in enumerate(file_images):
            X_i = np.asarray(Image.open(file_image), dtype=np.float32)
            X += [X_i]
            y[i, int(file_image.split('/')[-2])] = 1.
        X = np.stack(X)
        if self.augment:
            X = self.__augment(X)
        
        return [X / 255., y], y

In [15]:
train_data_generator = ArcFaceDataGenerator(file_images=file_images[idx_train],
                                            batch_size=64,
                                            shuffle=True,
                                            augment=True)
test_data_generator = ArcFaceDataGenerator(file_images=file_images[idx_test],
                                           batch_size=64,
                                           shuffle=True,
                                           augment=False)

This code snippet defines a part of a neural network architecture that consists of an input layer, a MobileFaceNet model, a flattening layer, and a fully connected layer. This architecture is often used in face recognition or feature extraction tasks, where MobileFaceNet is utilized to extract facial features, and the subsequent layers perform further processing or classification based on those features.

- Input Layer: Accepts images with a shape defined by INPUT_SHAPE.
- MobileFaceNet Backbone: Processes the input to extract compact facial features.
- Flattening Layer: Converts the extracted features into a one-dimensional array.
- Fully Connected Layer: Outputs a 512-dimensional feature vector, suitable for further classification or embedding in face recognition.

In [16]:
'''
inputs = tf.keras.Input(shape=INPUT_SHAPE, dtype=tf.float32)
x = mobile_face_net(inputs)
x = tf.keras.layers.Flatten() (x)
outputs = tf.keras.layers.Dense(512, 
                          kernel_initializer='he_normal',
                          kernel_regularizer=tf.keras.regularizers.l2(1e-4)) (x)
backbone_model = tf.keras.models.Model(inputs=inputs, outputs=outputs)
#backbone_model.load_weights("./03_selfsupervised_backbone_model_v0.1.h5")
backbone_model.summary()
'''

'\ninputs = tf.keras.Input(shape=INPUT_SHAPE, dtype=tf.float32)\nx = mobile_face_net(inputs)\nx = tf.keras.layers.Flatten() (x)\noutputs = tf.keras.layers.Dense(512, \n                          kernel_initializer=\'he_normal\',\n                          kernel_regularizer=tf.keras.regularizers.l2(1e-4)) (x)\nbackbone_model = tf.keras.models.Model(inputs=inputs, outputs=outputs)\n#backbone_model.load_weights("./03_selfsupervised_backbone_model_v0.1.h5")\nbackbone_model.summary()\n'

This code loads a pretrained model from a saved file and is likely included to initialize the model with weights from a previous training session, which can save training time and improve performance

In [17]:
backbone_model = tf.keras.models.load_model("/kaggle/input/arcface/tensorflow2/arcface_20epochs/1/04_model_arcface_v0.3_20.h5")
# backbone_model.summary()

This code sets up a neural network for face recognition using the ArcFace layer for enhanced classification.

- **Inputs**:
    - Image Input: The primary input layer inputs takes images with dimensions defined by INPUT_SHAPE.
    - Label Input: An additional input, labels, takes a one-hot encoded label vector representing the subject identity, with a length of MAX_SUBJECTS (total number of classes)
    
- **Feature Extraction**:

    - Backbone Model: The pretrained backbone_model processes the inputs to produce a feature embedding vector
    
- **ArcFace Layer**:

    - The ArcFace layer is applied to the embedding and labels, enhancing inter-class separation with adjustable parameters s=16. (scaling factor) and m=0.3 (angular margin). This layer produces the final classification output, helping the model differentiate identities effectively.

- **Model Assembly**:

    - Model Definition: Combines both inputs (inputs and labels) to generate the final output with ArcFace-enhanced embeddings, creating a complete model ready for training

In [18]:
'''inputs = tf.keras.Input(shape=INPUT_SHAPE, dtype=tf.float32)
labels = tf.keras.layers.Input(shape=(MAX_SUBJECTS, ), dtype=tf.float32)
x = backbone_model(inputs)
outputs = ArcFace(MAX_SUBJECTS, 
                  regularizer=tf.keras.regularizers.l2(1e-4),
                  s=16., m=0.3) ([x, labels])
model = tf.keras.models.Model(inputs=[inputs, labels], 
                              outputs=outputs)
model.summary()'''

'inputs = tf.keras.Input(shape=INPUT_SHAPE, dtype=tf.float32)\nlabels = tf.keras.layers.Input(shape=(MAX_SUBJECTS, ), dtype=tf.float32)\nx = backbone_model(inputs)\noutputs = ArcFace(MAX_SUBJECTS, \n                  regularizer=tf.keras.regularizers.l2(1e-4),\n                  s=16., m=0.3) ([x, labels])\nmodel = tf.keras.models.Model(inputs=[inputs, labels], \n                              outputs=outputs)\nmodel.summary()'

This code compiles the model for training by configuring the optimizer, loss function, and evaluation metrics. It uses the Stochastic Gradient Descent (SGD) optimizer with a learning rate of 0.1 and a momentum of 0.5 to enhance convergence speed and stability. The loss function is set to categorical crossentropy, which is suitable for multi-class classification tasks, measuring the dissimilarity between predicted and true class probabilities. Additionally, accuracy is included as a metric to evaluate the model's performance during training and validation.

In [19]:
'''model.compile(optimizer=tf.keras.optimizers.SGD(lr=1e-1, momentum=0.5),
              loss="categorical_crossentropy",
              metrics=["accuracy"])'''

'model.compile(optimizer=tf.keras.optimizers.SGD(lr=1e-1, momentum=0.5),\n              loss="categorical_crossentropy",\n              metrics=["accuracy"])'

This code defines a CosineAnnealingScheduler class, a custom Keras callback designed to adjust the learning rate during training using a cosine annealing strategy

The CosineAnnealingScheduler class inherits from tf.keras.callbacks.Callback and initializes parameters for learning rate scheduling, including hold (the number of epochs to maintain a constant learning rate), T_max (the number of epochs for a full cycle of learning rate adjustment), eta_max (the maximum learning rate), eta_min (the minimum learning rate), and verbose for logging. During each epoch, specifically in the on_epoch_begin method, the learning rate is updated according to a cosine function once the hold period is exceeded, smoothly varying the learning rate between eta_max and eta_min. This approach can enhance training efficiency and convergence. The on_epoch_end method also logs the current learning rate after each epoch, allowing for monitoring during training.

In [20]:
class CosineAnnealingScheduler(tf.keras.callbacks.Callback):
    def __init__(self, hold, T_max, eta_max, eta_min=0, verbose=0):
        super(CosineAnnealingScheduler, self).__init__()
        self.hold = hold
        self.T_max = T_max
        self.eta_max = eta_max
        self.eta_min = eta_min
        self.verbose = verbose

    def on_epoch_begin(self, epoch, logs=None):
        if not hasattr(self.model.optimizer, 'lr'):
            raise ValueError('Optimizer must have a "lr" attribute.')
        if epoch >= self.hold:
            lr = self.eta_min + (self.eta_max - self.eta_min) * (1 + math.cos(math.pi * epoch / self.T_max)) / 2
            tf.keras.backend.set_value(self.model.optimizer.lr, lr)
            if self.verbose > 0:
                print('\nEpoch %05d: CosineAnnealingScheduler setting learning '
                      'rate to %s.' % (epoch + 1, lr))

    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        logs['lr'] = tf.keras.backend.get_value(self.model.optimizer.lr)

This code snippet sets up a list of callbacks for model training, enhancing the training process and allowing for better management of the training state

In [21]:
'''checkpoint_path = 'cp-{epoch:04d}.h5'

callbacks = [
    CosineAnnealingScheduler(hold=10, T_max=5, eta_max=1e-1, eta_min=1e-3, verbose=1),
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                       save_weights_only=False,
                                       verbose=1)
]'''

"checkpoint_path = 'cp-{epoch:04d}.h5'\n\ncallbacks = [\n    CosineAnnealingScheduler(hold=10, T_max=5, eta_max=1e-1, eta_min=1e-3, verbose=1),\n    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,\n                                       save_weights_only=False,\n                                       verbose=1)\n]"

In [22]:
'''model.fit(train_data_generator,
          validation_data=test_data_generator,
          epochs=15,
          verbose=1,
          callbacks=callbacks)'''

'model.fit(train_data_generator,\n          validation_data=test_data_generator,\n          epochs=15,\n          verbose=1,\n          callbacks=callbacks)'

In [23]:
'''backbone_model.save("04_model_arcface_v0.3_15_03.h5")'''

'backbone_model.save("04_model_arcface_v0.3_15_03.h5")'

In [24]:
'''backbone_model.save_weights("04_model_arcface_weights_v0.3_15_03.h5")'''

'backbone_model.save_weights("04_model_arcface_weights_v0.3_15_03.h5")'

## Aproach 1: Locality Sensitive Hashing

This overall approach enables efficient neighbor search in high-dimensional spaces by leveraging the LSH technique, making it suitable for applications in face recognition and other similarity-based tasks.

### Create dataset: distance of closest neighborhood

This code snippet focuses on creating a dataset for finding the distance to the closest neighbors using Locality Sensitive Hashing (LSH) and cosine distance metrics.

The code begins by loading a preprocessed dataset from a pickle file named "05_embeds.pkl". This dataset likely contains embeddings or features of the data points.
It also loads the train and test indices from a previously saved file, "PATH_MASKED_IDX_TRAIN_TEST", which will be used for training and testing splits.

In [25]:
'''with open("/kaggle/input/frcsyn-models/05_embeds.pkl", "rb") as f:
    data = pickle.load(f)'''

'with open("/kaggle/input/frcsyn-models/05_embeds.pkl", "rb") as f:\n    data = pickle.load(f)'

In [26]:
'''with open(PATH_MASKED_IDX_TRAIN_TEST, "rb") as f:
    idx_train, idx_test = pickle.load(f)'''

'with open(PATH_MASKED_IDX_TRAIN_TEST, "rb") as f:\n    idx_train, idx_test = pickle.load(f)'

**Generating Random Vectors**:

The generate_random_vectors function creates random vectors that are used in the LSH algorithm to hash input vectors into binary codes, aiding in efficient similarity searches.

**Training the LSH Model**:

The train_lsh function trains a Locality Sensitive Hashing model using the TF-IDF vectors of the data. It generates random vectors, computes binary indices for the TF-IDF data, and organizes these indices into a hash table. This table maps binary codes to the original data indices for quick access.

**Searching Nearby Bins**:

The search_nearby_bins function allows searching for nearby bins in the LSH hash table based on a query vector. It generates variations of the query's binary representation by flipping a specified number of bits (search_radius) and retrieves candidate indices from the hash table.

**Retrieving Nearest Neighbors**:

The get_nearest_neighbors function finds the nearest neighbors of a query vector. It uses the trained LSH model to identify candidate indices and calculates the cosine distances between the query vector and the candidates. The results are returned in a DataFrame, sorted by distance, containing the indices and their corresponding distances.

In [27]:
# This function generates random vectors used in Locality Sensitive Hashing (LSH)

def generate_random_vectors(dim, n_vectors):
    return np.random.randn(dim, n_vectors)

# This function trains a Locality Sensitive Hashing (LSH) model using the TF-IDF vectors of the data
def train_lsh(X_tfidf, n_vectors, seed=None):    
    if seed is not None:
        np.random.seed(seed)
    dim = X_tfidf.shape[1]
    random_vectors = generate_random_vectors(dim, n_vectors)
    bin_indices_bits = X_tfidf.dot(random_vectors) >= 0
    powers_of_two = 1 << np.arange(n_vectors - 1, -1, step=-1)
    bin_indices = bin_indices_bits.dot(powers_of_two)
    table = defaultdict(list)
    for idx, bin_index in enumerate(bin_indices):
        table[bin_index].append(idx)
    model = {'table': table,
             'random_vectors': random_vectors,
             'bin_indices': bin_indices,
             'bin_indices_bits': bin_indices_bits}
    return model

# This function searches nearby bins in the LSH hash table based on the query vector
def search_nearby_bins(query_bin_bits, table, search_radius=3, candidate_set=None):
    if candidate_set is None:
        candidate_set = set()
    n_vectors = query_bin_bits.shape[0]
    powers_of_two = 1 << np.arange(n_vectors - 1, -1, step=-1)
    for different_bits in combinations(range(n_vectors), search_radius):
        index = list(different_bits)
        alternate_bits = query_bin_bits.copy()
        alternate_bits[index] = np.logical_not(alternate_bits[index])
        nearby_bin = alternate_bits.dot(powers_of_two)
        if nearby_bin in table:
            candidate_set.update(table[nearby_bin])
    return candidate_set

# This function retrieves the nearest neighbors of a query vector using the trained LSH model
def get_nearest_neighbors(X_tfidf, query_vector, model, max_search_radius=3):
    table = model['table']
    random_vectors = model['random_vectors']
    bin_index_bits = np.ravel(query_vector.dot(random_vectors) >= 0)
    candidate_set = set()
    for search_radius in range(max_search_radius + 1):
        candidate_set = search_nearby_bins(bin_index_bits, table, search_radius, candidate_set)
    candidate_list = list(candidate_set)
    candidates = X_tfidf[candidate_list]
    distance = pairwise_distances(candidates, query_vector, metric='cosine').flatten()
    distance_col = 'distance'
    nearest_neighbors = pd.DataFrame({
        'id': candidate_list, distance_col: distance
    }).sort_values(distance_col).reset_index(drop=True)
    return nearest_neighbors

This code snippet demonstrates the process of using the trained Locality Sensitive Hashing (LSH) model to find the nearest neighbors of a specific embedding from the dataset, along with an implementation of the K-Nearest Neighbors (KNN) classifier for further classification tasks

In [28]:
# model = train_lsh(np.stack(data["embeds"].values), 16, seed=143)

In [29]:
pd.set_option("display.max_rows", 100)

In [30]:
'''item_id = idx_test[0]
query_vector = data["embeds"].values[item_id].reshape(1, -1)
nearest_neighbors = get_nearest_neighbors(np.stack(data["embeds"].values), 
                                          query_vector, 
                                          model, 
                                          max_search_radius=3)
print('query: ', data[["user", "image"]].values[item_id])
pd.DataFrame([(data["user"].values[nearest_neighbors["id"].values[i]],
               data["image"].values[nearest_neighbors["id"].values[i]],
               nearest_neighbors["distance"].values[i]) for i in range(100)],
             columns=["user", "image", "distance"])'''

'item_id = idx_test[0]\nquery_vector = data["embeds"].values[item_id].reshape(1, -1)\nnearest_neighbors = get_nearest_neighbors(np.stack(data["embeds"].values), \n                                          query_vector, \n                                          model, \n                                          max_search_radius=3)\nprint(\'query: \', data[["user", "image"]].values[item_id])\npd.DataFrame([(data["user"].values[nearest_neighbors["id"].values[i]],\n               data["image"].values[nearest_neighbors["id"].values[i]],\n               nearest_neighbors["distance"].values[i]) for i in range(100)],\n             columns=["user", "image", "distance"])'

In [31]:
'''
neigh = KNeighborsClassifier(n_neighbors=10,
                             metric="cosine",
                             n_jobs=-1)
neigh.fit(np.stack(data["embeds"].values[idx_train]), data["user"].values[idx_train])
'''

'\nneigh = KNeighborsClassifier(n_neighbors=10,\n                             metric="cosine",\n                             n_jobs=-1)\nneigh.fit(np.stack(data["embeds"].values[idx_train]), data["user"].values[idx_train])\n'

In [32]:
'''y_pred = neigh.predict(np.stack(data["embeds"].values[idx_test]))'''

'y_pred = neigh.predict(np.stack(data["embeds"].values[idx_test]))'

In [33]:
'''accuracy_score(data["user"].values[idx_test], y_pred)
# 0.9586143790849673'''

'accuracy_score(data["user"].values[idx_test], y_pred)\n# 0.9586143790849673'

In [34]:
'''
def search(query):
    query_user, query_image, query_vector = query
    query_vector = query_vector.reshape(1, -1)
    nearest_neighbors = get_nearest_neighbors(np.stack(data["embeds"].values), 
                                              query_vector, 
                                              model, 
                                              max_search_radius=3)
    result = pd.DataFrame([(data["user"].values[nearest_neighbors["id"].values[j]],
                            data["image"].values[nearest_neighbors["id"].values[j]],
                            nearest_neighbors["distance"].values[j]) for j in range(1, min(100, len(nearest_neighbors)))],
                          columns=["user", "image", "distance"])
    result["query_user"] = query_user
    result["query_image"] = query_image
    return result
'''

'\ndef search(query):\n    query_user, query_image, query_vector = query\n    query_vector = query_vector.reshape(1, -1)\n    nearest_neighbors = get_nearest_neighbors(np.stack(data["embeds"].values), \n                                              query_vector, \n                                              model, \n                                              max_search_radius=3)\n    result = pd.DataFrame([(data["user"].values[nearest_neighbors["id"].values[j]],\n                            data["image"].values[nearest_neighbors["id"].values[j]],\n                            nearest_neighbors["distance"].values[j]) for j in range(1, min(100, len(nearest_neighbors)))],\n                          columns=["user", "image", "distance"])\n    result["query_user"] = query_user\n    result["query_image"] = query_image\n    return result\n'

In [35]:
'''
results_train = pd.concat([search(data.values[idx]) 
                           for idx in np.random.choice(idx_train, size=10_000, replace=False)], 
                           ignore_index=True)
'''

'\nresults_train = pd.concat([search(data.values[idx]) \n                           for idx in np.random.choice(idx_train, size=10_000, replace=False)], \n                           ignore_index=True)\n'

In [36]:
'''
results_test = pd.concat([search(data.values[idx]) 
                          for idx in np.random.choice(idx_test, size=10_000, replace=False)], 
                          ignore_index=True)
'''

'\nresults_test = pd.concat([search(data.values[idx]) \n                          for idx in np.random.choice(idx_test, size=10_000, replace=False)], \n                          ignore_index=True)\n'

In [37]:
'''with open("06_datasets_v0.1.pkl", "wb") as f:
    pickle.dump([results_train, results_test], f)'''

'with open("06_datasets_v0.1.pkl", "wb") as f:\n    pickle.dump([results_train, results_test], f)'

## Aproach 2: Identification model

Siamese neural network architecture, where two identical backbone models share weights to extract features from two input images. These features are then concatenated and passed through dense layers to make a binary classification decision.

In [38]:
'''with open("/kaggle/input/06-datasets-real/06_datasets_v0.1.pkl", "rb") as f:
    results_train, results_test = pickle.load(f)'''

'with open("/kaggle/input/06-datasets-real/06_datasets_v0.1.pkl", "rb") as f:\n    results_train, results_test = pickle.load(f)'

This generator class is designed to generate batches of data for training a model for identification tasks, such as determining whether two images belong to the same user. It incorporates options for shuffling data, applying augmentation techniques, and handling data loading in batches.

In [39]:
class IdentificationDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, 
                 comparison: np.ndarray,
                 batch_size: Optional[int] = 8,
                 shuffle: Optional[bool] = True,
                 augment: Optional[bool] = True):
        self.comparison = comparison
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.indexes = np.arange(self.comparison.shape[0])
        self.augment = augment
        self.seq = iaa.SomeOf((1),
                              [iaa.Identity(),
                               iaa.Grayscale(alpha=1.),
                               iaa.SaltAndPepper(p=0.1),
                               iaa.Solarize(p=0.8),
                               iaa.GaussianBlur(sigma=(0.25, 0.5)),
                               iaa.GammaContrast(gamma=(0.5, 1.5)),
                               iaa.UniformColorQuantizationToNBits(nb_bits=(3, 8))
                              ])
        self.on_epoch_end()
        
    def __len__(self):
        return int(np.ceil(len(self.indexes) / self.batch_size))
    
    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indexes)
    
    def __augment(self, images: np.ndarray):
        images = list((images).astype(np.uint8))
        images_aug = self.seq(images=images)
        return np.asarray(images_aug, dtype=np.float32)
    
    def __getitem__(self, idx: int):
        comparison = self.comparison[self.indexes[idx * self.batch_size : (idx + 1) * self.batch_size]]
        X_1 = []
        X_2 = []
        y = np.zeros(len(comparison), dtype=np.float32)
        for i, [user_1, file_1, _, user_2, file_2] in enumerate(comparison):
            X_1 += [np.asarray(Image.open(PATH_SYNC + "/" + user_1 + "/" + file_1), dtype=np.float32)]
            X_2 += [np.asarray(Image.open(PATH_SYNC + "/" + user_2 + "/" + file_2), dtype=np.float32)]
            y[i] = int(user_1 == user_2)
        X_1 = np.stack(X_1)
        X_2 = np.stack(X_2)
        if self.augment:
            X_1 = self.__augment(X_1)
            X_2 = self.__augment(X_2)
        
        return [X_1 / 255., X_2 / 255.], y

In [40]:
'''train_data_generator = IdentificationDataGenerator(comparison=results_train,
                                                   batch_size=32,
                                                   shuffle=True,
                                                   augment=True)
test_data_generator = IdentificationDataGenerator(comparison=results_test,
                                                  batch_size=32,
                                                  shuffle=True,
                                                  augment=False)'''

'train_data_generator = IdentificationDataGenerator(comparison=results_train,\n                                                   batch_size=32,\n                                                   shuffle=True,\n                                                   augment=True)\ntest_data_generator = IdentificationDataGenerator(comparison=results_test,\n                                                  batch_size=32,\n                                                  shuffle=True,\n                                                  augment=False)'

In [41]:
'''backbone_model = tf.keras.models.load_model("/kaggle/input/arcface/tensorflow2/arcface_30epochs/1/04_model_arcface_v0.3_30.h5")
backbone_model.trainable = False
backbone_model.summary()'''

'backbone_model = tf.keras.models.load_model("/kaggle/input/arcface/tensorflow2/arcface_30epochs/1/04_model_arcface_v0.3_30.h5")\nbackbone_model.trainable = False\nbackbone_model.summary()'

In [42]:
'''inputs_1 = tf.keras.Input(shape=INPUT_SHAPE, dtype=tf.float32)
inputs_2 = tf.keras.Input(shape=INPUT_SHAPE, dtype=tf.float32)
x_1 = backbone_model(inputs_1)
x_2 = backbone_model(inputs_2)
x = tf.keras.layers.concatenate([x_1, x_2])
x = tf.keras.layers.Dense(128, activation="gelu") (x)
outputs = tf.keras.layers.Dense(1, activation="sigmoid") (x)
model = tf.keras.models.Model(inputs=[inputs_1, inputs_2], outputs=outputs)
model.summary()'''

'inputs_1 = tf.keras.Input(shape=INPUT_SHAPE, dtype=tf.float32)\ninputs_2 = tf.keras.Input(shape=INPUT_SHAPE, dtype=tf.float32)\nx_1 = backbone_model(inputs_1)\nx_2 = backbone_model(inputs_2)\nx = tf.keras.layers.concatenate([x_1, x_2])\nx = tf.keras.layers.Dense(128, activation="gelu") (x)\noutputs = tf.keras.layers.Dense(1, activation="sigmoid") (x)\nmodel = tf.keras.models.Model(inputs=[inputs_1, inputs_2], outputs=outputs)\nmodel.summary()'

In [43]:
#model = tf.keras.models.load_model("/kaggle/input/similarity_model/tensorflow2/similarity_model_3_epochs/1/07_similarity_model.h5")

In [44]:
'''model.compile(optimizer=tf.keras.optimizers.Adam(1e-3),
              loss="binary_crossentropy",
              metrics=["accuracy"])'''

'model.compile(optimizer=tf.keras.optimizers.Adam(1e-3),\n              loss="binary_crossentropy",\n              metrics=["accuracy"])'

In [45]:
'''checkpoint_path = 'cp-{epoch:04d}.h5'
callbacks = [
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                       save_weights_only=False,
                                       verbose=1)]'''

"checkpoint_path = 'cp-{epoch:04d}.h5'\ncallbacks = [\n    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,\n                                       save_weights_only=False,\n                                       verbose=1)]"

In [46]:
'''model.fit(train_data_generator,
          validation_data=test_data_generator,
          epochs=3,
          verbose=1,
          callbacks=callbacks)'''

'model.fit(train_data_generator,\n          validation_data=test_data_generator,\n          epochs=3,\n          verbose=1,\n          callbacks=callbacks)'

In [47]:
'''model.save("07_similarity_model.h5")'''

'model.save("07_similarity_model.h5")'

# FINAL PREDICT ON DATASET

## Create Dataset

In [48]:
'''
from numpy import random
import os 
import pandas as pd

image_paths = []

# Iterar sobre las subcarpetas numeradas
for folder_num in range(1, MAX_SUBJECTS):
    folder_path = os.path.join(PATH_SYNC, str(folder_num))
    if os.path.exists(folder_path):
        images = [os.path.join(folder_path, img) for img in os.listdir(folder_path) if img.endswith('.png')]
        image_paths.extend(images)

from numpy import random
def get_pairs(images, image_paths):
    if len(images) < 4:
        remplacement = True
    else:
        remplacement = False
    positive_pairs = random.choice(images, 4, remplacement)
    #user_id = os.path.basename(os.path.normpath(os.path.dirname(positive_pairs[0])))
    negative_pairs = random.choice([img_path for img_path in image_paths if img_path not in images], 3, False)
    pairs_pos = [(positive_pairs[0], positive_pairs[1:], 1)]
    pairs_neg = [(positive_pairs[0], negative_pairs, 0)]
    return pairs_pos, pairs_neg

pairs_list = []

for folder_num in range(1, MAX_SUBJECTS):
    folder_path = os.path.join(PATH_SYNC, str(folder_num))
    if os.path.exists(folder_path):
        images = [os.path.join(folder_path, img) for img in os.listdir(folder_path) if img.endswith('.png')]
        pairs_pos, pairs_neg = get_pairs(images, image_paths)
        pairs_list.extend(pairs_pos + pairs_neg)

df = pd.DataFrame([(item[0], img, item[2]) for item in pairs_list for img in item[1]], columns=['user_path', 'pair_path', 'positive_pair'])
df.to_csv('faces-webface_pairs.csv', index=False)
'''

"\nfrom numpy import random\nimport os \nimport pandas as pd\n\nimage_paths = []\n\n# Iterar sobre las subcarpetas numeradas\nfor folder_num in range(1, MAX_SUBJECTS):\n    folder_path = os.path.join(PATH_SYNC, str(folder_num))\n    if os.path.exists(folder_path):\n        images = [os.path.join(folder_path, img) for img in os.listdir(folder_path) if img.endswith('.png')]\n        image_paths.extend(images)\n\nfrom numpy import random\ndef get_pairs(images, image_paths):\n    if len(images) < 4:\n        remplacement = True\n    else:\n        remplacement = False\n    positive_pairs = random.choice(images, 4, remplacement)\n    #user_id = os.path.basename(os.path.normpath(os.path.dirname(positive_pairs[0])))\n    negative_pairs = random.choice([img_path for img_path in image_paths if img_path not in images], 3, False)\n    pairs_pos = [(positive_pairs[0], positive_pairs[1:], 1)]\n    pairs_neg = [(positive_pairs[0], negative_pairs, 0)]\n    return pairs_pos, pairs_neg\n\npairs_list = 

# Predict

In [49]:
from sklearn import metrics

In [50]:
df = pd.read_csv("/kaggle/input/faces-webface-pairs/faces-webface_pairs.csv")
results = np.array(df.values.tolist())

In [51]:
class IdentificationDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, 
                 comparison: np.ndarray,
                 batch_size: Optional[int] = 8):
        self.comparison = comparison
        self.batch_size = batch_size
        self.indexes = np.arange(self.comparison.shape[0])
        self.on_epoch_end()
        
    def __len__(self):
        return int(np.ceil(len(self.indexes) / self.batch_size))
    
    
    
    def __getitem__(self, idx: int):
        comparison = self.comparison[self.indexes[idx * self.batch_size : (idx + 1) * self.batch_size]]
        X_1 = []
        X_2 = []
        y = np.zeros(len(comparison), dtype=np.float32)
        for i, [path_user_1, path_user_2, pair_positive] in enumerate(comparison):
            X_1 += [np.asarray(Image.open(path_user_1), dtype=np.float32)]
            X_2 += [np.asarray(Image.open(path_user_2), dtype=np.float32)]
            y[i] = pair_positive
        X_1 = np.stack(X_1)
        X_2 = np.stack(X_2)
        
        return [X_1 / 255., X_2 / 255.], y

In [52]:
data_generator = IdentificationDataGenerator(comparison=results,
                                             batch_size=32)

In [53]:
all_labels = []
for idx in range(len(data_generator)):
    _, labels = data_generator[idx]
    all_labels.extend(labels.tolist())
all_labels = np.array(all_labels)

## Predict Similarity model

In [54]:
similarity_model = tf.keras.models.load_model("/kaggle/input/similarity_model/tensorflow2/similarity_model_3_epochs/1/07_similarity_model.h5")

In [55]:
predictions = similarity_model.predict(data_generator)



In [56]:
predictions[predictions>=0.5]=1

In [57]:
predictions[predictions<0.5]=0

In [58]:
metrics.accuracy_score(all_labels, predictions)

0.6290953236842936

## Predict over embeddings

In [59]:
arcface_model = tf.keras.models.load_model("/kaggle/input/arcface/tensorflow2/arcface_30epochs/1/04_model_arcface_v0.3_30.h5")

In [60]:
class PairwiseLayer(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, control):
        distance = tf.expand_dims(tf.keras.losses.cosine_similarity(anchor, control) * -1., axis=1)
        return tf.divide(tf.subtract(distance, tf.reduce_min(distance)),
                         tf.subtract(tf.reduce_max(distance), tf.reduce_min(distance)))

In [61]:
inputs_1 = tf.keras.Input(shape=INPUT_SHAPE, dtype=tf.float32)
inputs_2 = tf.keras.Input(shape=INPUT_SHAPE, dtype=tf.float32)
x_1 = arcface_model(inputs_1)
x_2 = arcface_model(inputs_2)
outputs = PairwiseLayer() (x_1, x_2)
model = tf.keras.models.Model(inputs=[inputs_1, inputs_2], outputs=outputs)
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 112, 112, 3)]        0         []                            
                                                                                                  
 input_2 (InputLayer)        [(None, 112, 112, 3)]        0         []                            
                                                                                                  
 model (Functional)          (None, 512)                  1095104   ['input_1[0][0]',             
                                                                     'input_2[0][0]']             
                                                                                                  
 pairwise_layer (PairwiseLa  (None, 1)                    0         ['model[0][0]',           

In [62]:
predictions = model.predict(data_generator)



In [63]:
predictions_copy = predictions.copy()

In [64]:
predictions_copy[predictions_copy>=0.5]=1

In [65]:
predictions_copy[predictions_copy<0.5]=0

In [66]:
metrics.accuracy_score(all_labels, predictions_copy)

0.9014284362879577

In [67]:
predictions_copy_2 = predictions.copy()

In [68]:
predictions_copy_2[predictions_copy_2>=0.42]=1

In [69]:
predictions_copy_2[predictions_copy_2<0.42]=0

In [70]:
metrics.accuracy_score(all_labels, predictions_copy_2)

0.9124491533440545