In [None]:
# Bibliotecas básicas
import os
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns

# PyTorch y modelos
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import timm  # Biblioteca para modelos preentrenados avanzados

# Scikit-learn para métricas y división de datos
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix


In [None]:
# Parámetros generales
EPOCHS = 30
BATCH_SIZE = 16
LEARNING_RATE = 1e-4
IMAGE_SIZE = (224, 224)
SEED = 123
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Usando dispositivo: {DEVICE}")

# Fijar las semillas para reproducibilidad
torch.manual_seed(SEED)
np.random.seed(SEED)


In [None]:
# Ruta al directorio de imágenes
images_dir = '/ruta/a/tu/directorio/arcgis-survey-images-new'

# Obtener las clases y asignar etiquetas numéricas
class_names = sorted([d for d in os.listdir(images_dir) if os.path.isdir(os.path.join(images_dir, d))])
class_to_idx = {class_name: idx for idx, class_name in enumerate(class_names)}
print(f"Clases encontradas: {class_names}")

# Crear listas para rutas de imágenes y etiquetas
image_paths = []
labels = []

for class_name in class_names:
    class_dir = os.path.join(images_dir, class_name)
    for fname in os.listdir(class_dir):
        if fname.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
            image_paths.append(os.path.join(class_dir, fname))
            labels.append(class_to_idx[class_name])

# Crear DataFrame
data_df = pd.DataFrame({
    'filepath': image_paths,
    'label': labels
})

# Mostrar distribución de clases
print("Distribución de clases:")
print(data_df['label'].value_counts())


In [None]:
train_df, val_df = train_test_split(
    data_df,
    test_size=0.2,
    stratify=data_df['label'],
    random_state=SEED
)


In [None]:
# Importar OpenCV
import cv2

# Transformación personalizada
class CustomTransform:
    def __call__(self, image):
        # Convertir PIL Image a NumPy array
        image_np = np.array(image)
        
        # Convertir a escala de grises
        gray_image = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY)
        
        # Aplicar filtro de Sobel
        sobelx = cv2.Sobel(gray_image, cv2.CV_64F, 1, 0, ksize=5)
        sobely = cv2.Sobel(gray_image, cv2.CV_64F, 0, 1, ksize=5)
        sobel = cv2.magnitude(sobelx, sobely)
        sobel = cv2.normalize(sobel, None, 0, 255, cv2.NORM_MINMAX)
        sobel = sobel.astype(np.uint8)
        
        # Convertir de vuelta a RGB
        sobel_rgb = cv2.cvtColor(sobel, cv2.COLOR_GRAY2RGB)
        
        # Convertir a PIL Image
        image_filtered = Image.fromarray(sobel_rgb)
        
        return image_filtered

# Transformaciones para entrenamiento y validación
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize(IMAGE_SIZE),
        CustomTransform(),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ToTensor(),
        transforms.Normalize([0.485], [0.229])  # Normalización para imágenes en escala de grises
    ]),
    'val': transforms.Compose([
        transforms.Resize(IMAGE_SIZE),
        CustomTransform(),
        transforms.ToTensor(),
        transforms.Normalize([0.485], [0.229])
    ]),
}


In [None]:
class LeafDataset(Dataset):
    def __init__(self, df, transform=None):
        self.filepaths = df['filepath'].values
        self.labels = df['label'].values
        self.transform = transform

    def __len__(self):
        return len(self.filepaths)

    def __getitem__(self, idx):
        # Cargar imagen
        image = Image.open(self.filepaths[idx]).convert('RGB')
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label


In [None]:
# Crear datasets
train_dataset = LeafDataset(train_df, transform=data_transforms['train'])
val_dataset = LeafDataset(val_df, transform=data_transforms['val'])

# Crear dataloaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)


In [None]:
# Crear modelo Swin Transformer preentrenado
model = timm.create_model('swin_base_patch4_window7_224', pretrained=True)

# Modificar la última capa para el número de clases
model.head = nn.Linear(model.head.in_features, len(class_names))

# Mover el modelo al dispositivo
model = model.to(DEVICE)


In [None]:
# Congelar capas inferiores
for name, param in model.named_parameters():
    if 'layers' in name and int(name.split('.')[1]) < 2:  # Congelar las primeras 2 capas
        param.requires_grad = False


In [None]:
# Definir criterio de pérdida
criterion = nn.CrossEntropyLoss()

# Definir optimizador con weight decay para regularización
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)

# Scheduler para ajustar la tasa de aprendizaje
scheduler = lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)


In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    best_model_wts = model.state_dict()
    best_acc = 0.0
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
    
    early_stopping_patience = 5
    epochs_no_improve = 0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)
        
        # Cada época tiene una fase de entrenamiento y validación
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Modo entrenamiento
                dataloader = train_loader
            else:
                model.eval()   # Modo evaluación
                dataloader = val_loader

            running_loss = 0.0
            running_corrects = 0

            # Iterar sobre datos
            for inputs, labels in tqdm(dataloader):
                inputs = inputs.to(DEVICE)
                labels = labels.to(DEVICE)

                optimizer.zero_grad()

                # Habilitar gradientes solo en entrenamiento
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # Backpropagation y optimización solo en entrenamiento
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # Estadísticas
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / len(dataloader.dataset)
            epoch_acc = running_corrects.double() / len(dataloader.dataset)

            history[f'{phase}_loss'].append(epoch_loss)
            history[f'{phase}_acc'].append(epoch_acc.item())

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # Profundizar si hay mejora en la precisión
            if phase == 'val':
                if epoch_acc > best_acc:
                    best_acc = epoch_acc
                    best_model_wts = model.state_dict()
                    epochs_no_improve = 0
                else:
                    epochs_no_improve += 1

        # Early Stopping
        if epochs_no_improve >= early_stopping_patience:
            print("Early stopping")
            break

        print()

    print(f'Mejor precisión en validación: {best_acc:.4f}')
    # Cargar los mejores pesos
    model.load_state_dict(best_model_wts)
    return model, history


In [None]:
# Entrenar el modelo
model, history = train_model(model, criterion, optimizer, scheduler, num_epochs=EPOCHS)


In [None]:
def evaluate_model(model, dataloader):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in tqdm(dataloader):
            inputs = inputs.to(DEVICE)
            outputs = model(inputs)
            preds = outputs.softmax(dim=1).cpu().numpy()
            all_preds.append(preds)
            all_labels.extend(labels.numpy())

    all_preds = np.concatenate(all_preds, axis=0)
    return all_preds, np.array(all_labels)


In [None]:
# Obtener predicciones en el conjunto de validación
preds, val_labels = evaluate_model(model, val_loader)

# Obtener las clases predichas
pred_classes = np.argmax(preds, axis=1)


In [None]:
print("\nReporte de clasificación del modelo:\n")
print(classification_report(val_labels, pred_classes, target_names=class_names))


In [None]:
conf_matrix = confusion_matrix(val_labels, pred_classes)
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, xticklabels=class_names, yticklabels=class_names, annot=True, fmt='g', cmap='Blues')
plt.xlabel('Predicción')
plt.ylabel('Etiqueta Verdadera')
plt.title('Matriz de Confusión')
plt.show()


In [None]:
# Gráfica de pérdida y precisión
plt.figure(figsize=(12, 5))

# Pérdida
plt.subplot(1, 2, 1)
plt.plot(history['train_loss'], label='Entrenamiento')
plt.plot(history['val_loss'], label='Validación')
plt.legend()
plt.xlabel('Época')
plt.ylabel('Pérdida')
plt.title('Pérdida durante el Entrenamiento')

# Precisión
plt.subplot(1, 2, 2)
plt.plot(history['train_acc'], label='Entrenamiento')
plt.plot(history['val_acc'], label='Validación')
plt.legend()
plt.xlabel('Época')
plt.ylabel('Precisión')
plt.title('Precisión durante el Entrenamiento')

plt.tight_layout()
plt.show()
