In [None]:
import shutil
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.model_selection import train_test_split

import logging

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import (Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout, 
                                     BatchNormalization, GlobalAveragePooling2D, LeakyReLU)
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
import datetime

from keras.applications.vgg16 import preprocess_input as vgg16_preprocess
from keras.applications.resnet50 import preprocess_input as resnet_preprocess
from keras.applications.xception import preprocess_input as xception_preprocess

### Ensemble y Distilación
Se crea el ensemble con Voting classifier de los 4 modelos e intentar destilarlo por un único modelo

In [None]:
# Carpeta donde tienes los modelos .keras
folder = "mis_modelos"
model_files = [f for f in os.listdir(folder) if f.endswith(".keras")]

# Cargar los modelos
models = [tf.keras.models.load_model(os.path.join(folder, f)) for f in model_files]
preprocesses = [vgg16_preprocess, resnet_preprocess, xception_preprocess, lambda x: x/255.]
models

### Ensemble

In [None]:
class VotingTeacher(tf.keras.Model):
    def __init__(self, models, preprocess_fns=None):
        """
        Args:
            models: lista de modelos Keras ya cargados
            preprocess_fns: lista de funciones de preprocesado, una por modelo.
                            Cada función recibe X y devuelve X_preprocesado.
                            Si None, se usa identidad.
        """
        super().__init__()
        self.models = models
        if preprocess_fns is None:
            preprocess_fns = [lambda x: x for _ in models]
        self.preprocess_fns = preprocess_fns

    def call(self, X, training=False):
        preds = []
        for m, fn in zip(self.models, self.preprocess_fns):
            X_prep = fn(X)  # aplicar preprocesado específico
            p = m(X_prep, training=training)  # salida (N,1)
            # Aseguramos que sea tensor float32 con forma (N,1)
            p = tf.cast(tf.reshape(p, (-1, 1)), tf.float32)
            preds.append(p)

        # Apilar y promediar: resultado (N,1)
        stacked = tf.stack(preds, axis=0)      # (num_models, N, 1)
        avg_preds = tf.reduce_mean(stacked, axis=0)  # (N,1)
        return avg_preds

### Knowledge Distillation

In [None]:
class Distiller(Model):
    def __init__(self, student, teacher):
        super().__init__()
        self.teacher = teacher
        self.student = student

    def compile(
        self,
        optimizer,
        metrics,
        student_loss_fn,
        distillation_loss_fn,
        alpha=0.1,
        temperature=3,
    ):
        """
        Configure the distiller for binary classification.

        Args:
            optimizer: Keras optimizer for the student weights
            metrics: Keras metrics for evaluation (e.g. BinaryAccuracy)
            student_loss_fn: Loss between student predictions and ground-truth
            distillation_loss_fn: Loss between soft student predictions and soft teacher predictions
            alpha: weight to student_loss_fn and 1-alpha to distillation_loss_fn
            temperature: Temperature for softening probability distributions
        """
        super().compile(optimizer=optimizer, metrics=metrics)
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn
        self.alpha = alpha
        self.temperature = temperature

    def train_step(self, data):
        # Unpack data
        x, y = data

        # Forward pass of teacher
        teacher_predictions = self.teacher(x, training=False)

        with tf.GradientTape() as tape:
            # Forward pass of student
            student_predictions = self.student(x, training=True)

            # Compute losses
            student_loss = self.student_loss_fn(y, student_predictions)

            # Distillation loss: suavizamos con temperatura usando sigmoid
            teacher_soft = tf.nn.sigmoid(teacher_predictions / self.temperature)
            student_soft = tf.nn.sigmoid(student_predictions / self.temperature)

            distillation_loss = (
                self.distillation_loss_fn(teacher_soft, student_soft)
                * (self.temperature ** 2)
            )

            # Total loss
            loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss

        # Compute gradients
        trainable_vars = self.student.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update metrics
        self.compiled_metrics.update_state(y, student_predictions)

        # Return dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update(
            {"student_loss": student_loss, "distillation_loss": distillation_loss}
        )
        return results

    def test_step(self, data):
        # Unpack data
        x, y = data

        # Compute predictions
        y_prediction = self.student(x, training=False)

        # Calculate the loss
        student_loss = self.student_loss_fn(y, y_prediction)

        # Update metrics
        self.compiled_metrics.update_state(y, y_prediction)

        # Return dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update({"student_loss": student_loss})
        return results


#### Creamos el modelo Student

In [None]:
student = keras.Sequential(
    [
        Input(shape=(224, 224, 3)),  # si tus imágenes son RGB
        Conv2D(32, (3, 3), activation="relu", padding="same"),
        MaxPooling2D((2, 2)),
        Conv2D(64, (3, 3), activation="relu", padding="same"),
        MaxPooling2D((2, 2)),
        Conv2D(128, (3, 3), activation="relu", padding="same"),
        MaxPooling2D((2, 2)),
        Flatten(),
        Dense(64, activation="relu"),
        Dropout(0.5),
        Dense(1, activation="sigmoid"),
    ],
    name="student",
)


#### Cargamos datos

In [None]:
data_dir = "../data/processed/train"

def get_generators(data_dir, preprocess_fn, target_size=(224, 224), batch_size=128, validation_split=0.15):
    datagen = ImageDataGenerator(
        preprocessing_function=preprocess_fn,
        rotation_range=60,
        width_shift_range=0.1,
        height_shift_range=0.1,
        zoom_range=0.12,
        brightness_range=[0.8, 1.2],
        shear_range=0.2,
        vertical_flip=True,
        horizontal_flip=True,
        validation_split=validation_split
    )

    train_generator = datagen.flow_from_directory(
        data_dir,
        target_size=target_size,
        batch_size=batch_size,
        class_mode='binary',
        subset='training',
        shuffle=True
    )

    val_generator = datagen.flow_from_directory(
        data_dir,
        target_size=target_size,
        batch_size=batch_size,
        class_mode='binary',
        subset='validation',
        shuffle=False
    )

    return train_generator, val_generator

#### Instanciamos código

In [None]:
train_generator, val_generator = train_generator, val_generator = get_generators(data_dir, lambda x: x)

labels = train_generator.classes

# Calculamos los pesos
class_weights = compute_class_weight(
    class_weight="balanced",
    classes=np.unique(labels),
    y=labels
)

# Lo convertimos en diccionario para Keras
class_weights = dict(enumerate(class_weights))
print(class_weights)

teacher = VotingTeacher(models, preprocesses)

In [None]:
distiller = Distiller(student=student, teacher=teacher)
distiller.compile(
    optimizer=keras.optimizers.Adam(),
    student_loss_fn='binary_crossentropy',
    metrics=['accuracy'],
    distillation_loss_fn=keras.losses.KLDivergence(),
    alpha=0.1,
    temperature=10,
)

# Distill teacher to student
distiller.fit(
    train_generator,
    validation_data=val_generator,
    epochs=20,
    class_weight=class_weights
)

# Evaluate student on test dataset
distiller.evaluate(val_generator)