In [2]:
import numpy as np
from numpy import float64
from numpy.typing import NDArray, ArrayLike
import matplotlib.pyplot as plt
import pandas as pd
import scipy as sci
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow import Tensor

In [None]:
class Rede_Neural:
    def __init__(self, qtd_neuronios_camada, seed) -> None:
        self.camadas: int = len(qtd_neuronios_camada)
        self.neuronios_camada: list[int] = qtd_neuronios_camada
        self.pesos = [self.pesos_iniciais(seed)]
        self.seed = seed

    def ativacao(self, z: Tensor) -> Tensor:
        """
        Aplica a função de ativação definida em uma lista ou np.array. Atualmente é tanh, pq ReLU e Leaku ReLU é linear demais pro que queremos.
        """
        return 5 * tf.tanh(tf.divide(z, 5))

    def derivada_ativacao(self, z: Tensor) -> Tensor:
        """
        Aplica a derivada da função de ativação definida em uma lista ou np.array. Atualmente é leaky ReLU, com 0.01 se z < 0
        """
        return 1 / tf.cosh(tf.divide(z,5)) ** 2

    def pesos_iniciais(self, seed: int) -> list[Tensor]:
        """
        Usa a distribuição de Xavier para criar pesos iniciais usando a seed dada para o rng.\n
        W(camada) ~ N(0, sqrt(2 / neuronios[camada]))
        """
        pesos_saida = []
        tf.random.set_seed(seed)
        
        for i in range(self.camadas-1):
            n_in = self.neuronios_camada[i]
            n_out = self.neuronios_camada[i+1]

            w = tf.Variable(tf.keras.initializers.GlorotNormal(seed=seed)((n_out, n_in + 1)))

            pesos_saida.append(w)

        return pesos_saida

    def forward_pass(self, entrada: ArrayLike, pesos: list[Tensor]) -> list[dict[str, Tensor]]:
        """
        Aplica a rotina de multiplicar pelos pesos e aplicar função de ativação para todas as camadas, retornando os valores pré-ativação e pós-ativação de cada camada.

        Parameters
        ----------
        entrada: ArrayLike
            É a lista de valores de entrada da rede, pode ser uma lista mesmo ou qualquer ArrayLike, já que é transformada em Tensor dentro da função.

        pesos: list[Tensor]
            São os pesos que se quer usar na rede. Usar self.pesos[-1] para última época

        Returns
        -------
        valores_camadas: list[dict]
            Para uma camada qualquer ``i``, os valores de ``dicionário valores_camadas[i]`` são:

            - ``"z"``: Tensor or None\n
                Valores não ativados da camada
            - ``"a"``: Tensor\n
                Valores ativados da camada
        """
        valores_camadas = []
        a = tf.convert_to_tensor(entrada, dtype=tf.float32)
        valores_camadas.append({"z": None, "a": a})

        for W in pesos:
            a_bias = tf.concat([a, tf.ones([1])], axis=0) # adiciona bias
            z = tf.linalg.matvec(W, a_bias)
            a = self.ativacao(z)
            valores_camadas.append({"z": z, "a": a})
        
        return valores_camadas
    
    def previsao(self, entrada: ArrayLike, epoca: int) -> NDArray:
        """Retorna os valores previstos pela rede, os valores ativados da última camada, para a época desejada. Usar -1 para última época"""
        camadas  = self.forward_pass(entrada, self.pesos[epoca])
        previsao = camadas[-1]["a"]
        return previsao.numpy()

    def perda(self, previsao: Tensor, real: Tensor) -> Tensor:
        """
        Calcula a função de perda da rede, atualmente usando 0.5 MSE mas poderia ser com penalidades e
        para função de valor máximo talvez seja melhor usar uma função como softmax
        """
        residuos = previsao - real
        mse = tf.reduce_mean(residuos ** 2)
        perda = 0.5 * mse
        return perda
    
    # def derivada_perda(self, previsao: Tensor, real: Tensor) -> Tensor:
    #     """
    #     Calcula a derivada da função de perda, lembrar que a derivada de abs(x) = sign(x)
    #     """
    #     residuos = previsao - real
    #     return residuos
    
    def calcular_gradiente(self, entrada: Tensor, real: Tensor, epoca: int) -> list[Tensor]:
        """Calcula o gradiente dos pesos pelo tensorflow, retornando uma lista de Tensores correspondente ao gradiente de cada camada."""
        pesos = self.pesos[epoca]

        with tf.GradientTape() as tape:
            previsao = self.forward_pass(entrada, pesos)[-1]["a"]
            perda = self.perda(previsao, real)

        gradiente = tape.gradient(perda, pesos)
        return gradiente

    def treinar_uma_epoca(self, entradas: list[Tensor], saidas: list[Tensor], tamanho_lote: int, taxa_aprendizado: float) -> float:
        """Treina a rede por uma época usando Adam e mini-batches"""
        # embaralha
        n = len(entradas)
        indices = np.arange(n)
        np.random.shuffle(indices)

        # pega os pesos atuais e inicializa Adam
        pesos = self.pesos[-1]
        optimizer = tf.keras.optimizers.Adam(learning_rate=taxa_aprendizado)

        perdas = []

        for inicio in range(0, n, tamanho_lote):
            fim = inicio + tamanho_lote
            batch_idx = indices[inicio:fim]

            # monta o batch
            x_batch = tf.stack([entradas[i] for i in batch_idx])
            y_batch = tf.stack([saidas[i] for i in batch_idx])

            with tf.GradientTape() as tape:
                # forward em todo o batch
                previsoes = [self.forward_pass(x, pesos)[-1]["a"] for x in x_batch]
                previsoes = tf.stack(previsoes)

                perda = self.perda(previsoes, y_batch)

            # gradiente médio do batch
            gradientes = tape.gradient(perda, pesos)
            optimizer.apply_gradients(zip(gradientes, pesos))

            perdas.append(perda.numpy())

        # salva pesos da época
        self.pesos.append([tf.identity(w) for w in pesos])

        return float(np.mean(perdas))

    def treinar(self, taxa_aprendizado: float, tamanho_lotes: int, entradas_treino: list[ArrayLike], saidas_treino: list[ArrayLike], quantidade_epocas: int) -> None:
        """Chama self.treinar_uma_epoca ``quantidade_epocas`` vezes"""
        np.random.seed(self.seed)
        for epoca in range(quantidade_epocas):
            self.treinar_uma_epoca(taxa_aprendizado, tamanho_lotes, entradas_treino, saidas_treino)

In [None]:
re