# **Código Tarea 2 - Carlos Andre Nuñez Duran**

# Universidad de O'Higgins

## Escuela de Ingeniería
## COM4402: Introducción a Inteligencia Artificial

### **Tarea 2: Clasificación de Dígitos Manuscritos con Redes Neuronales**

### Estudiante: Ingrese su nombre y apellido

El objetivo de esta tarea es utilizar redes neuronales en un problema de clasificación de dígitos. Se utilizará el conjunto de datos Optical Recognition of Handwritten Digits Data Set. Este conjunto tiene 64 características, con 10 clases y 5620 muestras en total. La base de datos estará disponible en U-Campus.

Las redes a ser entrenadas tienen la siguiente estructura: capa de entrada de dimensionalidad 64 (correspondiente a los datos de entrada), capas ocultas (una o dos) y capa de salida con 10 neuronas y función de activación softmax. La función de loss (pérdida) es entropía cruzada. El optimizador que se
debe usar es Adam. La función softmax está implícita al usar la función de pérdida CrossEntropyLoss de PyTorch (**no se debe agregar softmax a la salida de la red**).

Se usará PyTorch para entrenar y validar la red neuronal que implementa el clasificador de dígitos. Se analizará los efectos de cambiar el tamaño de la red (número de capas ocultas y de neuronas en estas
capas) y la función de activación.

El siguiente código base debe ser usado para realizar las actividades pedidas.

## Observación: Antes de ejecutar su código, active el uso de GPU en Google Colab para acelerar el proceso de entrenamiento.

### Para esto: vaya a "Entorno de Ejecución" en el menú superior, haga click en "Cambiar tipo de entorno de ejecución", y seleccionar/verificar "GPU" en "Acelerador de Hardware"

In [1]:
import pandas as pd
import torch
import torch.nn as nn
import numpy as np
import time
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import torch
from sklearn.metrics import confusion_matrix

## Subir datasets de dígitos (train)

In [None]:
from google.colab import drive                                                  # conectar con google drive
drive.mount('/content/drive')

!wget https://raw.githubusercontent.com/Felipe1401/Mineria/main/dataset_digits/1_digits_train.txt  # importo 1_digits_train.txt
!wget https://raw.githubusercontent.com/Felipe1401/Mineria/main/dataset_digits/1_digits_test.txt   # importo 1_digits_test.txt


## Leer dataset de dígitos

In [None]:
column_names = ["feat" + str(i) for i in range(64)]
column_names.append("class")

In [None]:
df_train_val = pd.read_csv('1_digits_train.txt', names = column_names)
df_train_val

In [None]:
df_test = pd.read_csv('1_digits_test.txt', names = column_names)
df_test

In [None]:
df_train, df_val = train_test_split(df_train_val, test_size = 0.3, random_state = 10)

In [None]:
scaler = StandardScaler().fit(df_train.iloc[:,0:64])
df_train.iloc[:,0:64] = scaler.transform(df_train.iloc[:,0:64])
df_val.iloc[:,0:64] = scaler.transform(df_val.iloc[:,0:64])
df_test.iloc[:,0:64] = scaler.transform(df_test.iloc[:,0:64])

In [None]:
df_train

## Crear modelo

In [None]:
model = nn.Sequential(
          nn.Linear(64, 10),
          nn.ReLU(),
          nn.Linear(10,10)
        )

In [None]:
device = torch.device('cuda')

model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

## Crear datasets y dataloaders para pytorch (train)

In [None]:
# Crear datasets
feats_train = df_train.to_numpy()[:,0:64].astype(np.float32)
labels_train = df_train.to_numpy()[:,64].astype(int)
dataset_train = [ {"features":feats_train[i,:], "labels":labels_train[i]} for i in range(feats_train.shape[0]) ]

feats_val = df_val.to_numpy()[:,0:64].astype(np.float32)
labels_val = df_val.to_numpy()[:,64].astype(int)
dataset_val = [ {"features":feats_val[i,:], "labels":labels_val[i]} for i in range(feats_val.shape[0]) ]

feats_test = df_test.to_numpy()[:,0:64].astype(np.float32)
labels_test = df_test.to_numpy()[:,64].astype(int)
dataset_test = [ {"features":feats_test[i,:], "labels":labels_test[i]} for i in range(feats_test.shape[0]) ]

In [None]:
# Crear dataloaders
dataloader_train = torch.utils.data.DataLoader(dataset_train, batch_size=128, shuffle=True, num_workers=0)
dataloader_val = torch.utils.data.DataLoader(dataset_val, batch_size=128, shuffle=True, num_workers=0)
dataloader_test = torch.utils.data.DataLoader(dataset_test, batch_size=128, shuffle=True, num_workers=0)

## Entrenamiento

In [None]:
start = time.time()

# loop over the dataset multiple times
for epoch in range(250):
  model.train()
  # Train on the current epoch
  for i, data in enumerate(dataloader_train, 0):
    # Process the current batch
    inputs = data["features"].to(device)
    labels = data["labels"].to(device)
    # zero the parameter gradients
    optimizer.zero_grad()
    # forward + backward + optimize
    outputs = model(inputs)
    loss = criterion(outputs, labels)
    loss.backward() # backpropagation
    optimizer.step()

    # Por completar: calcule la pérdida de validación y acurracy en el batch actual

  model.eval()
  with torch.no_grad():
    # Por completar: calcule la pérdida de validación y acurracy en la época actual
    pass

  # Por hacer: imprima la pérdida de entrenamiento/validación y acurracy en la época actual
  print('epoch %d' % (epoch))

end = time.time()
print('Finished Training, total time %f seconds' % (end - start))

PARTE 2

In [None]:
def train_model(model, dataloader_train, dataloader_val, epochs=1000, patience=20):
    """
    Función para entrenar el modelo con early stopping basado en el loss de validación.

    Parámetros:
    - model: Modelo de PyTorch que se entrenará.
    - dataloader_train: Dataloader para el conjunto de entrenamiento.
    - dataloader_val: Dataloader para el conjunto de validación.
    - epochs: Número máximo de épocas para entrenar el modelo.
    - patience: Número de épocas a esperar sin mejora en el loss de validación antes de detener el entrenamiento.

    Retorna:
    - train_loss_history: Lista con el historial de loss de entrenamiento a lo largo de las épocas.
    - val_loss_history: Lista con el historial de loss de validación a lo largo de las épocas.
    """

    # Establecemos el dispositivo de cálculo (GPU si está disponible, de lo contrario CPU)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    # Definimos la función de pérdida y el optimizador
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    # Inicializamos el mejor loss de validación con un valor infinitamente grande
    best_val_loss = float('inf')
    # Listas para almacenar los historiales de loss de entrenamiento y validación
    train_loss_history = []
    val_loss_history = []
    # Contador para la paciencia del early stopping
    patience_counter = 0

    # Bucle principal de entrenamiento
    for epoch in range(epochs):
        model.train()  # Pone el modelo en modo de entrenamiento
        train_loss = 0.0
        for data in dataloader_train:
            # Cargamos los datos y etiquetas al dispositivo adecuado
            inputs, labels = data["features"].to(device), data["labels"].to(device)
            # Reiniciamos los gradientes del optimizador
            optimizer.zero_grad()
            # Realizamos una pasada hacia adelante
            outputs = model(inputs)
            # Calculamos el loss
            loss = criterion(outputs, labels)
            # Backpropagation
            loss.backward()
            # Actualizamos los pesos del modelo
            optimizer.step()
            train_loss += loss.item() * inputs.size(0)
        # Calculamos el loss promedio de entrenamiento para esta época
        train_loss = train_loss / len(dataloader_train.dataset)
        train_loss_history.append(train_loss)

        # Evaluamos el modelo en el conjunto de validación
        model.eval()  # Pone el modelo en modo de evaluación
        val_loss = 0.0
        with torch.no_grad():
            for data in dataloader_val:
                inputs, labels = data["features"].to(device), data["labels"].to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * inputs.size(0)
        # Calculamos el loss promedio de validación para esta época
        val_loss = val_loss / len(dataloader_val.dataset)
        val_loss_history.append(val_loss)

        # Comprobamos si el loss de validación ha mejorado
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0  # Reiniciamos el contador de paciencia
        else:
            patience_counter += 1
            # Si no ha habido mejora en el número de épocas definido por "patience", detenemos el entrenamiento
            if patience_counter >= patience:
                break

    return train_loss_history, val_loss_history


In [None]:
def plot_train_val_loss(train_loss_history, val_loss_history):
    """
    Función para graficar el loss de entrenamiento y validación a lo largo de las épocas.

    Parámetros:
    - train_loss_history: Lista con el historial de loss de entrenamiento.
    - val_loss_history: Lista con el historial de loss de validación.
    """

    plt.plot(train_loss_history, label="Train Loss")
    plt.plot(val_loss_history, label="Validation Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.title("Train vs. Validation Loss")
    plt.legend()
    plt.show()



In [None]:
# Definimos la función compute_confusion_matrix
def compute_confusion_matrix(model, dataloader):
    """
    Calcula y devuelve la matriz de confusión para el modelo y dataloader proporcionados.
    """
    model.eval()
    all_preds = []
    all_labels = []
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    with torch.no_grad():
        for data in dataloader:
            inputs, labels = data["features"].to(device), data["labels"].to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    conf_matrix = confusion_matrix(all_labels, all_preds)

    # Normalizar la matriz de confusión
    conf_matrix = conf_matrix.astype('float') / conf_matrix.sum(axis=1)[:, np.newaxis]

    return conf_matrix

def plot_confusion_matrix(conf_matrix):
    """
    Función para graficar la matriz de confusión.

    Parámetros:
    - conf_matrix: Matriz de confusión a graficar.
    """

    fig, ax = plt.subplots(figsize=(8, 6))

    # Usamos un formato personalizado para mostrar sólo dos decimales
    sns.heatmap(conf_matrix, annot=True, cmap='Blues', fmt=".2f", cbar=False)

    ax.set_xlabel('Predicted labels')
    ax.set_ylabel('True labels')
    ax.set_title('Confusion Matrix')
    ax.xaxis.set_ticklabels(range(10))
    ax.yaxis.set_ticklabels(range(10))
    plt.show()


In [None]:
def evaluate_model(model, dataloader):
    """
    Evalúa el modelo en un conjunto de datos y calcula la matriz de confusión y el accuracy.

    Parámetros:
    - model: Modelo de PyTorch a evaluar.
    - dataloader: Dataloader del conjunto de datos en el que se evaluará el modelo.

    Retorna:
    - conf_matrix: Matriz de confusión.
    - acc: Accuracy del modelo en el conjunto de datos.
    """

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    total = 0
    correct = 0
    conf_matrix = np.zeros((10, 10))

    with torch.no_grad():
        for data in dataloader:
            inputs, labels = data["features"].to(device), data["labels"].to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            for t, p in zip(labels.view(-1), predicted.view(-1)):
                conf_matrix[t.long(), p.long()] += 1

    acc = correct / total
    return conf_matrix, acc



In [None]:
# Definición de Modelos y Entrenamiento

# (a) 10 neuronas en la capa oculta, usando función de activación ReLU y 1000 épocas como máximo.
model_a = nn.Sequential(
    nn.Linear(64, 10),
    nn.ReLU(),
    nn.Linear(10, 10)
)
train_loss_history_a, val_loss_history_a = train_model(model_a, dataloader_train, dataloader_val, epochs=1000)
plot_train_val_loss(train_loss_history_a, val_loss_history_a)

# (b) 40 neuronas en la capa oculta y función de activación ReLU, y 1000 épocas como máximo.
model_b = nn.Sequential(
    nn.Linear(64, 40),
    nn.ReLU(),
    nn.Linear(40, 10)
)
train_loss_history_b, val_loss_history_b = train_model(model_b, dataloader_train, dataloader_val, epochs=1000)
plot_train_val_loss(train_loss_history_b, val_loss_history_b)

# (c) 10 neuronas en la capa oculta y función de activación Tanh, y 1000 épocas como máximo.
model_c = nn.Sequential(
    nn.Linear(64, 10),
    nn.Tanh(),
    nn.Linear(10, 10)
)
train_loss_history_c, val_loss_history_c = train_model(model_c, dataloader_train, dataloader_val, epochs=1000)
plot_train_val_loss(train_loss_history_c, val_loss_history_c)

# (d) 40 neuronas en la capa oculta y función de activación Tanh, y 1000 épocas como máximo.
model_d = nn.Sequential(
    nn.Linear(64, 40),
    nn.Tanh(),
    nn.Linear(40, 10)
)
train_loss_history_d, val_loss_history_d = train_model(model_d, dataloader_train, dataloader_val, epochs=1000)
plot_train_val_loss(train_loss_history_d, val_loss_history_d)

# (e) 2 capas ocultas con 10 y 10 neuronas cada una y función de activación ReLU, y 1000 épocas como máximo.
model_e = nn.Sequential(
    nn.Linear(64, 10),
    nn.ReLU(),
    nn.Linear(10, 10),
    nn.ReLU(),
    nn.Linear(10, 10)
)
train_loss_history_e, val_loss_history_e = train_model(model_e, dataloader_train, dataloader_val, epochs=1000)
plot_train_val_loss(train_loss_history_e, val_loss_history_e)

# (f) 2 capas ocultas con 40 y 40 neuronas cada una y función de activación ReLU, y 1000 épocas como máximo.
model_f = nn.Sequential(
    nn.Linear(64, 40),
    nn.ReLU(),
    nn.Linear(40, 40),
    nn.ReLU(),
    nn.Linear(40, 10)
)
train_loss_history_f, val_loss_history_f = train_model(model_f, dataloader_train, dataloader_val, epochs=1000)
plot_train_val_loss(train_loss_history_f, val_loss_history_f)


In [None]:

def plot_confusion_matrix(conf_matrix, dataset_type):
    """
    Función para graficar la matriz de confusión.

    Parámetros:
    - conf_matrix: Matriz de confusión a graficar.
    - dataset_type: 'test' o 'val' para especificar el tipo de conjunto en el título.
    """

    fig, ax = plt.subplots(figsize=(8, 6))
    sns.heatmap(conf_matrix, annot=True, cmap='Blues', fmt=".2f", cbar=False)

    ax.set_xlabel('Predicted labels')
    ax.set_ylabel('True labels')
    ax.set_title(f'Confusion Matrix for {dataset_type} set')
    ax.xaxis.set_ticklabels(range(10))
    ax.yaxis.set_ticklabels(range(10))
    plt.show()

def evaluate_and_plot_case(model, dataloader_test, dataloader_val, case_label):
    # Test
    plot_train_val_loss(eval(f'train_loss_history_{case_label}'), eval(f'val_loss_history_{case_label}'))
    conf_matrix_test = compute_confusion_matrix(model, dataloader_test)
    accuracy_test = (np.diag(conf_matrix_test).sum() / conf_matrix_test.sum()) * 100
    print(f"Caso ({case_label}) - Accuracy en Test: {accuracy_test:.2f}%")
    plot_confusion_matrix(conf_matrix_test, "test")

    # Validación
    conf_matrix_val = compute_confusion_matrix(model, dataloader_val)
    accuracy_val = (np.diag(conf_matrix_val).sum() / conf_matrix_val.sum()) * 100
    print(f"Caso ({case_label}) - Accuracy en Validación: {accuracy_val:.2f}%")
    plot_confusion_matrix(conf_matrix_val, "validation")

#Llamo la función para cada caso:

evaluate_and_plot_case(model_a, dataloader_test, dataloader_val, 'a')
evaluate_and_plot_case(model_b, dataloader_test, dataloader_val, 'b')
evaluate_and_plot_case(model_c, dataloader_test, dataloader_val, 'c')
evaluate_and_plot_case(model_d, dataloader_test, dataloader_val, 'd')
evaluate_and_plot_case(model_e, dataloader_test, dataloader_val, 'e')
evaluate_and_plot_case(model_f, dataloader_test, dataloader_val, 'f')


In [None]:
# Evaluamos el accuracy de validación para cada modelo para buscar el mejor

def validation_accuracy(model, dataloader_val):
    """
    Función para calcular el accuracy de validación de un modelo.
    """
    conf_matrix, acc = evaluate_model(model, dataloader_val)
    return acc

# Calculamos el accuracy de validación para cada modelo
acc_a = validation_accuracy(model_a, dataloader_val)
acc_b = validation_accuracy(model_b, dataloader_val)
acc_c = validation_accuracy(model_c, dataloader_val)
acc_d = validation_accuracy(model_d, dataloader_val)
acc_e = validation_accuracy(model_e, dataloader_val)
acc_f = validation_accuracy(model_f, dataloader_val)

# Mostramos los accuracies
print(f"Accuracy de validación para modelo (a): {acc_a * 100:.2f}%")
print(f"Accuracy de validación para modelo (b): {acc_b * 100:.2f}%")
print(f"Accuracy de validación para modelo (c): {acc_c * 100:.2f}%")
print(f"Accuracy de validación para modelo (d): {acc_d * 100:.2f}%")
print(f"Accuracy de validación para modelo (e): {acc_e * 100:.2f}%")
print(f"Accuracy de validación para modelo (f): {acc_f * 100:.2f}%")


In [None]:
# Identificamos y mostramos el modelo con el mayor accuracy de validación
models = [model_a, model_b, model_c, model_d, model_e, model_f]
accuracies = [acc_a, acc_b, acc_c, acc_d, acc_e, acc_f]
best_index = accuracies.index(max(accuracies))
best_model = models[best_index]

print(f"El mejor modelo es el modelo ({chr(97 + best_index)}) con un accuracy de validación de {accuracies[best_index] * 100:.2f}%")


In [None]:
# 2. Calcular la matriz de confusión normalizada para el conjunto de prueba.
best_conf_matrix_test = compute_confusion_matrix(best_model, dataloader_test)

# 3. Calcular el accuracy normalizado usando el conjunto de prueba.
best_accuracy_test = (np.diag(best_conf_matrix_test).sum() / best_conf_matrix_test.sum()) * 100

# Mostramos la matriz de confusión y el accuracy para el mejor modelo
print(f"Mejor modelo ({chr(97 + best_index)}) - Accuracy en conjunto de prueba: {best_accuracy_test:.2f}%")
plot_confusion_matrix(best_conf_matrix_test, "test")


# **El siguiente código es solo para hacer un analisis de los resultados**

In [None]:
def create_nn(input_size, output_size, neurons, activation):
    """
    Crea una red neuronal basada en la configuración especificada.

    Parámetros:
    - input_size: Número de entradas de la red.
    - output_size: Número de salidas de la red.
    - neurons: Lista con el número de neuronas en cada capa oculta.
    - activation: Función de activación a usar.

    Retorna:
    - model: Modelo de red neuronal.
    """

    layers = []
    input_dim = input_size

    # Agregamos las capas ocultas
    for n in neurons:
        layers.append(nn.Linear(input_dim, n))
        layers.append(activation)
        input_dim = n

    # Capa de salida
    layers.append(nn.Linear(input_dim, output_size))

    model = nn.Sequential(*layers)

    return model

# Definimos una lista de configuraciones basada en los requerimientos
configurations = [
    {"neurons": [10], "activation": nn.ReLU()},
    {"neurons": [40], "activation": nn.ReLU()},
    {"neurons": [10], "activation": nn.Tanh()},
    {"neurons": [40], "activation": nn.Tanh()},
    {"neurons": [10, 10], "activation": nn.ReLU()},
    {"neurons": [40, 40], "activation": nn.ReLU()}
]

# Diccionario para almacenar los resultados
results = {}

# Entrenamos y evaluamos cada configuración
for i, config in enumerate(configurations):
    model = create_nn(input_size=64, output_size=10, neurons=config["neurons"], activation=config["activation"])
    start_time = time.time()
    train_loss, val_loss = train_model(model, dataloader_train, dataloader_val)
    end_time = time.time()

    conf_matrix_val, acc_val = evaluate_model(model, dataloader_val)
    conf_matrix_test, acc_test = evaluate_model(model, dataloader_test)

    results[i] = {
        "Configuration": config,
        "Training Time": end_time - start_time,
        "Train Loss History": train_loss,
        "Validation Loss History": val_loss,
        "Validation Confusion Matrix": conf_matrix_val,
        "Validation Accuracy": acc_val,
        "Test Confusion Matrix": conf_matrix_test,
        "Test Accuracy": acc_test
    }

# Imprimimos los resultados de manera estructurada
for i, result in results.items():
    print(f"Configuration {i + 1}:")
    print("Neurons:", result["Configuration"]["neurons"])
    print("Activation:", type(result["Configuration"]["activation"]).__name__)
    print("Training Time:", result["Training Time"])
    print("Validation Accuracy:", result["Validation Accuracy"])
    print("Test Accuracy:", result["Test Accuracy"])
    print("Last 5 Train Losses:", result["Train Loss History"][-5:])
    print("Last 5 Validation Losses:", result["Validation Loss History"][-5:])
    print("\n")

# **El siguiente codigo es solo para graficar las matrices de forma comoda para el informe**

In [None]:
def plot_two_confusion_matrices(cm1, cm2, labels, filename):
    fig, axs = plt.subplots(1, 2, figsize=(12, 6))

    sns.heatmap(cm1, annot=True, ax=axs[0], cmap='Blues', fmt=".2f", cbar=False)
    axs[0].set_xlabel('Predicted labels')
    axs[0].set_ylabel('True labels')
    axs[0].set_title('Validation Confusion Matrix')
    axs[0].xaxis.set_ticklabels(labels)
    axs[0].yaxis.set_ticklabels(labels)

    sns.heatmap(cm2, annot=True, ax=axs[1], cmap='Blues', fmt=".2f", cbar=False)
    axs[1].set_xlabel('Predicted labels')
    axs[1].set_ylabel('True labels')
    axs[1].set_title('Test Confusion Matrix')
    axs[1].xaxis.set_ticklabels(labels)
    axs[1].yaxis.set_ticklabels(labels)

    plt.tight_layout()
    plt.show()
    plt.savefig(filename)
    plt.close()

def evaluate_and_save_confusion_matrices(model, dataloader_test, dataloader_val, case_label):
    # Compute confusion matrices
    conf_matrix_val = compute_confusion_matrix(model, dataloader_val)
    conf_matrix_test = compute_confusion_matrix(model, dataloader_test)

    # Plot and save to single image
    plot_two_confusion_matrices(conf_matrix_val, conf_matrix_test, range(10), f'combined_matrix_{case_label}.png')



evaluate_and_save_confusion_matrices(model_a, dataloader_test, dataloader_val, 'a')
evaluate_and_save_confusion_matrices(model_b, dataloader_test, dataloader_val, 'b')
evaluate_and_save_confusion_matrices(model_c, dataloader_test, dataloader_val, 'c')
evaluate_and_save_confusion_matrices(model_d, dataloader_test, dataloader_val, 'd')
evaluate_and_save_confusion_matrices(model_e, dataloader_test, dataloader_val, 'e')
evaluate_and_save_confusion_matrices(model_f, dataloader_test, dataloader_val, 'f')
