In [144]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import os

def rmse(y_true, y_pred):
    squared_errors = np.power(y_true - y_pred, 2)
    mean_squared_error = np.mean(squared_errors)
    return np.sqrt(mean_squared_error)
    
def r_squared(y_true, y_pred):
    sum_of_squared_residuals = np.sum(np.power(y_true - y_pred, 2))
    mean_y = np.mean(y_true)
    total_sum_of_squares = np.sum(np.power(y_true - mean_y, 2))
    if total_sum_of_squares == 0:
        return 1.0
    return 1 - (sum_of_squared_residuals / total_sum_of_squares)

    

def linear(x, derivative=False):
    if derivative:
        return 1.0
    return x

def ReLU(x, derivative=False):
    if derivative:
        return (x > 0).astype(np.float64)

    return np.maximum(0, x)

def logistical(x, derivative=False):
    x = np.clip(x, -500, 500)
    s = 1.0/(1.0 + np.exp(-x))
    if derivative:
        return s*(1-s)
    return s

def softmax(z):
    if not np.isfinite(z).all():
        raise ValueError("NaN/Inf in logits")
    z = z - np.max(z, axis=0, keepdims=True)
    exp = np.exp(z)
    return exp / np.sum(exp, axis=0, keepdims=True)




def mse(y_true, y_pred, derivative=False):
    if derivative:
        return 2 * (y_pred - y_true) / y_true.size
    return np.mean(np.power(y_true - y_pred, 2))

def binary_cross_entropy(y, y_hat, derivative=False, eps=1e-7):
    y_hat = np.clip(y_hat, eps, 1 - eps)
    if not derivative:
        return -np.mean(y * np.log(y_hat) + (1 - y) * np.log(1 - y_hat))
    
    # This is dL/dy_hat
    return (y_hat - y) / (y_hat * (1 - y_hat))

In [238]:
class NeuronLayer:

    # Create a NeuronLayer with:
    #     n_in input neurons
    #     n_out output neurons
    #     n_in * n_out connections     
    #   Obs: if l_type == "input" this is just a big identity matrix :D
    def __init__(
        self,
        n_in,                  # Previous Layer Size / Input size
        n_out,                 # Current Layer Size
        l_n,
    ):
        self.l_n = l_n
        self.l_size = n_out
        self.error = np.ndarray((n_out, 1))
        if l_n == 0:
            self.weights = None
            self.biases = None
            self.l_size = n_in
            self.Z = np.ndarray((n_in, 1))
            self.A = np.ndarray((n_in, 1))
            return
            
        self.Z = np.ndarray((n_out, 1))
        self.A = np.ndarray((n_out, 1))
        self.weights = np.random.normal(loc=0.0, scale=np.sqrt(2/n_in), size=(n_out, n_in))
        self.biases = np.zeros((n_out, 1))

    
    
    def get_weights(self):
        return self.weights

    def get_biases(self):
        return self.biases

    def set_weights(self, w):
        self.weights = w

    def set_biases(self, b):
        self.biases = b

    def update_weights(self, learning_rate, gradW):
        self.weights -= learning_rate*gradW

    def update_biases(self, learning_rate, gradB):
        self.biases -= learning_rate*gradB

    
    def propagate_foward(self, input, activation):
        self.Z = np.dot(self.weights, input) + self.biases
        self.A = activation(self.Z)
        return self.A

    
    def propagate_backward(self, next_layer, prev_layer, m, activation):
        self.error = np.dot(next_layer.get_weights().T, next_layer.error) * activation(self.Z, derivative=True)
        
        self.gradW = np.dot(self.error, prev_layer.A.T) / m
        self.gradB = np.sum(self.error, axis=1, keepdims=True) / m

        return (self.gradW, self.gradB)
        
        

class NeuralNetwork:

    # Creates an empty model. Params:
    #     layer_sizes: array with the size of each layer [input, hidden_layer_1, hidden_layer_2, ..., output]
    def __init__(
        self,
        layer_sizes,
        activation = ReLU,
        final_activation = linear,
        cost_function = mse,
        learning_rate = 0.0001,
        model_name = "default",
    ):
        self.model_name = model_name
        self.activation = activation
        self.final_activation = final_activation
        self.cost_function = cost_function
        self.learning_rate = learning_rate
        self.layer_sizes = layer_sizes
        self.n_of_layers = len(layer_sizes)
        self.layers = [0] * self.n_of_layers
        self.layers[0] = NeuronLayer(layer_sizes[0], layer_sizes[0], 0)
        
        for i in range(1, self.n_of_layers):
            self.layers[i] = NeuronLayer(layer_sizes[i-1], layer_sizes[i], i)


    def set_lr(self, learning_rate):
        self.learning_rate = learning_rate

        
    def get_layer_params(self, layer):
        return (self.layers[layer].get_weights(), self.layers[layer].get_biases())

        
    def print_layer_params(self, layer):
        if layer == 0:
            print(f"Layer 0 has {self.layers[0].l_size} inputs!")
            return
            
        w, b = self.get_layer_params(layer)
        print(f"Weights: ({w.shape[0]} rows & {w.shape[1]} columns) \n", w)
        print(f"Biases: ({b.shape[0]}) \n", b)

    
    def save_parameters(self):
        dir_path = f"model_params_{self.model_name}"
        os.makedirs(dir_path, exist_ok=True)
        for layer in self.layers:
            if layer.l_n == 0:
                continue

            pd.DataFrame(layer.get_weights()).to_csv(f"{dir_path}/{self.model_name}_w_{layer.l_n}", index=False, header=False)
            pd.DataFrame(layer.get_biases()).to_csv(f"{dir_path}/{self.model_name}b_{layer.l_n}", index=False, header=False)


    def load_parameters(self, dir_path):
        for layer in self.layers:
            if layer.l_n == 0:
                continue
    
            w = pd.read_csv(f"{dir_path}/{dir_path[13:]}_w_{layer.l_n}",
                            header=None).values
            b = pd.read_csv(f"{dir_path}/{dir_path[13:]}_b_{layer.l_n}",
                            header=None).values
    
            layer.set_weights(w)
            layer.set_biases(b)

    
    def foward(self, input):

        if input.shape[0] != self.layers[0].l_size:
            print("input wrong!")
            return None
            
        A_i = []
        for layer in self.layers:
            if layer.l_n == 0:
                A_i = input
                layer.A = A_i
                continue

            if layer.l_n == self.n_of_layers - 1:
                A_i = layer.propagate_foward(A_i, self.final_activation)
                return A_i
            
            A_i = layer.propagate_foward(A_i, self.activation)

    def backpropagation(self, y, m):
        y = np.asarray(y)
        prediction = self.layers[-1].A   
        
        for layer in reversed(self.layers):
            if layer.l_n == 0:
                continue
                
            prev_layer = self.layers[layer.l_n-1]
            
            if layer.l_n == self.n_of_layers-1:
                if self.final_activation is logistical and self.cost_function is binary_cross_entropy:
                    layer.error = prediction - y
                else:
                    layer.error = self.cost_function(y, prediction, derivative=True) * self.final_activation(layer.Z, derivative=True)
                    
                layer.gradW = np.dot(layer.error, prev_layer.A.T) / m
                layer.gradB = np.sum(layer.error, axis=1, keepdims=True) / m
                continue
                
            next_layer = self.layers[layer.l_n+1]
            gradW, gradB = layer.propagate_backward(next_layer, prev_layer, m, self.activation)

        for layer in self.layers:
            if layer.l_n == 0:
                continue
            
            layer.update_weights(self.learning_rate, layer.gradW)
            layer.update_biases(self.learning_rate, layer.gradB)


    def train(self, train_array, train_sol, epochs = 100):
        for epoch in range(epochs):
          for i in range(train_array.shape[0]):
              self.foward(train_array[i].reshape(-1, 1))
              self.backpropagation(train_sol[i].reshape(-1, 1), 1)
            

    def test(self, test_array):
        r = np.ndarray((test_array.shape[0], self.layer_sizes[-1]))
        for i in range(test_array.shape[0]):
            self.foward(test_array[i].reshape(-1, 1))
            r[i, :] = self.layers[-1].A[:, 0]

        return r

In [239]:
import os
import shutil
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
url = 'https://raw.githubusercontent.com/Krumpu/Homework_ICA/main/Data-Melbourne_F.csv'
df = pd.read_csv(url)
df = df.drop(columns=['BOD','month', 'day','year'])
df
df_y = df[['COD']].copy()
df_x = df.drop(columns=['COD'])

treino_x, teste_x, treino_y, teste_y = train_test_split(df_x,df_y,
                                                        test_size=0.2,
                                                        random_state=42,
                                                        shuffle=True)

assimetria = df.drop('COD', axis=1).skew()
colunas_assimetricas =  ['VV', 'SLP', 'PP', 'avg_inflow']

for colunas in colunas_assimetricas:
  treino_x[colunas] = np.log1p(treino_x[colunas])
  teste_x[colunas] = np.log1p(teste_x[colunas])

mediana = treino_y['COD'].median()


y_train_cls = (treino_y['COD'] > mediana).astype(int).values
y_test_cls = (teste_y['COD'] > mediana).astype(int).values
print(f"Mediana usada para corte: {mediana}")
print(f"Classes no Treino: {np.bincount(y_train_cls)}")


#Normalização
mean = treino_x.mean()
std = treino_x.std()

X_train_norm = ((treino_x - mean) / std).values
X_test_norm = ((teste_x - mean) / std).values

X_train_bias = np.c_[np.ones((len(X_train_norm), 1)), X_train_norm]
X_test_bias = np.c_[np.ones((len(X_test_norm), 1)), X_test_norm]

# FUNÇÕES DE AVALIAÇÃO
def calcular_acuracia(y_real, y_pred):
    return np.mean(y_real == y_pred)

def matriz_confusao_manual(y_real, y_pred):
    TP = np.sum((y_real == 1) & (y_pred == 1))
    TN = np.sum((y_real == 0) & (y_pred == 0))
    FP = np.sum((y_real == 0) & (y_pred == 1))
    FN = np.sum((y_real == 1) & (y_pred == 0))
    return np.array([[TN, FP], [FN, TP]])


Mediana usada para corte: 840.0
Classes no Treino: [560 545]


In [244]:
print("Treinando Rede Neural...")

nn = NeuralNetwork(
    [X_train_bias.shape[1], 32, 128, 64, 20, 1], 
    activation=ReLU,
    final_activation=logistical, 
    cost_function=binary_cross_entropy,
    learning_rate=0.001, 
    model_name="xxx_mk6"
)

nn.train(X_train_bias, y_train_cls, epochs=5000)
nn.save_parameters()


Treinando Rede Neural...


In [245]:
y_pred_nn = nn.test(X_test_bias)
y_pred_cls = (y_pred_nn.squeeze() >= 0.5).astype(int)

acc_log = calcular_acuracia(y_test_cls, y_pred_cls)
cm_log = matriz_confusao_manual(y_test_cls, y_pred_cls)

print(f"Acurácia da Rede Neural: {acc_log*100:.2f}%")
print("Matriz de Confusão (Rede Neural):\n", cm_log)

Acurácia da Rede Neural: 83.39%
Matriz de Confusão (Rede Neural):
 [[104  22]
 [ 24 127]]
