In [None]:
import os
import time
import numpy as np
import pandas as pd
import tensorflow as tf
from keras.callbacks import TensorBoard
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras import backend as K
from tensorflow.keras.metrics import Precision, Recall, AUC
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Configuración
MODEL_NAME = "MegaClassifier_a"
VERSION_BASE = "v8"
EPOCHS = 10
BATCH_SIZE = 32
IMAGE_SIZE = (224, 224)
IMAGE_SHAPE = IMAGE_SIZE + (3,)
SEED = 42

# Cargar dataset
DATASET_CSV = os.path.abspath("./data/processed/onlyDetectionsForTrain/onlyDetectionsForTrain.csv")
DATASET_PATH = os.path.dirname(DATASET_CSV)

dataset = pd.read_csv(DATASET_CSV, sep=";")
dataset['file_name'] = dataset['file_name'].apply(lambda x: os.path.join(DATASET_PATH, x))
dataset['binary_label'] = dataset['binary_label'].astype(str)

# Dividir en train, validation y test
train_df = dataset[dataset['subset'] == 'train']
validation_df = dataset[dataset['subset'] == 'validation']
test_df = dataset[dataset['subset'] == 'test']

# ✅ Calcular pesos de clase
class_weights = compute_class_weight(class_weight="balanced",
                                     classes=np.unique(train_df['binary_label']),
                                     y=train_df['binary_label'])
class_weight_dict = {i: class_weights[i] for i in range(len(class_weights))}

# ✅ Definir Focal Loss con α=0.25 y γ=1.0
def focal_loss(alpha=0.25, gamma=1.0):
    def loss(y_true, y_pred):
        epsilon = K.epsilon()
        y_pred = K.clip(y_pred, epsilon, 1.0 - epsilon)
        pt = tf.where(K.equal(y_true, 1), y_pred, 1 - y_pred)
        loss = -K.mean(alpha * K.pow(1. - pt, gamma) * K.log(pt))
        return loss

    return loss

# ✅ Data Augmentation (Brightness y Flips)
train_datagen = ImageDataGenerator(
    preprocessing_function=tf.keras.applications.mobilenet_v2.preprocess_input,
    brightness_range=[0.8, 1.2],  # Optimizado en versiones previas
    horizontal_flip=True          # Manteniendo mejora previa
)

datagen = ImageDataGenerator(preprocessing_function=tf.keras.applications.mobilenet_v2.preprocess_input)

# ✅ Generadores de imágenes
train_images = train_datagen.flow_from_dataframe(
    dataframe=train_df,
    x_col="file_name",
    y_col="binary_label",
    target_size=IMAGE_SIZE,
    batch_size=BATCH_SIZE,
    class_mode="binary",
    shuffle=True,
    seed=SEED,
)

validation_images = datagen.flow_from_dataframe(
    dataframe=validation_df,
    x_col="file_name",
    y_col="binary_label",
    target_size=IMAGE_SIZE,
    batch_size=BATCH_SIZE,
    class_mode="binary",
    shuffle=True,
    seed=SEED,
)

test_images = datagen.flow_from_dataframe(
    dataframe=test_df,
    x_col="file_name",
    y_col="binary_label",
    target_size=IMAGE_SIZE,
    batch_size=BATCH_SIZE,
    class_mode="binary",
)

# ✅ Configuraciones de Fine-Tuning
fine_tune_layers_list = [10, 20, 30]
learning_rates = [1e-4, 5e-5]

def train_and_evaluate(fine_tune_layers, learning_rate):
    fine_tune_label = f"FineTune_{fine_tune_layers}_LR{learning_rate}"
    print(f"\n🔹 Entrenando con Fine-Tuning ({fine_tune_layers} capas descongeladas, LR={learning_rate}) 🔹")

    # ✅ Cargar MobileNetV2 y congelar todas las capas inicialmente
    base_model = tf.keras.applications.MobileNetV2(
        weights="imagenet",
        include_top=False,
        input_shape=IMAGE_SHAPE,
    )
    base_model.trainable = False

    # ✅ Agregar capa de clasificación
    model = tf.keras.Sequential([
        base_model,
        tf.keras.layers.GlobalAveragePooling2D(),
        tf.keras.layers.Dense(1, activation="sigmoid"),
    ], name=f"{MODEL_NAME}_{VERSION_BASE}_{fine_tune_label}")

    # ✅ Entrenar solo el cabezal de clasificación primero
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss=focal_loss(alpha=0.25, gamma=1.0),
        metrics=["accuracy", Precision(name="precision"), Recall(name="recall"), AUC(name="auc")],
    )

    model.fit(
        train_images,
        epochs=5,
        validation_data=validation_images,
        class_weight=class_weight_dict,
    )

    # ✅ Descongelar las últimas `fine_tune_layers` capas de MobileNetV2
    base_model.trainable = True
    for layer in base_model.layers[:-fine_tune_layers]:
        layer.trainable = False  # Mantener congeladas capas iniciales

    # ✅ Compilar modelo con menor Learning Rate
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss=focal_loss(alpha=0.25, gamma=1.0),
        metrics=["accuracy", Precision(name="precision"), Recall(name="recall"), AUC(name="auc")],
    )

    # ✅ Entrenar Fine-Tuning
    start_time = time.time()
    history = model.fit(
        train_images,
        epochs=EPOCHS,
        validation_data=validation_images,
        class_weight=class_weight_dict,
        callbacks=[
            TensorBoard(log_dir=f"./logs/{MODEL_NAME}/{VERSION_BASE}/{fine_tune_label}"),
        ]
    )
    training_time = time.time() - start_time

    # ✅ Evaluar modelo
    results = model.evaluate(test_images)

    # ✅ Guardar métricas y resultados
    history_df = pd.DataFrame(history.history)
    os.makedirs(f"./logs/{MODEL_NAME}/{VERSION_BASE}/{fine_tune_label}", exist_ok=True)
    history_df.to_csv(f"./logs/{MODEL_NAME}/{VERSION_BASE}/{fine_tune_label}/history_{VERSION_BASE}_{fine_tune_label}.csv", index=False)

    metric_names = history.model.metrics_names
    evaluation_results = {("test_" + name): value for name, value in zip(metric_names, results)}
    evaluation_results["fine_tune_layers"] = fine_tune_layers
    evaluation_results["learning_rate"] = learning_rate
    evaluation_results["training_time"] = training_time

    results_df = pd.DataFrame([evaluation_results])
    results_df.to_csv(f"./logs/{MODEL_NAME}/{VERSION_BASE}/{fine_tune_label}/results_{VERSION_BASE}_{fine_tune_label}.csv", index=False)

    return results_df

# ✅ Ejecutar entrenamientos para cada configuración de Fine-Tuning
all_results = []
for fine_tune_layers in fine_tune_layers_list:
    for learning_rate in learning_rates:
        results_df = train_and_evaluate(fine_tune_layers, learning_rate)
        all_results.append(results_df)

# ✅ Guardar comparaciones de Fine-Tuning
final_results = pd.concat(all_results, ignore_index=True)
final_results.to_csv(f"./logs/{MODEL_NAME}/{VERSION_BASE}/fine_tuning_results.csv", index=False)

print("\n✅ ¡Fine-Tuning completado!")