In [None]:
!pip install torchmetrics



In [None]:
import os
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from torch.amp import autocast, GradScaler
from tqdm import tqdm

import torch
import torchmetrics
import cv2 as cv
import torch.nn as nn

from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms, models
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, recall_score, f1_score,confusion_matrix
from sklearn.model_selection import StratifiedShuffleSplit
import numpy as np

In [None]:
#TRAER LOS DATOS DESDE KAGGLE
import kagglehub
abdallahalidev_plantvillage_dataset_path = kagglehub.dataset_download('abdallahalidev/plantvillage-dataset')

print('Data source import complete.')


import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# Tratamiento de datos

In [None]:
# Species mapping
species_es_map = {
    'Strawberry': 'Fresa',
    'Grape': 'Uva',
    'Potato': 'Papa',
    'Blueberry': 'Arándano',
    'Corn_(maize)': 'Maíz',
    'Tomato': 'Tomate',
    'Peach': 'Durazno',
    'Pepper,_bell': 'Pimiento',
    'Orange': 'Naranja',
    'Cherry_(including_sour)': 'Cereza',
    'Apple': 'Manzana',
    'Raspberry': 'Frambuesa',
    'Squash': 'Calabaza',
    'Soybean': 'Soja'
}

# Disease mapping
disease_es_map = {
    'Black_rot': 'Podredumbre negra',
    'Early_blight': 'Tizón temprano',
    'Target_Spot': 'Mancha diana',
    'Late_blight': 'Tizón tardío',
    'Tomato_mosaic_virus': 'Virus del mosaico',
    'Haunglongbing_(Citrus_greening)': 'Huanglongbing (enverdecimiento de los cítricos)',
    'Leaf_Mold': 'Moho de la hoja',
    'Leaf_blight_(Isariopsis_Leaf_Spot)': 'Tizón de la hoja',
    'Powdery_mildew': 'Oídio',
    'Cedar_apple_rust': 'Roya del manzano y cedro',
    'Bacterial_spot': 'Mancha bacteriana',
    'Common_rust_': 'Roya común',
    'Esca_(Black_Measles)': 'Esca',
    'Tomato_Yellow_Leaf_Curl_Virus': 'Virus del rizado amarillo de la hoja',
    'Apple_scab': 'Sarna',
    'Northern_Leaf_Blight': 'Tizón foliar del norte',
    'Spider_mites Two-spotted_spider_mite': 'Ácaros araña de dos manchas',
    'Septoria_leaf_spot': 'Mancha foliar por septoria',
    'Cercospora_leaf_spot Gray_leaf_spot': 'Mancha foliar por cercospora / mancha foliar gris',
    'Leaf_scorch': 'Quemadura de la hoja'
}

In [None]:
# Directorio en Kaggle
# dataset_path = "plantvillage_dataset/plantvillage dataset/color"
# Directorio para corrida local
dataset_path = "/kaggle/input/plantvillage-dataset/plantvillage dataset/color/"

data = []

# Recorremos carpetas y archivos
if os.path.exists(dataset_path):
    for folder in os.listdir(dataset_path):
        folder_path = os.path.join(dataset_path, folder)
        if not os.path.isdir(folder_path):
            continue
        species, disease = folder.split('___', 1)
        healthy = disease == 'healthy'
        disease = None if healthy else disease
        for file in os.listdir(folder_path):
            file_path = os.path.join(folder_path, file)
            if os.path.isfile(file_path):
                data.append({
                    'Format': 'color',
                    'Species': species,
                    'Healthy': healthy,
                    'Disease': disease,
                    'FileName': file
                })

# Crear DataFrame
df = pd.DataFrame(data)

In [None]:
# Agregar nombre en español para especie
df['Especie'] = df['Species'].map(species_es_map)

# Agregar nombre en español para enfermedad (o 'Sano' si es healthy)
df['Enfermedad'] = df.apply(
    lambda row: 'Sano' if row['Healthy'] else disease_es_map.get(row['Disease'], row['Disease']),
    axis=1
)

# Duplicar columna "Enfermedad" como "Label", por cuestiones prácticas
df['Label'] = df['Enfermedad']

# Codificar las etiquetas compuestas como números
df['Label_id'] = df['Label'].astype('category').cat.codes

# Crear el diccionario de mapeo id → etiqueta compuesta
label_map = dict(enumerate(df['Label'].astype('category').cat.categories))

# Número de clases únicas
NUM_CLASSES = len(label_map)

In [None]:
print(f"Número de clases: {NUM_CLASSES}")

In [None]:
df["disease_text"] = df.apply(lambda r: "healthy" if r["Healthy"] else r["Disease"], axis=1)
df["Label_id"] = df["disease_text"].astype("category").cat.codes
NUM_CLASSES = df["Label_id"].nunique()
label_map = dict(enumerate(df["disease_text"].astype("category").cat.categories))


In [None]:
df

In [None]:
# Semilla reproducible
SEED = 42

# Split 85% train, 15% valid
train_df, valid_df = train_test_split(
    df,
    test_size=0.15,
    stratify=df['Label_id'],
    random_state=SEED
)

In [None]:
len(valid_df)

# Clase

In [None]:
class PlantVillageDataset(torch.utils.data.Dataset):
    def __init__(self, df, root_dir, format_type='color', transform=None):
        self.df = df.reset_index(drop=True)
        self.root_dir = root_dir
        self.format_type = format_type
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        folder = f"{row['Species']}___{'healthy' if row['Healthy'] else row['Disease']}"
        image_path = os.path.join(self.root_dir, folder, row['FileName'])

        image = cv.imread(image_path, cv.IMREAD_COLOR)  # lectura robusta en color
        if image is None:
            raise FileNotFoundError(f"Image not found or unreadable: {image_path}")

        # Asegurar RGB (imread devuelve BGR)
        image = cv.cvtColor(image, cv.COLOR_BGR2RGB)


        # image es numpy array uint8, listo para ToPILImage
        label = torch.tensor(row['Label_id'], dtype=torch.long)

        if self.transform:
            image = self.transform(image)

        return image, label

# Dividir dataset para iterar rapido

In [None]:
SE_SMALL_TRAIN = True
TRAIN_FRAC = 0.2  # porcentaje de train

def stratified_fraction_indices(y, frac=0.2, seed=1337):
    sss = StratifiedShuffleSplit(n_splits=1, train_size=frac, random_state=seed)
    (idx_small, _), = sss.split(np.zeros_like(y), y)  # ahora train_size=frac
    return idx_small

base_train_df = train_df
if USE_SMALL_TRAIN:
    idx = stratified_fraction_indices(train_df["Label_id"].values, frac=TRAIN_FRAC, seed=1337)
    base_train_df = train_df.iloc[idx].reset_index(drop=True)

total_train = len(train_df)
subset_train = len(base_train_df)

print(f"Total imágenes en train: {total_train}")
print(f"Usando en subset: {subset_train} ({subset_train/total_train:.1%} del train)")

# Dataset

In [None]:
# Normalización estándar ImageNet
imagenet_mean = [0.485, 0.456, 0.406]
imagenet_std  = [0.229, 0.224, 0.225]

train_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomResizedCrop((224, 224), scale=(0.5, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.25, contrast=0.25, saturation=0.25, hue=0.03),
    transforms.ToTensor(),
    transforms.RandomErasing(p=0.25,
                             scale=(0.02, 0.12),
                             ratio=(0.3, 3.3)),
    transforms.Normalize(mean=imagenet_mean, std=imagenet_std),
])


# Transform para validación
val_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize(256),
    transforms.CenterCrop((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=imagenet_mean, std=imagenet_std),
])


In [None]:
train_dataset = PlantVillageDataset(base_train_df, dataset_path, transform=train_transform)
valid_dataset = PlantVillageDataset(valid_df,        dataset_path, transform=val_transform)

BATCH_SIZE = 128
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True,  num_workers=0, pin_memory=True)
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)

data_dict = {"train": train_loader, "valid": valid_loader, "image_width": 224, "image_height": 224}


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Usando dispositivo: {device}")

# Class weight

In [None]:


counts = base_train_df["Label_id"].value_counts().sort_index().to_numpy(dtype=np.float32)
class_weights = torch.tensor(1.0 / (counts + 1e-6), dtype=torch.float32)
class_weights = class_weights / class_weights.sum() * len(counts)
class_weights = class_weights.to(device)

criterion = nn.CrossEntropyLoss(weight=class_weights, label_smoothing=0.1)


# Model

In [None]:
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)

In [None]:
import cv2 as cv
cv.setNumThreads(0)   # Esto lo agregue porque me tiraba un bug

In [None]:
# Info de loaders e imagen
data_dict = {
    "train": train_loader,
    "valid": valid_loader,
    "image_width": 224,
    "image_height": 224
}

# TensorBoard
writer = {
    "train": SummaryWriter(log_dir="runs/plant_train"),
    "valid": SummaryWriter(log_dir="runs/plant_valid")
}

# Train (cambie la funcion porque me tiraba errores)

In [None]:
from torch.amp import autocast, GradScaler  # en lugar de torch.cuda.amp
scaler = GradScaler()

In [None]:


from tqdm.auto import tqdm
from typing import Dict, Any, Optional


def train(
    model: nn.Module,
    optimizer: torch.optim.Optimizer,
    criterion: nn.Module,
    metric,  # torchmetrics.Metric
    data: Dict[str, Any],
    epochs: int,
    tb_writer: Optional[Dict[str, Any]] = None,  # {"train": SummaryWriter, "valid": SummaryWriter}
    log_interval: int = 10,
    early_stop_patience: int = 3,
    grad_clip_norm: Optional[float] = None,
    use_amp: bool = True,
    best_ckpt_path: str = "mejor_modelo.pth",
) -> Dict[str, list]:
    """
    Entrena un modelo de clasificación con validación, AMP, early stopping y TensorBoard.

    data: {
        "train": DataLoader,
        "valid": DataLoader,
        "image_width": int,
        "image_height": int
    }
    """
    train_loader = data["train"]
    valid_loader = data["valid"]
    image_width, image_height = data["image_width"], data["image_height"]

    device = next(model.parameters()).device
    device_type = "cuda" if torch.cuda.is_available() and device.type == "cuda" else "cpu"

    # AMP moderno
    scaler = GradScaler(enabled=(use_amp and device_type == "cuda"))

    # TB graph (opcional; a veces falla con ciertos módulos)
    if tb_writer:
        try:
            dummy = torch.zeros((1, 3, image_height, image_width), device=device)
            tb_writer["train"].add_graph(model, dummy)
        except Exception:
            pass  # evitar que rompa por modelos no trazables

    best_val_loss = float("inf")
    patience_counter = 0

    history = {
        "train_loss": [],
        "train_acc": [],
        "valid_loss": [],
        "valid_acc": [],
    }

    for epoch in range(1, epochs + 1):
        # -----------------------
        # Train
        # -----------------------
        model.train()
        metric.reset()
        running_loss = 0.0

        pbar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch}/{epochs} [Train]")
        for step, (x, y) in pbar:
            x = x.to(device, non_blocking=True)
            y = y.to(device, non_blocking=True)

            optimizer.zero_grad(set_to_none=True)

            if use_amp:
                with autocast(device_type=device_type):
                    logits = model(x)
                    loss = criterion(logits, y)
            else:
                logits = model(x)
                loss = criterion(logits, y)

            if scaler.is_enabled():
                scaler.scale(loss).backward()
                if grad_clip_norm is not None:
                    # unscale antes de clipear
                    scaler.unscale_(optimizer)
                    torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip_norm)
                scaler.step(optimizer)
                scaler.update()
            else:
                loss.backward()
                if grad_clip_norm is not None:
                    torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip_norm)
                optimizer.step()

            running_loss += loss.item()
            metric.update(logits, y)

            if step % log_interval == 0:
                avg_loss = running_loss / (step + 1)
                batch_acc = metric.compute().item()
                pbar.set_postfix(loss=f"{avg_loss:.4f}", acc=f"{batch_acc:.4f}")

        epoch_train_loss = running_loss / max(1, len(train_loader))
        epoch_train_acc = metric.compute().item()
        history["train_loss"].append(epoch_train_loss)
        history["train_acc"].append(epoch_train_acc)

        # -----------------------
        # Valid
        # -----------------------
        model.eval()
        metric.reset()
        val_running_loss = 0.0

        with torch.inference_mode():
            for x, y in valid_loader:
                x = x.to(device, non_blocking=True)
                y = y.to(device, non_blocking=True)
                if use_amp:
                    with autocast(device_type=device_type):
                        logits = model(x)
                        loss = criterion(logits, y)
                else:
                    logits = model(x)
                    loss = criterion(logits, y)

                val_running_loss += loss.item()
                metric.update(logits, y)

        epoch_val_loss = val_running_loss / max(1, len(valid_loader))
        epoch_val_acc = metric.compute().item()
        history["valid_loss"].append(epoch_val_loss)
        history["valid_acc"].append(epoch_val_acc)

        print(
            f"Epoch {epoch} | "
            f"Train Loss: {epoch_train_loss:.4f} Acc: {epoch_train_acc:.4f} | "
            f"Valid Loss: {epoch_val_loss:.4f} Acc: {epoch_val_acc:.4f}"
        )

        # TensorBoard
        if tb_writer:
            tb_writer["train"].add_scalar("loss", epoch_train_loss, epoch)
            tb_writer["train"].add_scalar("accuracy", epoch_train_acc, epoch)
            tb_writer["valid"].add_scalar("loss", epoch_val_loss, epoch)
            tb_writer["valid"].add_scalar("accuracy", epoch_val_acc, epoch)
            tb_writer["train"].flush()
            tb_writer["valid"].flush()

        # Early stopping + checkpoint
        if epoch_val_loss < best_val_loss:
            best_val_loss = epoch_val_loss
            patience_counter = 0
            torch.save(model.state_dict(), best_ckpt_path)
            print(f"✅ Mejor modelo guardado en '{best_ckpt_path}' (val_loss={best_val_loss:.4f})")
        else:
            patience_counter += 1
            if patience_counter >= early_stop_patience:
                print(f"⏹️ Early stopping en epoch {epoch}")
                break

    return history


# Tecnica entrenar multiple pasos

## Paso A

In [None]:
# Congelar backbone para linear probe
for p in model.parameters():
    p.requires_grad = False

model.fc = nn.Linear(model.fc.in_features, NUM_CLASSES)
model = model.to(device)

# 2) Optimizer (solo la cabeza)
optimizer = torch.optim.AdamW(model.fc.parameters(), lr=1e-3, weight_decay=1e-4)

# 3) Métrica (macro)
metric = torchmetrics.classification.MulticlassAccuracy(
    num_classes=NUM_CLASSES, average='macro'
).to(device)

history_s1 = train(
    model=model,
    optimizer=optimizer,
    criterion=criterion,
    metric=metric,
    data=data_dict,
    epochs=10,
    tb_writer=writer,
    best_ckpt_path="best_s1.pth"
)

## Paso B

In [None]:
model.load_state_dict(torch.load("/content/mejor_modelo.pth", map_location=device))

In [None]:
# PASO 2
for p in model.parameters():
    p.requires_grad = False
for p in model.layer4.parameters():
    p.requires_grad = True
for m in model.layer4.modules():  # BN de layer4 en train
    if isinstance(m, nn.BatchNorm2d):
        m.train()
        m.requires_grad_(True)

optimizer = torch.optim.AdamW(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=3e-4, weight_decay=1e-4
)

history_s2 = train(
    model=model,
    optimizer=optimizer,
    criterion=criterion,
    metric=metric,
    data=data_dict,
    epochs=10,  # 8–12
    tb_writer=writer,
    best_ckpt_path="best_s2.pth"
)


### Paso C

In [None]:
model.load_state_dict(torch.load("best_s2.pth", map_location=device))

for p in model.parameters():
    p.requires_grad = True

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)

history_s3 = train(
    model=model,
    optimizer=optimizer,
    criterion=criterion,
    metric=metric,
    data=data_dict,
    epochs=12,  # 10–15
    tb_writer=writer,
    best_ckpt_path="best_s3.pth"
)


# Validacion

In [None]:
@torch.no_grad()
def eval_metrics(model, valid_loader, device, label_map=None):
    """
    Devuelve:
      acc_global, recall_sano, f1_sano, recall_enf, f1_enf
    """
    model.eval()
    y_true, y_pred = [], []

    for x, y in valid_loader:
        x = x.to(device, non_blocking=True)
        y = y.to(device, non_blocking=True)
        logits = model(x)
        pred = logits.argmax(dim=1)
        y_true.append(y.cpu().numpy())
        y_pred.append(pred.cpu().numpy())

    y_true = np.concatenate(y_true)
    y_pred = np.concatenate(y_pred)

    # 1) Accuracy global (multiclase)
    acc_global = accuracy_score(y_true, y_pred)

    # 2) Identificar el id de 'healthy' (Sano)
    #    preferimos usar label_map si lo tenés (id->nombre)
    if label_map is not None:
        healthy_id = None
        for k, v in label_map.items():
            if str(v).lower() == "healthy":
                healthy_id = int(k); break
        if healthy_id is None:
            raise ValueError("No encontré 'healthy' en label_map. Revisá label_map.")
    else:
        # fallback: deducir desde el y_true (si df está accesible podrías usarlo mejor)
        # acá asumimos que existe al menos una etiqueta 'healthy' en valid
        raise ValueError("Pasá label_map para identificar la clase 'healthy' de forma segura.")

    # 3) Métricas de la clase Sano (one-vs-rest)
    y_true_sano = (y_true == healthy_id).astype(int)
    y_pred_sano = (y_pred == healthy_id).astype(int)
    recall_sano = recall_score(y_true_sano, y_pred_sano, zero_division=0)
    f1_sano     = f1_score(y_true_sano, y_pred_sano, zero_division=0)

    # 4) Métricas de Enfermedades combinadas (todo lo que NO es healthy)
    y_true_enf = (y_true != healthy_id).astype(int)
    y_pred_enf = (y_pred != healthy_id).astype(int)
    recall_enf = recall_score(y_true_enf, y_pred_enf, zero_division=0)
    f1_enf     = f1_score(y_true_enf, y_pred_enf, zero_division=0)

    return acc_global, recall_sano, f1_sano, recall_enf, f1_enf

# --- Llamada y print bonito ---
acc, r_sano, f1_sano, r_enf, f1_enf = eval_metrics(model, valid_loader, device, label_map=label_map)
print(f"Accuracy global: {acc:.4f}")
print(f"Recall clase 'Sano': {r_sano:.4f}")
print(f"F1-score clase 'Sano': {f1_sano:.4f}")
print(f"Recall enfermedades (combinadas): {r_enf:.4f}")
print(f"F1-score enfermedades (combinadas): {f1_enf:.4f}")

In [None]:
@torch.no_grad()
def preds_targets(model, loader, device):
    model.eval()
    ys, ps = [], []
    for x,y in loader:
        x = x.to(device); y = y.to(device)
        p = model(x).argmax(1)
        ys.append(y.cpu().numpy()); ps.append(p.cpu().numpy())
    return np.concatenate(ys), np.concatenate(ps)

y_true, y_pred = preds_targets(model, valid_loader, device)
cm = confusion_matrix(y_true, y_pred)
print("Confusion matrix shape:", cm.shape)

In [None]:
class_names = sorted(train_df['disease_text'].unique().tolist())


In [None]:
from sklearn.metrics import classification_report
print(classification_report(y_true, y_pred, target_names=class_names))