In [2]:
# Importação das bibliotecas
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Dropout
from tensorflow.keras.metrics import BinaryAccuracy
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.exceptions import UndefinedMetricWarning
import warnings
import os
import matplotlib.pyplot as plt
import random
import optuna
from imblearn.under_sampling import RandomUnderSampler
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UndefinedMetricWarning)

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# Definição de algumas variáveis globais
divisao_cjt_teste = 0.2
caminho_graficos = r'C:\Users\alexa\OneDrive\Documentos\TCC MBA\Etapa final TCC\Ambiente de testes\Gráficos overfitting'
caminho_base = r'C:\Users\alexa\OneDrive\Documentos\TCC MBA\Etapa final TCC\Ambiente de testes\Dados.xlsx'
caminho_pasta_atual = r'C:\Users\alexa\OneDrive\Documentos\TCC MBA\Etapa final TCC\Ambiente de testes'
planilha_original = 'DadosOriginais'
planilha_oversample = 'DadosOS'
planilha_smote = 'DadosSmote'

# Definindo valor da seed das principais bibliotecas para replicação dos resultados
seed_value = 1
random.seed(seed_value)         # Python random
np.random.seed(seed_value)      # Numpy
tf.random.set_seed(seed_value)  # TensorFlow/Keras

In [4]:
# Definição da função para tratamento dos dados. Retorna os dados já divididos em treinamento e teste
def tratamento_dados(caminho_arquivo:str, nome_planilha:str, aplica_undersample:bool):
    
    # Importação dos dados para um DataFrame pandas
    dados = pd.read_excel(io=caminho_arquivo, sheet_name=nome_planilha)

    # Declarando a variável que receberá os valores dos dados que serão tratados
    dados_tratamento = None

    # Inicializando a variável dados_tratamento com um valor diferente de nulo conforme argumento passado na assinatura da função
    if aplica_undersample:

        # Aplicando a técnica de undersampling para diminuir a quantidade da categoria predominante (RJ = 0) e balancear os dados
        # Filtrando os dados e pegando apenas 1 registro de cada empresa, do ano 5, que contém de fato 0 e 1
        dados_undersampling = dados[dados['Ano'] == 5][['Name', 'RJ']]

        # Transformando em um array de 2 dimensões para ser utilizado com a classe RandomUnderSampler, e separando em x e y
        x_undersampling = dados_undersampling['Name'].values.reshape(-1, 1)
        y_undersampling = dados_undersampling['RJ']

        # Construindo o objeto para aplicar a reamostragem, com fator de 30% da classe menor em relação à maior
        rus = RandomUnderSampler(sampling_strategy=0.3, random_state=42)

        # Aplicando a reamostragem
        x_resampled, _ = rus.fit_resample(x_undersampling, y_undersampling)

        # Transformando o array x, que contém o nome das empresas mantidas após RUS em um array de uma dimensão
        empresas_remanescentes = x_resampled.flatten()

        # Filtrando os dados originais e mantendo apenas os nomes das empresas que saíram do resultado de RUS
        dados_tratamento = dados[dados['Name'].isin(empresas_remanescentes)]


    else:

        # Caso RUS não seja ativado na chamada da função, simplesmente ignora o processo
        dados_tratamento = dados
 

    # Inicializando uma variável de lista vazia para conter os dados organizados para a análise
    dados_organizados = []

    # Criando uma variável com os nomes (códigos) distintos de todas empresas
    empresas_unicas = dados_tratamento['Name'].unique()

    # Iterando sobre os nomes distintos das empresas e separando os dados dos indicadores com o nome de cada empresa em uma lista
    for empresa in empresas_unicas:
        dados_empresa = dados_tratamento[dados_tratamento['Name'] == empresa]
        dados_empresa = dados_empresa.iloc[:,0:-2].values
        dados_organizados.append(dados_empresa)
    
    # Transformando a lista em um array numpy
    dados_organizados = np.array(dados_organizados)

    # Atribuindo o array à variável X, que representa as variáveis independentes do modelo
    X = dados_organizados

    # Atribuindo à variável y, que represente a variável dependente, os valores da variável dependente RJ
    y = dados_tratamento[dados_tratamento['Ano'] == 5]['RJ']

    return train_test_split(X, y, test_size=divisao_cjt_teste, random_state=42)

In [6]:
# Define a função que constrói o modelo de rede neural, retornando-a ao fim
def return_model(input_shape, lstm:int, lstm2:str, dense2:str, dropout_rate:float, learning_rate:float, ativacao:str):

    # Define um modelo sequencial LSTM
    model = Sequential()

    # Adiciona uma camada lstm
    model.add(LSTM(lstm, input_shape=input_shape, activation=ativacao, return_sequences=True if lstm2 == 's' else False))

    # Adiciona dropout
    model.add(Dropout(dropout_rate))

    # Camadas adicionais condicionais
    if lstm2 == 's':
        model.add(LSTM(lstm, activation=ativacao, return_sequences=False))
    if dense2 == 's':
        model.add(Dropout(dropout_rate))
        model.add(Dense(32, activation=ativacao))

    # Adiciona a camada densa de saída
    model.add(Dense(1, activation='sigmoid'))

    # Define o otimizador
    optimizer = Adam(learning_rate=learning_rate)

    # Compila o modelo
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=[BinaryAccuracy()])

    return model

In [11]:
# Define a função que será chamada a cada estratégia
def corrida(alias:str):

    # Define o conjunto de variáveis que rege cada cenário para cada estratégia
    if alias == 'IN':
        pesos = False
        planilha = planilha_original
        undersample = False
    elif alias == 'P':
        pesos = True
        planilha = planilha_original
        undersample = False
    elif alias == 'OS+P':
        pesos = True
        planilha = planilha_oversample
        undersample = False
    elif alias == 'OS':
        pesos = False
        planilha = planilha_oversample
        undersample = False
    elif alias == 'US':
        pesos = False
        planilha = planilha_original
        undersample = True
    elif alias == 'US+P':
        pesos = True
        planilha = planilha_original
        undersample = True
    elif alias == 'US+OS+P':
        pesos = True
        planilha = planilha_oversample
        undersample = True
    elif alias == 'SMOTE':
        pesos = False
        planilha = planilha_smote
        undersample = False
    else:
        return Exception('Erro ao escolher o alias da corrida')
    
    # Atriubui valores às variáveis independentes (x) e dependentes (y) de treinamento e teste, utilizando função previamente definida
    X_train, X_test, y_train, y_test = tratamento_dados(caminho_base, planilha, undersample)

    # Separa as quantidades de empresas para teste e validação, utilizada a seguir para remodelar uma matriz
    qtd_empresas_teste = y_test.shape[0]
    qtd_empresas_treino = y_train.shape[0]

    # Separando o nome das empresas de treino em uma variável exclusiva, para posterior identificação caso necessário
    empresas_treino = X_train[:,:,0:1]

    # Remodela empresas treino para um array de uma dimensão
    empresas_treino = empresas_treino.reshape(qtd_empresas_treino, 5)

    empresas_treino = empresas_treino[:,0]

    # Separando os dados quantitativos (indicadores) e redefinindo a variável X_train com os mesmos
    X_train = X_train[:,:,1:5]

    # Definindo a variável n_features com base no número de indicadores atual do estudo
    _,_,n_features = X_train.shape

    # Realizando os mesmos procedimentos nas empresas de teste
    empresas_teste = X_test[:,:,0:1]
    empresas_teste = empresas_teste.reshape(qtd_empresas_teste, 5)
    empresas_teste = empresas_teste[:,0]
    X_test = X_test[:,:,1:5]

    # Inicializando uma lista vazia onde serão armazenados os scalers, os objetos que farão a normalização dos dados de teste e treino
    scalers = []

    # Inicializa arrays para guardar os dados normalizados
    X_train_normalized = np.empty(X_train.shape)
    X_test_normalized = np.empty(X_test.shape)


    # Iteração partindo de zero até n_features vezes
    for i in range(n_features):

        # Inicializa o objeto scaler a ser utilizado nesta iteração/indicador
        scaler = StandardScaler()

        '''
            Aplica o método fit_transform sobre o i-ésimo indicador - este método calcula média e desvpad,
            subtraindo a média dos valores e dividindo esse resultado pelo desvpad para normalizá-los.
        '''
         
        X_train_normalized[:, :, i] = scaler.fit_transform(X_train[:, :, i])

        # O método transform dessa vez é aplicado sobre a base de teste, utilizando o mesmo desvpad e média da base de treino
        X_test_normalized[:, :, i] = scaler.transform(X_test[:, :, i])
        
        # Armazena o objeto scaler para posteridade, caso se deseje desnormalizar os dados
        scalers.append(scaler)

    # Define a função objetiva que será utilizada pela Optuna na Random Search
    def objetivo(trial):

        # Define os limites com os quais o Optuna vai trabalhar para cada variável
        lstm = trial.suggest_int('lstm', 10, 100, step=10)
        lstm2 = trial.suggest_categorical('lstm2', ['s', 'n'])
        dense2 = trial.suggest_categorical('dense2', ['s', 'n'])
        ativacao = trial.suggest_categorical('ativacao', ['relu', 'tanh'])
        dropout_rate = trial.suggest_float('dropout_rate', 0.0, 0.2)
        learning_rate = trial.suggest_float('learning_rate', 0.0001, 0.01)

        # Extrai a janela temporal e número de indicadores do formato da matriz/array X_train_normalized
        _, anos, indicadores = X_train_normalized.shape

        # Define a variável que será utilizada no formato de entrada dos dados na arquitetura do modelo
        input_shape = (anos, indicadores)

        # Retorna o modelo que será utilizado por iteração de cada estratégia
        model = return_model(input_shape, lstm, lstm2, dense2, dropout_rate, learning_rate, ativacao)

        # Mais definição de limites para a Optuna
        batch_size = int(trial.suggest_categorical('batch_size', ['32', '64', '128', '192']))
        
        epochs = trial.suggest_int('epochs', 10, 310, step=20)

        # Se o argumento pesos for verdadeiro, cria o objeto class_weight_dict
        if pesos:

            classes = np.unique(y_train)
            class_weights = compute_class_weight(class_weight='balanced', classes=classes, y=y_train)
            class_weight_dict = dict(zip(classes, class_weights))

        # Define o early stopping
        early_stopping = EarlyStopping(monitor='val_loss',
                                patience=5,
                                restore_best_weights=True)

        # Treina o modelo
        history = model.fit(X_train_normalized, y_train, epochs=epochs, batch_size=batch_size, 
                    validation_split=divisao_cjt_teste, verbose=0, callbacks=[early_stopping],
                    class_weight=class_weight_dict if pesos else None)  

        # Separa o histórico da curva de perda no treino e validação    
        train_losses = history.history['loss']
        val_losses = history.history['val_loss']

        '''
        Realiza as previsões no conjunto de teste, transformando o resultado para binário.
        Necessário para que tanto o Y predito quanto o real correspondam ao mesmo tipo de dado
        '''
        y_pred = model.predict(X_test_normalized)
        y_pred_binario = (y_pred > 0.5).astype(int)

        # Cálculo das métricas
        conf_matrix = confusion_matrix(y_test, y_pred_binario)
        accuracy = round(accuracy_score(y_test, y_pred_binario), 4)
        precision = round(precision_score(y_test, y_pred_binario), 4)
        recall = round(recall_score(y_test, y_pred_binario), 4)
        f1 = round(f1_score(y_test, y_pred_binario), 4)
        roc_auc = round(roc_auc_score(y_test, y_pred_binario), 4)

        # Construção do gráfico
        plt.figure(figsize=(10, 6))
        plt.plot(train_losses, label='Perda de Treino')
        plt.plot(val_losses, label='Perda de Validação')
        plt.title('Perda de Treino vs. Validação')
        plt.xlabel('Épocas')
        plt.ylabel('Perda')
        plt.ylim(bottom=0, top=1)
        plt.legend()

        x0, xmax = plt.xlim()
        y0, ymax = plt.ylim()

        data_width = xmax - x0
        data_height = ymax - y0
        
        plt.text(data_width * 0.1, data_height * 0.1, f"Acc {accuracy} | Prec {precision} | Recall {recall} | F1 {f1} | ROC AUC {roc_auc}", 
                fontsize=8, color='red')
        plt.savefig(os.path.join(caminho_graficos, f'modelo_{trial.number}_{alias}.png'))
        plt.close()

        # Salva o output em txt
        with open(os.path.join(caminho_pasta_atual, 'output.txt'), 'a') as f:
            print(f'Alias: {alias}, Trial: {trial.number}, acc: {accuracy}, prec: {precision}, recall: {recall}, f1: {f1}, roc_auc: {roc_auc}, matriz_confusao: {conf_matrix}, params: {trial.params}', file=f)
        return accuracy * recall * roc_auc
    
    # Cria um objeto "estudo" na Optuna e inicializa a otimização passando a função objetivo e o número de iterações
    study = optuna.create_study(direction='maximize')
    study.optimize(objetivo, n_trials=500)

In [None]:
# Chama a função corrida para cada estratégia proposta
for alias in ['IN', 'P', 'OS+P', 'OS', 'US', 'US+P', 'US+OS+P', 'SMOTE']:
    corrida(alias)