# Modularização | Python

Autor: Felipe Oliveira

------------

## Objetivo
Este notebook objetiva a criação de módulos (classes) que auxiliem na automação e otimização do pipeline de modelagem

# Pacotes

## Instalação

In [0]:
dbutils.library.installPyPI("shap")
dbutils.library.installPyPI("lightgbm")
dbutils.library.installPyPI("xgboost")
dbutils.library.installPyPI("hyperopt")
dbutils.library.installPyPI("mlflow")
# dbutils.library.restartPython()

In [0]:
import mlflow
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from hyperopt import hp, rand, tpe, STATUS_OK, Trials, space_eval, fmin, SparkTrials
from hyperopt.pyll.stochastic import sample

from sklearn import metrics
from sklearn.ensemble import AdaBoostClassifier, RandomForestClassifier
from sklearn.model_selection import cross_val_score, train_test_split, RepeatedStratifiedKFold

#AdaBoost
from sklearn.tree import DecisionTreeClassifier

#LightGBM
from lightgbm import LGBMClassifier

#XgBoost
from xgboost import XGBClassifier

#Classes

## Otimizador Bayesiano

In [0]:
classificadores = {
    "adaboost": {"obj": AdaBoostClassifier(), "gradiente": False, "base_estimator": DecisionTreeClassifier(), "espaco_busca": {
        'base_estimator__max_depth': hp.choice("base_estimator__max_depth", [2, 4]),
        'base_estimator__min_samples_leaf': hp.choice("base_estimator__min_samples_leaf", [25, 100, 300]),
        'learning_rate': hp.loguniform("learning_rate", np.log(0.01), np.log(0.3)),
        'algorithm': hp.choice("algorithm", ['SAMME', 'SAMME.R'])
    }},
    "random_forest": {"obj": RandomForestClassifier(), "n_jobs": -1, "gradiente": False, "espaco_busca": {
        'bootstrap': hp.choice("bootstrap", [True, False]),
        'max_depth': hp.choice("max_depth", [4, 6, 8]),
        'min_samples_leaf': hp.choice("min_samples_leaf", [25, 100, 300]),
        'min_samples_split': hp.choice("min_samples_split", [8, 10, 12]),
    }},
    "light_gbm": {"obj": LGBMClassifier(n_jobs=-1), "gradiente": True, "espaco_busca": {
        "learning_rate": hp.loguniform("learning_rate", np.log(0.01), np.log(0.3)),
        "num_leaves": hp.choice("num_leaves", [63, 127]),
        "max_depth": hp.choice("max_depth", [4, 6, 8]),
        "feature_fraction": hp.quniform("feature_fraction", .7, .9, 0.1),
        "bagging_fraction": hp.quniform("bagging_fraction", .7, .9, 0.1),
        "min_child_samples": hp.choice('min_child_samples', [25, 100, 300]),
        "lambda_l1": hp.choice('lambda_l1', [0, .1, 1, 10]),
        "lambda_l2": hp.choice('lambda_l2', [0, .1, 1, 10]),
    }},
    "xgboost":  {"obj": XGBClassifier(n_jobs=-1), "gradiente": True, "espaco_busca": {
        "learning_rate": hp.loguniform("learning_rate", np.log(0.01), np.log(0.3)),
        "max_depth": hp.choice("max_depth", [4, 6, 8]),
        "num_leaves": hp.choice("num_leaves", [63, 127]),
        "colsample_bytree": hp.quniform("colsample_bytree", .5, .9, 0.1),
        "subsample": hp.quniform("subsample", .5, .9, 0.1),
        "min_child_weight": hp.choice('min_child_weight', [10, 25, 100]),
    }}
}

metricas = {
    "auc": metrics.roc_auc_score,
}


class OtimizadorBayesiano(object):
    """Classe que automatiza a otimização bayesiana de hiperparâmetros de modelos de machine learning

    Parâmetros
    ----------
    classificador : str
        Nome do classificador a ser utilizado. Suportados: adaboost, random_forest, light_gbm, xgboost
    n_estimadores : int
        Número de estimadores a serem utilizados.
    metrica : str
        Nome da métrica a ser utilizada.
    minimizar : bool
        Se True, otimização será minimizada.
    espaco_busca : dict
        Dicionário com os hiperparâmetros a serem otimizados.
    max_iteracoes : int
        Número máximo de iterações para otimização.
    max_avaliacoes : int
        Número máximo de avaliações para otimização.
    random_state : int
        Número da semente para o random_state.

    """

    def __init__(self, classificador: str, n_estimadores: int, metrica: str = "auc", minimizar: bool = False, espaco_busca: dict = None, max_iteracoes: int = 5, max_avaliacoes: int = 100, random_state: int = 1109):
        self.__classificador = classificador
        self.__n_estimadores = n_estimadores
        self.__espaco_busca = espaco_busca if espaco_busca else classificadores[
            classificador]["espaco_busca"]
        self.__max_iteracoes = max_iteracoes
        self.__max_avaliacoes = max_avaliacoes
        self.__np_random_state = np.random.default_rng(random_state)
        self.__random_state = random_state
        self.__metrica = metrica
        self.__minimizar = minimizar
        self.__modelo = None

    @property
    def classificador(self):
        return self.__classificador

    @classificador.setter
    def classificador(self, value):
        self.__classificador = value

    @property
    def n_estimadores(self):
        return self.__n_estimadores

    @property
    def espaco_busca(self):
        return self.__espaco_busca

    @espaco_busca.setter
    def espaco_busca(self, value):
        self.__espaco_busca = value

    @property
    def max_iteracoes(self):
        return self.__max_iteracoes

    @property
    def max_avaliacoes(self):
        return self.__max_avaliacoes

    @property
    def random_state(self):
        return self.__random_state
    
    @property
    def np_random_state(self):
        return self.__np_random_state
      
    @property
    def metrica(self):
        return self.__metrica

    @property
    def minimizar(self):
        return self.__minimizar

    @property
    def modelo(self):
        return self.__modelo

    @modelo.setter
    def modelo(self, value):
        self.__modelo = value

    def otimiza_hiperparams(self, X, y, tam_teste=.3, n_divisoes=3, n_repeticoes=2):
        """Função que recebe um conjunto de dados de treino (X), os respectivos rótulos (y) e um tamanho para o conjunto de teste e otimiza os hiperparâmetros de um classificador"""
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=tam_teste, shuffle=True, random_state=self.random_state)
        
        v_cruzada = RepeatedStratifiedKFold(n_splits=n_divisoes, n_repeats=n_repeticoes, random_state=self.random_state)
        
        def objective(hiperparams):
            """Função objetivo que recebe um dicionário de hiperparâmetros e retorna o valor da métrica de avaliação (loss) para o classificador avaliado"""
            classificador = classificadores[self.classificador]

            params = self.seta_params(classificador)

            modelo = classificador["obj"]
            modelo.set_params(**params)
            modelo.set_params(**hiperparams)

            if classificador["gradiente"]:
                modelo.fit(X=X_train, y=y_train,
                           eval_set=[(X_test, y_test)],
                           eval_metric=self.metrica,
                           early_stopping_rounds=self.max_iteracoes,
                           verbose=True)

                print(classificador["obj"])
                score = self.lgbm_score(modelo) if self.classificador == "light_gbm" else (modelo.evals_result()[
                    'validation_0'][self.metrica][modelo.best_iteration] * (1 if self.minimizar else -1))
            else:
                modelo.fit(X=X_train, y=y_train)
                y_pred_proba = modelo.predict_proba(X_train)[:, 1]

                func_score = {metricas[self.metrica]}
                score = cross_val_score(modelo, X_train, y_train, cv=v_cruzada, scoring="roc_auc", n_jobs=-1).mean()  * (1 if self.minimizar else -1)

            return {'loss': score, 'status': STATUS_OK, 'model': modelo}

        tentativas = SparkTrials(parallelism=32)

        with mlflow.start_run():
            melhores_params = fmin(fn=objective, space=self.espaco_busca, trials=tentativas,
                                   algo=tpe.suggest, max_evals=self.max_avaliacoes, verbose=1,
                                   rstate=self.np_random_state, return_argmin=False)

            classificador = classificadores[self.classificador]
            modelo_final = classificador["obj"]
            params = self.seta_params(classificador)

            modelo_final.set_params(**params)
            modelo_final.set_params(**melhores_params)
            self.modelo = modelo_final

            self.fit(X_train, y_train, X_test, y_test)

            return melhores_params, tentativas

    def fit(self, X_train, y_train, X_test, y_test):
        """Função que treina um classificador com os dados de treino (X_train, y_train) e avalia o classificador com os dados de teste (X_test, y_test)"""
        classificador = classificadores[self.classificador]

        if classificador["gradiente"]:
            self.modelo.fit(X=X_train, y=y_train, eval_set=[
                            (X_test, y_test)], eval_metric=self.metrica)
        else:
            self.modelo.fit(X=X_train, y=y_train)

        return self.modelo

    def lgbm_score(self, modelo):
        """Função que retorna o valor da métrica de avaliação (loss) para o classificador LGBM"""
        return modelo.best_score_["valid_0"][self.metrica] * (1 if self.minimizar else -1)

    def seta_params(self, classificador):
        """Função que seta um dicionário de hiperparâmetros para o classificador a ser treinado"""
        params = {
            "n_estimators": self.n_estimadores,
        }

        extra_params = {
            "n_jobs": classificador.get("n_jobs"),
            "base_estimator": classificador.get("base_estimator")
        }

        for key, value in extra_params.items():
            if value:
                params.update({key: value})

        return params

## Visualizador

In [0]:
class Visualizador(object):
    """Classe auxiliar para a visualização de dados durante a modelagem

    Parâmetros:
    base: DataFrame do pandas
        Base de dados a ser visualizada
    base_cat: DataFrame do pandas
        Base de dados categorizada a ser visualizada (opcional e específica para alguns casos de uso)
     """

    def __init__(self, base: pd.DataFrame, base_cat: pd.DataFrame = None):
        self.__base = base
        self.__base_cat = base_cat
        self.__tab_KS_pivot = None
        self.__tab_decis = None
        self.__prefixos = set()
        

    @property
    def base(self):
      return self.__base

    @property
    def base_cat(self):
      return self.__base_cat

    @property
    def tab_KS_pivot(self):
      return self.__tab_KS_pivot

    @tab_KS_pivot.setter
    def tab_KS_pivot(self, value):
      self.__tab_KS_pivot = value
      
    @property
    def tab_decis(self):
      return self.__tab_decis

    @tab_KS_pivot.setter
    def tab_decis(self, value):
      self.__tab_decis = value

    @property
    def prefixos(self):
       return self.__prefixos

    def __adiciona_prefixo__(self, prefixos: str or list):
        """ Função que adiciona prefixos ao conjunto de prefixos"""
        self.__prefixos.add(prefixos) if type(
            prefixos) == str else self.__prefixos.update(prefixos)
    
    
    def __cria_quantis__(self, base: pd.DataFrame, modelo: str, n_quantis: int, nome_var: str, col_safra: str = None, safras: list = []):
        """Função que cria 'n_quantis' numa base a partir de um determinado modelo e uma variável target (nome_var) """
        base = cria_fxs(base, base, modelo, n_quantis, nome_var)
        base = base[base[col_safra].isin(safras)] if col_safra and len(safras) > 0 else base
        
        globals()[f"tabfxs{modelo}"] = pd.crosstab(base[nome_var],columns=modelo,values=base[target],aggfunc=np.mean)
        
        return globals()[f"tabfxs{modelo}"]
      
    def tabela_qs(self, modelos: list or str, safras: list = [], col_safra: str = "", target: str ="inad30"):
      """ Função que recebe uma lista com safras específicas, o nome da coluna que representa as safras no df e retorna uma tabela com os modelos e suas respectivas ordenações de inad30 por decil"""
      tabs = []
      
      self.__adiciona_prefixo__(modelos)
      base_temp = self.base.copy()
      
      if isinstance(modelos, str):
        col_qs = self.__cria_quantis__(base_temp, modelos, 10, f"qs_{modelos}")
        
        return col_qs * 100
      
      for modelo in modelos:
        nome_var_qs =  f"qs_{modelo}"
        
        col_qs = self.__cria_quantis__(base_temp, modelo, 10, nome_var_qs)
        tabs.append(col_qs)

      tab_qs = tabs[0].join(tabs[1:], rsuffix='r') * 100 
      self.__tab_decis = tab_qs
      
      return tab_qs
    
    def __popula_conjunto__(self, opcoes: list):
      conj_modelos = {}
      
      for opcao in opcoes:
        if opcao == "aberto":
          conj_modelos[opcao] = [modelo for modelo in modelos if opcao in modelo and "continuo" not in modelo]
        else :
          conj_modelos[opcao] = [modelo for modelo in modelos if opcao in modelo]
      return conj_modelos
          
      
    def ks_por_safra(self, modelos: list, safras: list, agrupar: bool = False, kind="line", title="KS do modelo por safra", ylabel="KS", figsize=(8,6), grid: bool = False, tab_KS_pivot=None):
      """ Função que recebe uma lista com safras específicas e retorna uma lista com três gráficos (restrito, aberto , aberto_continuo) com o KS de cada modelo por safra"""
      safras.sort()
      xlim_min = safras[0]
      
      if not agrupar:
        grafico = self.tab_KS_pivot[modelos].plot(kind=kind, title=f"{title}", figsize=figsize, marker='.', markersize=16)
        
        plt.ylabel("KS")
        plt.legend([*conjunto, col_target])
        plt.xlabel('Safra')
        plt.grid(grid)
        return grafico
      
      opcoes = ["restrito", "aberto", "aberto_continuo"]

      conj_modelos = self.__popula_conjunto__(opcoes)
      
      for conjunto in conj_modelos:
        if tab_KS_pivot:
          globals()[f"ax_{opcao}"] = tab_KS_pivot[conjunto].plot(kind=kind, title=f"{title} ({opcao.capitalize()})", figsize=figsize, marker='.', markersize=16)
        else:
          globals()[f"ax_{opcao}"] = self.tab_KS_pivot[conjunto].plot(kind=kind, title=f"{title} ({opcao.capitalize()})", figsize=figsize, marker='.', markersize=14)
        plt.ylabel("KS")
        plt.legend(conjunto)
        
    
    def tabela_ks_por_safra(self, target, modelos: list, safras: list):
        """ Função que recebe uma lista com nomes de modelos, coluna target e safras específicas e retorna uma tabela pivot com os modelos e seus respectivos KS's por safra"""
        lsafra=[]
        lscore=[]
        lKS=[]
        lN=[]
        ltx=[]

        for modelo in [*modelos]:
            for safra in safras:
                temp_calc=calc_perf(self.base[self.base.Safra_main==safra], modelo, target)
                lsafra=lsafra+[safra]
                lscore=lscore+[modelo]
                lKS=lKS+[np.round(temp_calc[0],4)]
                lN=lN+temp_calc[1]
                ltx=ltx+temp_calc[2]

        tab_KS = pd.DataFrame(
            {
            'modelo':lscore,
            'Safra':lsafra,    
            'KS':lKS
            }
        )
        tab_KS = tab_KS.drop_duplicates()

        tab_KS_pivot=tab_KS.pivot(index='Safra',columns='modelo',values='KS')
        self.__tab_KS_pivot = tab_KS_pivot

        return tab_KS_pivot
    
    
    def __cria_grafico__(self, opcao, modelos: list, kind, title, figsize, tab_decis: pd.DataFrame = None):
      subtitulo = "" 
      if isinstance(opcao, str):
        subtitulo = opcao.capitalize()
        
      if tab_decis is not None:
          globals()[f"ax_{opcao}"] = self.tab_decis[modelos].plot(kind=kind, title=f"{title} ({subtitulo})", figsize=figsize, marker='.', markersize=14)
      else:
          globals()[f"ax_{opcao}"] = tab_decis[modelos].plot(kind=kind, title=f"{title} ({subtitulo})", figsize=figsize, marker='.', markersize=16)
          
          
    def ordenacao_inad_decil(self, modelos: list, tab_decis: pd.DataFrame = None, agrupar: bool = False, kind="line", title="Ordenação inad por decil", figsize=(8,6), grid: bool = False):
      
      conj_modelos = {}
  
      if agrupar:
          opcoes = ["restrito", "aberto", "aberto_continuo"]
          conj_modelos = self.__popula_conjunto__(opcoes)
      else:
          self.__cria_grafico__(0, modelos, tab_decis=tab_decis, kind=kind,title=title, figsize=figsize)
        
        
      for indice, conjunto in enumerate(conj_modelos):
        if tab_decis is not None:
          globals()[f"ax_{indice}"] = self.tab_decis[conjunto].plot(kind=kind, title=f"{title} ({opcao.capitalize()})", figsize=figsize, marker='.', markersize=14)
        else:
          globals()[f"ax_{indice}"] = tab_decis[conjunto].plot(kind=kind, title=f"{title} ({opcao.capitalize()})", figsize=figsize, marker='.', markersize=16)
          
      plt.ylabel("%inad30")
      plt.legend(title="modelo")
      plt.xlabel('Decil')
      plt.grid(grid)