# Práctica 3 - Redes Neuronales Residuales 

### Natalia Martínez García, Lucía Vega Navarrete
### Grupo: AP.11.06

In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from keras import layers, metrics, optimizers, losses
from pathlib import Path
import os
import random

Primero importamos las librerías que vamos a utilizar a lo largo de la práctica.

In [2]:
seed=1234
os.environ['PYTHONHASHSEED']=str(seed)
tf.random.set_seed(seed)
np.random.seed(seed)
random.seed(seed)

Fijamos una semilla aleatoria para para poder reproducir los resultados.

### 1. Carga y preprocesado del dataset 

#### 1.1. Carga 

In [3]:
DATA_PATH = Path("HIGGS.csv.gz") # El archivo está en la misma carpeta que el notebook

# Cargar el dataset 
higgs = pd.read_csv(DATA_PATH, compression="gzip", header=None)

print("Tamaño total del dataset (ejemplos, columnas):", higgs.shape)

# Separar características (X) y etiquetas (y)
X = higgs.iloc[:, 1:].to_numpy(dtype=np.float32) # Columnas 1-28: features
y = higgs.iloc[:, 0].to_numpy(dtype=np.float32) # Columna 0: label

# División como dice el enunciado
N_TRAIN = 2000000
N_TEST = 500000 

# primeros 2 millones para entrenamiento
X_train = X[:N_TRAIN]
y_train = y[:N_TRAIN]

# ultimos 500 000 para test
X_test  = X[-N_TEST:]
y_test  = y[-N_TEST:]

# resto para extra (con esto entrenaremos el modelo con fine tuning)
X_extra = X[N_TRAIN:-N_TEST]
y_extra = y[N_TRAIN:-N_TEST]

print("División del dataset:")
print(f"Train: {X_train.shape}")
print(f"Test:  {X_test.shape}")
print(f"Extra: {X_extra.shape}")

Tamaño total del dataset (ejemplos, columnas): (11000000, 29)
División del dataset:
Train: (2000000, 28)
Test:  (500000, 28)
Extra: (8500000, 28)


Vamos a utilizar el **Higgs Boston Dataset**, que contiene 11 millones de ejemplos que tenemos que clasificar como señal (clase 1) o fondo (clase 0). Estas etiquetas corresponden a la primera columna del dataset. Cada ejemplo cuenta con 28 características numéricas cada uno, correspondientes al resto de columnas.

Dividimos el dataset en los 3 conjuntos que nos pide el enunciado: los 2 primeros millones de ejemplos para entrenamiento del modelo base, los últimos 500000 para test y los 8.5 millones restantes como conjunto extra para fine-tuning.

#### 1.2. Preprocesado

Vamos a aplicar la normalización Z-score a todas las características para ponerlas en el mismo rango. Para ello usamos la fórmula `z = x - media / desviación típica`.

Para normalizar los conjuntos, calculamos la media y desviación típica solo del conjunto de train, ya que cualquier información de test no puede influir en el entrenamiento. De lo contrario, estaríamos haciendo *data leakage*, y la red daría resultados artificialmente buenos, pero no reales. Con esa media y desviación típica normalizaremos también extra y test, para que el modelo vea datos en la misma escala en todas las fases.



In [4]:
# Normalizar usando solo TRAIN
# Media y desviación por columna (feature) 

"""
Normalizamos usando la media y desviación típica de train porque cualquier info de test no puede influir en el entrenamiento (sería como hacer trampas).
Si usamos la media y desviación del test para normalizar, meteríamos información del futuro dentro del entrenamiento y estaríamos haciendo data leakage.
Los conjuntos extra y test los normalizamos con la media y std de train porque vamos a entrenar la red con datos normalizados usando esos parámetros.
"""

mean_train = X_train.mean(axis=0) # axis = 0 para calcularlo por columnas (cada feature)
std_train  = X_train.std(axis=0)

# Evitar división por cero en caracteristicas que tengan desviacion tipica 0
std_train[std_train == 0] = 1.0

# z = (x - media) / desviacion tipica
X_train = (X_train - mean_train) / std_train
X_extra = (X_extra - mean_train) / std_train # Usamos las mismas media y desviacion que train
X_test  = (X_test  - mean_train) / std_train # Usamos las mismas media y desviacion que train

print(f"- Media train (ya normalizado): {X_train.mean():.5f}")
print(f"- Desviación típica train  (ya normalizado): {X_train.std():.5f}")

# Comprobación del desbalanceo de clases
# QUITAMOS ESTO ????????????????
def check_balance(name, y):
    n = len(y)
    n_signal = np.sum(y == 1) # Cuenta ejemplos de clase 1 (señal)
    n_back   = np.sum(y == 0) # Cuenta ejemplos de clase 0 (fondo)
    print(f"\n{name}:")
    print(f"Signal = {n_signal} ({100*n_signal/n:.2f}%)")
    print(f"Background = {n_back}   ({100*n_back/n:.2f}%)")

print("\nDISTRIBUCIÓN DE CLASES:")
check_balance("Train", y_train)
check_balance("Extra", y_extra)
check_balance("Test",  y_test)

- Media train (ya normalizado): -0.00000
- Desviación típica train  (ya normalizado): 1.00000

DISTRIBUCIÓN DE CLASES:

Train:
Signal = 1058818 (52.94%)
Background = 941182   (47.06%)

Extra:
Signal = 4505798 (53.01%)
Background = 3994202   (46.99%)

Test:
Signal = 264507 (52.90%)
Background = 235493   (47.10%)


Después de normalizar, el conjunto de entrenamiento tiene media 0 y desviación típica 1.

# QUITAMOS LO DE DESBALANCEO ?

### 2. Creación de red neuronal con capas residuales

In [5]:
class ResidualBlock(tf.keras.models.Model):
    # EL BLOQUE TIENE LA MISMA FORMA QUE EL DE LA PRESENTACIÓN DE TEORÍA
    def __init__(self, units=128, **kwargs):
        super().__init__(**kwargs)
        self.units = units # Num de neuronas en las capas dense
        self.dense1 = layers.Dense(units, activation='relu', name=f"{self.name}_dense1")
        self.dense2 = layers.Dense(units, activation=None, name=f"{self.name}_dense2")
        self.add = layers.Add(name=f"{self.name}_add") # Suma dos matrices
        self.activation = layers.Activation('relu')

    def call(self, x):
        """Forward pass del bloque"""
        x_skip = x # Guardamos la entrada original para el atajo
        out = self.dense1(x)
        out = self.dense2(out)
        # Capa que suma la entrada original con la salida out
        out = self.add([out, x_skip])
        out = self.activation(out)
        return out
    
    def get_config(self):
        """Necesario para poder guardar y cargar el modelo"""
        config = super().get_config()
        config.update({'units': self.units})
        return config

Nuestro bloque residual tiene dos capas densas de 128 neuronas cada una (la capa que pondremos antes del primer bloque transformará las 28 características originales a 128 dimensiones). La primera capa aplica ReLU, la segunda no tiene activación. Después de estas dos capas, sumamos la salida con la entrada original (esto es la conexión residual). Finalmente aplicamos ReLU a esa suma. Esta estructura la hemos tomado de la presentación de teoría.

La clase hereda de `keras.Model`y tiene tres métodos:
- `__init__` se ejecuta al crear el bloque y define todas las capas. Junto a las que hemos explicado arriba, incluimos una capa Add para hacer la suma. 
- `get_config` es necesario para que Keras pueda guardar y cargar el modelo (algo que vamos a usar más adelante). Devuelve la configuración del bloque.
- `call` es el forward pass, se ejecuta cada vez que pasamos datos por el bloque. Primero guardamos la entrada en `x_skip` para usarla después. Luego procesamos la entrada: pasa por `dense1` (con ReLU), después por `dense2` (sin activación), sumamos esta salida con la entrada original guardada en `x_skip`, y finalmente aplicamos ReLU. El resultado es la salida del bloque. 

In [6]:
class ResidualNetwork(tf.keras.Model):
    """Red neuronal entera 2 bloques residuales"""
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.capa_entrada = layers.Dense(128, activation="relu", name="capa_entrada") # convierte 28 features a 128 dimensiones
        self.res_block1 = ResidualBlock(units=128, name="res_block1")
        self.res_block2 = ResidualBlock(units=128, name="res_block2")
        self.capa_intermedia = layers.Dense(64, activation="relu", name="capa_intermedia")
        # 1 neurona con sigmoid para clasificación binaria
        # Sigmoid transforma la salida al rango 0-1 que se puede interpretar como probabilidad
        self.capa_salida = layers.Dense(1, activation="sigmoid", name="capa_salida")
    
    def call(self, inputs):
        """Forward pass de la red completa"""
        x = self.capa_entrada(inputs)
        x = self.res_block1(x)
        x = self.res_block2(x)
        x = self.capa_intermedia(x)
        outputs = self.capa_salida(x)
        return outputs
    
    def get_config(self):
        """Necesario para poder guardar y cargar el modelo"""
        return super().get_config()

La red tiene:
- una capa de entrada de 128 neuronas, que transforma las 28 características originales a 128 dimensiones con activación ReLU
- dos bloques residuales (res_block1 y res_block2) con la clase que hemos definido arriba
- una capa intermedia de 64 neuronas, reduce la dimensionalidad de 128 a 64 con ReLU para comprimir la información
- la capa de salida, que tiene 1 neurona para la clasificación binaria (1 si es señal, 0 si no lo es, y por tanto es fondo). La capa tiene activación sigmoid que produce un valor entre 0 y 1, que podemos interpretar como la probabilidad de que el ejemplo sea señal.

Igual que la clase anterior, `ResidualNetwork` hereda de `keras.Model`y define los mismos métodos:
- `__init__` define todas las capas de la red al crearla
- `call` ejecuta el forward pass: los datos de entrada pasan por la capa de entrada, luego los dos bloques residuales, la capa intermedia y la capa de salida Este método se ejecuta automáticamente cada vez que llamamos al modelo con datos
- `get_config` permite guardar el modelo y cargarlo después sin perder la arquitectura

In [7]:
# CONSTRUIR EL MODELO
modelo_base = ResidualNetwork(name="red_residual")
# Si usamos clases necesitamos pasar algún dato por el modelo para que construya todas las capas
# Para poder ver el summary
_ = modelo_base(X_train[:1]) 
modelo_base.summary()

### 3. Entrenamiento de la red

Implementamos el bucle de entrenamiento manualmente, sin usar `.fit()`, siguiendo el tutorial "Escribir un ciclo de entrenamiento desde cero" de TensorFlow, proporcionado en el enunciado de esta práctica. Esto nos da control total sobre cada paso del proceso. 

#### 3.1. Preparar los datasets

In [8]:
BATCH_SIZE = 4096

# Convierte arrays de numpy en un dataset de tensorflow
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
# shuffle: Mezcla aleatoriamente los datos
# batch: Divide los datos en batches
train_dataset = train_dataset.shuffle(buffer_size=100_000, reshuffle_each_iteration=True).batch(BATCH_SIZE)

# Dataset de test: solo en batches, sin shuffle
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))
test_dataset = test_dataset.batch(BATCH_SIZE)

Preparamos los datos siguiendo las indicaciones del tutorial. Dividimos los datos en batches de 4096 para poder tener un entrenamiento que proporcione buenos resultados pero que sea rápido.

#### 3.2. Definir loss, optimizador y métricas

In [9]:
optimizer = optimizers.Adam(learning_rate=1e-3) # Optimizador
loss_fn = losses.BinaryCrossentropy() # Función de loss (binaria, porque la salida es 0/1)

# Métricas para TRAIN
train_loss = metrics.Mean(name="train_loss")
train_accuracy = metrics.BinaryAccuracy(name="train_accuracy")

# Métricas para TEST
test_loss = metrics.Mean(name="test_loss") # ELIMINARLO ? NO LO PIDE Y NO LO USAMOS
test_accuracy = metrics.BinaryAccuracy(name="test_accuracy")
# Para calcular F1
test_precision = metrics.Precision(name="test_precision")
test_recall = metrics.Recall(name="test_recall")

De igual manera que en el tutorial, definimos las métricas que vamos a usar en training y en test, con la única diferencia de que ampliamos las métricas de test para incluir las que se piden en el enunciado.

Usamos el optimizador Adam con learning rate de 0.001, que es un valor estándar que funciona bien en la mayoría de problemas. Adam adapta automáticamente el learning rate para cada parámetro. 

Como función de loss usamos Binary Crossentropy, ya que nuestro problema es de clasificación binaria. Esta mide la diferencia entre las probabilidades predichas y las etiquetas reales. 

Definimos métricas separadas para train y test: Mean para la loss (calcula el promedio), BinaryAccuracy para la precisión general, y además Precision y Recall para test que nos permiten calcular a mano el F1-Score.


#### 3.3. Train step y test step

In [10]:
@tf.function # Decorador que dice el tutorial que compila la función para más eficiencia
def train_step(x, y):
    """Paso de entrenamiento: forward + loss + backpropagation + actualizar métricas train"""
    with tf.GradientTape() as tape:
        # Forward pass: obtener predicciones
        predictions = modelo_base(x, training=True)
        # Calcular loss comparando predicciones con etiquetas reales
        loss_value = loss_fn(y, predictions)
    grads = tape.gradient(loss_value, modelo_base.trainable_weights) # Calcular gradientes
    optimizer.apply_gradients(zip(grads, modelo_base.trainable_weights)) # Actualizar pesos

    # Actualizar métricas de entrenamiento
    train_loss.update_state(loss_value)
    train_accuracy.update_state(y, predictions)
    return loss_value

@tf.function
def test_step(x, y):
    """
    Paso de evaluación:
      - forward
      - actualizar métricas de test (loss, accuracy, precision, recall)
    Devuelve las predicciones para poder calcular balanced accuracy fuera
    """
    # Forward pass en modo evaluación (training=False)
    predictions = modelo_base(x, training=False)
    loss_value = loss_fn(y, predictions)
    # Actualizar métricas de test
    test_loss.update_state(loss_value)
    test_accuracy.update_state(y, predictions)
    test_precision.update_state(y, predictions)
    test_recall.update_state(y, predictions)

    return predictions

La función `train_step` ejecuta un paso de entrenamiento para un batch con **base_model**. El decorador `@tf.function` compila la función a un grafo estático de TensorFlow para que sea más eficiente. Dentro de `tf.GradientTape()` hacemos el forward pass llamando al modelo con `training=True` y calculamos el loss. Finalmente, `optimizer.apply_gradients()` actualiza los pesos usando esos gradientes. Al final actualizamos las métricas de entrenamiento con el loss y las predicciones del batch. Hemos omitido el bucle de validación que se incluye en el tutorial ya que no la utilizamos en nuestro problema, al dividir los datos solo en entrenamiento, test y extra.

La función `test_step` es similar pero más simple. Hacemos forward pass con `training=False` (modo evaluación), calculamos el loss, y actualizamos las métricas de test (accuracy, precision, recall). No calculamos gradientes ni actualizamos pesos. Devolvemos las predicciones porque luego necesitamos todas juntas para calcular balanced accuracy. Está basada en la función del tutorial utilizada para la validación, pero esta solo la vamos a ejecutar una vez después del entrenamiento en lugar de en acda epoch.

#### 3.4. Entrenamiento

In [11]:
import time

def entrenar_modelo(train_dataset, train_step, train_loss, train_accuracy, epochs=3):
    # LE PASAMOS PARAMETROS PORQUE PARA CADA MODELO DISTINTO HAY QUE DEFINIR UN TRAIN_STEP DISTINTO
    # ASI QUE LO DEFINIMOS FUERA Y SE LO PASAMOS COMO PARAMETRO
    for epoch in range(epochs): # Iterar por cada epoch
        print("\nStart of epoch %d" % (epoch,))
        start_time = time.time()

        # Iterar sobre los batches del dataset 
        for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
            loss_value = train_step(x_batch_train, y_batch_train) # paso de entrenamiento

            # Mostrar progreso cada 200 batches
            if step % 200 == 0:
                print(
                    "Training loss (for one batch) at step %d: %.4f"
                    % (step, float(loss_value))
                )
                print("Seen so far: %d samples" % ((step + 1) * BATCH_SIZE))

        # Mostrar las métricas al final de cada epoch 
        train_acc  = float(train_accuracy.result())
        train_loss_epoch = float(train_loss.result())
        print("\n--- Training metrics ---")
        print("Accuracy: %.4f" % train_acc)
        print("Loss: %.4f" % train_loss_epoch)

        # Resetear las métricas al final del epoch
        train_loss.reset_state()
        train_accuracy.reset_state()
        
        print("Time taken: %.2fs" % (time.time() - start_time))

Esta función ejecuta el bucle completo de entrenamiento igual que se hace en el tutorial. Para cada epoch, recorremos todos los batches del dataset. En cada batch llamamos a `train_step`, que hace forward pass, calcula loss, backpropagation y actualiza pesos. Cada 200 batches mostramos el progreso para monitorizar el entrenamiento (como se hace en el tutorial). Al final de cada epoch mostramos las métricas acumuladas (accuracy y loss promedio de todos los batches) y las reseteamos para el siguiente epoch con `.reset_state()`.

Entrenamos la red base durante 10 epochs:

In [12]:
entrenar_modelo(train_dataset, train_step, train_loss, train_accuracy, epochs=10)


Start of epoch 0
Training loss (for one batch) at step 0: 0.7083
Seen so far: 4096 samples
Training loss (for one batch) at step 200: 0.5655
Seen so far: 823296 samples
Training loss (for one batch) at step 400: 0.5309
Seen so far: 1642496 samples

--- Training metrics ---
Accuracy: 0.7004
Loss: 0.5679
Time taken: 7.39s

Start of epoch 1
Training loss (for one batch) at step 0: 0.5289
Seen so far: 4096 samples
Training loss (for one batch) at step 200: 0.5310
Seen so far: 823296 samples
Training loss (for one batch) at step 400: 0.5284
Seen so far: 1642496 samples

--- Training metrics ---
Accuracy: 0.7348
Loss: 0.5236
Time taken: 7.62s

Start of epoch 2
Training loss (for one batch) at step 0: 0.5009
Seen so far: 4096 samples
Training loss (for one batch) at step 200: 0.5144
Seen so far: 823296 samples
Training loss (for one batch) at step 400: 0.5047
Seen so far: 1642496 samples

--- Training metrics ---
Accuracy: 0.7440
Loss: 0.5091
Time taken: 7.28s

Start of epoch 3
Training loss

Vemos que al principio el loss baja rápido (de 0.70 a 0.57 solo en el primer epoch) porque el modelo está aprendiendo los patrones básicos desde cero. A partir del segundo, la mejora es más lenta y gradual: el loss sigue bajando poco a poco hasta 0.48 y la accuracy sube del 70% al 76%. Tampoco vemos saltos bruscos ni subidas en el loss, lo que nos indica que el entrenamiento va bien. 

Sin embargo, no podemos estar seguros de la calidad del entrenamiento hasta que evaluemos la red en el conjunto de test.

#### 3.5. Test

In [13]:
def evaluar_modelo(test_dataset, test_step, test_accuracy, test_precision, test_recall):
    """Evalúa el modelo en el conjunto de test y calcula todas las métricas"""
    # Listas para acumular predicciones y etiquetas reales y con eso balanced accuracy
    y_true_all = [] # Todas las etiquetas reales
    y_pred_all = [] # Todas las predicciones binarias
    
    # Evaluar en todos los batches
    for x_batch_test, y_batch_test in test_dataset:
        # test_step actualiza loss, accuracy, precision, recall
        test_predictions = test_step(x_batch_test, y_batch_test)
        
        # Guardar para balanced accuracy
        y_true_all.append(y_batch_test.numpy().astype(np.int32))
        # Convertir probabilidades a clases binarias (umbral=0.5)
        y_pred_all.append((test_predictions.numpy() >= 0.5).astype(np.int32))
    
    # Concatenar todos los batches en un solo array
    y_true_all = np.concatenate(y_true_all, axis=0).reshape(-1)
    y_pred_all = np.concatenate(y_pred_all, axis=0).reshape(-1)
    
    # Métricas de test a partir de los objetos de Keras
    test_acc = float(test_accuracy.result())
    precision = float(test_precision.result())
    recall = float(test_recall.result())
    f1_score = 2 * precision * recall / (precision + recall + 1e-8) # para que no haya división por 0
    
    # Balanced Accuracy: calcular accuracy para cada clase y hacer la media
    mask_0 = (y_true_all == 0) # pone a True todos los valores de la lista de etiquetas reales que valgan 0
    mask_1 = (y_true_all == 1) # pone a True todos los valores de la lista de etiquetas reales que valgan 1
    
    # Accuracy para cada clase por separado
    acc_class0 = np.mean(y_pred_all[mask_0] == y_true_all[mask_0]) # especificidad (verdaderos negativos)
    acc_class1 = np.mean(y_pred_all[mask_1] == y_true_all[mask_1]) # sensibilidad (verdaderos positivos)
    balanced_accuracy = (acc_class0 + acc_class1) / 2
    
    # Crear diccionario con resultados
    # Lo guardamos porque luego hay que compararlo con el que lleva fine tuning !!!
    resultados = {
        'accuracy': test_acc,
        'balanced_accuracy': balanced_accuracy,
        'f1_score': f1_score,
        'precision': precision,
        'recall': recall,
        'sensitivity': acc_class1,
        'specificity': acc_class0
    }
    return resultados

def mostrar_resultados_test(resultados):
    print("\n--- Test metrics ---")
    print("Accuracy: %.4f" % resultados['accuracy'])
    print("Precision: %.4f" % resultados['precision'])
    print("Recall: %.4f" % resultados['recall'])
    print("F1-Score: %.4f" % resultados['f1_score'])
    print("Balanced Accuracy:")
    print("\tAccuracy Class 0 (background): %.4f" % resultados['specificity'])
    print("\tAccuracy Class 1 (signal): %.4f" % resultados['sensitivity'])
    print("\tMedia: %.4f" % resultados['balanced_accuracy'])

La función `evaluar_modelo` procesa todo el conjunto de test en batches. Para cada batch ejecuta `test_step` y guarda las predicciones y etiquetas reales. Al final, calcula todas las métricas: accuracy, precision y recall vienen directamente de las métricas de Keras. El F1-Score lo calculamos a mano con su fórmula. Para balanced accuracy calculamos la accuracy de cada clase por separado: filtramos con máscaras booleanas los ejemplos de clase 0 y clase 1, calculamos cuántos predijimos correctamente en cada clase, y hacemos la media. Esto es importante en datasets desbalanceados porque da el mismo peso a ambas clases.

In [14]:
# Evaluar en test
resultados_test = evaluar_modelo(test_dataset,test_step,test_accuracy,test_precision,test_recall)
mostrar_resultados_test(resultados_test)


--- Test metrics ---
Accuracy: 0.7597
Precision: 0.7637
Recall: 0.7903
F1-Score: 0.7768
Balanced Accuracy:
	Accuracy Class 0 (background): 0.7254
	Accuracy Class 1 (signal): 0.7903
	Media: 0.7579


El modelo alcanza un 75.97% de accuracy en datos que nunca ha visto. Es ligeramente inferior al accuracy final alcanzado en el entrenamiento, pero esto es algo normal y no es señal de overfitting.

La precisión del 76% indica que cuando predice señal, acierta aproximadamente 3 de cada 4 veces. El recall (79%) muestra que detecta la gran mayoría de las señales reales. Todo esto se combina al final en un F1-score del 77%, ya que ambas métricas están muy equilibradas.

Mirando el balanced accuracy vemos que este es de un 75.79%, muy similar al accuracy normal. Sin embargo, mirando los accuracy de las clases por separado, vemos que el modelo detecta mejor las señales (79%) que los backgrounds (72.5%). Esto pasa porque en el dataset hay ligeramente más ejemplos de señal (53%), como hemos visto cuando analizamos el desbalanceo, y por eso el modelo aprende este patrón. En la práctica esto significa que el modelo tiende a clasificar más eventos como señal. Esto puede estar bien (si queremos asegurarnos de no perder posibles señales) o no (si queremos evitar falsos positivos).

In [15]:
# Guardar el modelo entrenado
MODEL_PATH = "residual_network_entrenada.keras"
modelo_base.save(MODEL_PATH)
print(f"\nModelo guardado en: {MODEL_PATH}")


Modelo guardado en: residual_network_entrenada.keras


### 4. Fine tuning

#### 4.1 Cargar el modelo base ya entrenado

Cargamos el modelo que entrenamos anteriormente desde el `.keras`. Como definimos clases personalizadas (`ResidualBlock` y `ResidualNetwork`), necesitamos indicarle a Keras cuáles son mediante el parámetro `custom_objects`. Sin esto, Keras no sabría cómo reconstruir nuestros bloques residuales y daría un error.

In [16]:
MODEL_PATH = "residual_network_entrenada.keras"
# custom_objects para que keras reconozca nuestras clases
modelo_finetuning = keras.models.load_model(
    MODEL_PATH,
    custom_objects={'ResidualBlock': ResidualBlock, 'ResidualNetwork': ResidualNetwork}
)

print("Modelo cargado desde:", MODEL_PATH)
modelo_finetuning.summary()

Modelo cargado desde: residual_network_entrenada.keras


Una vez cargado, tenemos el modelo con todos sus pesos ya entrenados, y ahora podemos hacer fine-tuning. Vemos que el summary es exactamente el mismo que la red que construimos al principio.

#### 4.2 Congelar los pesos

In [17]:
# Congelar todas las capas entrenables del modelo
for layer in modelo_finetuning.layers:
    layer.trainable = False # Desactivar entrenamiento para esta capa

# Comprobamos que están congeladas
print("Capas del modelo y su estado de entrenamiento:")
for layer in modelo_finetuning.layers:
    print(f"  {layer.name}: trainable={layer.trainable}")

modelo_finetuning.summary()

Capas del modelo y su estado de entrenamiento:
  capa_entrada: trainable=False
  res_block1: trainable=False
  res_block2: trainable=False
  capa_intermedia: trainable=False
  capa_salida: trainable=False


Ponemos `trainable=False` en todas las capas del modelo base para que sus pesos no se modifiquen durante el fine-tuning. Esto va a hacer que tengamos muchos menos parámetros que entrenar.

Como se ve en el summary, tenemos 0 parámetros entrenables después de congelar.

#### 4.3 Añadir módulos de adaptación de baja dimensión

In [18]:
class CapaModeloFineTuning(tf.keras.layers.Layer):
    """
    - W: pesos originales (din, dout) - CONGELADOS
    - A: matriz (din, r) - ENTRENABLE
    - B: matriz (r, dout) - ENTRENABLE
    """
    def __init__(self, capa_original, r=8, alfa=0.1, **kwargs):
        super().__init__(**kwargs)
        self.r = r
        self.alfa = alfa

        # Guardamos la activación de la capa original
        self.activation = capa_original.activation

        # Obtener dimensiones de la capa original
        self.din, self.dout = capa_original.kernel.shape

        # Pesos base (congelados)
        self.W = self.add_weight(
            name=f"{self.name}_W",
            shape=(self.din, self.dout),
            initializer=tf.constant_initializer(capa_original.kernel.numpy()), # Copiamos los pesos de la capa original
            trainable=False, # No se puede entrenar 
        )

        # bias
        self.b = self.add_weight(
            name=f"{self.name}_b",
            shape=(self.dout,),
            initializer=tf.constant_initializer(capa_original.bias.numpy()), # Copiamos el bias de la capa original
            trainable=False,
        )

        # matriz A: (din, r)
        self.A = self.add_weight(
            name=f"{self.name}_A",
            shape=(self.din, self.r),
            initializer="glorot_uniform", # Inicialización estándar
            trainable=True, # sí que se entrena
        )

        # matriz B: (r, dout)
        self.B = self.add_weight(
            name=f"{self.name}_B",
            shape=(self.r, self.dout),
            initializer="zeros", # Inicializada a cero
            trainable=True, # sí que se entrena
        )

    def call(self, x):
        # Calcular la matriz de ajuste
        A_por_B = tf.matmul(self.A, self.B)
        pesos_finales = self.W + self.alfa * A_por_B
        # salida = x por pesos_finales + bias
        out = tf.matmul(x, pesos_finales) + self.b
        # Ponemos este if porque en los bloques residuales tenemos algunas sin activación
        if self.activation is not None: 
            out = self.activation(out)
        return out

    def get_config(self):
        config = super().get_config()
        config.update({'r': self.r, 'alfa': self.alfa})
        return config


Partimos de una capa ya entrenada con pesos W. La idea es adaptar el modelo sin modificar sus pesos originales. En lugar de cambiar directamente los pesos W que ya están entrenados, añadimos un "ajuste" pequeño usando dos matrices A y B. 

La clase `CapaModeloFineTuning` que hemos definido recibe como entrada la capa original ya entrenada y dos hiperparámetros: r (cuanto más pequeño sea más pequeñas van a ser las matrices A y B) y alfa (controla cuánto peso van a tener las matrices en el resultado final).

**Inicialización (`__init__`):**

Primero extraemos información de la capa original: sus dimensiones (din para entrada, dout para salida) y su función de activación. Luego creamos cuatro conjuntos de pesos:

- **W y b (congelados)**: Copiamos los pesos y bias de la capa original y los marcamos como no entrenables (`trainable=False`). Ya están entrenados de antes.

- **A (entrenable)**: Matriz de dimensión din x r. Se inicializa con valores aleatorios usando glorot_uniform. Como r es pequeño, esta matriz "comprime" la información de entrada.

- **B (entrenable)**: Matriz de dimensión r x dout. Se inicializa a ceros para que al principio la adaptación no tenga efecto (la salida inicial es idéntica a la capa original). Esta matriz "descomprime" de vuelta a la dimensión de salida.

**Forward pass (`call`):**

Cuando pasan datos por la capa:
1. Multiplicamos las matrices A y B para obtener una matriz din x dout del mismo tamaño que W
2. Sumamos esta matriz a W, pero escalada por alfa: `W_final = W + α x (A x B)`
3. Aplicamos estos pesos finales a la entrada: `salida = x por W_final + b`
4. Si la capa original tenía activación, la aplicamos al resultado



In [19]:
class ResidualBlockFineTuning(tf.keras.models.Model):
    
    def __init__(self, original_block, r=4, alfa=0.1, **kwargs):
        super().__init__(**kwargs)
        self.r = r
        self.alfa = alfa
        
        # Cambiar las capas Dense del bloque original por la CapaModeloFineTuning
        # El resto es lo mismo que el bloque original
        self.dense1 = CapaModeloFineTuning(original_block.dense1, r=r, alfa=alfa, name=f"{self.name}_dense1_ft")
        self.dense2 = CapaModeloFineTuning(original_block.dense2, r=r, alfa=alfa, name=f"{self.name}_dense2_ft")
        self.add = original_block.add
        self.activation = original_block.activation

    def call(self, x):
        """Forward pass con conexión residual"""
        # ES IGUAL QUE EL DEL RESIDIAL BLOCK ORIGINAL
        x_skip = x
        out = self.dense1(x)
        out = self.dense2(out)
        out = self.add([out, x_skip])
        out = self.activation(out)
        return out
    
    def get_config(self):
        config = super().get_config()
        config.update({'r': self.r, 'alfa': self.alfa})
        return config

In [20]:
class ResidualNetworkFineTuning(tf.keras.Model):
    """Red residual completa con adaptación de baja dimensión"""
    
    def __init__(self, modelo_base, r=4, alfa=0.1, **kwargs):
        super().__init__(**kwargs)
        self.r = r
        self.alfa = alfa
        
        # Envolver todas las capas Dense con adaptación de baja dimensión
        # Cambiar las capas por la CapaModeloFineTuning (o el bloque)
        self.capa_entrada = CapaModeloFineTuning(modelo_base.capa_entrada, r=r, alfa=alfa,name="ft_capa_entrada")
        self.res_block1 = ResidualBlockFineTuning(modelo_base.res_block1, r=r, alfa=alfa, name="ft_res_block1")
        self.res_block2 = ResidualBlockFineTuning(modelo_base.res_block2, r=r, alfa=alfa, name="ft_res_block2")
        self.capa_intermedia = CapaModeloFineTuning(modelo_base.capa_intermedia, r=r, alfa=alfa, name="ft_capa_intermedia")
        self.capa_salida = CapaModeloFineTuning(modelo_base.capa_salida, r=r, alfa=alfa, name="ft_capa_salida")
    
    def call(self, inputs):
        x = self.capa_entrada(inputs)
        x = self.res_block1(x)
        x = self.res_block2(x)
        x = self.capa_intermedia(x)
        outputs = self.capa_salida(x)
        return outputs
    
    def get_config(self):
        config = super().get_config()
        config.update({'r': self.r, 'alfa': self.alfa})
        return config

Adaptamos las clases que implementaban el bloque residual y la red completa para poder aplicarles el módulo de adaptación de baja dimensión.

- `ResidualBlockFineTuning`: adapta un bloque residual existente para fine-tuning. Recibe el bloque original ya entrenado y los parámetros r y alfa. Reemplazamos las dos capas densas (`dense1` y `dense2`) por versiones con adaptación de baja dimensión usando `CapaModeloFineTuning`. El resto de la arquitectura se mantiene exactamente igual que el bloque original.

- `ResidualNetworkFineTuning`: reconstruye toda la arquitectura de la red original pero con adaptación de baja dimensión en todas las capas densas. Recibe el modelo base completo ya entrenado. Envolvemos cada capa/bloque del modelo base con la adaptación (las capas con `CapaModeloFineTuning` y los bloques residuales con`ResidualBlockFineTuning`, que a su vez envuelve sus capas densas internas). Todos usan los mismos valores de r y alfa.

In [21]:
r = 4 # Rango de las matrices A y B (menor = menos parámetros porque son mas pequeñas)
alfa = 0.1 # Factor de escala para el ajuste (controla cuánto influye)

modelo_ft = ResidualNetworkFineTuning(modelo_finetuning, r=r, alfa=alfa, name="red_residual_fine_tuning")
_ = modelo_ft(X_train[:1])
modelo_ft.summary()

Hemos elegido r = 4 (mucho menor que las dimensiones originales de las capas) y alfa = 0.1 (para que los ajustes sean sutiles y no cambien drásticamente el comportamiento del modelo original).

Ahora, del total de 83,829 parámetros:
- **78,081 no entrenables**: Son los pesos originales del modelo base (W y b de todas las capas), que están congelados y no se van a modificar durante el fine-tuning
- **5,748 entrenables**: Son las matrices A y B que hemos añadido a cada capa. Estas son las únicas que se entrenarán con los nuevos datos

### 4.4 Entrenar el modelo adaptado

Preparamos el conjunto extra en batches igual que hicimos con el conjunto de entrenamiento.

Lo importante aquí es que usamos un **learning rate más bajo** (5e-4 en lugar de 1e-3) porque el modelo ya está entrenado y solo queremos hacer ajustes pequeños. Si usáramos un learning rate alto, podríamos estropear el conocimiento ya aprendido en los pesos congelados.

Además, definimos métricas, `ft_train_step` y `ft_test_step` de la misma manera que antes, pero específicos para este modelo. De esta manera podemos comparar después los resultados del modelo base con el modelo con fine-tuning.

In [22]:
# Preparar dataset extra
extra_dataset = tf.data.Dataset.from_tensor_slices((X_extra, y_extra))
extra_dataset = extra_dataset.shuffle(buffer_size=100_000, reshuffle_each_iteration=True).batch(BATCH_SIZE)

In [23]:
# Definir nuevas métricas y optimizador para fine tuning

# Optimizador con learning rate MENOR porque queremos ajustes pequeños (la red ya está entrenada)
ft_optimizer = optimizers.Adam(learning_rate=5e-4)
loss_fn_ft = losses.BinaryCrossentropy()

# Crear nuevas métricas para el fine-tuning
ft_train_loss = metrics.Mean(name="ft_train_loss")
ft_train_accuracy = metrics.BinaryAccuracy(name="ft_train_accuracy")

ft_test_accuracy = metrics.BinaryAccuracy(name="ft_test_accuracy")
ft_test_precision = metrics.Precision(name="ft_test_precision")
ft_test_recall = metrics.Recall(name="ft_test_recall")

In [24]:
@tf.function
def ft_train_step(x, y):
    """Paso de entrenamiento para fine-tuning"""
    with tf.GradientTape() as tape:
        predictions = modelo_ft(x, training=True)
        loss_value = loss_fn_ft(y, predictions)
    
    # Solo calcula gradientes para variables entrenables (A y B)
    grads = tape.gradient(loss_value, modelo_ft.trainable_variables)
    ft_optimizer.apply_gradients(zip(grads, modelo_ft.trainable_variables))
    
    ft_train_loss.update_state(loss_value)
    ft_train_accuracy.update_state(y, predictions)
    return loss_value

@tf.function
def ft_test_step(x, y):
    """Paso de evaluación para fine-tuning"""
    predictions = modelo_ft(x, training=False)
    
    ft_test_accuracy.update_state(y, predictions)
    ft_test_precision.update_state(y, predictions)
    ft_test_recall.update_state(y, predictions)
    
    return predictions

Entrenamos durante 3 epochs con el conjunto extra porque estamos haciendo ajustes finos sobre un modelo ya entrenado, no entrenando desde cero.

In [25]:
entrenar_modelo(extra_dataset, ft_train_step, ft_train_loss, ft_train_accuracy, epochs=3)


Start of epoch 0
Training loss (for one batch) at step 0: 0.4936
Seen so far: 4096 samples
Training loss (for one batch) at step 200: 0.4923
Seen so far: 823296 samples
Training loss (for one batch) at step 400: 0.4761
Seen so far: 1642496 samples
Training loss (for one batch) at step 600: 0.4829
Seen so far: 2461696 samples
Training loss (for one batch) at step 800: 0.4842
Seen so far: 3280896 samples
Training loss (for one batch) at step 1000: 0.4853
Seen so far: 4100096 samples
Training loss (for one batch) at step 1200: 0.4760
Seen so far: 4919296 samples
Training loss (for one batch) at step 1400: 0.4775
Seen so far: 5738496 samples
Training loss (for one batch) at step 1600: 0.4733
Seen so far: 6557696 samples
Training loss (for one batch) at step 1800: 0.4769
Seen so far: 7376896 samples
Training loss (for one batch) at step 2000: 0.4740
Seen so far: 8196096 samples

--- Training metrics ---
Accuracy: 0.7612
Loss: 0.4832
Time taken: 37.44s

Start of epoch 1
Training loss (for o

El loss se mantiene bastante estable alrededor de 0.48, mejorando ligeramente de 0.4832 a 0.4817. El accuracy mejora muy poco (de 76.12% a 76.2%). Todo esto era esperado, ya que el modelo base ya funcionaba bien y solo estamos refinándolo con pequeños ajustes.


### 4.5. Evaluar en test y comparar los resultados

In [26]:
resultados_test_ft = evaluar_modelo(
    test_dataset, 
    ft_test_step, 
    ft_test_accuracy, 
    ft_test_precision, 
    ft_test_recall
)
mostrar_resultados_test(resultados_test_ft)


--- Test metrics ---
Accuracy: 0.7626
Precision: 0.7689
Recall: 0.7883
F1-Score: 0.7785
Balanced Accuracy:
	Accuracy Class 0 (background): 0.7339
	Accuracy Class 1 (signal): 0.7883
	Media: 0.7611


El modelo con fine-tuning alcanza un 76.26% de accuracy, ligeramente mejor que el modelo base. Las mejoras son pequeñas, pero consistentes en todas las métricas excepto recall, que baja de un 79% a 78.83%. Sin embargo, se compensa con una mejora en precision a 77.08%, de forma que el F1-score también sube al 77.85%. El balanced accuracy también pasa de 75.79% a 76.12% (el accuracy de la clase 1 baja ligeramente pero el de la clase 0 sube).

Lo importante es que conseguimos estas mejoras entrenando solo 5,748 parámetros. Esto es mucho más eficiente que reentrenar todo el modelo desde cero con los 8.5 millones de ejemplos extra, ya que requiere menos tiempo de entrenamiento y menos memoria pero mantiene todo el conocimiento ya adquirido en el entrenamiento inicial.

In [27]:
print("COMPARACIÓN DE RESULTADOS")
comparacion = pd.DataFrame({
    'Modelo Base': [
        resultados_test['accuracy'],
        resultados_test['balanced_accuracy'],
        resultados_test['f1_score'],
        resultados_test['precision'],
        resultados_test['recall']
    ],
    'Modelo Fine Tuning': [
        resultados_test_ft['accuracy'],
        resultados_test_ft['balanced_accuracy'],
        resultados_test_ft['f1_score'],
        resultados_test_ft['precision'],
        resultados_test_ft['recall']
    ]
}, index=['Accuracy', 'Balanced Accuracy', 'F1-Score', 'Precision', 'Recall'])

comparacion['Diferencia'] = comparacion['Modelo Fine Tuning'] - comparacion['Modelo Base']

display(comparacion.round(4))


COMPARACIÓN DE RESULTADOS


Unnamed: 0,Modelo Base,Modelo Fine Tuning,Diferencia
Accuracy,0.7597,0.7626,0.0029
Balanced Accuracy,0.7579,0.7611,0.0032
F1-Score,0.7768,0.7785,0.0016
Precision,0.7637,0.7689,0.0051
Recall,0.7903,0.7883,-0.0021


#### 4.6. Compactar el modelo

Una vez hecho el fine-tuning, podemos compactar el modelo fusionando la matriz de pesos originales W con las matrices A y B entrenadas. 


In [28]:
def compactar_capa_finetuning(capa_ft, capa_destino):
    """
    Copia a capa_destino los pesos compactados de una CapaModeloFineTuning.
    """
    # Obtener pesos base y matrices
    W_base = capa_ft.W.numpy() # din, dout
    b_base = capa_ft.b.numpy()
    A = capa_ft.A.numpy() # din, r
    B = capa_ft.B.numpy() # r, dout

    # calcula alfa x A x B
    # usamos @ para multiplicar las matrices porque hemos pasado las matrices a numpy
    ajuste = capa_ft.alfa * (A @ B) # din, dout
    pesos_compactados = W_base + ajuste

    # Copiar los pesos compactados a la capa destino
    capa_destino.set_weights([pesos_compactados, b_base])



Esta función toma una capa con adaptación de baja dimensión y fusiona todos sus pesos en una capa Dense normal. Lo calculando `W_final = W + alfa x (A x B)` para cada capa. Como A y B ya están entrenadas, podemos combinarlas con W en una única matriz de pesos. 

Primero extraemos todos los componentes de la capa con fine-tuning: los pesos base W (que estaban congelados), el bias b, y las matrices A y B (que acabamos de entrenar).

Luego calculamos el ajuste multiplicando las matrices A y B. Como A tiene dimensión din x r y B tiene dimensión r x dout, su producto A x B da como resultado una matriz de tamaño din x dout, el mismo que W. Multiplicamos este resultado por alfa (0.1 en nuestro caso) para controlar cuanto influirá ese ajuste.

Finalmente sumamos W_base + alfa x (A x B) para obtener los pesos compactados. Esta suma fusiona lo que había aprendido la red con el primer entrenamiento (W) con los ajustes específicos que ha hecho el fine-tuning (A x B). Usamos `set_weights()` para copiar estos pesos compactados y el bias a la capa destino, que es una capa Dense normal sin adaptación.

Como resultado, tenemos W + alfa x (A x B) en una sola matriz W_final.

In [29]:
# Crear una red residual normal
modelo_compacto = ResidualNetwork(name="red_residual_compacta")
_ = modelo_compacto(X_train[:1])

# Compactar cada capa del modelo fine tuning en el modelo normal
compactar_capa_finetuning(modelo_ft.capa_entrada, modelo_compacto.capa_entrada)
compactar_capa_finetuning(modelo_ft.res_block1.dense1, modelo_compacto.res_block1.dense1)
compactar_capa_finetuning(modelo_ft.res_block1.dense2, modelo_compacto.res_block1.dense2)
compactar_capa_finetuning(modelo_ft.res_block2.dense1, modelo_compacto.res_block2.dense1)
compactar_capa_finetuning(modelo_ft.res_block2.dense2, modelo_compacto.res_block2.dense2)
compactar_capa_finetuning(modelo_ft.capa_intermedia, modelo_compacto.capa_intermedia)
compactar_capa_finetuning(modelo_ft.capa_salida, modelo_compacto.capa_salida)

modelo_compacto.summary()

# COMPROBAMOS QUE HACEN LAS MISMAS PREDICCIONES
preds_ft = modelo_ft(X_test[:10000], training=False).numpy()
preds_compacto = modelo_compacto(X_test[:10000], training=False).numpy()

# Calcular la diferencia máxima entre predicciones
max_diff = np.max(np.abs(preds_ft - preds_compacto))
print("Máxima diferencia absoluta entre modelo_ft y modelo_compacto:", max_diff)

Máxima diferencia absoluta entre modelo_ft y modelo_compacto: 1.1920929e-07


Creamos un modelo con la arquitectura original (`ResidualNetwork`) y copiamos en él los pesos compactados de cada capa del modelo fine-tuning. El resultado es un modelo del mismo tamaño que el base, sin las matrices A y B. Ahora tiene exactamente la misma arquitectura y número de parámetros que el modelo base original (78,081), pero con pesos mejorados.

Para comprobar que la compactación funciona correctamente, comprobamos que la red con adaptación y la red compactada nos dan las mismas predicciones para el mismo conjunto de datos. Probamos con 10,000 ejemplos del conjunto de test y calculamos la diferencia máxima entre esas predicciones, y como se puede ver es prácticamente 0.