# 0. Importaciones

A continuación se importan diversas librerías útiles para el manejo de los datos y la visualización de los resultados.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import os
from sklearn.model_selection import train_test_split   #permite dividir conjuntos de datos en train y test
from pathlib import Path
import imageio.v2 as imageio
from IPython.display import Image, display
from tqdm import tqdm

In [None]:
sns.set() # Establece el estilo de las gráficas

Específicas de Pytorch se necesita:

**torch.nn:** para crear el modelo de red neuronal.

**torch.autograd:** se necesita el módulo Variable, que se encarga de manejar las operaciones de los tensores.

**torch.optim:**  se necesita el optimizador para entrenar la red neuronal y modificar sus pesos.


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import torch.optim as optim

# 1. Ejemplo de la clasificación para el problema XOR

Para poder definir una Red Neuronal Artificial(_MLP_ por sus siglas en inglés) haremos uso de la librería Pytorch. Esta librería permite dar nuevas funcionalidades a una clase estándar de Python (tal y como ocurría con las dataclasses). En concreto nos interesa definir dos funciones para esta clase:

*   *init*: Se definen las capas de las que consta el modelo,
*   *forward*: Se especifica el proceso de la propagación hacia adelante, desde la entrada hasta la capa de salida.

Veamos un pequeño ejemplo con el Perceptron para el problema de la separación del XOR. En primer lugar definiremos y mostraremos los datos, para lo cuál definimos la siguiente función.

In [None]:
def mostrar(datos, eje_x, eje_y, nombre_clases=[]):
    """Muestra una gráfica de puntos utilizando para ello los datos de la
    características que se solicitan como parámetro.

    Nota: El parámetro hue nos sirve para cambiar el aspecto de la gráfica en función del
    # valor que toma en una columna en concreto.
    """
    if len(nombre_clases) == len(datos["target"].unique()):
      sns.scatterplot(data=datos, x=eje_x, y=eje_y, hue=nombre_clases[datos["target"]], palette="tab10")
    else:
      sns.scatterplot(data=datos, x=eje_x, y=eje_y, hue=datos["target"], palette="tab10")


In [None]:
xor = pd.DataFrame({
    "X": [0.,0., 1., 1.],
    "y": [0.,1.,0.,1.],
    "target": [0, 1, 1, 0]
})

mostrar(xor, "X", "y")
entrada = torch.tensor(xor[["X", "y"]].values, dtype=torch.float32) # .values nos da la representación de numpy
salida = torch.tensor(xor["target"].values, dtype=torch.long)

In [None]:
class Perceptron(nn.Module):
    def __init__(self):
        """
        En init suele añadirse el número de características que tiene el dataset,
        en este caso concreto, al ser el problema del xor, utilizamos 2 por defecto
        """
        super(Perceptron, self).__init__()
        self.capa1 = nn.Linear(2, 2) # Importante, el 2 inicial representa el nº de características de nuestro dataset. Esta capa se corresponde con la de la entrada.

    def forward(self, entrada):
        salida = self.capa1(entrada)
        probabilidades = F.softmax(salida, dim=1)
        return probabilidades

Veamos visualmente qué aspecto tiene la red.

In [None]:
from IPython.display import IFrame

IFrame(src=f"https://drive.google.com/file/d/1hOqOdnk6lQphQgHdJA6bksVWzQOgOeb5/preview", width=640, height=480)


Como puede observar en la función **init** de la clase **Perceptron** tenemos diferenciados dos atributos, *capa1* y *soft*. El primero de ellos permitirá realizar una transformación lineal de los datos de entrada. El segundo resulta de gran utilidad, ya que transforma la salida a valores que están en el intervalo $[0,1]$. De este modo, la suma de todos los valores será *exactamente* $1$. Esto es así debido a que estamos representando probabilidades. Luego, en cada índice tendremos valores que representan la probabilidad de que pertenezca a la $i-$ésima clase.

Tras esto, podemos definir el resto de elementos necesarios. El optimizador, la función de pérdida y nuestro bucle para realizar el entrenamiento del modelo. Recuerda que para la función de pérdida ($loss$), depende en gran medida del tipo de tarea de queremos que realice la red. Por ejemplo, para realizar una regresión podemos utilizar el error cuadrático medio ($MSE$), mientras que para la clasificación suele usarse la $CrossEntropyLoss$. Por el momento, familiarícese sólo con su uso, sin entrar en más detalle.

En cuanto al optimizador, se utilizará el Descenso de Gradiente Estocástico (SGD), con una tasa de aprendizaje de $0.01$.

In [None]:
def train(modelo, entrada, salida, epochs):
    for epoch in range(epochs):
        # 1. Reseteamos el gradiente
        optimizador.zero_grad()
        # 2. Obtenemos la salida del modelo. Propagación hacia adelante
        prediccion = modelo(entrada)
        # 3. Obtención del error
        error = f_perdida(prediccion, salida)
        # 4. Propagamos el error hacia atrás, actualizando los pesos
        error.backward()
        optimizador.step()
    return modelo

perceptron = Perceptron()
f_perdida = nn.CrossEntropyLoss()
optimizador = optim.SGD(perceptron.parameters(), lr=0.01) # En lr especificamos la tasa de aprendizaje (η).
epochs = 1000
train(perceptron, entrada, salida, epochs)

Si queremos obtener la predicción del modelo para un dato concreto, podemos hacer una llamada al modelo como si fuese una función. Sin embargo, verá que no es muy descriptiva.

In [None]:
y_hat = perceptron(torch.Tensor([[1., 1.]]))
print(y_hat)

Para devolver la clase 0 o 1, se usa la función argmax, que devuelve la posición del máximo. Dicho de otro modo, la clase **más probable**.

In [None]:
print(torch.argmax(y_hat))

¿Qué valor predice para el punto $(1, 1)$. ¿Sería correcto? Pruebe con algún punto más.

Ahora bien, recuerde que el Perceptron básico no nos permite clasificar clases que no sean linealmente separables. Es el caso del problema del XOR. Para visualizar qué está pasando internamente con la red, vamos a crear un par de funciones que ayuden a la visualización.

In [None]:
def plot_decision_boundary(model, X, y, epoch):
    x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
    y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
    xx, yy = np.meshgrid(np.arange(x_min, x_max, 0.01),
                         np.arange(y_min, y_max, 0.01))

    with torch.no_grad():
        Z = model(torch.from_numpy(np.c_[xx.ravel(), yy.ravel()]).float()).detach().numpy()
        Z = torch.softmax(torch.from_numpy(Z), dim=1).numpy()[:, 1]
    Z = Z.reshape(xx.shape)

    contour = plt.contourf(xx, yy, Z, alpha=0.8, cmap=plt.cm.Spectral)
    plt.scatter(X[:, 0], X[:, 1], c=y, s=40, edgecolor='k', cmap=plt.cm.Spectral)
    cbar = plt.colorbar(contour)
    cbar.set_label('Probabilidad de la clase #1')

    plt.title(f"Epoch {epoch}")
    plt.savefig(f'./imgs/frontera_decision_{epoch}.png')
    plt.close()

Vamos a crear un _gif_, para lo cual, en primer lugar vamos a crear el directorio en el que vamos a crear las imágenes. Para ver las imágenes, puede hacer click en el icono de la carpeta y ahí verá las imágenes según se irán generando.

In [None]:
directorio = Path("imgs")
directorio.mkdir(exist_ok=True)

Y tras esto, vamos a modificar levemente el bucle del entrenamiento.

In [None]:
def train(modelo, entrada, salida, epochs):
    for epoch in range(epochs):
        # 1. Reseteamos el gradiente
        optimizador.zero_grad()
        # 2. Obtenemos la salida del modelo. Propagación hacia adelante
        prediccion = modelo(entrada)
        # 3. Obtención del error
        error = f_perdida(prediccion, salida)
        # 4. Propagamos el error hacia atrás, actualizando los pesos
        error.backward()
        optimizador.step()
        if epoch % 10 == 0: # Sólo vamos a guardar una imagen cada 10 épocas
            plot_decision_boundary(modelo, entrada, salida, epoch)
    return modelo

Entrenemos el modelo y obtengamos las imágenes.

In [None]:
perceptron = Perceptron()
f_perdida = nn.CrossEntropyLoss()
optimizador = optim.SGD(perceptron.parameters(), lr=0.01) # En lr especificamos la tasa de aprendizaje.
epochs = 1000
train(perceptron, entrada, salida, 1000)

In [None]:
imagenes = []
for epoch in tqdm(range(1000)):
    if epoch % 10 == 0:
        imagenes.append(imageio.imread(f'./imgs/frontera_decision_{epoch}.png'))

imageio.mimsave('./imgs/frontera_decision.gif', imagenes, fps=15, loop=0)

In [None]:
imagen = Image("imgs/frontera_decision.gif")
display(imagen)

¿Qué ocurre si en lugar de conectar la entrada con la salida, agregamos una capa adicional (denominada capa oculta), e incorporamos una función de activación como ReLU?

In [None]:
class MLP(nn.Module):
    def __init__(self):
        """
        En init suele añadirse el número de características que tiene el dataset,
        en este caso concreto, al ser el problema del xor, utilizamos 2 por defecto
        """
        super(MLP, self).__init__()

        self.capa1 = nn.Linear(2, 8) # Importante, el 2 inicial representa el nº de características de nuestro dataset
        self.activacion = nn.ReLU()
        self.capa2 = nn.Linear(8, 2)

    def forward(self, entrada):
        salida_capa1 = self.activacion(self.capa1(entrada))
        salida = self.capa2(salida_capa1)
        return salida

red = MLP()
f_perdida = nn.CrossEntropyLoss()
optimizador = optim.SGD(red.parameters(), lr=0.01, momentum=0.9) # Momentum es un parámetro para que converja más rápido.
epochs = 1000
train(red, entrada, salida, 1000)

imagenes = []
for epoch in tqdm(range(1000)):
    if epoch % 10 == 0:
        imagenes.append(imageio.imread(f'./imgs/frontera_decision_{epoch}.png'))

imageio.mimsave('./imgs/frontera_decision2.gif', imagenes, fps=15, loop=0)

In [None]:
imagen = Image("imgs/frontera_decision2.gif")
display(imagen)

Pruebe con otras funciones de activación en lugar de ReLU y observe qué ocurre.

In [None]:
class MLP(nn.Module):
    def __init__(self):
        """
        En init suele añadirse el número de características que tiene el dataset,
        en este caso concreto, al ser el problema del xor, utilizamos 2 por defecto
        """
        super(MLP, self).__init__()

        self.capa1 = nn.Linear(2, 8) # Importante, el 2 inicial representa el nº de características de nuestro dataset
        self.activacion = # Incluya otra función de activación Tanh, Sigmoid, ...
        self.capa2 = nn.Linear(8, 2)

    def forward(self, entrada):
        salida_capa1 = self.ReLU(self.capa1(entrada))
        salida = self.capa2(salida_capa1)
        return salida

red = MLP()
f_perdida = nn.CrossEntropyLoss()
optimizador = optim.SGD(red.parameters(), lr=0.01, momentum=0.9) # Momentum es un parámetro para que converja más rápido.
epochs = 1000
train(red, entrada, salida, 1000)

imagenes = []
for epoch in tqdm(range(1000)):
    if epoch % 10 == 0:
        imagenes.append(imageio.imread(f'./imgs/frontera_decision_{epoch}.png'))

imageio.mimsave('./imgs/frontera_decision3.gif', imagenes, fps=15, loop=0)

In [None]:
imagen = Image("imgs/frontera_decision3.gif")
display(imagen)

### Pregunta 1:
¿Tiene sentido cambiar el número de unidades en la capa de salida?
¿Y en la capa oculta?

# 2. Clasificación para la base de datos Iris

In [None]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

iris = load_iris(as_frame=True) # Vamos a cargarlo como un DataFrame de la librería Pandas
X = iris['data']
y = iris['target']
names = iris['target_names']
feature_names = iris['feature_names']

Ahora unamos las entradas y salidas en un único dataset llamado __iris__ y exploremos qué forma tiene el dataset.

In [None]:
iris = pd.concat([X, y], axis=1)

In [None]:
iris

¿Qué rango de valores tiene este dataset? Para ello, podemos usar la función _describe_ que nos da información muy útil que podríamos utilizar a posteriori.

In [None]:
iris.describe()

In [None]:
iris.columns

También es útil ver a nivel gráfico qué forma tienen los datos. Para ello, podemos utilizar la librería seaborn, la cual está orientada a hacer gráficas de forma sencilla usando DataFrames de Pandas. Veamos un pequeño ejemplo con las dos primeras características de Iris: la longitud y el ancho del sépalo.

In [None]:
mostrar(iris, "sepal length (cm)", "sepal width (cm)", names)

¿Cómo mostraríamos las dos últimas características, el alto y el ancho del pétalo?

También puede comprobar cómo se distribuyen los valores teniendo en cuenta cada par de características

In [None]:
sns.pairplot(iris, hue="target", palette="tab10")

## 2.1 Preprocesado
Cuando estamos trabajando con modelos de Aprendizaje Automático (_Machine Learning_) casi siempre es recomendable realizar algún tipo de preprocesamiento. Uno de los más habituales es realizar un escalado para que los datos estén en una distribución normal, es decir, con media ($\mu$) 0 y varianza ($\sigma$) 1. Para ello se sigue la siguiente fórmula:

$$ X = \frac{X - X_{media}}{X_{desviacion\_tipica}} $$

Es decir, a cada elemento del dataset se le resta la media del conjunto completo y tras esto, se divide por la desviación típica. Teniendo en cuenta que las aplicaciones aritméticas se aplican sobre el dataframe completo, hacemos la siguiente transformación.

In [None]:
iris_standard = iris.copy()
iris_standard[iris.columns[:-1]] = (iris_standard[iris.columns[:-1]] - iris_standard[iris.columns[:-1]].mean()) / iris_standard[iris.columns[:-1]].std()
iris_standard.describe()

Importante, como ha podido comprobar no hemos modificado la variable de salida. Sólo las variables de entrada, de ahí el uso de iris.columns[:-1]. Esto devuelve el nombre de todas las columnas menos el de la última.

Muestre los datos como en el ejemplo anterior. ¿Aprecia alguna diferencia?

Otro tipo de preprocesado alternativo y habitual es realizar un escalado para que todos los datos se encuentren en un intervalo $[0,1]$. Para realizar este preprocesado se realiza la siguiente operación.

$$ X = \frac{X - X_{min}}{X_{max} - X_{min}} $$

In [None]:
iris_01 = iris.copy()
iris_01[iris.columns[:-1]] = (iris_01[iris.columns[:-1]] - iris_01[iris.columns[:-1]].min()) /  \
                             (iris_01[iris.columns[:-1]].max() - iris_01[iris.columns[:-1]].min())
iris_01.describe()

Muestre los datos como en el ejemplo anterior. ¿Aprecia alguna diferencia?

Esta es una demostración de distintas formas de preprocesar los datos de entrada. Normalmente, no se trata con dataframes de pandas en todo momento, si no que se opta por trabajar con la matriz directamente, ya que resulta más cómodo.

Toda esta exploración inicial de los datos, suele formar parte de un proceso más amplío que se sigue en proyectos de este ámbito conocido como Análisis Exploratorio de Datos (*EDA* por sus siglas en inglés). Así, se pueden tener "pistas" que ayuden o guíen en la resolución del problema.

In [None]:
iris = iris.values
iris_scaled = iris.copy() # Obtenemos la matriz de numpy a partir del DataFrame

In [None]:
iris_scaled[:, :-1] = torch.Tensor((iris_scaled[:, :-1] - iris_scaled[:, :-1].max()) / (iris_scaled[:, :-1].max() - iris_scaled[:, :-1].min()))

In [None]:
print(iris_scaled[0:10]) # Imprimimos las 10 primeras filas

# 3. Entrenamiento del modelo

Una vez familiarizado con cómo funciona Pytorch, considere la siguiente definición de red.

In [None]:
class MLP(nn.Module):
    def __init__(self, Nin):
        super(MLP, self).__init__()
        self.layer1 = nn.Linear(Nin, 50)
        self.layer2 = nn.Linear(50, 50)
        self.layer3 = nn.Linear(50, 3)
        self.Relu = nn.ReLU()
        self.Soft = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.Relu(self.layer1(x))
        x = self.Relu(self.layer2(x))
        probabilidades = F.softmax(self.layer3(x))
        return probabilidades

Y su arquitectura visualmente es la siguiente, siendo $p$ el número de variables y $Cl$ el número de clases:

In [None]:
IFrame(src=f"https://drive.google.com/file/d/1i6RGJae5yhF6hlWb6Ws-W_NptN8VZKHj/preview", width=640, height=480)


Hasta ahora, hemos usado tanto un Perceptron como una Red Neuronal Multicapa. Sin embargo, no hemos hecho una división del dataset en conjuntos de entrenamiento y test, ¿verdad? Este proceso es vital para saber la capacidad de generalización de nuestro modelo. De nada sirve obtener un error muy bajo durante el entrenamiento si luego es incapaz de predecir de forma correcta a partir de nuevos datos de entrada. Veamos visualmente la arquitectura de la red.

In [None]:
Xtrain, Xtest, Ytrain, Ytest = train_test_split(
    torch.Tensor(iris_scaled[:,:-1]), torch.Tensor(iris_scaled[:,-1]), test_size=0.2, random_state=2)

In [None]:
model = MLP(Xtrain.shape[1])
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
loss_fn = nn.CrossEntropyLoss()


El proceso de entrenamiento consiste en presentar a la red un número dado de veces ($num_{epochs}$) los datos de entrenamiento para que esta vaya ajustando sus pesos.

Después de cada ciclo de entrenamiento se evalúa la salida predicha por el modelo, es decir, se calcula el error cometido entre la predicción y la clase real conocida, tanto con los datos de entrenamiento como con los datos de test. Para ello, volvamos a definir otra función train, pero que contemple los dos nuevos conjuntos. En su forma tradicional, el algoritmo de descenso de gradiente estocástico (SGD) toma el conjunto de datos en pequeños lotes (pudiendo ser una instancia cada vez). En esta ocasión, por simplicidad del código y dado que el conjunto de datos no es muy grande, pasaremos el conjunto de datos de entrenamiento al completo en cada iteración.


In [None]:
def train(modelo, X_train, y_train, X_test, y_test, epochs):
  lossTrain_list  = np.zeros((num_epochs,))
  accTrain_list = np.zeros((num_epochs,))
  lossVal_list  = np.zeros((num_epochs,))
  accVal_list = np.zeros((num_epochs,))

  for epoch in tqdm.trange(num_epochs):
      correct=0
      Y_pred = model(X_train)
      lossTrain = loss_fn(Y_pred, Y_train)
      lossTrain_list[epoch] = lossTrain.item()
      correct = (torch.argmax(Y_pred, dim=1) == Y_train).type(torch.FloatTensor)
      accTrain_list[epoch] = correct.mean() # media de predicciones correctas por epoch

      # Zero gradients
      optimizer.zero_grad()
      lossTrain.backward()
      optimizer.step()

      with torch.no_grad():
          Y_pred = model(X_test)
          lossVal = loss_fn(Y_pred, Y_test)
          lossVal_list[epoch] = lossVal.item()
          correct = (torch.argmax(Y_pred, dim=1) == Y_test).type(torch.FloatTensor)
          accVal_list[epoch] = correct.mean() # media de predicciones correctas por epoch
  return lossTrain_list, accTrain_list, lossVal_list, accVal_list

In [None]:
import tqdm
num_epochs  = 50
#convierte datos del tensor a variables del auto-grad
X_train = Variable(Xtrain).float()
Y_train = Variable(Ytrain).long()
X_test  = Variable(Xtest).float()
Y_test  = Variable(Ytest).long()

lossTrain_list, accTrain_list, lossVal_list, accVal_list = train(model, X_train, Y_train, X_test, Y_test, num_epochs)



# 4. Visualización de resultados



In [None]:
sns.lineplot(x=range(num_epochs), y=accVal_list, label="Validacion")
sns.lineplot(x=range(num_epochs), y=accTrain_list, label="Entrenamiento")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Train versus Validation: Accuracy")
plt.legend()
plt.show()

In [None]:
sns.lineplot(x=range(num_epochs), y=lossVal_list, label="Validacion")
sns.lineplot(x=range(num_epochs), y=lossTrain_list, label="Entrenamiento")
plt.xlabel("Epoch")
plt.ylabel("Cross Entropy Loss")
plt.title("Train versus Validation: Loss ")
plt.show()

¿Ha observado el comportamiento que esperaba? El optimizador SGD tiene el inconveniente de que es necesario hacer una buena selección de los parámetros para que funcione como esperaríamos. Uno de los optimizadores más usados es Adam, ya que se requiere un menor ajuste y tiene una tasa de aprendizaje adaptativa.

Repita la prueba pero usando Adam en lugar de SGD con una tasa de aprendizaje baja, como $0.001$.

## MATRIZ DE CONFUSIÓN

In [None]:
!pip install torchmetrics

In [None]:
from torchmetrics import ConfusionMatrix

total = 0
correct = 0


X_test  = Variable(Xtest).float()
Y_test  = Variable(Ytest).long()
Y_pred = model(X_test)
correct = (torch.argmax(Y_pred, dim=1) == Y_test).type(torch.FloatTensor)
confmat = ConfusionMatrix(task="multiclass",num_classes=3)

total = Y_test.size()

print(f"Porcentaje de patrones bien clasificados: {100 *correct.mean():.2f}%")
print("Matriz de Confusion")
print(confmat(Y_pred, Y_test))



# PRÁCTICAS
1. Entrena durante más iteraciones, (100, 150) y visualiza los resultados.
2. Compara los resultados con las siguientes opciones:
  * Cambiando el número de capas ocultas (1 o 2 capas)
  * Cambiando el número de unidades ocultas (25, 100 y 500)


Tenga en cuenta que puede añadir todas las celdas que estime oportunas para poder ejecutar las distintas configuraciones.