In [11]:
import numpy as np
import os
import random
import multiprocessing as mp
import cv2
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Conv2D, Activation, Input
from tensorflow.keras.layers import BatchNormalization, MaxPooling2D
from tensorflow.keras.layers import Lambda, Flatten, Dense
from tensorflow.keras.regularizers import l2
from tensorflow.keras import backend as K
from sklearn.utils import shuffle
import numpy.random as rng
from tqdm.notebook import tnrange

In [2]:
# Para facilitar la ejecución del codigo, se descargan directamente los conjuntos de datos y se descomprimen los .zip.

!wget https://github.com/brendenlake/omniglot/raw/master/python/images_evaluation.zip
!wget https://github.com/brendenlake/omniglot/raw/master/python/images_background.zip

!unzip -qq images_background.zip
!unzip -qq images_evaluation.zip

--2023-09-19 22:25:28--  https://github.com/brendenlake/omniglot/raw/master/python/images_evaluation.zip
Resolving github.com (github.com)... 140.82.112.3
Connecting to github.com (github.com)|140.82.112.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/brendenlake/omniglot/master/python/images_evaluation.zip [following]
--2023-09-19 22:25:28--  https://raw.githubusercontent.com/brendenlake/omniglot/master/python/images_evaluation.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.109.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 6462886 (6.2M) [application/zip]
Saving to: ‘images_evaluation.zip.1’


2023-09-19 22:25:29 (49.2 MB/s) - ‘images_evaluation.zip.1’ saved [6462886/6462886]

--2023-09-19 22:25:29--  https://github.com/brendenlake/omn

In [3]:
def lee_alfabetos(ruta_al_directorio_del_alfabeto, nombre_del_alfabeto):
    """
    Lee y devuelve todos los caracteres de un alfabeto específico que se encuentra en un directorio dado.

    Parámetros:
    - ruta_al_directorio_del_alfabeto (str): Ruta al directorio que contiene los caracteres del alfabeto.
    - nombre_del_alfabeto (str): Nombre del alfabeto a procesar.

    Retorno:
    - np.array(datax): Una lista de imágenes de los caracteres leídos.
    - np.array(datay): Una lista de etiquetas correspondientes a cada imagen.
    """

    # Listas para almacenar imágenes y sus correspondientes etiquetas
    datax = []
    datay = []

    # Listar todos los caracteres dentro del directorio del alfabeto.
    caracteres = os.listdir(ruta_al_directorio_del_alfabeto)

    for caracter in caracteres:
        # Listar todas las imágenes asociadas a un caracter específico.
        imagenes = os.listdir(ruta_al_directorio_del_alfabeto + caracter + '/')

        for im in imagenes:
            # Leer cada imagen en escala de grises.
            image = cv2.imread(ruta_al_directorio_del_alfabeto + caracter + '/' + im, cv2.IMREAD_GRAYSCALE)

            # Añadir la imagen leída y su etiqueta correspondiente a las listas.
            datax.append(image)
            datay.append(nombre_del_alfabeto + '_' + caracter)

    # Convertir listas a arrays de numpy y devolver
    return np.array(datax), np.array(datay)

def preproces_omniglot(directorio_principal):
    """
    Preprocesa y devuelve todas las imágenes y etiquetas de los alfabetos contenidos en el directorio principal.

    Parámetros:
    - directorio_principal (str): Ruta al directorio principal que contiene todos los alfabetos.

    Retorno:
    - datax: Todas las imágenes de los caracteres de todos los alfabetos.
    - datay: Las etiquetas correspondientes a cada imagen.
    """

    # Inicializar variables donde se almacenarán las imágenes y etiquetas
    datax = None
    datay = None

    # Crear un pool de procesos para paralelizar la lectura de los alfabetos.
    # Esto aprovecha todos los núcleos del CPU para leer múltiples alfabetos simultáneamente.
    pool = mp.Pool(mp.cpu_count())

    # Leer todos los alfabetos del directorio principal de forma paralela.
    # Para cada subdirectorio (alfabeto) en el directorio principal, aplica la función lee_alfabetos
    resultados = [pool.apply(lee_alfabetos,
                             args=(
                                 directorio_principal + '/' + directorio + '/', directorio,
                             )) for directorio in os.listdir(directorio_principal)]

    # Cerrar el pool de procesos una vez que todos han terminado.
    pool.close()

    # Procesar y combinar los resultados para crear una lista única de imágenes y etiquetas
    for resultado in resultados:
        if datax is None:
            datax = resultado[0]
            datay = resultado[1]
        else:
            # Añadir las imágenes y etiquetas de cada alfabeto a las listas generales.
            datax = np.vstack([datax, resultado[0]])
            datay = np.concatenate([datay, resultado[1]])

    return datax, datay



In [4]:
# Procesa las imágenes y etiquetas del directorio "images_background" y las asigna a las variables trainx y trainy.
trainx, trainy = preproces_omniglot('images_background')

# Procesa las imágenes y etiquetas del directorio "images_evaluation" y las asigna a las variables testx y testy.
testx, testy = preproces_omniglot('images_evaluation')

# Toma las primeras 3000 imágenes y etiquetas del conjunto de test para usarlas como conjunto de validación.
valx = testx[0:3000]
valy = testy[0:3000]

# Actualiza el conjunto de test eliminando las primeras 3000 imágenes y etiquetas que ya están en el conjunto de validación.
testx = testx[3000:,:,:]
testy= testy[3000:]


# Reinicializa trainy y asigna etiquetas numéricas para las imágenes.
# Cada grupo de 20 imágenes recibirá el mismo número, del 0 al 963.
trainy = []
for i in range(964):
    trainy.extend([i] * 20)

# Reinicializa valy y asigna etiquetas numéricas para las imágenes de validación.
# Cada grupo de 20 imágenes recibirá el mismo número, del 0 al 149.
valy = []
for i in range(150):
    valy.extend([i] * 20)

# Reinicializa testy y asigna etiquetas numéricas para las imágenes de test.
# Cada grupo de 20 imágenes recibirá el mismo número, del 0 al 508.
testy = []
for i in range(509):
    testy.extend([i] * 20)


In [7]:
# Imprime las formas de los arrays para conocer sus dimensiones.
# Esto es útil para verificar que los datos se hayan dividido correctamente.
trainx.shape, len(trainy), valx.shape, len(valy), testx.shape, len(testy)

((19280, 105, 105), 19280, (3000, 105, 105), 3000, (10180, 105, 105), 10180)

In [8]:
def initialize_weights(shape, dtype=None):
    """
    Inicializa los pesos de una capa CNN siguiendo las recomendaciones del paper:
    http://www.cs.utoronto.ca/~gkoch/files/msc-thesis.pdf
    Los pesos se inicializan con una media de 0.0 y una desviación estándar de 0.01.

    Parámetros:
    - shape (tuple): Forma del tensor de pesos a inicializar.
    - dtype (opcional): Tipo de datos para los valores inicializados.

    Retorno:
    - np.array: Tensor de pesos inicializado.
    """
    return np.random.normal(loc = 0.0, scale = 1e-2, size = shape)

def initialize_bias(shape, dtype=None):
    """
    Inicializa el sesgo (bias) de una capa CNN siguiendo las recomendaciones del paper:
    http://www.cs.utoronto.ca/~gkoch/files/msc-thesis.pdf
    El sesgo se inicializa con una media de 0.5 y una desviación estándar de 0.01.

    Parámetros:
    - shape (tuple): Forma del tensor de sesgo a inicializar.
    - dtype (opcional): Tipo de datos para los valores inicializados.

    Retorno:
    - np.array: Tensor de sesgo inicializado.
    """
    return np.random.normal(loc = 0.5, scale = 1e-2, size = shape)


In [9]:
def get_siamese_model(input_shape):
    """
    Construye y devuelve un modelo siamés basado en la arquitectura proporcionada en:
    http://www.cs.utoronto.ca/~gkoch/files/msc-thesis.pdf

    Parámetros:
    - input_shape (tuple): Forma del tensor de entrada para cada una de las dos imágenes.

    Retorno:
    - siamese_net (Model): Modelo siamés para comparar la similitud entre dos imágenes.
    """

    # Definir los tensores para las dos imágenes de entrada.
    left_input = Input(input_shape)
    right_input = Input(input_shape)

    # Construcción de la Red Neuronal Convolucional.
    model = Sequential()
    model.add(Conv2D(64, (10,10), activation='relu', input_shape=input_shape,
                     kernel_initializer=initialize_weights, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(128, (7,7), activation='relu',
                     kernel_initializer=initialize_weights,
                     bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(128, (4,4), activation='relu', kernel_initializer=initialize_weights,
                     bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(256, (4,4), activation='relu', kernel_initializer=initialize_weights,
                     bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
    model.add(Flatten())
    model.add(Dense(4096, activation='sigmoid',
                    kernel_regularizer=l2(1e-3),
                    kernel_initializer=initialize_weights,bias_initializer=initialize_bias))

    # Generar las codificaciones (vectores de características) para las dos imágenes.
    encoded_l = model(left_input)
    encoded_r = model(right_input)

    # Añadir una capa personalizada para calcular la diferencia absoluta entre las codificaciones.
    L1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
    L1_distance = L1_layer([encoded_l, encoded_r])

    # Añadir una capa densa con una unidad sigmoidal para generar la puntuación de similitud.
    prediction = Dense(1, activation='sigmoid', bias_initializer=initialize_bias)(L1_distance)

    # Conectar las entradas con las salidas.
    siamese_net = Model(inputs=[left_input, right_input], outputs=prediction)

    # Devolver el modelo siamés.
    return siamese_net


In [12]:
# Construir el modelo siamés con una forma de entrada de (105, 105, 1), lo que indica
# que las imágenes de entrada serán de 105x105 píxeles con un solo canal (escala de grises).
model = get_siamese_model((105, 105, 1))

# Mostrar un resumen del modelo, incluyendo la arquitectura, número de parámetros, etc.
model.summary()

# Definir el optimizador Adam con una tasa de aprendizaje de 0.00006.
optimizer = Adam(learning_rate = 0.00006)

# Compilar el modelo siamés.
# Se utilizará la "binary_crossentropy" como función de pérdida ya que la tarea es de clasificación binaria
# (dos imágenes son similares o no).
model.compile(loss="binary_crossentropy", optimizer=optimizer)


Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_3 (InputLayer)        [(None, 105, 105, 1)]        0         []                            
                                                                                                  
 input_4 (InputLayer)        [(None, 105, 105, 1)]        0         []                            
                                                                                                  
 sequential_1 (Sequential)   (None, 4096)                 3894764   ['input_3[0][0]',             
                                                          8          'input_4[0][0]']             
                                                                                                  
 lambda_1 (Lambda)           (None, 4096)                 0         ['sequential_1[0][0]',    

In [13]:
def crea_lote(tamaño_lote, s="val"):
    """
    Crea un lote de pares de imágenes con clases mixtas.

    Parámetros:
    - tamaño_lote (int): Número de pares de imágenes en el lote.
    - s (str): Indica el conjunto de datos a utilizar ("train" o "val").

    Retorno:
    - pares (list): Lista con dos arrays, cada uno contiene imágenes del lote.
    - etiquetas (np.array): Vector de etiquetas binarias indicando si las imágenes del par pertenecen a la misma clase.
    """

    # Seleccionar el conjunto de datos adecuado basado en el argumento 's'.
    if s == "train":
        X = trainx
        categorias = trainy
    elif s == "val":
        X = valx
        categorias = valy
    else:
        print("Error, valor del segundo argumento no valido")
        return

    n_imagenes, w, h = X.shape

    # Inicializar 2 arrays vacíos para el lote de imágenes de entrada.
    pares = [np.zeros((tamaño_lote, h, w, 1)) for i in range(2)]

    # Inicializar vector para las etiquetas.
    etiquetas = np.zeros((tamaño_lote,))

    for i in range(tamaño_lote):
        # Elegir una categoría aleatoria.
        categoria = rng.choice(categorias[-1]+1)

        # Elegir dos índices aleatorios dentro de esa categoría.
        idx_1 = rng.randint(0, 20)
        idx_2 = rng.randint(0, 20)

        if rng.rand() < 0.5:
            # La mitad del tiempo, elige una categoría diferente para la segunda imagen.
            categoria_2 = random.choice([num for num in range(categorias[-1] + 1) if num != categoria])
            etiquetas[i] = 0
        else:
            # La otra mitad del tiempo, mantiene la misma categoría para la segunda imagen.
            categoria_2 = categoria
            etiquetas[i] = 1

        # Asignar las imágenes seleccionadas a los arrays de pares.
        pares[0][i,:,:,:] = X[categoria * 20 + idx_1].reshape(w, h, 1)
        pares[1][i,:,:,:] = X[categoria_2 * 20 + idx_2].reshape(w, h, 1)

    return pares, etiquetas


In [14]:
def tarea_fewShot(N, k=1):
    """
    Crea pares de una imagen de prueba y un conjunto de soporte para probar el aprendizaje one-shot
    de N vías con k muestras.

    Parámetros:
    - N (int): Número de clases distintas en la tarea.
    - k (int): Número de muestras por clase en la tarea.

    Retorno:
    - pares (list): Lista con dos arrays, el primero contiene imágenes de prueba y el segundo contiene el conjunto de soporte.
    - etiquetas (np.array): Etiquetas binarias que indican si la imagen de prueba y la imagen de soporte son de la misma clase.
    - indices_verdaderos (np.array): Índices de las ubicaciones donde se colocaron las imágenes de la categoría correcta.
    """

    # Seleccionar el conjunto de datos de prueba.
    X = testx
    categorias = testy

    # Obtener las dimensiones de las imágenes.
    n_imagenes, w, h = X.shape

    # Elegir k muestras aleatoriamente para cada una de las N categorías.
    indices = rng.choice(20, size=(N, k), replace=True)

    # Elegir N categorías aleatoriamente sin repetición.
    categorias = rng.choice(categorias[-1] + 1, size=(N,), replace=False)

    # Elegir la categoría verdadera y una imagen aleatoria de esa categoría.
    categoria_verdadera = categorias[0]
    ex1 = rng.choice(20, replace=False)
    imagen_test = np.asarray([X[categoria_verdadera*20 + ex1, :, :]]*N*k).reshape(N*k, w, h, 1)

    # Crear el conjunto de soporte con las imágenes elegidas.
    soporte = np.zeros((N*k, w, h, 1))
    for i, categoria in enumerate(categorias):
        soporte[i*k: i*k + k] = X[categoria*20 + indices[i]].reshape(k, w, h, 1)

    # Inicializar las etiquetas, siendo 1 para las imágenes de la categoría verdadera y 0 para las demás.
    etiquetas = np.zeros((N*k,))
    etiquetas[:k] = 1

    # Barajar los índices y utilizarlos para mezclar los otros arrays.
    shuffled_indices = np.arange(N*k)
    np.random.shuffle(shuffled_indices)

    etiquetas = etiquetas[shuffled_indices]
    imagen_test = imagen_test[shuffled_indices]
    soporte = soporte[shuffled_indices]

    # Los índices verdaderos son las ubicaciones donde colocamos las imágenes de la categoría correcta (es decir, las que tienen target=1).
    indices_verdaderos = np.where(etiquetas == 1)[0]

    pares = [imagen_test, soporte]

    return pares, etiquetas, indices_verdaderos


In [19]:
# Definir tamaño del lote, número de iteraciones y número de épocas
tamaño_lote = 32
n_iter = 2000
n_epocas = 25

# Decorador para compilar una función de TensorFlow y mejorar su ejecución
@tf.function
def train_step(inputs, etiquetas):
    """Realizar un paso de entrenamiento.

    Parámetros:
    - inputs (list): Imágenes de entrada.
    - etiquetas (np.array): Etiquetas verdaderas.

    Retorno:
    - loss (Tensor): Valor de la pérdida calculada.
    """
    with tf.GradientTape() as tape:
        # Pase hacia adelante
        predicciones = model(inputs, training=True)
        # Calcular el valor de la pérdida
        loss = loss_fn(etiquetas, predicciones)

    # Calcular los gradientes
    gradientes = tape.gradient(loss, model.trainable_variables)
    # Actualizar los pesos del modelo
    optimizer.apply_gradients(zip(gradientes, model.trainable_variables))
    # Actualizar las métricas
    train_acc_metric.update_state(etiquetas, predicciones)
    # Retornar el valor de la pérdida
    return loss

# Definir la función de pérdida
loss_fn = tf.keras.losses.BinaryCrossentropy()

# Definir la métrica de precisión
train_acc_metric = tf.keras.metrics.BinaryAccuracy()

# Bucle de entrenamiento por época
for epoca in range(n_epocas):
    print("\nInicio de la época %d" % (epoca,))

    loss_total = 0
    for step in tnrange(n_iter):
        # Crear un lote de datos de entrenamiento
        inputs, etiquetas = crea_lote(tamaño_lote, s='train')
        valor_loss = train_step(inputs, etiquetas)
        loss_total += valor_loss

        # Verificar cada 500 episodios
        if (step + 1) % 500 == 0:
            # Obtener la precisión actual de entrenamiento
            acc_actual = float(train_acc_metric.result())

            # Almacenar la pérdida promedio de los últimos 500 episodios
            loss_media_500 = loss_total / (step + 1)

            # Calcular la precisión de validación
            val_inputs, val_etiquetas = crea_lote(300, s="val")
            val_predic = model(val_inputs, training=False)
            val_predic_sd = np.squeeze((val_predic > 0.5).numpy().astype("int32"))
            val_acc = np.mean(val_predic_sd == val_etiquetas)

            # Imprimir valores (para depuración)
            print("Después del episodio %d: Precisión de entrenamiento: %.4f, precisión de validación: %.4f, Pérdida promedio: %.4f"
                  % ((step + 1), acc_actual, val_acc, loss_media_500))

    # Al final de la época, calcular e imprimir las estadísticas globales de la época
    loss_media_epoca = loss_total / n_iter
    acc_entrenamiento_epoca = float(train_acc_metric.result())

    print(
        "Pérdida de entrenamiento para la época %d: %.4f - Precisión: %.4f"
        % (epoca, loss_media_epoca, acc_entrenamiento_epoca)
    )

    # Resetear la pérdida total y la métrica de precisión al FINAL de la época
    loss_total = 0
    train_acc_metric.reset_states()

    # Actualizar la tasa de aprendizaje, reduciéndola en un 1%
    lr_actual = optimizer.learning_rate.numpy()
    nuevo_lr = lr_actual * 0.99
    optimizer.learning_rate.assign(nuevo_lr)



In [18]:
# Definición de las variables de prueba y configuración de la evaluación
n_test = 1000  # Número de pruebas a realizar
N_way = 5      # Número de categorías distintas en cada prueba (N-way one-shot learning)
s_shot = 1    # Número de imágenes por categoría que se proporcionará (1 para one-shot, 5 para five-shot, etc.)

# Descomentar las siguientes líneas si se desea cargar un modelo pre-entrenado
# from tensorflow.keras.models import load_model
# nombre_modelo = f'Omniglot_siamesa2Overfit.h5'
# model = load_model(nombre_modelo)  # Carga el modelo con el nombre definido

# Inicialización de la variable para contar las pruebas correctas
n_correct = 0

print("Evaluating model on {} random {} way {}-shot learning tasks ...".format(n_test, N_way, s_shot))

# Realizar pruebas N_way one-shot learning
for i in tnrange(n_test):
    # Obtener un conjunto de prueba y conjunto de soporte con sus etiquetas
    inputs, etiquetas, indices_verdaderos = tarea_fewShot(N_way, s_shot)

    # Predecir las probabilidades de similitud entre la imagen de prueba y las imágenes del conjunto de soporte
    probs = model(inputs)

    # Comprobar si la predicción del modelo es correcta comparando el índice de la probabilidad máxima con los índices verdaderos
    if np.argmax(probs) in indices_verdaderos:
        n_correct += 1

# Calcular y mostrar la precisión promedio del modelo en las tareas N_way one-shot learning
test_acc = (n_correct / n_test) * 100
print("Got an average of {}% accuracy for {} way {}-shot learning".format(test_acc, N_way, s_shot))
