# CARGA DE BIBLIOTECAS

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from copy import deepcopy

import torch
from torch import nn  # Modelos neuronales y funciones de Loss

from torch import optim # (3er paso del algoritmo de retropropagación) Optimizadores ---> Gradiente descendiente, Adam, AdaDelta, etc
from torch.utils.data import Dataset, DataLoader

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")  # Elige el dispositivo. Utilizará GPU si está disponible

# CONSTRUCCION DEL DATASET

In [None]:
class dataset(Dataset):
    '''
    Esta clase maneja la lectura de los datos y provee un mecanismo
    para alimentar los modelos con los patrones.
    '''

    #===================================================
    def __init__(self, filename):

        #------------------------------------
        # LECTURA DE LOS DATOS
        data = pd.read_csv(filename, header=None).to_numpy() # Levanta los datos en formato numpy

        #------------------------------------
        # ALEATORIZO LOS PATRONES (filas)
        idxs = np.arange(len(data))  # Genero un vector de índices
        np.random.shuffle(idxs)
        data = data[idxs,:]

        #------------------------------------
        # SEPARO LOS DATOS
        self.x = data[:,:-1].astype(np.float32)
        self.y = data[:,-1].astype(np.float32)  # La clase está en la última columna

    #===================================================
    def __len__(self):
        '''
        Devuelve el número de patrones en el dataset.
        '''
        return len(self.x)


    #===================================================
    def __getitem__(self, idx):
        '''
        Devuelve el/los patrones indicados.
        '''
        return self.x[idx,:], self.y[idx]

# CONSTRUCCION DEL MODELO

## PERCEPTRON MULTICAPA

In [None]:
class model(nn.Module):

    #====================================
    def __init__(self, n_features, n_inputs, n_outputs):
        '''
        Esta función inicializa/construye el perceptrón.
        n_features: features de cada patrón (2 para OR y XOR)
        n_outputs: cantidad de salidas esperadas.
        '''

        super().__init__()

        self.n_features = n_features
        self.n_inputs = n_inputs
        self.n_outputs = n_outputs

        self.layer1 = nn.Linear(self.n_features, self.n_inputs, bias=True)
        self.layer2 = nn.Linear(self.n_inputs, self.n_outputs, bias=True)

        self.tanh = nn.Tanh()

    #====================================
    def forward(self, x):
        '''
        Esta función realiza la pasada hacia adelante.
        '''

        # Calculo salida lineal de la capa 1
        y = self.layer1(x)

        # Aplico función no lineal
        y = self.tanh(y)

        # Calculo salida lineal de la capa 2
        y = self.layer2(y)

        # Aplico función no lineal
        y = self.tanh(y)

        return y

# LOOPS

## ENTRENAMIENTO

In [None]:
def train_step(model, data, loss_function, optimizer, device):
    '''
    Esta función se encarga de pasar todos los patrones
    a través del modelo neuronal y realizar el ajuste de los pesos.
    '''

    model.train()  # Activamos el cálcula de gradientes

    N_batches = len(data)  # Número de batches = N_patrones/N_patrones_x_batch

    error = 0

    #==============================================================
    for idx,(X,y) in enumerate(data):

        X = X.to(device)  # Se envían los datos a la GPU (si se dispone)
        y = y.to(device)  # Se envían los datos a la GPU (si se dispone)

        optimizer.zero_grad()  # Se limpia el caché del optimizador

        #-----------------------
        # Pasada hacia adelante
        # (Forward pass)
        #-----------------------
        y_pred = model(X)

        #---------------------------
        # Cálculo del error (Loss)
        #---------------------------
        if (data.batch_size == 1):
            loss = loss_function(y_pred.squeeze(), y.squeeze())  # El método "squeeze()" elimina todas las dimensiones con valor "1"
                                                                 # Ej. el vector [[1,2,3]] tiene dimensiones (1,3). Luego de aplicar
                                                                 # "squeeze()", el vector resultante [1,2,3] tiene dimensiones (3,)
        else:
            loss = loss_function(y_pred.squeeze(), y)

        error += loss.item()

        #-----------------------
        # Retropropagación
        # (Backward pass)
        #-----------------------
        loss.backward()
        optimizer.step()
    #==============================================================

    error /= N_batches

    return error, model

## VALIDACION / TEST

In [None]:
def predict_step(model, data, loss_function, device):
    '''
    Esta función se encarga de pasar todos los patrones
    hacia adelante a través del modelo para generar
    las predicciones.
    '''

    model.eval()  # Se desactiva el funcionamiento
                  # de algunos elementos especiales de PyTorch

    N_batches = len(data)  # Número de batches = N_patrones/N_patrones_x_batch

    error = 0

    Y = torch.tensor([])
    Yp = torch.tensor([])

    #==============================================================
    with torch.no_grad():  # Turn off gradients computation

        for idx,(X,y) in enumerate(data):

            Y = torch.hstack( (Y, y.flatten()) )  # En esta línea acumulamos las salidas deseadas
                                                  # en un único vector, de manera de tener ordenados
                                                  # los pares "salida deseada" | "salida predicha" para
                                                  # calcular medidas de desempeño al finalizar esta etapa.
                                                  # El método "flatten()" genera un vector "plano".

            X = X.to(device)  # Se envían los datos a la GPU (si se dispone)
            y = y.to(device)  # Se envían los datos a la GPU (si se dispone)

            #-----------------------
            # Pasada hacia adelante
            # (Forward pass)
            #-----------------------
            y_pred = model(X)

            Yp = torch.hstack( (Yp, y_pred.flatten().cpu()) )  # En esta línea acumulamos las salidas predichas
                                                               # del modelo en un único vector, de manera de tener
                                                               # ordenados los pares "salida deseada" | "salida predicha"
                                                               # para calcular medidas de desempeño al finalizar esta etapa.
                                                               # El método "flatten()" genera un vector "plano".
                                                               # El método "cpu()" retorna los datos a la CPU en caso de estar en la GPU.

            #---------------------------
            # Cálculo del error (Loss)
            #---------------------------
            loss = loss_function(y_pred.squeeze(), y.squeeze())  # El método "squeeze()" elimina todas las dimensiones con valor "1"
                                                                 # Ej. el vector [[1,2,3]] tiene dimensiones (1,3). Luego de aplicar
                                                                 # "squeeze()", el vector resultante [1,2,3] tiene dimensiones (3,)

            error += loss.item()
    #==============================================================

    error /= N_batches

    #------------------

    return error, Y, Yp

# EXPERIMENTO

In [None]:
#=================================
# Definimos los archivos de datos
#=================================
filename_train_data = 'XOR_trn.csv'
filename_validation_data = 'XOR_trn.csv'
filename_test_data = 'XOR_tst.csv'


#==========================================
# Inicializamos parámetros del experimento
#==========================================
LEARNING_RATE = 1E-3  # tasa de aprendizaje

MOMENTUM = 0.9    # Término de momento

MAX_EPOCAS = 50   # Defino el número máximo de épocas
                  # de entrenamiento.

PATIENCE = 10     # Defino el máximo número de épocas
                  # sin mejorar el error de validación
                  # para detener el entrenamiento.

BATCH_SIZE = 10  # Número de patrones en cada batch
                 # Se denomina 'batch' a un conjunto de patrones
                 # que se procesan juntos durante una pasada a
                 # través de la red neuronal  durante el aprendizaje.
                 # Ej. si el batch incluye 10 patrones, los mismos
                 # son usados para realizar los pasos hacia adelante,
                 # la retropropagación y el cálculo de la actualización
                 # de los pesos de acuerdo al error cometido con cada patrón.
                 # Sin embargo, al usar un batch se combinan las actualizaciones
                 # de pesos debidas a cada patrón y se realiza una única
                 # actualización "promedio".

#-----------------------------------------------------

acc = 0.  # Inicializo el accuracy inicial
epoca = 0  # Inicializo contador de épocas
MIN_ERROR = 1E6   # Inicializo la variable para
                  # registrar el mínimo error cometido.


#===========================================================
# Construimos los datasets para entrenamiento y validación
#===========================================================
trn = dataset(filename_train_data)
val = dataset(filename_validation_data)


#=============================================================
# Construimos los dataloaders para entrenamiento y validación
#=============================================================
train_data = DataLoader(trn, batch_size=BATCH_SIZE, shuffle=True)
validation_data = DataLoader(val, batch_size=BATCH_SIZE, shuffle=False)


#=============================================
# Inicializamos el modelo
#=============================================
modelo = model(n_features=2, n_inputs=2, n_outputs=1)
modelo.to(device)


#=============================================
# Definimos la función de costo a utilizar
#=============================================
loss_function = nn.MSELoss(reduction='mean').to(device)


#=============================================
# Definimos el optimizador a utilizar
# > 3er paso del algoritmo de retropropagación
#=============================================
optimizer = optim.SGD(modelo.parameters(), lr=LEARNING_RATE, momentum=MOMENTUM)

## ENTRENAMIENTO DEL MODELO

In [None]:
error = []  # Inicializo estructura para almacenar
            # los errores en el tiempo
accuracy = []  # Inicializo estructura para almacenar
               # el accuracy en el tiempo

STOP = False
counter = 0

best_model = None
best_model_weights = None

#===============================================================
while (epoca < MAX_EPOCAS) and (not STOP):

    epoca += 1

    #----------------------
    # ENTRENAMIENTO
    #----------------------
    _,modelo = train_step(modelo, train_data, loss_function, optimizer, device)

    #----------------------
    # VALIDACION
    #----------------------
    e,Y,Yp = predict_step(modelo, validation_data, loss_function, device)

    acc = torch.sum(torch.sign(Yp) == torch.sign(Y))/ len(Y)

    #----------------------
    # ALMACENO MEDIDAS
    #----------------------
    error.append(e)
    accuracy.append(acc)

    #-----------------------------------------------
    # CRITERIO DE CORTE Y ALMACENAMIENTO DEL MODELO
    #-----------------------------------------------
    if (e < MIN_ERROR):
        MIN_ERROR = e
        counter = 0

        #·······················
        # Almaceno el modelo
        #·······················
        best_model = deepcopy(modelo)  # Genero una copia independiente
        best_model_weights = best_model.state_dict()

    else:
        counter += 1
        if counter > PATIENCE:
            STOP = True

    #--------------------------------------------
    # MUESTRO REPORTE POR PANTALLA (POR EPOCA)
    #--------------------------------------------
    if (epoca % 5) == 0:
        print(f'Epoca: {epoca} -- Error: {e:.4}\t--\tTasa acierto [val]: {acc:.4}\n')
#===============================================================

#--------------------------------------------
# MUESTRO REPORTE POR PANTALLA (FINAL)
#--------------------------------------------
print('='*79)
print(f'FINAL -- Epoca: {epoca} -- Error: {e:.4}\t--\tTasa acierto [val]: {acc:.4}')
print('='*79)

#-----------------------------
# GUARDO MEJOR MODELO A DISCO
#-----------------------------
torch.save(best_model,
           'best_model.pt',
           _use_new_zipfile_serialization=True)

#----------------------------------------------
# GUARDAMOS LOS PESOS DEL MEJOR MODELO A DISCO
#----------------------------------------------
torch.save(best_model.state_dict(),
           'best_model_state_dict.pt',
           _use_new_zipfile_serialization=True)

## GRAFICAMOS LAS SALIDAS

In [None]:
fig, ax = plt.subplots(1, 3, figsize=(30,6))

epocas = np.arange(epoca)

# ERROR
ax[0].plot(epocas, error, 'o-r', lw=2)
ax[0].grid(True)
ax[0].set_xlim(0,MAX_EPOCAS)
ax[0].set_xlabel('Epocas', fontsize=12)
ax[0].set_ylabel('MSE', fontsize=12)

# ACC
ax[1].plot(epocas, accuracy, 'o-b', lw=2)
ax[1].grid(True)
ax[1].set_xlim(0,MAX_EPOCAS)
ax[1].set_xlabel('Epocas', fontsize=12)
ax[1].set_ylabel('Acc', fontsize=12)

# CLASIFICACION
C = []
for i in range(len(Y)):

    if (torch.sign(Y[i]) == torch.sign(Yp[i])) and (torch.sign(Y[i]) == 1):
        C.append('blue')
    if (torch.sign(Y[i]) == torch.sign(Yp[i])) and (torch.sign(Y[i]) == -1):
        C.append('red')
    if (torch.sign(Y[i]) != torch.sign(Yp[i])) and (torch.sign(Y[i]) == 1):
        C.append('cyan')
    if (torch.sign(Y[i]) != torch.sign(Yp[i])) and (torch.sign(Y[i]) == -1):
        C.append('magenta')

ax[2].scatter(validation_data.dataset.x[:,0], validation_data.dataset.x[:,1], 20, C)
ax[2].set_xlim(-1.1,1.1)
ax[2].set_ylim(-1.1,1.1)
ax[2].set_xlabel('X1', fontsize=14)
ax[2].set_ylabel('X2', fontsize=14)
ax[2].grid(True)

# CONSTRUCCION DE LA FRONTERA DE DECISION
x = torch.tensor([-1.5, 1.5])
W1 = modelo.layer1.weight[0,:].flatten().detach().cpu()
B1 = modelo.layer1.bias[0].detach().cpu()
b1 = -B1/W1[1]
m1 = W1[0]/W1[1]

W2 = modelo.layer1.weight[1,:].flatten().detach().cpu()
B2 = modelo.layer1.bias[1].detach().cpu()
b2 = -B2/W2[1]
m2 = W2[0]/W2[1]

ax[2].plot(x, b1 - m1 * x, ':k', lw=2)
ax[2].plot(x, b2 - m2 * x, ':k', lw=2)
ax[2].set_xlim(-1.1,1.1)
ax[2].set_ylim(-1.1,1.1)
ax[2].grid(True)

---

## LECTURA DE DATOS DE EVALUACION

In [None]:
#=====================================
# LEVANTAMOS DE DISCO EL MEJOR MODELO
#=====================================

del modelo  # Eliminamos de memoria
            # para asegurarnos de usar
            # el modelo guardado en disco

#--------------------------------------
# Modelo completo (archivo binario)
#--------------------------------------
modelo = torch.load('best_model.pt')

#-----------------------
# A partir de los pesos
#-----------------------
#best_model = torch.load('best_model_state_dict.pt')
#modelo = MODELO(n_features=2, n_inputs=2, n_outputs=1)
#modelo.load_state_dict(best_model)
#modelo.to(device)

In [None]:
#=====================================
# CONSTRUIMOS EL DATASET PARA TEST
#=====================================
tst = dataset(filename_test_data)

test_data = DataLoader(tst, batch_size=BATCH_SIZE, shuffle=False)

#=====================================
# EVALUAMOS EL MODELO ENTRENADO
#=====================================
error,Y,Yp = predict_step(modelo, test_data, loss_function, device)

acc = torch.sum(torch.sign(Yp) == torch.sign(Y))/ len(Y)

print(f'\nTasa acierto [test]: {acc:.4}\n')

## GRAFICAMOS LAS SALIDAS

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(10,6))

# CLASIFICACION
C = []
for i in range(len(Y)):

    if (torch.sign(Y[i]) == torch.sign(Yp[i])) and (torch.sign(Y[i]) == 1):
        C.append('blue')
    if (torch.sign(Y[i]) == torch.sign(Yp[i])) and (torch.sign(Y[i]) == -1):
        C.append('red')
    if (torch.sign(Y[i]) != torch.sign(Yp[i])) and (torch.sign(Y[i]) == 1):
        C.append('cyan')
    if (torch.sign(Y[i]) != torch.sign(Yp[i])) and (torch.sign(Y[i]) == -1):
        C.append('magenta')

ax.scatter(test_data.dataset.x[:,0], test_data.dataset.x[:,1], 20, C)
ax.set_xlim(-1.1,1.1)
ax.set_ylim(-1.1,1.1)
ax.set_xlabel('X1', fontsize=14)
ax.set_ylabel('X2', fontsize=14)
ax.grid(True)

# CONSTRUCCION DE LA FRONTERA DE DECISION
x = torch.tensor([-1.5, 1.5])
W1 = modelo.layer1.weight[0,:].flatten().detach().cpu()
B1 = modelo.layer1.bias[0].detach().cpu()
b1 = -B1/W1[1]
m1 = W1[0]/W1[1]

W2 = modelo.layer1.weight[1,:].flatten().detach().cpu()
B2 = modelo.layer1.bias[1].detach().cpu()
b2 = -B2/W2[1]
m2 = W2[0]/W2[1]

ax.plot(x, b1 - m1 * x, ':k', lw=2)
ax.plot(x, b2 - m2 * x, ':k', lw=2)
ax.set_xlim(-1.1,1.1)
ax.set_ylim(-1.1,1.1)
ax.grid(True)