In [1]:
from numpy import vstack
from pandas import read_csv
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.data import random_split
from torch import Tensor
from torch.nn import Linear
from torch.nn import ReLU
from torch.nn import Sigmoid
from torch.nn import Module
from torch.optim import SGD
from torch.nn import BCELoss
from torch.nn.init import kaiming_uniform_
from torch.nn.init import xavier_uniform_

# Definición de nuestro dataset
class CSVDataset(Dataset):
    # Cargamos el dataset
    def __init__(self, path):
        # Cargamos el archivo CSV como un dataframe
        df = read_csv(path, header=None)
        # Guardamos los valores de entrada y los de salida
        self.X = df.values[:, :-1]
        self.y = df.values[:, -1]
        # Nos aseguramos de que la data de entrada sea flotante
        self.X = self.X.astype('float32')
        # Etiquetamos el objetivo de codificación y asegúrese de que los valores sean flotantes
        self.y = LabelEncoder().fit_transform(self.y)
        self.y = self.y.astype('float32')
        self.y = self.y.reshape((len(self.y), 1))

    # Sacamos el número de filas en el dataset
    def __len__(self):
        return len(self.X)

    # Obtenemos una fila en un índice
    def __getitem__(self, idx):
        return [self.X[idx], self.y[idx]]

    # Obtenemos los índices para entrenar y probar filas
    def get_splits(self, n_test=0.33):
        # Determinamos los tamaños
        test_size = round(n_test * len(self.X))
        train_size = len(self.X) - test_size
        # Calculamos los splits
        return random_split(self, [train_size, test_size])

# Preparamos el dataset
def prepare_data(path):
    # Cargamos el dataset
    dataset = CSVDataset(path)
    # Calculamos los splits
    train, test = dataset.get_splits()
    # Preparamos cargadores de datos
    train_dl = DataLoader(train, batch_size=32, shuffle=True)
    test_dl = DataLoader(test, batch_size=1024, shuffle=False)
    return train_dl, test_dl

# Definición del modelo
class MLP(Module):
    # Definimos los elementos del modelo
    def __init__(self, n_inputs):
        super(MLP, self).__init__()
        # Entrada a la primera capa oculta
        self.hidden1 = Linear(n_inputs, 10)
        kaiming_uniform_(self.hidden1.weight, nonlinearity='relu')
        self.act1 = ReLU()
        # Segunda capa oculta
        self.hidden2 = Linear(10, 8)
        kaiming_uniform_(self.hidden2.weight, nonlinearity='relu')
        self.act2 = ReLU()
        # Tercera capa oculta y salida
        self.hidden3 = Linear(8, 1)
        xavier_uniform_(self.hidden3.weight)
        self.act3 = Sigmoid()

    # Entrada de alimentación hacia delante
    def forward(self, X):
        # Entrada a la primera capa oculta
        X = self.hidden1(X)
        X = self.act1(X)
        # Segunda capa oculta
        X = self.hidden2(X)
        X = self.act2(X)
        # Tercera capa oculta y salida
        X = self.hidden3(X)
        X = self.act3(X)
        return X

# Entrenamos el modelo
def train_model(train_dl, model):
    # Definimos la optimización
    criterion = BCELoss()
    optimizer = SGD(model.parameters(), lr=0.01, momentum=0.9)
    # Pasamos por las diferentes epocas o repeticiones
    for epoch in range(100):
        # Enumeramos los mini lotes
        for i, (inputs, targets) in enumerate(train_dl):
            # Borramos los gradientes
            optimizer.zero_grad()
            # Calculamos la salida del modelo
            yhat = model(inputs)
            # Calculamos la pérdida
            loss = criterion(yhat, targets)
            loss.backward()
            # Actualizamos los pesos del modelo
            optimizer.step()

# Evaluación del modelo
def evaluate_model(test_dl, model):
    predictions, actuals = list(), list()
    for i, (inputs, targets) in enumerate(test_dl):
        # Evaluamos el modelo con el dataset de prueba
        yhat = model(inputs)
        # Regresamos un numpy array
        yhat = yhat.detach().numpy()
        actual = targets.numpy()
        actual = actual.reshape((len(actual), 1))
        # Redondeamos a valores de clases
        yhat = yhat.round()
        # Guardamos los valores
        predictions.append(yhat)
        actuals.append(actual)
    predictions, actuals = vstack(predictions), vstack(actuals)
    # Calculamos la precisión
    acc = accuracy_score(actuals, predictions)
    return acc

In [2]:
# Hacemos una predicción de clase para una fila de datos
def predict(row, model):
    # Convertimos la fila en datos
    row = Tensor([row])
    # Hacemos la predicción
    yhat = model(row)
    # Devolvemos un array de numpy
    yhat = yhat.detach().numpy()
    return yhat

In [4]:
path = 'https://raw.githubusercontent.com/jbrownlee/Datasets/master/ionosphere.csv'
train_dl, test_dl = prepare_data(path)
print(len(train_dl.dataset), len(test_dl.dataset))
# Definimos la red neuronal
model = MLP(34)
# Entrenamos el modelo
train_model(train_dl, model)
# Evaluamos el modelo
acc = evaluate_model(test_dl, model)
print('Precisión: %.3f' % acc)
# Hacemos una predicción
row = [1,0,0.99539,-0.05889,0.85243,0.02306,0.83398,-0.37708,1,0.03760,0.85243,-0.17755,0.59755,-0.44945,0.60536,-0.38223,0.84356,-0.38542,0.58212,-0.32192,0.56971,-0.29674,0.36946,-0.47357,0.56811,-0.51171,0.41078,-0.46168,0.21266,-0.34090,0.42267,-0.54487,0.18641,-0.45300]
yhat = predict(row, model)
print('Predecido: %.3f (class=%d)' % (yhat, yhat.round()))

235 116
Precisión: 0.940
Predecido: 0.998 (class=1)
