In [None]:
import os, time, datetime
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.applications import ResNet50 # type: ignore
from tensorflow.keras import layers, models # type: ignore
from tensorflow.keras.callbacks import EarlyStopping # type: ignore
from tensorflow.keras.metrics import AUC, F1Score # type: ignore
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import load_img, img_to_array # type: ignore
from tensorflow.keras.preprocessing.image import ImageDataGenerator # type: ignore
from tensorflow.keras import backend as K # type: ignore
from sklearn.metrics import classification_report, confusion_matrix
from tensorflow.keras.optimizers.schedules import ExponentialDecay # type: ignore
from tensorflow.keras.optimizers import Adam # type: ignore
from tensorflow.keras.regularizers import l2 # type: ignore
from tensorflow.keras.models import load_model  # type: ignore
from tensorflow.keras.callbacks import TensorBoard # type: ignore

In [2]:
class ImageDataset:
    def __init__(self, data_dir):
        self.data_dir = data_dir
        self.images = []
        self.labels = []

    def load_data(self):
        for label in ['plaga', 'sana']:
            label_dir = os.path.join(self.data_dir, label)
            for image_name in os.listdir(label_dir):
                img_path = os.path.join(label_dir, image_name)
                img_array = self.load_and_preprocess_image(img_path)
                if img_array is not None:
                    self.images.append(img_array)
                    self.labels.append(self.assign_label(label))

    def load_and_preprocess_image(self, img_path):
        try:
            img = load_img(img_path, target_size=(224, 224))  # Redimensionar
            img_array = img_to_array(img) / 255.0  # Normalizar
            return img_array
        except OSError as e:
            print(f"Error al cargar la imagen {img_path}: {e}")
            return None

    def assign_label(self, label):
        return 1 if label == 'plaga' else 0  # Etiquetas binarias

In [21]:
class ResNetModel:
    def __init__(self):
        """Usamos ResNet50 como base y agregamos capas de salida."""
        base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))


        for layer in base_model.layers[-20:]:
            layer.trainable = True#Congelar las capas de la base para evitar el sobreajuste
        
        
        self.model = models.Sequential([
            base_model,
            layers.GlobalAveragePooling2D(),  # Promedio global de las características
            layers.Dense(128, activation='relu', kernel_regularizer=l2(0.001)),
            layers.Dropout(0.2),
            layers.Dense(64, activation='relu', kernel_regularizer=l2(0.001)),
            layers.Dense(1, activation='sigmoid')  # Salida binaria (plaga o sana)
        ])
        
    

    def compile(self):
        """Compila el modelo."""
        
        lr_schedule = ExponentialDecay(
            initial_learning_rate=0.001, #Valor inicial del learning rate
            decay_steps=50000,  # Número de pasos antes de reducir el learning rate
            decay_rate=0.96,     # Factor de reducción en cada paso
            staircase=True       # Si se debe hacer el descenso en "escalones"
        )
        
        optimizer = Adam(learning_rate=lr_schedule) # Usamos Adam con un scheduler de decaimiento exponencial
        
        self.model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy', AUC(), F1Score()]) 

    def train(self, X_train, y_train, X_val, y_val, epochs=6, batch_size=32):
        """Entrena el modelo con data augmentation.""" 

        
        # Early stopping
        early_stopping = EarlyStopping(
            monitor='val_loss',
            patience=5,
            restore_best_weights=True
        )

        log_dir = os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
        tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

        print("Iniciando el entrenamiento del modelo...")
        history = self.model.fit(
            X_train,
            y_train,
            batch_size=batch_size,
            epochs=epochs,
            validation_data=(X_val, y_val),
            callbacks=[early_stopping, tensorboard_callback],
            verbose=1
        )
        
        return history


    def evaluate(self, X_val, y_val):
        """Evalúa el modelo con los datos de validación."""
        loss, accuracy, auc, f1 = self.model.evaluate(X_val, y_val)
        print(f'Pérdida: {loss}, Accuracy: {accuracy}, AUC: {auc}, F1-Score: {f1}')
        return loss, accuracy, auc, f1

    def save(self, model_name='modelo_entrenado_resnet.h5'):
        """Guarda el modelo entrenado."""
        self.model.save(model_name)

        
        


In [22]:
data_dir = 'data/'  # Cambia a la ruta de tus imágenes
dataset = ImageDataset(data_dir)
dataset.load_data()

# Convertir listas a arrays de NumPy
images = np.array(dataset.images)
labels = np.array(dataset.labels)


# Dividir las imágenes en entrenamiento (80%) y validación (20%)
X_train, X_val, y_train, y_val = train_test_split(images, labels, test_size=0.2, random_state=42)

# Agregar una dimensión adicional para la metrica de f1-score 
y_train  = np.expand_dims(y_train, axis=-1)
y_val = np.expand_dims(y_val, axis=-1) 

#print("Clases en train:", np.bincount(y_train))
#print("Clases en val:", np.bincount(y_val))

print(y_train.shape)  # → (2466,)
print(y_val.shape)    # → (617,)


(2466, 1)
(617, 1)


In [23]:
# Crear y entrenar el modelo
model = ResNetModel()
model.compile()
print("Compilando el modelo...")
history = model.train(X_train, y_train, X_val, y_val, epochs=100)

# Evaluar el modelo
loss, accuracy, auc, f1 = model.evaluate(X_val, y_val)

Compilando el modelo...
Iniciando el entrenamiento del modelo...
Epoch 1/100
[1m78/78[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m827s[0m 10s/step - accuracy: 0.9199 - auc_6: 0.9675 - f1_score: 0.6863 - loss: 0.5501 - val_accuracy: 0.4684 - val_auc_6: 0.5000 - val_f1_score: 0.0000e+00 - val_loss: 396.3120
Epoch 2/100
[1m78/78[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m760s[0m 10s/step - accuracy: 0.9236 - auc_6: 0.9687 - f1_score: 0.6537 - loss: 0.4983 - val_accuracy: 0.4684 - val_auc_6: 0.4602 - val_f1_score: 0.6866 - val_loss: 7.4578
Epoch 3/100
[1m78/78[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m747s[0m 10s/step - accuracy: 0.9239 - auc_6: 0.9686 - f1_score: 0.6659 - loss: 0.4295 - val_accuracy: 0.5316 - val_auc_6: 0.5000 - val_f1_score: 0.6942 - val_loss: 8.5540
Epoch 4/100
[1m78/78[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m727s[0m 9s/step - accuracy: 0.9500 - auc_6: 0.9870 - f1_score: 0.6802 - loss: 0.3031 - val_accuracy: 0.5316 - val_auc_6: 0.8884 - val_f1

In [None]:
# Obtener predicciones como probabilidades
y_pred_probs = model.model.predict(X_val)

# Convertir probabilidades a etiquetas binarias
y_pred_classes = (y_pred_probs >= 0.5).astype(int)

In [None]:
y_val = np.expand_dims(y_val, axis=-1)  # → (617, 1)
y_val_flat = y_val.flatten()


In [None]:
import seaborn as sns

cm = confusion_matrix(y_val_flat, y_pred_classes)

# Visualización
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Sano', 'Plaga'], yticklabels=['Sano', 'Plaga'])
plt.xlabel('Predicción')
plt.ylabel('Real')
plt.title('Matriz de Confusión')
plt.show()

# Reporte detallado
print(classification_report(y_val_flat, y_pred_classes, target_names=['Sano', 'Plaga']))