In [640]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, mean_squared_error, log_loss

import numpy as np
import matplotlib.pyplot as plt

# sigmoid function
# we don't use the logistic itself since it is slightly more costly
from scipy.special import expit 

import pandas as pd

In [765]:
class MLP():
    def __init__(self, dimensions=[], momentum=0.9, classification=True, 
                 binary=True, learning_rate=1, use_bias=True, testing=False):
        """A MLP class that implements any amount of fully-connect hidden layers.
        """
        # Check input
        if not dimensions:
            raise ValueError("Must pass input dimensions!")
        if len(dimensions) < 2:
            raise ValueError("At least 2 layers needed")
        
        if testing: # If testing, use always the same seed
            np.random.seed(42)
            
        self.classification = classification
        self.use_bias = use_bias
        self.momentum = momentum
        self.lr = learning_rate
        self.velocities = []
        self.weights = []
        self.grads = []
        
        self.initialize_weights(dimensions)
    
    def initialize_weights(self, dimensions):
        """ Initialize weights - we suposse our bias is built into the weights
        """
        # Add weights and gradients
        for idx, (input_dim, output_dim) in enumerate(dimensions):
            if self.use_bias:
                input_dim += 1
                if idx != len(dimensions) - 1:
                    output_dim += 1
                    
            weight = np.random.uniform(low=-0.1, high=0.100001, 
                                        size=(input_dim, output_dim))
            self.weights.append(weight)
            self.grads.append(np.zeros_like(weight))
            self.velocities.append(np.zeros_like(weight))

    def extend_with_bias(self, matrix):
        # Add bias term to a matrix
        new = np.ones((matrix.shape[0], 
                            matrix.shape[1]+1))
        new[:, :-1] = matrix
        return new
    
    def forward_pass(self, x_train):
        """Performs a forward pass of a dataset through our net
        """
        current = x_train
        outputs = []
        for layer in self.weights:
            output = expit(current.dot(layer))
            outputs.append(output)
            current = output
        
        return outputs

    def backward_pass(self, loss, outputs, input_layers):
        """Performs a backward pass and calculates gradients through our net
        """
        backward_outputs = outputs[::-1]
        idxs = range(len(self.weights), 0,-1)
        last_layer = True
        
        # go through backwards
        for idx, output, prev_layer in zip(idxs, backward_outputs, input_layers[::-1]):
            if last_layer:
                layer_error = loss
                last_layer = False
            else:
                layer_error = np.dot(previous_delta, self.weights[idx].T)
            layer_delta = layer_error * output*(1-output)
            grad = prev_layer.T.dot(layer_delta)

            self.grads[idx-1] = grad
            previous_delta = layer_delta
    
    def fit(self, original_x, original_y, n_iter=100, testing=False,
           verbose=False, mini_batch=32):
        """Trains our model learnable weights with a dataset
        """
        
        # Check if the labels are 0 or 1 exclusively!
        if self.classification and self.binary:
            invalid_label = (~np.isin(np.unique(original_y), 
                              np.array([0, 1]))).sum()
            if invalid_label:
                raise ValueError("Invalid label in Y!")
            
        
        if self.use_bias:
            x_train = self.extend_with_bias(original_x)
        else:
            x_train = original_x
            
        y_train = original_y.reshape(len(original_y), 1)
           
        for _ in range(n_iter):   
            for start_idx in range(0, len(x_train), mini_batch):
                x_batch = x_train[start_idx:start_idx+mini_batch]
                y_batch = y_train[start_idx:start_idx+mini_batch]
                
                # Forward pass the mini batch
                outputs = self.forward_pass(x_batch)
                input_layers = [x_batch] + outputs[:-1]

                # Back propagate the loss
                loss = (y_batch - outputs[-1]) / len(x_batch)
                #loss = log_loss(y_batch, outputs[-1]) / len(x_batch)
                self.backward_pass(loss, outputs, input_layers)

                # Update 
                for idx, (grad, last_v) in enumerate(zip(self.grads, self.velocities)):
                    # Update weights according to momentum
                    new_v = self.momentum * last_v + self.lr*grad
                    self.weights[idx] += new_v
                    # Update the new velocites and set the gradients to 0
                    self.velocities[idx] = new_v
                    self.grads[idx] = np.zeros_like(self.weights[idx])

    def predict(self, x_test):
        if self.use_bias:
            new_x_test = self.extend_with_bias(x_test)
        else:
            new_x_test = x_test
        output = self.forward_pass(new_x_test)[-1]
        
        if self.classification:
            return (output > 0.5).astype(int)
        else:
            return output
    

# winequality-red.csv 

In [722]:
TARGET = "category"
df = pd.read_csv("winequality-red.csv")
df.drop(columns=["Unnamed: 0"], inplace=True) # drop index
print("Categorias:", df[TARGET].unique())
df.head()

Categorias: ['Mid' 'Good' 'Bad']


Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,category
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,Mid
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,Mid
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,Mid
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,Mid
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,Mid


Mapear os valores categóricos do nosso target para numéricos

In [639]:
df["category"] = df["category"].map({"Bad":0, "Mid":1, "Good":2})

In [623]:
x = df.drop(columns=[TARGET])
y = df[TARGET]

In [680]:
def run_experiment(x, y, test_percent, hidden_layers, cicles, lr, momentum,
                  classification):
    x_train, x_test, y_train, y_test = train_test_split(x, y, stratify=y, 
                                                       test_size=test_percent)
    dimensions = [(x.shape[1], 32)]
    if hidden_layers > 1:
        dimensions += [(32, 16), (16, 1)]
    else:
        dimensions += [(32, 1)]
    mlp = MLP(dimensions=dimensions, momentum=momentum, learning_rate=lr, 
             classification=classification)
    mlp.fit(x_train.values, y_train.values, n_iter=cicles)
    preds = mlp.predict(x_test.values)
    

In [633]:
run_experiment(x, y, 0.3, 1, cicles=2, lr=0.1, momentum=0.9, classification=True)

TypeError: unsupported operand type(s) for -: 'str' and 'float'

In [723]:
x = []
y = []

for i in [0, 1]:
    for j in [0, 1]:
        x.append([i, j])
        y.append(i != j)

x = np.array(x)
y = np.array(y)
y = y.astype(int)

In [730]:
dimensions = [(2,5), (5, 5), (5,1)]
dimensions = [(2,4), (4,1)]
mlp = MLP(dimensions=dimensions, learning_rate=1,
         use_bias=True)

mlp.fit(x, y, n_iter=10000, verbose=False)

In [731]:
preds = mlp.predict(x).reshape(-1)
#preds = mlp.predict(x).reshape(1, -1)
#pd.DataFrame(np.vstack((preds, y)).T, columns=["MLP predito", "Resposta"])
#preds[:4]

In [732]:
preds

array([0, 1, 1, 0])

In [711]:
mlp.classification = False
mlp.predict(x)

array([[0.25],
       [0.25],
       [0.25],
       [0.25]])

In [743]:
last = mlp.forward_pass(mlp.extend_with_bias(x))[-1]
last.shape

(4, 1)

In [745]:
last

array([[0.00721391],
       [0.99269207],
       [0.99297411],
       [0.00905072]])

In [746]:
last.sum(axis=1)

array([0.00721391, 0.99269207, 0.99297411, 0.00905072])

In [749]:
last.reshape(-1) / last.sum(axis=1)

array([1., 1., 1., 1.])

In [769]:
last = np.random.rand(4, 3)
print(last.shape)       
last

(4, 3)


array([[0.07194636, 0.04604516, 0.35302427],
       [0.9693172 , 0.31867515, 0.40988413],
       [0.63407706, 0.02411184, 0.89776358],
       [0.51944302, 0.46681091, 0.86072666]])

In [768]:
#last / last.sum(axis=1).reshape(-1, 1)

In [770]:
np.exp(last).sum(axis=1)

array([3.54508509, 5.51809159, 5.36379477, 5.64086938])

In [675]:
log_loss(y, preds, )

17.26978799617044

In [678]:
expit([1, 2, 3]) / expit([1, 2, 3]).sum()

array([0.28507647, 0.34346703, 0.37145651])