# Rede Neural Artificial


## Imports necessários 

In [14]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import classification_report

## Funções de Ativação

In [2]:
# Funções de ativação e derivadas
def sigmoid(z):
    return 1 / (1 + np.exp(-z))

def sigmoid_derivative(a):
    sigmoid_a = sigmoid(a)
    return sigmoid_a * (1 - sigmoid_a)

def relu(z):
    return np.maximum(0, z)

def relu_derivative(a):
    return np.where(a > 0, 1, 0)

def softmax(z):
    z = z - np.max(z, axis=-1, keepdims=True)
    exps = np.exp(z)
    return exps / np.sum(exps, axis=-1, keepdims=True)

def softmax_derivative(z):
    # TODO: Explicar pq utiliza return 1. Tentar implementar a derivada do softmax
    return 1

def identity(z):
    return z

def identity_derivative(z):
    return np.ones_like(z)

activation_funcs = {
    'sigmoid': (sigmoid, sigmoid_derivative),
    'relu':    (relu, relu_derivative),
    'softmax': (softmax, softmax_derivative),
    'identity': (identity, identity_derivative)
}


## Funções de Custo

In [3]:
# Funções de custo e derivadas
def mse(y_true, y_pred):
    return np.mean((y_true - y_pred)**2)

def mse_derivative(y_true, y_pred):
    return 2 * (y_pred - y_true)

def binary_cross_entropy(y_true, y_pred, eps=1e-15):
    y_pred = np.clip(y_pred, eps, 1 - eps)
    return -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))

def binary_cross_entropy_derivative(y_true, y_pred, eps=1e-15):
    y_pred = np.clip(y_pred, eps, 1 - eps)
    return (y_pred - y_true) / (y_pred * (1 - y_pred))

def categorical_cross_entropy(y_true, y_pred):
    return -np.sum(y_true * np.log(y_pred)) / y_pred.shape[0] # dividir por predictions.shape[0] tem o intuito de normalizar o valor da perda pelo número de amostras, tornando a perda independente do tamanho do batch

def categorical_cross_entropy_derivative(y_true, y_pred, eps=1e-12):
    y_pred = np.clip(y_pred, eps, 1 - eps)
    return y_pred - y_true

cost_funcs = {
    'mse':            (mse, mse_derivative),
    'binary_cross_entropy':  (binary_cross_entropy, binary_cross_entropy_derivative),
    'categorical_cross_entropy': (categorical_cross_entropy, categorical_cross_entropy_derivative)
}


## Inicialização da Rede e Feedfoward

In [None]:
def initialize_layers(layers, c_inputs):
  """
  Inicializa as camadas da rede neural com pesos aleatórios.

  Parâmetros
  ----------
  layers : list of dict
      Lista onde cada dicionário contém:
      - 'neurons': número de neurônios na camada
      - 'activation_function': nome da função de ativação usada na camada
  c_inputs : int
      Número de entradas da rede (atributos do dataset).

  Retorna
  -------
  initialized_layers : list
      Lista de camadas com pesos e função de ativação configurados.
  """

  initialized_layers = []

  for i, layer in enumerate(layers):
    # se for a primeira camada, o número de entradas é igual ao número de atributos
    if i == 0:
      input_size = c_inputs
    else:
      input_size = layers[i - 1]['neurons']
    # o número de saídas é igual ao número de neurônios da camada atual
    output_size = layer['neurons']
    
    # Inicializa os pesos com valores aleatórios pequenos e mais um para o viés
    weight_matrix = np.random.uniform(low=-0.33, high=0.33,size=(output_size,input_size + 1))
  
    initialized_layers.append({
        'weights': weight_matrix,
        'activation_func': layer['activation_function']
    })
  return initialized_layers

def feed_forward(layers, inputs):
  """
  Executa o algoritmo de propagação direta (feedforward) em uma rede neural.

  Parâmetros
  ----------
  layers : list
      Lista de camadas já inicializadas com pesos e funções de ativação.
  inputs : np.ndarray
      Vetor de entrada com os atributos de uma observação.

  Retorna
  -------
  activation : np.ndarray
      Saída final da rede após a última camada (predição).
  layers : list
      Lista atualizada das camadas com entradas e saídas armazenadas para uso posterior no backpropagation.
  """
    
  activation = inputs

  # Itera sobre cada camada da rede neural
  for layer in layers:

    # Concatena para o input + viés da matriz de pesos
    activation = np.concatenate(([1], activation))
    
    # salva input na layer
    layer['input'] = activation

    # Calcula a saída da camada atual: z = w' * a + b, onde w' é a transposta da matriz de pesos, a é o input da camada e b é o viés que foi concatenado
    z = np.dot(activation, layer['weights'].T)
    
    # Extrai e calcula a função de ativação do dicionário activation_funcs
    activation_func, _ = activation_funcs[layer['activation_func']]
    
    activation = activation_func(z)
  
    # salva a operacao na layer
    layer['output'] = z

  return activation, layers

## Backpropagation

In [None]:
def backpropagation(layers, cost_derivation, learningRate):
    """
    Executa o algoritmo de backpropagation em uma rede neural artificial.

    Parâmetros
    ----------
    layers : list
        Lista de dicionários representando as camadas da rede após o feedfoward. Cada camada deve conter:
        - "activation_func": nome da função de ativação
        - "output": saída da camada após a ativação
        - "input": entrada recebida pela camada
        - "weights": matriz de pesos da camada
    cost_derivation : np.ndarray
        Derivada da função de custo em relação à saída da rede.
    learningRate : float
        Taxa de aprendizado usada para atualização dos pesos.

    Retorna
    -------
    newWeights : list
        Lista contendo os novos pesos atualizados para cada camada.
    """
    newWeights = []
    
    error = None
    nextLayerWeights = None

    # Percorre as camadas da rede em ordem reversa (do output para o input)
    for layer in reversed(layers):
        # Obtém e calcula a derivada da função de ativação da camada atual
        _, activation_derivation = activation_funcs[layer["activation_func"]]

        derivation = activation_derivation(layer["output"])

        if error is None:
            # Primeira iteração: erro é a derivada do custo vezes a derivada da ativação
            error = cost_derivation * derivation
        else:
            # Para camadas intermediárias: propaga o erro da camada seguinte
            propagated_error = np.dot(error, nextLayerWeights[:, 1:]) # Ignora o viés

            error = propagated_error * derivation

        # Calcula e aplica o gradiente
        gradient = np.outer(error, layer["input"]) * learningRate

        nextLayerWeights = layer["weights"]

        newLayerWeights = nextLayerWeights - gradient

        newWeights.insert(0, newLayerWeights)
    
    return newWeights

## Função de Treino

In [6]:
def train(ann, epochs, x, y, learning_rate, cost_func_name):
    """
    Executa o treinamento de uma rede neural artificial (RNA).

    Parâmetros:
    ann : list
        Estrutura da rede neural, lista de dict {weights, activation_func}.
    epochs : int
        Número de épocas de treinamento.
    x : pandas.DataFrame
        Conjunto de dados de entrada.
    y : list or np.ndarray
        Conjunto de saídas desejadas.
    learning_rate : float
        Taxa de aprendizado.
    cost_func_name : string
        Nome da função de custo desejada.

    Retorna:
    stages : list
        Lista contendo os estados da RNA ao final de cada época.
    """

    stages = []  # Armazena o estado da rede após cada época

    # Loop sobre cada época
    for _ in range(epochs):
        # Loop sobre cada observação no conjunto de dados
        for observation_id in range(len(x)):

            observation = np.array(x.iloc[observation_id])

            # Executa a propagação direta
            y_pred, ann = feed_forward(ann, observation)

            y_true = np.array(y[observation_id])
            
            _, cost_derivation_func = cost_funcs[cost_func_name]

            # Calcula da saída
            cost_derivation = cost_derivation_func(y_true=y_true, y_pred=y_pred)

            # Executa a retropropagação para atualizar os pesos
            newWeights = backpropagation(
                layers=ann,
                cost_derivation=cost_derivation,
                learningRate=learning_rate
            )

            for i, layer in enumerate(ann):
                layer["weights"] = newWeights[i]

        # Salva o estado da rede ao final da época
        stages.append(ann)

    return stages[-1]


## Funções de Preparação de Dados

In [7]:
# TODO: trocar dataset https://www.kaggle.com/datasets/adityakadiwal/water-potability

def prepareDataBinaryClassification():
  # print(data.head())
  DATA_PATH = 'prepareData/penguins_binary_classification.csv'

  data = pd.read_csv(DATA_PATH)

  # Realizando o label encoding da coluna 'species'
  data['species'] = data['species'].map({'Adelie': 0, 'Gentoo': 1})

  # Realizando o label encoding da coluna 'island'
  data['island'] = data['island'].map({'Torgersen': 0, 'Biscoe': 1, 'Dream': 2})

  numerical_cols = ['bill_length_mm', 'bill_depth_mm', 'flipper_length_mm', 'body_mass_g', 'year']

  # Inicializando o MinMaxScaler
  scaler = MinMaxScaler(feature_range=(0, 1))

  # Aplicando a normalização às colunas numéricas
  data[numerical_cols] = scaler.fit_transform(data[numerical_cols])

  X = data

  y = data.pop('species').values

  # Divisão dos dados em conjuntos de treino e teste na proporção de 80% para treino e 20% para teste
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

  return X_train, X_test, y_train, y_test


def prepareDataMultipleClassification():
  data = pd.read_csv('prepareData/mobile_price_multiple_classification.csv')

  # print(data.head())
  
  numerical_cols = ['battery_power', 'blue', 'clock_speed', 'dual_sim', 'fc', 'four_g', 'int_memory', 'm_dep', 'mobile_wt', 'n_cores', 'pc', 'px_height', 'px_width', 'ram', 'sc_h', 'sc_w', 'talk_time', 'three_g', 'touch_screen',  'wifi']

  # Inicializando o MinMaxScaler
  scaler = MinMaxScaler(feature_range=(0, 1))

  # Aplicando a normalização às colunas numéricas
  data[numerical_cols] = scaler.fit_transform(data[numerical_cols])
  
  X = data.drop(columns=['price_range'])

  y = pd.get_dummies(data["price_range"], prefix="price", sparse=False)

  # Divisão dos dados em conjuntos de treino e teste na proporção de 80% para treino e 20% para teste
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
  
  return X_train, X_test, y_train, y_test

def prepareDataRegression():
  DATA_PATH = 'prepareData/house_price_regression_dataset.csv'

  data = pd.read_csv(DATA_PATH)

  # print(data.head())

  numerical_cols = ['Square_Footage', 'Num_Bedrooms', 'Num_Bathrooms', 'Year_Built', 'Lot_Size', 'Garage_Size', 'Neighborhood_Quality', 'House_Price']

  min_max_house_price = (data['House_Price'].min(), data['House_Price'].max())

  # Inicializando o MinMaxScaler
  scaler = MinMaxScaler(feature_range=(0, 1))

  # Aplicando a normalização às colunas numéricas
  data[numerical_cols] = scaler.fit_transform(data[numerical_cols])

  X = data

  y = data.pop('House_Price').values

  
  # Divisão dos dados em conjuntos de treino e teste na proporção de 80% para treino e 20% para teste
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

  return X_train, X_test, y_train, y_test, min_max_house_price


# Montagem da Rede de Classificação Multiclasse

In [10]:
input_size = 20

layers = [
    {'neurons': 10, 'activation_function': 'relu'},
    {'neurons': 10, 'activation_function': 'relu'},
    {'neurons': 4, 'activation_function': 'softmax'}
]

ann_layer = initialize_layers(layers, c_inputs=input_size)

x_train, x_test, y_train, y_test = prepareDataMultipleClassification()

In [11]:
ann = train(
    ann=ann_layer,
    epochs=10,
    x=x_train,
    y=np.array(y_train),
    learning_rate=0.002,
    cost_func_name='categorical_cross_entropy'
)

In [16]:
def evaluate(ann, x, y):
    predictions = []
    for observation_id in range(len(x)):
        input = np.array(x.iloc[observation_id])
        prediction, _ = feed_forward(ann, input)
        one_hot_prediction = np.zeros_like(prediction)
        one_hot_prediction[np.argmax(prediction)] = 1
        predictions.append(one_hot_prediction)
    
    print("\n Resultados da Classificação:")
    print(classification_report(y, predictions))
    
evaluate(ann, x_test, y_test)


 Resultados da Classificação:
              precision    recall  f1-score   support

           0       0.80      0.99      0.88       114
           1       0.97      0.34      0.50       103
           2       0.50      0.49      0.49        80
           3       0.71      0.99      0.83       103

   micro avg       0.72      0.72      0.72       400
   macro avg       0.74      0.70      0.68       400
weighted avg       0.76      0.72      0.69       400
 samples avg       0.72      0.72      0.72       400



# Montagem da Rede de Classificação Binaria

# Montagem da Rede de Regressão