# Imports

In [22]:
#! pip install -r requirements.txt


In [23]:
import os
import cv2
import random
import numpy as np
import uuid
import matplotlib.pyplot as plt
import datetime
import tarfile
import builtins

In [24]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Layer, Dense, Conv2D, MaxPooling2D, Flatten, Lambda, Dropout
from tensorflow.keras.metrics import Precision, Recall
import tensorflow.keras.backend as keras
from keras.optimizers import RMSprop

gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)
print(f'Your gpu: {gpus}')



Your gpu: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


## Flags

In [25]:
collect_images_flag = False
train_flag = False  
save_flag = True

# Directory Configs

In [26]:
project_path = os.path.curdir

data_dir = os.path.join(project_path, 'data')
os.makedirs(data_dir, exist_ok=True)

dataset_dir = os.path.join(project_path, 'dataset')
os.makedirs(dataset_dir, exist_ok=True)

checkpoint_dir = os.path.join(project_path,'checkpoints')
os.makedirs(checkpoint_dir, exist_ok=True)

save_model_dir = os.path.join(project_path, 'save_model')
os.makedirs(save_model_dir, exist_ok=True)

db_dir = os.path.join(project_path, 'db')
os.makedirs(project_path, exist_ok=True)

lfw_dir = os.path.join(dataset_dir, 'lfw')


# Dataset download

In [27]:
if not os.path.exists(os.path.join(dataset_dir,'lfw.tgz')):
    !wget http://vis-www.cs.umass.edu/lfw/lfw.tgz -P {dataset_dir}
else:
    print('The file lfw.tgz is already downloaded.')


"wget" no se reconoce como un comando interno o externo,
programa o archivo por lotes ejecutable.


# Checking and Extracting 

In [28]:
# Verificar si el directorio "data" ya contiene archivos
if os.path.exists(lfw_dir) and len(os.listdir(lfw_dir)) > 0:
    print(f'The "lfw_dir" directory already contains {len(os.listdir(lfw_dir))} files:')
else:
    # Si el directorio "lfw_dir" está vacío, descomprimir el archivo lfw.tgz en una carpeta "lwf"
    lfw_tgz_path = os.path.join(dataset_dir, 'lfw.tgz')
    
    if os.path.exists(lfw_tgz_path):
    # Descomprimir el archivo lfw.tgz en el directorio lwf dentro de "dataset"
        with tarfile.open(lfw_tgz_path, 'r:gz') as tar:
            tar.extractall(path=dataset_dir)
            print('File extracted in the "lwf" directory inside "dataset".')
    else:
        print(f"File {lfw_tgz_path} not found.")



File .\dataset\lfw.tgz not found.


In [29]:
import os
import shutil

# Verificar si el directorio de destino está vacío
if not os.listdir(data_dir) and collect_images_flag:
    # Iterar sobre las carpetas en el directorio de origen
    for folder_name in os.listdir(lfw_dir):
        folder_path = os.path.join(lfw_dir, folder_name)
        
        # Verificar si es una carpeta
        if os.path.isdir(folder_path):
            # Contar las imágenes dentro de la carpeta
            num_images = len([f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))])
            
            # Si la carpeta tiene más de 5 imágenes, copiarla al directorio de destino
            if num_images > 5:
                dest_folder_path = os.path.join(data_dir, folder_name)
                shutil.copytree(folder_path, dest_folder_path)
                # print(f"Copiada la carpeta: {folder_name} con {num_images} imágenes.")
                
else:
    print(f"El directorio 'data' no está vacío {len(os.listdir(data_dir))} files:').")


El directorio 'data' no está vacío 0 files:').


# Collect Images 


In [30]:
import cv2
import os
import uuid

# Cargar el clasificador Haar para la detección de rostros
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

def collect_images(images_path=None):
    cap = cv2.VideoCapture(0)  # Abrir la cámara

    # Establecer una resolución alta para la cámara (ajustar según el dispositivo)
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)  # Ancho de resolución
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)  # Altura de resolución

    count_images = 0  # Contador de imágenes guardadas
    saved_images = []  # Lista para almacenar las imágenes guardadas

    # Definir padding (margen) alrededor del rostro
    padding = 25  # Margen adicional para evitar cortes bruscos en el rostro

    # Bucle para capturar imágenes
    while cap.isOpened():
        ret, frame = cap.read()  # Leer cada frame de la cámara
        if not ret:
            print("No se pudo acceder a la cámara.")
            break

        # Convertir el frame a escala de grises para mejorar la detección
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # Detectar rostros en el frame
        faces = face_cascade.detectMultiScale(gray, 1.3, 5)  # Ajustar parámetros para detección

        # Mostrar rectángulos alrededor de los rostros detectados en el frame
        display_frame = frame.copy()  # Copiar el frame original solo para mostrar
        for (x, y, w, h) in faces:
            # Añadir padding alrededor del rostro
            x1 = max(0, x - padding)
            y1 = max(0, y - padding)
            x2 = min(frame.shape[1], x + w + padding)
            y2 = min(frame.shape[0], y + h + padding)
            
            # Dibujar un rectángulo con padding alrededor del rostro
            cv2.rectangle(display_frame, (x1, y1), (x2, y2), (255, 0, 0), 2)

        # Mostrar la imagen con la detección de rostros (solo para visualización)
        cv2.imshow('Image Collection - Press "p" to capture', display_frame)
        key = cv2.waitKey(1) & 0xFF

        # Guardar el rostro si se presiona 'p'
        if key == ord('p'):
            if len(faces) > 0:  # Verificar si se detectó al menos un rostro
                for (x, y, w, h) in faces:
                    # Recortar el rostro con padding
                    x1 = max(0, x - padding)
                    y1 = max(0, y - padding)
                    x2 = min(frame.shape[1], x + w + padding)
                    y2 = min(frame.shape[0], y + h + padding)

                    # Recortar la imagen del rostro
                    face_img = frame[y1:y2, x1:x2]
                    
                    # Guardar la imagen solo si se proporciona una ruta
                    if images_path is not None:
                        imgname = os.path.join(images_path, f'{uuid.uuid1()}.jpg')  # Generar un nombre único
                        cv2.imwrite(imgname, face_img, [cv2.IMWRITE_JPEG_QUALITY, 95])  # Guardar con calidad alta
                        count_images += 1
                        saved_images.append(face_img)  # Añadir la imagen guardada a la lista
                        print(f'Imagen guardada: {imgname}')
                    else:
                        # Si no se proporciona una ruta, solo añadir la imagen a la lista
                        saved_images.append(face_img)  # Añadir la imagen a la lista sin guardarla
                        count_images += 1
                        print('Imagen capturada (no guardada).')

            else:
                print("No se detectó ningún rostro para guardar.")

        # Salir si se presiona 'q'
        elif key == ord('q'):
            break

    # Liberar la cámara y cerrar todas las ventanas
    cap.release()  
    cv2.destroyAllWindows()  
    
    # Devolver tanto el contador de imágenes como la lista de imágenes guardadas
    return count_images, saved_images

In [31]:
import os
import ipywidgets as widgets
from IPython.display import display, clear_output

def setup_image_collection(collect_images_flag):
    # Verificar si el flag está en False, si es así, no hacer nada
    if not collect_images_flag:
        print("Collecting images is not enabled.")
        return  # Terminar la función sin hacer nada

    # Function that is triggered when the "Yes" or "No" button is pressed
    def on_decision_button_clicked(button):
        clear_output()  # Clear the current output (hide previous buttons)
        if button.description == "Yes":
            # If "Yes" is chosen, show the text box for folder name and the "Accept" button
            display(folder_name_label, folder_name_widget, accept_button)
        else:
            # If "No" is chosen, end the flow
            print("No folder will be created.")

    # Function that is triggered when the "Accept" button is pressed
    def on_accept_button_clicked(button):
        folder_selected = data_dir  # Here you can select a default path
        folder_name = folder_name_widget.value
        
        if folder_name:
            new_folder_path = os.path.join(folder_selected, folder_name)
            os.makedirs(new_folder_path, exist_ok=True)
            print(f"Folder created: {new_folder_path}")
            if collect_images_flag:
                collect_images(new_folder_path)  # Call your processing function
        else:
            print("No folder name was entered.")

    # Widgets
    decision_label = widgets.Label("Do you want to create a new folder for the images?")
    yes_button = widgets.Button(description="Yes", button_style='success')
    no_button = widgets.Button(description="No", button_style='danger')

    folder_name_label = widgets.Label("Enter the name of the new folder:")
    folder_name_widget = widgets.Text(placeholder="Enter the folder name")
    accept_button = widgets.Button(description="Accept", button_style='info')

    # Link buttons to their functions
    yes_button.on_click(on_decision_button_clicked)
    no_button.on_click(on_decision_button_clicked)
    accept_button.on_click(on_accept_button_clicked)

    # Display the "Yes" and "No" buttons since collect_images_flag is True
    display(decision_label, yes_button, no_button)

# Ejemplo de cómo llamar la función
#collect_images_flag = True 
setup_image_collection(collect_images_flag)



Collecting images is not enabled.


# Data Augmentation

In [32]:
def data_aug(img, num_variations=1):
    data = []
    for i in range(num_variations):
        img_aug = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1, 2))
        img_aug = tf.image.stateless_random_contrast(img_aug, lower=0.6, upper=1, seed=(1, 3))
        img_aug = tf.image.stateless_random_flip_left_right(img_aug, seed=(np.random.randint(100), np.random.randint(100)))
        img_aug = tf.image.stateless_random_jpeg_quality(img_aug, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100), np.random.randint(100)))
        img_aug = tf.image.stateless_random_saturation(img_aug, lower=0.9, upper=1, seed=(np.random.randint(100), np.random.randint(100)))
        data.append(img_aug)
    
    return data

def log_processed_person(log_file, person_name):
    try:
        # Verificar si el archivo existe antes de abrirlo
        if not os.path.exists(log_file):
            with open(log_file, 'w') as f:
                pass  # Crea el archivo si no existe
        
        with builtins.open(log_file, 'a') as f:
            f.write(person_name + '\n')
    except OSError as e:
        print(f"Error al escribir en el archivo de log: {e}")

def augment_in_directory(directory, num_variations):
    if os.path.exists(directory):
        for file_name in os.listdir(directory):
            img_path = os.path.join(directory, file_name)

            # Verifica que sea un archivo de imagen
            if os.path.isfile(img_path) and file_name.lower().endswith(('.jpg', '.png', '.jpeg')):
                # Lee la imagen
                img = cv2.imread(img_path)

                if img is None:
                    print(f"Error al cargar la imagen: {img_path}")
                    continue

                # Aplica aumentación
                augmented_images = data_aug(tf.convert_to_tensor(img), num_variations)

                # Guarda las imágenes aumentadas
                for image in augmented_images:
                    # Convertir tensor a numpy
                    image_np = image.numpy()

                    # Asegúrate de que la imagen esté en formato uint8
                    if image_np.dtype != np.uint8:
                        image_np = (image_np * 255).astype(np.uint8)  # Convertir a uint8

                    # Genera un nombre único para la imagen nueva
                    new_file_name = '{}.jpg'.format(uuid.uuid1())

                    # Guarda la imagen en formato jpg
                    cv2.imwrite(os.path.join(directory, new_file_name), image_np)

def augment_images_in_directories(images_path, log_file='augmentation_log.txt', num_variations=1):
    # Leer los nombres de las personas ya procesadas del archivo de log
    processed_people = set()
    try:
        # Crear el archivo de log si no existe
        if not os.path.exists(log_file):
            with builtins.open(log_file, 'w') as f:
                pass  # Crea el archivo si no existe
        
        with builtins.open(log_file, 'r') as f:
            processed_people = {line.strip() for line in f.readlines()}
    except OSError as e:
        print(f"Error al leer el archivo de log: {e}")
        # Si no se puede leer, comenzamos con un conjunto vacío

    for person_name in os.listdir(images_path):
        person_dir = os.path.join(images_path, person_name)

        # Verifica si es un directorio y si ya se procesó esta persona
        if os.path.isdir(person_dir) and person_name not in processed_people:
            print(f"Aplicando aumentación a: {person_name}")

            # Aumenta las imágenes en el directorio
            augment_in_directory(person_dir, num_variations)

            # Registrar el nombre de la persona en el archivo de log
            log_processed_person(log_file, person_name)

        else:
            print(f"Ya se aplicó aumentación a: {person_name}, omitiendo...")

    print("Aumentación completada.")


In [33]:
augment_images_in_directories(data_dir)

Aumentación completada.


# Preparing Datasets for Training

In [34]:
import os
import tensorflow as tf
import random

person_folders = [os.path.join(data_dir, folder) for folder in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, folder))]
persont_count = 200#len(person_folders)
pair_p_person = 22
test_data_index = round(persont_count*.7)

# Preprocesar las imágenes
def preprocess_image(image_path):
    byte_img = tf.io.read_file(image_path)
    img = tf.io.decode_jpeg(byte_img, channels=3)  # Asegura que la imagen tenga 3 canales (RGB)
    img = tf.image.resize(img, (105, 105))
    img = img / 255.0  # Normaliza los valores de los píxeles a [0, 1]
    return img

def process_pairs(pairs):
    images_x = []
    images_y = []
    labels = []
    
    for (x, y), label in pairs:
        images_x.append(x)
        images_y.append(y)
        labels.append(label)
    
    return images_x, images_y, labels

def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    value_uint8 = tf.cast(value * 255.0, tf.uint8)  # Escalar de float32 a uint8
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[tf.io.encode_jpeg(value_uint8).numpy()]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def create_example(image1, image2, label):
    feature = {
        'image1': _bytes_feature(image1),
        'image2': _bytes_feature(image2),
        'label': _int64_feature(label)
    }
    return tf.train.Example(features=tf.train.Features(feature=feature))



def create_or_load_dataset(data_dir, save_path):
    

    # Definimos los datasets
    positive_pairs = []
    negative_pairs = []

    test_positives = []
    test_negatives = []

   
    # Generar pares positivos y negativos
    for i in range(persont_count):
        # Listar las imágenes en la carpeta de la persona i
        images_i = tf.data.Dataset.list_files(os.path.join(person_folders[i], '*.jpg')).as_numpy_iterator()
        images_i = list(images_i)

        if(i >= test_data_index):
            for j in range(int(pair_p_person/2)):
                #print(j , len(images_i))

                test_positives.append((images_i[j], images_i[j+1], 1))  # Etiqueta 1 para pares positivos


                # Crear pares negativos con otras personas
                index_person_k = random.randint(1, len(person_folders)-1)
                person_k = tf.data.Dataset.list_files(os.path.join(person_folders[(i+index_person_k)%len(person_folders)], '*.jpg')).as_numpy_iterator()
                person_k = list(person_k)
                image_k = random.randint(0, len(person_k)-1)
                test_negatives.append((images_i[j], person_k[image_k], 0)) # Etiqueta 0 para pares negativos
            continue
        


        for j in range(int(pair_p_person/2)):
            # Crear pares positivos
            #print(j , len(images_i))
            positive_pairs.append((images_i[j], images_i[j+1], 1))  # Etiqueta 1 para pares positivos


            # Crear pares negativos con otras personas
            index_person_k = random.randint(1, len(person_folders)-1)
            person_k = tf.data.Dataset.list_files(os.path.join(person_folders[(i+index_person_k)%len(person_folders)], '*.jpg')).as_numpy_iterator()
            person_k = list(person_k)
            image_k = random.randint(0, len(person_k)-1)
            negative_pairs.append((images_i[j], person_k[image_k], 0)) # Etiqueta 0 para pares negativos



    # Preprocesar y almacenar las imágenes como pares
    processed_positive_pairs = [((preprocess_image(x), preprocess_image(y)), label) for x, y, label in positive_pairs]
    processed_negative_pairs = [((preprocess_image(x), preprocess_image(y)), label) for x, y, label in negative_pairs]

    processed_positive_pairs_test = [((preprocess_image(x), preprocess_image(y)), label) for x, y, label in test_positives] 
    processed_negative_pairs_test = [((preprocess_image(x), preprocess_image(y)), label) for x, y, label in test_negatives]  



    dataset = processed_negative_pairs + processed_positive_pairs
    random.shuffle(dataset)  # Mezclar el dataset antes de guardarlo
    test_set = processed_positive_pairs_test + processed_negative_pairs_test
    random.shuffle(test_set)

    dataset = dataset + test_set


    images_x, images_y, labels = process_pairs(dataset)

    # Crear dataset de TensorFlow desde slices de imágenes y etiquetas
    return tf.data.Dataset.from_tensor_slices(((images_x, images_y), labels))



save_path = 'dataset.tfrecord'
if collect_images_flag:
    dataset = create_or_load_dataset(data_dir,save_path)


In [35]:
# Obtener la cardinalidad del dataset
if  collect_images_flag:
    dataset_size = 0 # dataset.cardinality().numpy()
    for _ in dataset:
        dataset_size += 1
    print(dataset_size)

    # Verificar si el dataset es finito o infinito
    if dataset_size == tf.data.experimental.INFINITE_CARDINALITY:
        print("El dataset es infinito.")
    elif dataset_size == tf.data.experimental.UNKNOWN_CARDINALITY:
        print("El tamaño del dataset es desconocido.")
    else:
        # Toma el 70% del conjunto de datos para entrenamiento
        train_size = round(test_data_index*pair_p_person)
        train_data = dataset.take(train_size)

        print(f"Tamaño del conjunto de entrenamiento: {train_size}")


### Test dataset

In [36]:
if collect_images_flag:
    test_data = dataset.skip(train_size)
    test_data = test_data.take(dataset_size - train_size)


# Embedding Model Definition

### Input Layer

- **Input(shape=(105, 105, 3))**: Esta es la capa de entrada del modelo. Recibe imágenes de tamaño **105x105** con **3 canales** (RGB).

---

### Primera Capa Convolucional + MaxPooling

- **Conv2D(64, (10, 10), activation='relu')**: Esta es una **capa convolucional** que aplica **64 filtros** a la imagen de entrada, cada uno de tamaño **10x10**. La función de activación utilizada es **ReLU**, que introduce no linealidad en la red.
  - **Entrada**: Imagen de tamaño **105x105x3** (alto x ancho x canales).
  - **Salida**: Un conjunto de características (**feature maps**) de tamaño reducido pero con más **canales (profundidad)**, lo que permite que la red aprenda características más complejas.

- **MaxPooling2D(pool_size=(2, 2), padding='same')**: Esta es una **capa de reducción de dimensión espacial** que actúa sobre las características extraídas por las capas convolucionales anteriores. Su función principal es **reducir el tamaño** de las representaciones espaciales (ancho y alto) para disminuir la cantidad de parámetros, lo que ayuda a evitar el sobreajuste y mejora la eficiencia computacional.
  - **Función**: Toma el valor máximo en regiones de **2x2 píxeles** de las características (feature maps) obtenidas de la convolución. Esto permite reducir la resolución espacial mientras se conservan las características más importantes.
  - **pool_size=(2, 2)**: La ventana es de **2x2**, reduciendo el tamaño de la imagen en un factor de 2.
  - **padding='same'**: Mantiene las dimensiones similares a las de la entrada añadiendo relleno cuando es necesario.
  - **Propósito**: Ayuda a la red a enfocarse en **patrones más globales** en las capas más profundas.

---

### Segunda Capa Convolucional + MaxPooling

- **Conv2D(128, (7, 7), activation='relu')**: Aplica **128 filtros** de tamaño **7x7** a las características que vienen de la primera capa de pooling. La activación es nuevamente **ReLU**.
  
- **MaxPooling2D(pool_size=(2, 2), padding='same')**: Reduce el tamaño espacial aplicando pooling de **2x2**.

---

### Tercera Capa Convolucional + MaxPooling

- **Conv2D(128, (4, 4), activation='relu')**: Aplica **128 filtros** de tamaño **4x4** sobre las características.
  
- **MaxPooling2D(pool_size=(2, 2), padding='same')**: Reduce nuevamente el tamaño espacial aplicando pooling de **2x2**.

---

### Cuarta Capa Convolucional + Aplanamiento (Flatten)

- **Conv2D(256, (4, 4), activation='relu')**: Aplica **256 filtros** de tamaño **4x4**. En las capas más profundas, se extraen características más complejas como bordes, texturas y patrones.
  
- **Flatten()**: Aplana el tensor tridimensional (resultado de la convolución) en un **vector unidimensional** para ser procesado por una capa densa completamente conectada.

---

### Capa Densa Completamente Conectada

- **Dense(4096)**: Es una **capa completamente conectada** con **4096 unidades**.
  
  - Esta capa actúa como la **representación final** de la imagen de entrada, produciendo un **vector de tamaño 4096**, que será el **embedding** que representa la imagen.


In [37]:
# Crea un modelo de red convolucional para obtener una representación o embedding de una imagen de entrada de tamaño 105x105 con 3 canales de color (RGB)

def create_base_network():
    '''Base network to be shared (eq. to feature extraction).
    '''
    input = Input(shape= (105,105,3))
    x = Flatten()(input)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.1)(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.1)(x)
    x = Dense(128, activation='relu')(x)
    x = Lambda(lambda  x: keras.l2_normalize(x,axis=1))(x)
    x = Lambda(lambda  x: keras.l2_normalize(x,axis=1))(x)
    return Model(input, x)


def make_embedding(): 
    inp = Input(shape=(105,105,3), name='input_image')
    
    # First block
    c1 = Conv2D(64, (10,10), activation='relu', input_shape = (105, 105, 3))(inp)
    m1 = MaxPooling2D()(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D()(c2)
    
    # Third block 
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D()(c3)
    
    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    m4  = MaxPooling2D()(c4)
    f1 = Flatten()(m4)
    d1 = Dense(4096)(f1)
    x = Lambda(lambda  x: keras.l2_normalize(x,axis=1))(d1)
    
    
    return Model(inputs=[inp], outputs=x, name='embedding')

# Se define una red neuronal convolucional (CNN) para extraer embeddings (representaciones de características) de imágenes.


## Distancia L1


In [38]:
# Se definen las funciones para calcular la distancia L1 (la suma de las diferencias absolutas) entre dos vectores.

    
def euclidean_distance(vects):
    x, y = vects
    sum_square = keras.sum(keras.square(x - y), axis=1, keepdims=True)
    return keras.sqrt(keras.maximum(sum_square, keras.epsilon()))


def eucl_dist_output_shape(shapes):
    shape1, shape2 = shapes
    return (shape1[0], 1)


# Siamese Model

Aquí tienes el contenido en formato Markdown, estructurado de manera clara y ordenada:

```markdown
# Definición del modelo siamés

## Función `make_siamese_model()`

La función `make_siamese_model()` define y devuelve un modelo siamés, que se utiliza para comparar dos imágenes y determinar si son similares.

### Explicación del Código

1. **Definición de las Entradas**:
   ```python
   input_sample_image = Input(shape=(105, 105, 3), name='sample_image')
   input_validation_image = Input(shape=(105, 105, 3), name='validation_image')
   ```
   - Se crean tensores de entrada para las imágenes, cada una con un tamaño de **105x105 píxeles** y **3 canales** (RGB).

2. **Modelo de Embeddings**:
   ```python
   embedding_model = make_embedding()
   ```
   - Se llama a la función `make_embedding()` que define el modelo de **embeddings**, responsable de extraer características importantes de las imágenes.

3. **Codificación de la Imagen de Muestra**:
   ```python
   encoded_s = embedding_model(input_sample_image)
   ```
   - La imagen de muestra se pasa por el modelo de embeddings, obteniendo un vector de características que representa dicha imagen.

4. **Codificación de la Imagen de Validación**:
   ```python
   encoded_v = embedding_model(input_validation_image)
   ```
   - La imagen de validación se codifica para obtener su correspondiente vector de características.

5. **Cálculo de la Distancia**:
   ```python
   distance = Lambda(euclidean_distance, output_shape=eucl_dist_output_shape)([encoded_s, encoded_v])
   ```
   - Se calcula la **distancia L1** entre los dos embeddings. Esta medida indica la similitud, donde valores más bajos indican mayor similitud.

6. **Definición del Modelo Completo**:
   ```python
   model = Model(inputs=[input_sample_image, input_validation_image], outputs=distance, name='siamese_network')
   ```
   - Se crea el modelo completo, especificando las entradas y la salida. Se le asigna el nombre `'siamese_network'`.

7. **Retorno del Modelo**:
   ```python
   return model
   ```
   - Finalmente, se devuelve el modelo siamés creado.

---

## Explicación de las Funciones y Activaciones Utilizadas

### 1. `Input`
- **Descripción**: Crea un tensor de entrada para el modelo.
- **Uso**: Se usa para definir las dimensiones de las imágenes de entrada.

### 2. `make_embedding()`
- **Descripción**: Define un modelo que extrae características relevantes de las imágenes.
- **Uso**: Permite obtener representaciones compactas (embeddings) que encapsulan la información más relevante de las imágenes.

### 3. `L1Dist`
- **Descripción**: Calcula la distancia L1 (también conocida como distancia Manhattan) entre dos vectores.
- **Uso**: Mide la similitud entre los embeddings generados para las dos imágenes. Una distancia más baja indica que las imágenes son más similares.

### 4. `Dense`
- **Descripción**: Crea una capa completamente conectada.
- **Uso**: Se utiliza para combinar las salidas de las capas anteriores en una salida única. En este caso, se utiliza para generar la probabilidad de que las imágenes sean de la misma clase.

### 5. Funciones de Activación `ReLU`
- **Descripción**: La función de activación ReLU (Rectified Linear Unit) devuelve 0 si la entrada es menor que 0; de lo contrario, devuelve la entrada.
- **Uso**: Se utiliza en las capas convolucionales para introducir no linealidad y permitir que la red aprenda características complejas.

---

## Resumen
El modelo siamés construido en esta función compara dos imágenes utilizando un modelo de embeddings para extraer características, calcula la distancia entre estas características y finalmente clasifica si las imágenes son similares o no utilizando una capa densa con activación sigmoide. Las activaciones ReLU se utilizan en las capas convolucionales para facilitar el aprendizaje de características no lineales.
```

Este contenido está diseñado para ser claro y fácil de leer, proporcionando una comprensión completa de la función `make_siamese_model()` y las activaciones utilizadas en el modelo.

In [39]:
def make_siamese_model(): 
    
    input_image = Input(name='input_img', shape=(105,105,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(105,105,3))
    embedding = make_embedding() #create_base_network()
    # Combine siamese distance components
    encoded_s = embedding(input_image)
    encoded_v = embedding(validation_image)
    
    distance = Lambda(euclidean_distance, output_shape=eucl_dist_output_shape)([encoded_s, encoded_v])
    
    return Model(inputs=[input_image, validation_image], outputs=distance, name='SiameseNetwork')
    

In [40]:
siamese_model = make_siamese_model()
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         10648896    ['input_img[0][0]',              
                                                                  'validation_img[0][

### ***Note :***
> ***I could have chosen a simpler approach using Keras' compile function, but I decided to implement the training process manually.
This allowed me to understand in detail each step involved in training the model. Although I haven't tested the following code,
it should work fine. Here's the streamlined approach:***

```python

siamese_model = make_siamese_model()
siamese_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[Precision(), Recall()])

history = siamese_model.fit(train_data, 
                            validation_data=test_data,
                            epochs=5,
                            batch_size=16)

plt.plot(history.history['precision'], label='Precision')
plt.plot(history.history['recall'], label='Recall')
plt.title('Precision and Recall of Siamese Model')
plt.xlabel('Epochs')
plt.ylabel('Metrics')
plt.legend()
plt.show()
```

# Funciones de Pérdida en Aprendizaje Automático

## ¿Qué son las Funciones de Pérdida?

Las funciones de pérdida son componentes esenciales en el entrenamiento de redes neuronales. Cuantifican el error entre las predicciones del modelo y las salidas reales, permitiendo al modelo optimizarse para mejorar su rendimiento.

### Propósitos de las Funciones de Pérdida

1. **Cuantificación del Error**: Mide la diferencia entre las predicciones del modelo y los valores reales.
2. **Guía de Optimización**: Ayuda a actualizar los pesos de la red durante el entrenamiento, minimizando el error.
3. **Evaluación del Modelo**: Permite comparar el rendimiento de diferentes modelos.

---

## Funciones de Pérdida Comunes en Redes Siamesas

Las redes siamesas están diseñadas para aprender representaciones y medir la similitud entre pares de entradas, por lo que las funciones de pérdida utilizadas en este contexto deben reflejar este objetivo. A continuación se presentan las más comunes:

### 1. **Contrastive Loss**

- **Uso**: Evalúa la distancia entre dos entradas. Incentiva que las muestras similares estén más cerca en el espacio de embeddings y las disímiles estén más lejos.

- **Fórmula**:
  \[
  \text{Loss} = (1 - y) \cdot \frac{1}{2} D^2 + y \cdot \frac{1}{2} \max(0, m - D)^2
  \]
  donde:
  - \( D \) es la distancia entre los embeddings,
  - \( y \) es la etiqueta (1 para similar, 0 para disímil),
  - \( m \) es el margen.

- **Funcionamiento**:
  - Si \( y = 1 \) (pares similares), la pérdida penaliza si la distancia entre los embeddings es grande.
  - Si \( y = 0 \) (pares disímiles), la pérdida penaliza si los embeddings están demasiado cerca, dentro del margen \( m \).

---

### 2. **Triplet Loss**

- **Uso**: Compara tripletas de entradas: un ancla (A), una muestra positiva (P) y una negativa (N). Busca que el ancla esté más cerca del positivo que del negativo por un margen dado.

- **Fórmula**:
  \[
  \text{Loss} = \max(0, D(a, p) - D(a, n) + \alpha)
  \]
  donde:
  - \( a \) es el ancla,
  - \( p \) es la muestra positiva,
  - \( n \) es la muestra negativa,
  - \( \alpha \) es un margen.

- **Funcionamiento**:
  - Penaliza si la distancia entre el ancla y la muestra positiva \( D(a, p) \) es mayor o igual que la distancia entre el ancla y la muestra negativa \( D(a, n) \) más un margen \( \alpha \).
  - Asegura que las muestras positivas estén más cerca del ancla que las negativas en el espacio de embeddings.

---

### 3. **Binary Crossentropy**

- **Uso**: Común en tareas de clasificación binaria. Se puede aplicar en redes siamesas para predecir si dos entradas son similares (1) o no (0).

- **Fórmula**:
  \[
  \text{Loss} = -\frac{1}{N} \sum_{i=1}^{N} [y_i \cdot \log(p_i) + (1 - y_i) \cdot \log(1 - p_i)]
  \]
  donde:
  - \( y_i \) es la etiqueta (1 o 0),
  - \( p_i \) es la probabilidad predicha por el modelo,
  - \( N \) es el número de ejemplos.

- **Funcionamiento**:
  - Mide la divergencia entre la probabilidad predicha \( p_i \) y la etiqueta real \( y_i \).
  - Penaliza las predicciones incorrectas: si el modelo asigna una probabilidad baja a una muestra etiquetada como similar (1), la pérdida será alta, y viceversa.

---

### 4. **Hinge Loss**

- **Uso**: Se utiliza en Máquinas de Vectores de Soporte (SVM), pero también es aplicable a redes siamesas. Busca que la diferencia entre las salidas de pares similares y disímiles esté por encima de un margen.

- **Fórmula**:
  \[
  \text{Loss} = \max(0, m - (y \cdot D))
  \]
  donde:
  - \( y \) es la etiqueta (1 o -1),
  - \( D \) es la distancia entre los embeddings,
  - \( m \) es un margen.

- **Funcionamiento**:
  - Penaliza si los pares disímiles están demasiado cerca (dentro del margen).
  - Similar a **Contrastive Loss**, pero no requiere etiquetas binarias (1 o 0) y es más común en clasificación marginada.

---

### 5. **Focal Loss**

- **Uso**: Ampliación de la **Binary Crossentropy** para manejar clases desbalanceadas, centrándose en los ejemplos más difíciles de clasificar.

- **Fórmula**:
  \[
  \text{Loss} = - \alpha (1 - p_t)^\gamma \log(p_t)
  \]
  donde:
  - \( p_t \) es la probabilidad predicha para la clase correcta,
  - \( \alpha \) y \( \gamma \) son hiperparámetros para ajustar el enfoque en ejemplos difíciles.

- **Funcionamiento**:
  - Otorga mayor peso a ejemplos mal clasificados, reduciendo la pérdida en ejemplos bien clasificados.
  - Útil cuando hay desbalance en las clases.

---

## Comparación de las Funciones de Pérdida

| **Función de Pérdida**   | **Ventajas**                               | **Desventajas**                              |
|--------------------------|--------------------------------------------|----------------------------------------------|
| **Contrastive Loss**      | Favorece la cercanía de muestras similares y la lejanía de las disímiles. | Requiere pares etiquetados.                  |
| **Triplet Loss**          | Fomenta un espacio de embeddings mejor estructurado. | Necesita generar tripletas, lo que puede ser complejo. |
| **Binary Crossentropy**   | Fácil de implementar, adecuada para clasificación binaria. | No captura bien la relación entre pares.     |
| **Hinge Loss**            | Adecuada para problemas marginados.         | Menos intuitiva en redes siamesas.           |
| **Focal Loss**            | Excelente para problemas de clases desbalanceadas. | Más compleja de ajustar (hiperparámetros).   |

---

## Mejor Función de Pérdida para Redes Siamesas que Comparan Rostros

Para una red siamesa que clasifica si dos rostros son el mismo, las funciones **Contrastive Loss** y **Triplet Loss** son las más recomendadas:

1. **Contrastive Loss** es ideal cuando se tienen pares de datos etiquetados y se busca que el modelo aprenda a diferenciar entre pares similares y disímiles de manera directa.
  
2. **Triplet Loss** es efectiva cuando se dispone de un gran conjunto de imágenes y es posible generar tripletas. Permite un mejor ajuste del espacio de embeddings, pero es más costosa en términos de preparación de datos.

**Conclusión**: Si tienes buenos datos emparejados, **Contrastive Loss** es la mejor opción. Si puedes generar tripletas eficientemente, **Triplet Loss** puede proporcionar mejores resultados, pero con mayor complejidad.


In [41]:
def contrastive_loss(y_true, y_pred):
    
    margin = 1
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    sqaure_pred = keras.square(y_pred)
    margin_square = keras.square(keras.maximum(margin - y_pred, 0))
    return keras.mean(y_true * sqaure_pred + (1 - y_true) * margin_square)
# Initialize Adam optimizer with a learning rate of 0.0001
adam_optimizer = tf.keras.optimizers.Adam(0.001)

checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
model_checkpoint = tf.train.Checkpoint(opt=adam_optimizer, siamese_model=siamese_model)


In [42]:

# Extraer una imagen del dataset
if collect_images_flag:
    for (image_x, image_y), label in dataset.take(1):  # Tomar un ejemplo del dataset
        # image_x es la primera imagen del par, y image_y es la segunda
        # Normalmente, las imágenes están en formato tensor, por lo que debemos convertirlas a formato numpy
        image_x = image_x.numpy()
        image_y = image_y.numpy()
        
        # Graficar la primera imagen
        plt.figure(figsize=(10,5))

        plt.subplot(1, 2, 1)
        plt.imshow((image_x*255).astype("uint8"))  # Convertir los valores a formato correcto
        plt.title(f'Imagen X - Etiqueta: {label.numpy()}')

        # Graficar la segunda imagen
        plt.subplot(1, 2, 2)
        plt.imshow((image_y*255).astype("uint8"))
        plt.title(f'Imagen Y - Etiqueta: {label.numpy()}')

        plt.show()
        print(label)

In [43]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [44]:
def compute_accuracy(y_true, y_pred):
    '''Compute classification accuracy with a fixed threshold on distances.
    '''
    pred = y_pred.ravel() < 0.5
    return np.mean(pred == y_true)


def accuracy(y_true, y_pred):
    '''Compute classification accuracy with a fixed threshold on distances.
    '''
    return keras.mean(keras.equal(y_true, keras.cast(y_pred < 0.5, y_true.dtype)))

In [45]:
model = make_siamese_model()
rms = RMSprop()
model.compile(loss=contrastive_loss, optimizer=adam_optimizer, metrics=[accuracy])
if(train_flag):
    #train_model(train_data,30)

    # Suponiendo que tienes train_data y labels como tensores
    train_data_batched = train_data.batch(16)
    test_data_batched = test_data.batch(16)
    # Luego puedes entrenar el modelo directamente con el dataset
    history = model.fit(train_data_batched, epochs=50, validation_data=test_data_batched)

    

In [46]:
if collect_images_flag:
    history_dict = history.history
    loss_values = history_dict['loss']
    val_loss_values = history_dict['val_loss']
    epochs = range(1, (len(history.history['val_accuracy']) + 1))
    plt.plot(epochs, loss_values, 'bo', label='Training loss')
    plt.plot(epochs, val_loss_values, 'b', label='Validation loss')
    plt.title('Training and validation loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

In [47]:
# ACCURACY Learning Curves
if collect_images_flag:
    history_dict = history.history
    loss_values = history_dict['accuracy']
    val_loss_values = history_dict['val_accuracy']
    epochs = range(1, (len(history.history['accuracy']) + 1))
    plt.plot(epochs, loss_values, 'bo', label='Training Acc')
    plt.plot(epochs, val_loss_values, 'b', label='Validation Acc')
    plt.title('Training and validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

In [48]:
def load_siamese_model(save_model_dir, date_str=None):
    custom_objects = {
        'L1Dist': euclidean_distance, 
        'Contrastive_loss': contrastive_loss  
    }

    if date_str:
        # Si se proporciona una fecha, intenta cargar el modelo correspondiente
        model_filename = f"siamese_model_{date_str}.h5"
        model_filepath = os.path.join(save_model_dir, model_filename)
        if os.path.exists(model_filepath):
            print(f"Cargando el modelo desde: {model_filepath}")
            return tf.keras.models.load_model(model_filepath, custom_objects=custom_objects)
        else:
            raise FileNotFoundError(f"No se encontró el modelo para la fecha proporcionada: {date_str}")
    else:
        # Si no se proporciona una fecha, carga el modelo más reciente
        model_files = [f for f in os.listdir(save_model_dir) if f.startswith("siamese_model_") and f.endswith(".h5")]
        if not model_files:
            raise FileNotFoundError("No se encontraron modelos en el directorio especificado.")
        
        # Ordenar los archivos por fecha y hora en el nombre del archivo
        model_files.sort(key=lambda x: datetime.datetime.strptime(x.split("_", 2)[2].split(".")[0], "%Y_%m_%d_%H_%M"), reverse=True)
        latest_model_filename = model_files[0]
        latest_model_filepath = os.path.join(save_model_dir, latest_model_filename)
        print(f"Cargando el modelo más reciente desde: {latest_model_filepath}")
        return tf.keras.models.load_model(latest_model_filepath) # custom_objects=custom_objects)

In [49]:
model_date = '2024_08_08_17_19'
model_filename = f"siamese_model_{model_date}.h5"
# Guardar el modelo en el subdirectorio

if save_flag:
    siamese_model.save(os.path.join(save_model_dir,model_filename))



In [50]:
# Para cargar un modelo específico por fecha
# model = load_siamese_model(save_model_path, "2024_08_01_12_00")

# Para cargar el modelo más reciente
try:
    siamese_model = load_siamese_model(save_model_dir)
except Exception as e:
    print(f'Error loading model: {e}')

Cargando el modelo más reciente desde: .\save_model\siamese_model_2024_08_08_17_19.h5
Error loading model: Exception encountered when calling layer "lambda_1" (type Lambda).

unknown opcode

Call arguments received by layer "lambda_1" (type Lambda):
  • inputs=['tf.Tensor(shape=(None, 4096), dtype=float32)', 'tf.Tensor(shape=(None, 4096), dtype=float32)']
  • mask=None
  • training=None


In [51]:
# View model summary
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         10648896    ['input_img[0][0]',              
                                                                  'validation_img[0][

In [52]:
# 70/ 70
def preprocess_image_form_img(image):
    # Si la imagen es un array de NumPy, convertirla a un tensor de TensorFlow
    if isinstance(image, np.ndarray):
        image = tf.convert_to_tensor(image)
    
    # Verificar si la imagen es un tensor (debe ser de tipo tf.Tensor)
    if not tf.is_tensor(image):
        raise ValueError(f"Se esperaba un tensor o array de imagen, pero se recibió: {type(image)}")
    
    # Asegurar que la imagen tiene 3 canales (RGB) si no los tiene
    if image.shape[-1] != 3:
        raise ValueError("La imagen debe tener 3 canales (RGB).")
    
    # Redimensionar la imagen
    img = tf.image.resize(image, (105, 105))
    
    # Normalizar los valores de los píxeles a [0, 1]
    img = img / 255.0
    
    return img



def verify(model, input_image, verific_dir, detection_threshold, verification_threshold):
    results = []
    
    # Preprocesar la imagen de entrada para que tenga el formato adecuado para el modelo
    input_img = preprocess_image_form_img(input_image)
    
    # Iterar sobre cada imagen en el directorio de verificación
    for image in os.listdir(verific_dir):
        
        validation_img = preprocess_image(os.path.join(verific_dir, image))
        
        result = 1-model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        print(result)
        results.append(result)
    
    # Threshold de detección: contar cuántas predicciones son mayores que el umbral de detección
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Threshold de verificación: calcular la proporción de predicciones positivas sobre el total de muestras positivas
    verification = detection / len(os.listdir(os.path.join(verific_dir))) 
    
    # Verificar si la proporción de predicciones positivas es mayor que el umbral de verificación
    verified = verification > verification_threshold
    
    # Devolver los resultados de las predicciones y el resultado de la verificación
    return results, verified


In [53]:
import os

def detect_and_verify_face(siamese_model, db_dir, detection_threshold=0.7, verification_threshold=0.7):
    count, input_images = collect_images()  # Assuming this function is already defined

    # Check if any image was captured
    if count > 0:
        input_image = input_images[0]  # Take the first captured image
        person_found = False  # Flag to check if a match is found
        person_results = []
        # Iterate over each person in the database directory
        for person_folder in os.listdir(db_dir):
            person_dir = os.path.join(db_dir, person_folder)

            # Ensure it's a directory
            if not os.path.isdir(person_dir):
                continue

            results, person_verified = verify(siamese_model, input_image, person_dir, detection_threshold, verification_threshold)
            person_results.append((results, person_verified))
                
        return max(person_results, key=lambda x: x[0])
    else:
        print("No image was captured.")
        return None


In [54]:
detect_and_verify_face(siamese_model,db_dir)

No image was captured.
