In [None]:
import torch
import numpy as np
import torchvision
from torchvision import datasets 
import torchvision.transforms  as T
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader , random_split
from torch import nn 
import torch.nn.functional as F
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu'  ) 
torch.manual_seed(13)

In [None]:
path= './DATASET/'
min_batch = 512
TRAIN_SIZE = 50000
VAL_SIZE = 5000
TEST_SIZE = 5000

In [None]:
print(f"Usando dispositivo: {device}")

In [None]:
transform_cifar10_train = T.Compose([
    T.RandomHorizontalFlip(p=0.3),
    T.ToTensor(),
    T.Normalize([0.491, 0.482, 0.447], [0.247, 0.243, 0.262])
])
transform_cifar10_test = T.Compose([
                T.ToTensor(),
                T.Normalize([0.491, 0.482, 0.447], [0.247, 0.243, 0.262])
            ])

In [None]:
cifar10_train = datasets.CIFAR10(path, train=True, download=False,transform=transform_cifar10_train)


test_dataset = datasets.CIFAR10(path, train=False, download=False, transform=transform_cifar10_test)
val_dataset, test_dataset = random_split(test_dataset, [VAL_SIZE, TEST_SIZE])

train_loader = DataLoader(cifar10_train, batch_size=min_batch, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=min_batch, shuffle=True, num_workers=4, pin_memory=True)

In [None]:
def plot_cifar10_grid():
    classes = train_loader.dataset.classes
    total_samples = 8
    plt.figure(figsize=(15,15))
    for label, sample in enumerate(classes):
        class_idxs = np.flatnonzero(label == np.array(train_loader.dataset.targets))
        sample_idxs = np.random.choice(class_idxs, total_samples, replace = False)
        for i, idx in enumerate(sample_idxs):
            plt_idx = i*len(classes) + label + 1
            plt.subplot(total_samples, len(classes), plt_idx)
            plt.imshow(train_loader.dataset.data[idx])
            plt.axis('off')
            
            if i == 0: plt.title(sample)
    plt.show()

plot_cifar10_grid() 

# ResNet

In [None]:
def conlayer_K3P1(canal_in , canal_out , stride):
    return nn.Conv2d(canal_in , canal_out , stride=stride , kernel_size=3 , padding=1)

In [None]:
class residual_block(nn.Module):
    '''

    '''
    def __init__(self, in_channel, out_channel, stride=1, change_size = True):
        super().__init__()
        self.conv1 = conlayer_K3P1(in_channel, out_channel, stride)
        self.bn1 = nn.BatchNorm2d(out_channel)
        self.conv2 = conlayer_K3P1(out_channel, out_channel, 1)
        self.bn2 = nn.BatchNorm2d(out_channel)
        #for changing activation map sizes
        self.change_size = change_size
        if change_size:
            self.residual = nn.Sequential(nn.Conv2d(in_channel, 
                                                    out_channel, 
                                                    kernel_size=1,
                                                    stride=stride),
                                         nn.BatchNorm2d(out_channel)
                                         )      
    def forward(self, x):
        identity = x if not self.change_size else self.residual(x)
        y = F.relu(self.bn1(self.conv1(x)))
        y = self.bn2(self.conv2(y))
        y += identity
        return F.relu(y)

In [None]:
class ResNet56(nn.Module):
    def __init__(self, n=9, num_classes=10):
        super().__init__()
        self.conv1 = conlayer_K3P1(3, 16, stride = 1)
        self.bn1 = nn.BatchNorm2d(16)
        self.block1 = self.create_block(n=9, in_channel=16, 
                                        out_channel=16, stride=1, 
                                        change_size=False)
        self.block2 = self.create_block(n=9, in_channel=16, 
                                        out_channel=32, stride=2)
        self.block3 = self.create_block(n=9, in_channel=32, 
                                        out_channel=64, stride=2)
        self.fc = nn.Linear(64, num_classes)

    def create_block(self, n, in_channel, out_channel, stride, change_size=True):
        block = [residual_block(in_channel, out_channel, stride, change_size=change_size)]
        for i in range(n-1):
            block.append(residual_block(out_channel, out_channel, stride=1, change_size=False))
        return nn.Sequential(*block)   
        
    def forward(self, x):
        y = F.relu(self.bn1(self.conv1(x)))
        y = self.block3(self.block2(self.block1(y)))
        y = F.adaptive_avg_pool2d(y, 1)
        return self.fc(y.view(y.size(0), -1))      
       

In [None]:
model = ResNet56()
optimizer_resnet56 = torch.optim.SGD(model.parameters() , lr= 0.1 , momentum=0.95 , weight_decay=1e-4)

# Funcion de Accuracy



In [None]:
def accuracy(model, loader):
    correct = 0  # Contador para el número de predicciones correctas
    total = 0    # Contador para el número total de muestras
    cost = 0     # Acumulador para la pérdida total
    model.eval()  # Configura el modelo en modo de evaluación (desactiva dropout, etc.)
    model = model.to(device=device)  # Mueve el modelo al dispositivo (CPU o GPU)
    with torch.no_grad():  # Desactiva el cálculo de gradientes para ahorrar memoria y tiempo
        for x, y in loader:  # Itera sobre cada lote en el cargador de datos
            x = x.to(device=device, dtype=torch.float32)  # Mueve los datos de entrada al dispositivo
            y = y.to(device=device, dtype=torch.long)     # Mueve las etiquetas al dispositivo
            scores = model(x)  # Calcula las predicciones del modelo (salidas en bruto)
            cost += (F.cross_entropy(scores, y)).item()  # Acumula la pérdida de entropía cruzada
            _, pred = scores.max(dim=1)  # Obtiene las predicciones (clase con mayor puntuación)
            correct += (pred == y).sum()  # Suma el número de predicciones correctas
            total += pred.size(0)  # Acumula el número total de muestras en el lote
        # Calcula la pérdida promedio y la precisión total
        return cost / len(loader), float(correct) / total

In [None]:
def find_lr(model, optimiser, start_val=1e-6, end_val=1, beta=0.99, loader=train_loader):
    """
    Encuentra la tasa de aprendizaje óptima utilizando la prueba de rango de tasa de aprendizaje.

    Args:
        model: El modelo de aprendizaje automático que se está entrenando.
        optimiser: El optimizador utilizado para actualizar los pesos del modelo.
        start_val: La tasa de aprendizaje inicial (por defecto 1e-6).
        end_val: La tasa de aprendizaje final (por defecto 1).
        beta: Factor de suavizado para la media móvil exponencial de la pérdida (por defecto 0.99).
        loader: El cargador de datos que proporciona los lotes de entrenamiento (por defecto train_loader).

    Returns:
        log_lrs: Lista de tasas de aprendizaje utilizadas en cada iteración.
        losses: Lista de pérdidas promedio suavizadas calculadas en cada lote.
        accuracies: Lista de precisiones calculadas por lote.
        optimal_lr: Tasa de aprendizaje óptima seleccionada automáticamente.
    """
    n = len(loader) - 1
    factor = (end_val / start_val) ** (1 / n)
    lr = start_val
    optimiser.param_groups[0]['lr'] = lr

    avg_loss, loss = 0.0, 0.0
    lowest_loss = 0.0
    optimal_lr = start_val  # Inicializamos con el valor inicial
    losses = []
    log_lrs = []
    accuracies = []

    model = model.to(device=device)

    for i, (x, y) in enumerate(loader, start=1):
        x = x.to(device=device, dtype=torch.float32)
        y = y.to(device=device, dtype=torch.long)
        optimiser.zero_grad()
        scores = model(x)
        cost = F.cross_entropy(input=scores, target=y)

        loss = beta * loss + (1 - beta) * cost.item()
        avg_loss = loss / (1 - beta ** i)

        acc_ = (torch.argmax(scores, dim=1) == y).sum().item() / scores.size(0)

        # Actualizamos lowest_loss y guardamos el lr correspondiente
        if avg_loss < lowest_loss or i == 1:
            lowest_loss = avg_loss
            optimal_lr = lr  # Guardamos el lr donde la pérdida es mínima

        # Detenemos si la pérdida crece demasiado
        if i > 1 and avg_loss > 4 * lowest_loss:
            print(f'Se detuvo en el lote {i}, cost: {cost.item():.4f}')
            return log_lrs, losses, accuracies, optimal_lr

        accuracies.append(acc_)
        losses.append(avg_loss)
        log_lrs.append(lr)

        cost.backward()
        optimiser.step()

        print(f'cost: {cost.item():.4f}, lr: {lr:.4f}, acc: {acc_:.4f}')
        lr *= factor
        optimiser.param_groups[0]['lr'] = lr

    # Si no se detiene antes, devolvemos el lr óptimo basado en la pérdida mínima
    return log_lrs, losses, accuracies, optimal_lr

In [None]:
def train(model, optimiser, scheduler=None, epochs=100):
    val_loss_history = []  # Lista para almacenar la pérdida de validación por época
    train_loss_history = []  # Lista para almacenar la pérdida de entrenamiento por época
    val_acc_history = []  # Lista para almacenar la precisión de validación por época
    train_acc_history = []  # Lista para almacenar la precisión de entrenamiento por época
    lrs = []  # Lista para almacenar las tasas de aprendizaje por época

    for epoch in range(epochs):
        model.train()  # Configura el modelo en modo de entrenamiento
        train_correct_num = 0  # Contador de predicciones correctas
        train_total = 0  # Contador del total de muestras
        train_cost_acum = 0  # Acumulador de la pérdida

        # Bucle de entrenamiento
        for mb, (x, y) in enumerate(train_loader, start=1):
            x = x.to(device, dtype=torch.float32)
            y = y.to(device, dtype=torch.long)
            scores = model(x)
            cost = F.cross_entropy(input=scores, target=y)

            optimiser.zero_grad()
            cost.backward()
            optimiser.step()
            if scheduler:
                scheduler.step()

            # Acumula métricas de entrenamiento
            train_correct_num += (torch.argmax(scores, dim=1) == y).sum()
            train_total += scores.size(0)
            train_cost_acum += cost.item()

        # Calcula métricas promedio de entrenamiento por época
        train_acc = float(train_correct_num) / train_total
        train_cost = train_cost_acum / len(train_loader)

        # Evalúa en el conjunto de validación (una vez por época)
        val_cost, val_acc = accuracy(model, val_loader)

        # Almacena las métricas (una vez por época)
        train_loss_history.append(train_cost)
        val_loss_history.append(val_cost)
        train_acc_history.append(train_acc)
        val_acc_history.append(val_acc)
        lrs.append(optimiser.param_groups[0]["lr"])

        # Imprime un resumen de la época
        print(f'Epoch:{epoch}, train cost: {train_cost:.6f}, val cost: {val_cost:.6f},'
              f' train acc: {train_acc:.4f}, val acc: {val_acc:.4f}, total: {train_total},'
              f' lr: {optimiser.param_groups[0]["lr"]:.6f}')

    return train_loss_history, val_loss_history, train_acc_history, val_acc_history, lrs

In [None]:
# Configuración del modelo y optimizador
model_resnet56 = ResNet56()
model_resnet56 = model_resnet56.to(device)
optimiser_resnet56 = torch.optim.SGD(model_resnet56.parameters(),
                                     lr=0.1, momentum=0.95,
                                     weight_decay=1e-4)

# Ejecutar find_lr para obtener el lr óptimo
log_lrs, losses, accuracies, optimal_lr = find_lr(model_resnet56, optimiser_resnet56, 
                                                  start_val=1e-6, end_val=1)

print(f"Tasa de aprendizaje óptima encontrada: {optimal_lr:.6f}")

# Configurar el scheduler con el lr óptimo como max_lr
epochs = 10
scheduler = torch.optim.lr_scheduler.OneCycleLR(optimiser_resnet56, 
                                                max_lr=optimal_lr,  # Usamos el valor óptimo
                                                steps_per_epoch=len(train_loader), 
                                                epochs=epochs, 
                                                pct_start=0.43, 
                                                div_factor=10, 
                                                final_div_factor=1000, 
                                                three_phase=True, 
                                                verbose=False)

# Entrenar el modelo
train_loss_history, val_loss_history, train_acc_history, val_acc_history, lrs = train(
                                model_resnet56, 
                                optimiser_resnet56,
                                scheduler=scheduler,
                                epochs=epochs)

In [None]:
def predecir(model, img, lbl):
    # Asegurarse de que la imagen ya está normalizada (asumimos que test_dataset lo hizo)
    # Agregar dimensión de batch
    img_batch = img.unsqueeze(0).to('cuda')
    
    # Generar predicción
    logits = model(img_batch)
    y_pred = logits.argmax(1).item()

    # Mostrar imagen original (transponer de CHW a HWC para matplotlib)
    img_for_display = img.cpu().numpy().transpose((1, 2, 0))  # De [3, 32, 32] a [32, 32, 3]
    plt.imshow(img_for_display)  # Sin cmap para RGB
    plt.title(f'Categoría predicha: {y_pred}, categoría real: {lbl}')
    plt.show()



In [None]:
# Tomar una imagen del set de prueba
img, lbl = test_dataset[torch.randint(len(test_dataset), size=(1,)).item()]

# Verificar la forma de la imagen antes de predecir
print(f"Forma de la imagen original: {img.shape}")

# Y generar la predicción
predecir(model_resnet56, img, lbl)