# Módulo 2 - Preprocesado

## Librerías

In [None]:
import torch
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, random_split
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

import kagglehub

from sklearn.metrics import roc_auc_score, confusion_matrix

import seaborn as sns
###### from sklearn.metrics import roc_auc_score
import time
import numpy as np


## Data Loading

In [None]:
# Download latest version
images_path = kagglehub.dataset_download("arafatsahinafridi/multi-class-driver-behavior-image-dataset") + "/Multi-Class Driver Behavior Image Dataset"

print("Path to dataset files:", images_path)

## Transformaciones

### Cambio de tamaño de imágenes

In [3]:
resize_options = {
    "ResNet": (224, 224) #(224, 224)
}

# Modificar esta línea para cambiar la opción de resizing.
resize_selection = resize_options["ResNet"]

size_transformation = transforms.Resize(resize_selection)

### Normalización de valores de pixeles

In [4]:
normalization_options = {
    "TorchDefault": [[0.485, 0.456, 0.406],[0.229, 0.224, 0.225]]
}

# Modificar esta línea para cambiar la opción de normalización.
normalization_option = normalization_options["TorchDefault"]
norm_option_mean = normalization_option[0]
norm_option_std = normalization_option[1]

norm_transformation = transforms.Normalize(mean=norm_option_mean, std=norm_option_std)

### Rotación aleatoria

In [5]:
randRotation_transformation = transforms.RandomRotation(degrees=(0,180))

### Rotación horizontal aleatoria 

In [6]:
randHorizontalRotation_transformation = transforms.RandomHorizontalFlip(p=0.5)

### Transformación a Tensor

In [7]:
toTensor_transformation = transforms.ToTensor()

### Concatenación de transformaciones

In [8]:
pipeline_settings = {
    "Initial": [size_transformation, toTensor_transformation, norm_transformation]
}

# Modificar para cambiar el pipeline de transformación
pipeline_selection = pipeline_settings["Initial"]
transform_pipeline = transforms.Compose([
    size_transformation,
    toTensor_transformation,
    norm_transformation
])

### Cargado y aplicación de las transformaciones

In [None]:
dataset = datasets.ImageFolder(images_path, transform=transform_pipeline)
print(dataset.classes)

## Train-Val-Test Split (70%, 20%, 10%)

In [10]:
dataset_len = len(dataset)
train_len = int(0.7 * dataset_len)
val_len = int(0.2 * dataset_len)
test_len = dataset_len - train_len - val_len

train_set, val_set, test_set = random_split(dataset, [train_len, val_len, test_len])


batch_size = 32
train_loader = DataLoader(
    train_set,
    batch_size=batch_size,
    shuffle=True
)
val_loader = DataLoader(
    val_set,
    batch_size=batch_size,
    shuffle=False
)
test_loader = DataLoader(
    test_set,
    batch_size=batch_size,
    shuffle=False
)

## Función de pérdida y algoritmo de optimización

## Modelos

In [11]:
modelos = {}

### ResNet18

In [None]:
resnet_model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True)

In [13]:
modelos["Resnet18"] = {
    "model": resnet_model,
    "optimizer": optim.Adam(resnet_model.parameters(), lr=1e-4),
    "criterion": nn.CrossEntropyLoss()
}

### VGG19

In [None]:
vgg19_model = models.vgg19(pretrained=True)
vgg19_model.classifier[6] = nn.Linear(4096, 5) # ???, número de clases

In [15]:
modelos["VGG19"] = {
    "model": vgg19_model,
    "optimizer": optim.Adam(vgg19_model.parameters(), lr=1e-4),
    "criterion": nn.CrossEntropyLoss()
}

### Modelo custom 1

### Modelo custom 2

## Configuración de tracking de métricas

In [16]:
def save_model(model_name, model): 
    model_path = f"./{model_name}.pth"
    torch.save(model.state_dict(), model_path)
    print(f"Model {model_name} saved to: {model_path}")

def train_model(model, optimizer, device, criterion, model_name):
    # Tracking history1
    train_losses, val_losses = [], []
    train_accuracies, val_accuracies = [], []

    # Training loop
    num_epochs = 3
    print("Starting training...")
    for epoch in range(num_epochs):
        print(f"\n Epoch {epoch + 1}/{num_epochs}")
        
        # Training
        model.train()
        running_loss = 0.0
        correct_train = total_train = 0
        print("Entrenando con imágenes dadas...")
        counter = 1
        for images, labels in train_loader:
            print(f"Imagen #{counter}")
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct_train += (predicted == labels).sum().item()
            total_train += labels.size(0)
            counter += 1
        
        print("Calculando métricas...")
        train_acc = correct_train / total_train
        avg_train_loss = running_loss / len(train_loader)
        train_losses.append(avg_train_loss)
        train_accuracies.append(train_acc)

        # Validation
        print("Iniciando validación...")
        model.eval()
        correct = total = 0
        val_loss = 0.0
        counter = 1
        with torch.no_grad():
            for images, labels in val_loader:
                print(f"Imagen #{counter}")
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()
                total += labels.size(0)

        val_acc = correct / total
        avg_val_loss = val_loss / len(val_loader)
        val_losses.append(avg_val_loss)
        val_accuracies.append(val_acc)
        counter += 1

        print(f" Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc*100:.2f}%")
        print(f" Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc*100:.2f}%")
        print("\n")
    save_model(model_name, model)
    return train_losses, val_losses, train_accuracies, val_accuracies, model


# Final test evaluation
def test_model(model, device, model_name):
    correct = total = 0
    all_preds = []
    all_labels = []
    all_probs = []
    inference_times = []

    model.eval()
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            start_time = time.time()
            outputs = model(images)
            end_time = time.time()

            probs = torch.softmax(outputs, dim=1)
            _, predicted = torch.max(outputs, 1)

            correct += (predicted == labels).sum().item()
            total += labels.size(0)

            inference_times.append(end_time - start_time)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.numpy())
            all_probs.extend(probs.cpu())

    class_names = [
        "other_actvities",
        "safe_driving",
        "talking_phone",
        "texting_phone",
        "turning"
    ]
    plot_confusion_matrix(all_labels, all_preds, class_names, model_name) 

    test_acc = correct / total * 100
    print(f"\n Final Test Accuracy: {test_acc:.2f}%")

    avg_infer_time = np.mean(inference_times)
    print(f"⏱Average Inference Time per Batch: {avg_infer_time:.4f}s")
    print(f"Average Inference Time per Sample: {avg_infer_time / batch_size:.6f}s")

    # auc= roc_auc_score(all_labels, all_probs, multi_class="ovr")
    # print(f"AUC (multi-class OVR): {auc:.4f}")
    


def plot_confusion_matrix(all_labels, all_preds, class_names, model_name):
  # Compute confusion matrix
    cm = confusion_matrix(all_labels, all_preds)

    # Plot confusion matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title(f'Confusion Matrix on Test Set for {model_name}')
    plt.xticks(rotation=45)
    plt.yticks(rotation=45)
    plt.tight_layout()
    plt.show()
  


# Plot results
def plot_results(train_accuracies, val_accuracies, train_losses, val_losses, num_epochs=10):
    epochs = range(1, num_epochs + 1)
    plt.figure(figsize=(14, 5))

    # Accuracy Plot
    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_accuracies, 'o-', label='Train Accuracy')
    plt.plot(epochs, val_accuracies, 'o-', label='Val Accuracy')
    plt.title('Accuracy vs Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    # Loss Plot
    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_losses, 'o-', label='Train Loss')
    plt.plot(epochs, val_losses, 'o-', label='Val Loss')
    plt.title('Loss vs Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.tight_layout()
    plt.show()


## Entrenamiento

In [None]:
for model_name, model_options in modelos.items():
    model = model_options["model"]
    optimizer = model_options["optimizer"]
    criterion = model_options["criterion"]
    device = "cpu"
    train_losses, val_losses, train_accuracies, val_accuracies, model = train_model(
        model,
        optimizer,
        device,
        criterion,
        model_name
    )
    test_model = test_model(model, device, model_name)

    plot_results(train_accuracies, val_accuracies, train_losses, val_losses, num_epochs=10)


## Validación

## Testing

## Conclusiones generales