# Pacotes

In [0]:
# %pip install scikit-learn==1.4.1.post1
# !pip install --upgrade optbinning


In [0]:
#Pacotes de instalação

!pip install --upgrade optbinning
!pip install --upgrade tqdm
!pip install mlflow==2.11.2
!pip install shap
!pip install optuna
!pip install optuna-integration


[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Looking in indexes: https://svc_nexus_data_pipeline%40picpay.com:****@nexus-prod.limbo.work/repository/picpay-pypi-hosted/simple, https://pypi.org/simple/
Collecting optbinning
  Downloading optbinning-0.21.0-py3-none-any.whl (214 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 214.8/214.8 kB 7.0 MB/s eta 0:00:00
Collecting scikit-learn>=1.6.0
  Downloading scikit_learn-1.7.2-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (9.7 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 9.7/9.7 MB 51.2 MB/s eta 0:00:00
Collecting ortools<9.12,>=9.4
  Downloading ortools-9.11.4210-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (28.1 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 28.1/28.1 MB 64.7 MB/s eta 0:00:00
Collecting ropwr>=1.0.0
  Downloading ropwr-1.1.0-py3-none-any.whl (17 kB)
Collecting pandas
  Downloading pandas-2.3.3-cp310-cp310-manylinux_2_24_x

In [0]:
dbutils.library.restartPython()

In [0]:
from pyspark.sql import functions as f
from tqdm import tqdm
import math
from pyspark.sql.types import IntegerType, DoubleType, FloatType, StringType, StructField, StructType, BooleanType, NumericType
import numpy as np
from scipy.stats import ks_2samp, chi2_contingency
from pyspark.sql.window import Window
from pyspark.sql.functions import DataFrame
import pandas as pd
import lightgbm as lgb
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
from sklearn.linear_model import LogisticRegression
from pyspark.ml.feature import Bucketizer
from datetime import datetime
# from pyspark.sql.functions import col, sum as spark_sum
from optbinning import BinningProcess
import re
import mlflow
import mlflow.sklearn
from mlflow.models import ModelSignature, infer_signature
import shap
import optuna
import plotly.express as px
import plotly.graph_objects as go
import json

pd.set_option('display.max_rows', 600)
pd.set_option('display.max_columns', 600)




#Funções

##Performance

In [0]:
# Calculando o Gini com inversão do score - função auxiliar
def calculate_auc(group, score, target):
        # Invertendo o score
        # Calculando o AUC
        auc = roc_auc_score(group[target], group[score])
        # Corrigindo o AUC se for menor que 0.5
        if auc < 0.5:
            auc = 1 - auc
        return auc

In [0]:
def get_ks2_metric(data_spark, score, target, segment):

    """
    Calcula a métrica KS2 (Kolmogorov-Smirnov) para um conjunto de dados Spark segmentado.

    Args:
        data_spark (DataFrame): DataFrame Spark contendo os dados de entrada.
        score (str): Nome da coluna que contém as pontuações a serem avaliadas.
        target (str): Nome da coluna que contém o target (deve ser binário, com valores 0 e 1).
        segment (list): Lista de colunas que serão usadas para segmentar os dados.

    Returns:
        DataFrame: DataFrame Spark contendo a métrica KS2 calculada para cada segmento.

    Raises:
        ValueError: Se qualquer coluna especificada em `score`, `target` ou `segment` não estiver presente no DataFrame de entrada (`data_spark`).
        ValueError: Se a coluna target (`target`) contiver valores diferentes de 0 e 1.

    Example:
        >>> df_ks2 = get_ks2_metric(data_spark=df_spark, score='score_column', target='target_column', segment=['segment_column'])
    """
    
    #validando se a lista esta presente na tabela de input pu no objeto do modelo
    if score not in data_spark.columns:
        raise ValueError(f"A feature {score} definido no parâmetro score não esta presente na tabela de input, parâmetro data_spark.")

    if target not in data_spark.columns:
        raise ValueError(f"A feature {target} definido no parâmetro target não esta presente na tabela de input, parâmetro data_spark.")

    target_value  = data_spark.select(target).distinct().rdd.flatMap(lambda row: row).collect()
    result_bool = all(value in [1, 0] for value in target_value)

    if result_bool == False:
        raise ValueError(f"A variavel {target} especificada no parâmetro target possui os seguintes valores {target_value}, logo o processo binning foi desenvolvido para problemas de classificação assumindo somente valores (0,1).")

    #validando se a lista esta presente na tabela de input pu no objeto do modelo
    list_not_in = [column for column in segment if column not in data_spark.columns]

    if list_not_in != []:
        raise ValueError(f"A lista de features {list_not_in} do parâmetro segment não esta presente na tabela de input, parâmetro data_spark.")

    data_pandas = data_spark.toPandas()
    result = (data_pandas[data_pandas[score] >= 0]
              .groupby(segment)
              .apply(lambda group: ks_2samp(group[score][group[target] == 1], group[score][group[target] == 0])[0])
              .round(4)
              .reset_index(name=f'ks2_{score.lower()}')
              .sort_values(segment))
    return(spark.createDataFrame(result))

In [0]:

def get_auc_metric(data_spark, score, target, segment):
    
    """
    Calcula a métrica AUC (Area Under the Curve) para um conjunto de dados Spark segmentado.

    Args:
        data_spark (DataFrame): DataFrame Spark contendo os dados de entrada.
        score (str): Nome da coluna que contém as pontuações a serem avaliadas.
        target (str): Nome da coluna que contém o target (deve ser binário, com valores 0 e 1).
        segment (list): Lista de colunas que serão usadas para segmentar os dados.

    Returns:
        DataFrame: DataFrame Spark contendo a métrica AUC calculada para cada segmento.

    Raises:
        ValueError: Se qualquer coluna especificada em `score`, `target` ou `segment` não estiver presente no DataFrame de entrada (`data_spark`).
        ValueError: Se a coluna target (`target`) contiver valores diferentes de 0 e 1.

    Example:
        >>> df_auc = get_auc_metric(data_spark=df_spark, score='score_column', target='target_column', segment=['segment_column'])
    """

    #validando se a lista esta presente na tabela de input pu no objeto do modelo
    if score not in data_spark.columns:
        raise ValueError(f"A feature {score} não esta presente na tabela de input, parâmetro data_spark.")

    if target not in data_spark.columns:
        raise ValueError(f"A feature {target} não esta presente na tabela de input, parâmetro data_spark.")

    #validando se o target é binario

    target_value  = data_spark.select(target).distinct().rdd.flatMap(lambda row: row).collect()
    result_bool = all(value in [1, 0] for value in target_value)

    if result_bool == False:
        raise ValueError(f"A variavel {target} especificada no parâmetro target possui os seguintes valores {target_value}, logo o processo binning foi desenvolvido para problemas de classificação assumindo somente valores (0,1).")

    data_pandas = data_spark.toPandas()
    data_pandas["score_invert"] = 1000-data_pandas[score]

    result = (
        data_pandas[data_pandas[score] >= 0]
        .groupby(segment)
        .apply(lambda group: calculate_auc(group = group, target = target, score = 'score_invert'))
        .round(4)
        .reset_index(name=f'auc_{score.lower()}')
        .sort_values(segment)
    )
    return spark.createDataFrame(result)

In [0]:
def get_gini_metric(data_spark, score, target, segment):

    """
    Calcula a métrica Gini para um conjunto de dados Spark segmentado.

    Args:
        data_spark (DataFrame): DataFrame Spark contendo os dados de entrada.
        score (str): Nome da coluna que contém as pontuações a serem avaliadas.
        target (str): Nome da coluna que contém o target (deve ser binário, com valores 0 e 1).
        segment (list): Lista de colunas que serão usadas para segmentar os dados.

    Returns:
        DataFrame: DataFrame Spark contendo a métrica Gini calculada para cada segmento.

    Raises:
        ValueError: Se qualquer coluna especificada em `score`, `target` ou `segment` não estiver presente no DataFrame de entrada (`data_spark`).
        ValueError: Se a coluna target (`target`) contiver valores diferentes de 0 e 1.

    Example:
        >>> df_gini = get_gini_metric(data_spark=df_spark, score='score_column', target='target_column', segment=['segment_column'])
    """

    #validando se a lista esta presente na tabela de input pu no objeto do modelo
    if score not in data_spark.columns:
        raise ValueError(f"A feature {score} definido no parâmetro score não esta presente na tabela de input, parâmetro data_spark.")

    if target not in data_spark.columns:
        raise ValueError(f"A feature {target} definido no parâmetro target não esta presente na tabela de input, parâmetro data_spark.")

    target_value  = data_spark.select(target).distinct().rdd.flatMap(lambda row: row).collect()
    result_bool = all(value in [1, 0] for value in target_value)

    if result_bool == False:
        raise ValueError(f"A variavel {target} especificada no parâmetro target possui os seguintes valores {target_value}, logo o processo binning foi desenvolvido para problemas de classificação assumindo somente valores (0,1).")

    #validando se a lista esta presente na tabela de input pu no objeto do modelo
    list_not_in = [column for column in segment if column not in data_spark.columns]

    if list_not_in != []:
        raise ValueError(f"A lista de features {list_not_in} do parâmetro segment não esta presente na tabela de input, parâmetro data_spark.")

    data_pandas = data_spark.toPandas()
    data_pandas["score_invert"] = 1000-data_pandas[score]

    result = (
        data_pandas[data_pandas[score] >= 0]
        .groupby(segment)
        .apply(lambda group: 2 * calculate_auc(group = group, score = "score_invert", target = target) - 1)
        .round(4)
        .reset_index(name=f'gini_{score.lower()}')
        .sort_values(segment)
    )
    return spark.createDataFrame(result)

In [0]:
def get_metric_perfomance_multiple(data_spark, list_score, target, list_segment, metric = "ks2"):

    """
    Calcula as principais métricas de performance (KS2, Gini, AUC) para um conjunto de dados Spark segmentado.

    Args:
        data_spark (DataFrame): DataFrame Spark contendo os dados de entrada.
        list_score (list): Lista de colunas que contêm as pontuações a serem avaliadas.
        target (str): Nome da coluna que contém o target (deve ser binário, com valores 0 e 1).
        list_segment (list): Lista de colunas que serão usadas para segmentar os dados.
        metric (str, opcional): Métrica a ser calculada. As opções possíveis são 'ks2', 'gini', 'auc'. O valor default é
        'ks2'.

    Returns:
        DataFrame: DataFrame Spark contendo as métricas calculadas para cada segmento.

    Raises:
        ValueError: Se a métrica especificada em `metric` não for uma das opções válidas.
        ValueError: Se qualquer coluna especificada em `score`, `target` ou `segment` não estiver presente no DataFrame de entrada (`data_spark`).
        ValueError: Se a coluna target (`target`) contiver valores diferentes de 0 e 1.

    Example:
        >>> df_metrics = get_metric_perfomance_multiple(data_spark=df_spark, list_score=['score1', 'score2'], target='target_column', list_segment=['segment_column'], metric='ks2')
    """

    list_possible_metrics = ['ks2', 'gini', 'auc']
    metric_result_valid = [column for column in list_possible_metrics if column in metric.lower()]

    if metric_result_valid == []:
        raise ValueError(f"A Métrica {metric} especificada no parâmetro metric não esta presente nas possibilidades: {list_possible_metrics}. Escolha uma dessas informações para retorno da performance.")

    metric = metric.lower()

    if metric == "ks2":
        score_ini = list_score[0]
        print(f'Calculando KS2 da Feature {score_ini}')
        result_ini = get_ks2_metric(data_spark = data_spark, score = score_ini, target = target, segment = list_segment)
        if len(list_score) > 1:
            for i in list_score[1:]:
                print(f'Calculando KS2 da Feature {i}')
                result = get_ks2_metric(data_spark = data_spark, score = i, target = target, segment = list_segment)
                
                result_ini = result_ini.join(result, on = list_segment, how = 'outer')

    if metric == "auc":
        score_ini = list_score[0]
        print(f'Calculando AUC da Feature {score_ini}')
        result_ini = get_auc_metric(data_spark = data_spark, score = score_ini, target = target, segment = list_segment)
        
        if len(list_score) > 1:
            for i in list_score[1:]:
                print(f'Calculando AUC da Feature {i}')
                result = get_auc_metric(data_spark = data_spark, score = i, target = target, segment = list_segment)
                
                result_ini = result_ini.join(result, on = list_segment, how = 'outer')

    if metric == "gini":
        score_ini = list_score[0]
        print(f'Calculando GINI da Feature {score_ini}')
        result_ini = get_gini_metric(data_spark = data_spark, score = score_ini, target = target, segment = list_segment)
        if len(list_score) > 1:
            for i in list_score[1:]:
                print(f'Calculando GINI da Feature {i}')
                result = get_gini_metric(data_spark = data_spark, score = i, target = target, segment = list_segment)
                result_ini = result_ini.join(result, on = list_segment, how = 'outer')
    return(result_ini)

In [0]:
# def get_psi_metric(data_spark_ref, data_spark_test, feature, reference_test):
    
#     """
#     Calcula a métrica PSI (Population Stability Index) para um conjunto de dados Spark de referência e teste.

#     Args:
#         data_spark_ref (DataFrame): DataFrame Spark contendo os dados de referência.
#         data_spark_test (DataFrame): DataFrame Spark contendo os dados de teste.
#         feature (str): Nome da coluna que contém a feature a ser avaliada.
#         reference_test (str): Nome da coluna que contém a referência para os dados de teste.

#     Returns:
#         DataFrame: DataFrame Spark contendo a métrica PSI calculada para cada grupo de referência.

#     Raises:
#         ValueError: Se qualquer coluna especificada em `feature` ou `reference_test` não estiver presente no DataFrame de entrada (`data_spark_ref` ou `data_spark_test`).

#     Example:
#         >>> df_psi = get_psi_metric(data_spark_ref=df_ref, data_spark_test=df_test, feature='feature_column', reference_test='reference_column')
#     """


#     #validando se a lista esta presente na tabela de input pu no objeto do modelo
#     if feature not in data_spark_ref.columns:
#         raise ValueError(f"A feature {feature} definido no parâmetro feature não esta presente na tabela de input, parâmetro data_spark_ref.")

#     #validando se a lista esta presente na tabela de input pu no objeto do modelo
#     if feature not in data_spark_test.columns:
#         raise ValueError(f"A feature {feature} para o cálculo não esta presente na tabela de input, parâmetro data_spark_test.")

#     #validando se a lista esta presente na tabela de input pu no objeto do modelo
#     if reference_test not in data_spark_test.columns:
#         raise ValueError(f"A feature {reference_test} para o cálculo não esta presente na tabela de input, parâmetro data_spark_test.")

#     #Calculando Volumetria da referencia
#     df_table_ref = data_spark_ref
#     size_df_table_ref = df_table_ref.groupBy(f.col(feature)).agg(f.count("*").alias("Volume_Ref"))
#     total_volume = size_df_table_ref.select(f.sum(f.col("Volume_Ref"))).collect()[0][0]
#     size_df_table_ref = size_df_table_ref.withColumn("prop_ref", f.col("Volume_Ref")/total_volume)

#     #Calculando a Volumetria do teste
#     df_table_test = data_spark_test
#     size_df_table_test = df_table_test.groupBy(feature, reference_test).agg(f.count("*").alias("Volume_Test"))
#     total_volume_teste = size_df_table_test.groupBy(reference_test).agg(f.sum(f.col("Volume_Test")).alias("Volume_Test_Grupo"))
#     size_df_table_test = size_df_table_test.join(total_volume_teste, how = "left", on = [reference_test])
#     size_df_table_test = size_df_table_test.withColumn("prop_test", f.col("Volume_Test")/f.col("Volume_Test_Grupo"))

#     #Calculando PSI
#     Tabela = size_df_table_test.join(size_df_table_ref, how = "left", on = feature)
#     Tabela = Tabela.withColumn("PSI", (f.col("prop_ref") - f.col("prop_test"))*f.log(f.col("prop_ref")/f.col("prop_test")))
#     PSI_Geral = Tabela.groupBy(reference_test).agg(f.round(f.sum(f.col("PSI")), 4).alias(f"PSI_{feature}"))

#     return(PSI_Geral)
    

In [0]:
# def get_psi_metric_multiple_features(data_spark_ref, data_spark_test, list_features, reference_test):
    
#     """
#     Calcula a métrica PSI (Population Stability Index) para múltiplas features em conjuntos de dados Spark de referência e teste.

#     Args:
#         data_spark_ref (DataFrame): DataFrame Spark contendo os dados de referência.
#         data_spark_test (DataFrame): DataFrame Spark contendo os dados de teste.
#         list_features (list): Lista de colunas que contêm as features a serem avaliadas.
#         reference_test (str): Nome da coluna que contém a referência para os dados de teste.

#     Returns:
#         DataFrame: DataFrame Spark contendo a métrica PSI calculada para cada feature em cada grupo de referência.

#     Raises:
#         ValueError: Se qualquer coluna especificada em `feature` ou `reference_test` não estiver presente no DataFrame
#         de entrada (`data_spark_ref` ou `data_spark_test`).

#     Example:
#         >>> df_psi = get_psi_metric_multiple_features(data_spark_ref=df_ref, data_spark_test=df_test, list_features=['feature1', 'feature2'], reference_test='reference_column')
#     """

#     feature_ini = list_features[0]
#     result_ini = get_psi_metric(data_spark_ref = data_spark_ref, data_spark_test = data_spark_test, feature = feature_ini, reference_test = reference_test)
    
#     if len(list_features) > 1:
#         for i in list_features[1:]:
#             result = get_psi_metric(data_spark_ref = data_spark_ref, data_spark_test = data_spark_test, feature = i, reference_test = reference_test)
#             result_ini = result_ini.join(result, on = reference_test, how = 'outer')
#     return(result_ini.orderBy(reference_test))

In [0]:
def get_psi_metric_multiple_features(data_spark_ref, data_spark_test, list_features, reference_test):
    """
    Calcula a métrica PSI (Population Stability Index) para múltiplas features em conjuntos de dados Spark de referência e teste.

    Args:
        data_spark_ref (DataFrame): Dados de referência.
        data_spark_test (DataFrame): Dados de teste.
        list_features (list): Lista de features para calcular PSI.
        reference_test (str): Nome da coluna com o grupo de teste.
        
    Returns:
        DataFrame: Tabela com features e valores de PSI para cada grupo.
    """
    
    # Verificação de colunas ausentes
    missing_cols = [feat for feat in list_features if feat not in data_spark_ref.columns or feat not in data_spark_test.columns]
    if missing_cols:
        raise ValueError(f"As seguintes colunas estão ausentes em um dos datasets: {missing_cols}")
    if reference_test not in data_spark_test.columns:
        raise ValueError(f"A coluna '{reference_test}' está ausente em data_spark_test.")

    # Criando expressão stack
    stack_args = ", ".join([f"'{col}', `{col}`" for col in list_features])
    stack_expr = f"stack({len(list_features)}, {stack_args}) as (feature, value)"

    # ---- Referência ----
    stacked_ref = data_spark_ref.selectExpr(stack_expr)
    stacked_ref = stacked_ref.groupBy("feature", "value").agg(f.count("*").alias("Volume_Ref"))
    total_ref = stacked_ref.groupBy("feature").agg(f.sum("Volume_Ref").alias("Total_Ref"))
    stacked_ref = stacked_ref.join(total_ref, on="feature", how="left")
    stacked_ref = stacked_ref.withColumn("prop_ref", f.col("Volume_Ref") / f.col("Total_Ref"))

    # ---- Teste ----
    stacked_test = data_spark_test.selectExpr(reference_test, *[f"`{col}`" for col in list_features])
    stacked_test = stacked_test.selectExpr(reference_test, stack_expr)
    stacked_test = stacked_test.groupBy("feature", "value", reference_test).agg(f.count("*").alias("Volume_Test"))
    total_test = stacked_test.groupBy("feature", reference_test).agg(f.sum("Volume_Test").alias("Total_Test"))
    stacked_test = stacked_test.join(total_test, on=["feature", reference_test], how="left")
    stacked_test = stacked_test.withColumn("prop_test", f.col("Volume_Test") / f.col("Total_Test"))

    # ---- PSI ----
    psi_table = stacked_test.join(stacked_ref, on=["feature", "value"], how="left")
    psi_table = psi_table.withColumn(
        "PSI", 
        (f.col("prop_ref") - f.col("prop_test")) * f.log(f.col("prop_ref") / f.col("prop_test"))
    )

    # ---- Resultado final: PSI por feature e por grupo ----
    psi_final = (
        psi_table.groupBy("feature")
        .pivot(reference_test)
        .agg(f.round(f.sum("PSI"), 4))
        .orderBy("feature")
    )

    return psi_final


In [0]:
def get_iv_metric_multiple_features(data_spark, list_features, target, segment):
    """
    Calcula o Information Value (IV) para múltiplas features, pivotando os segmentos para colunas.

    Args:
        data_spark (DataFrame): Dados contendo as features categorizadas, target e segmento.
        list_features (list): Lista de features categóricas ou binned para calcular IV.
        target (str): Nome da coluna target (esperado binário: 0 e 1).
        segment (str): Nome da coluna de segmentação para gerar colunas no output.

    Returns:
        DataFrame: Tabela com IV calculado para cada feature e cada segmento nas colunas.

    Raises:
        ValueError: Se qualquer coluna especificada não estiver presente no DataFrame.
    """

    # ---- Validação ----
    missing_cols = [feat for feat in list_features if feat not in data_spark.columns]
    if missing_cols:
        raise ValueError(f"As seguintes colunas estão ausentes no dataframe: {missing_cols}")
    if target not in data_spark.columns:
        raise ValueError(f"A coluna target '{target}' não está presente no dataframe.")
    if segment not in data_spark.columns:
        raise ValueError(f"A coluna segment '{segment}' não está presente no dataframe.")

    # ---- Criando expressão stack ----
    stack_args = ", ".join([f"'{col}', `{col}`" for col in list_features])
    stack_expr = f"stack({len(list_features)}, {stack_args}) as (feature, value)"

    # ---- Preparação dos dados ----
    df_stacked = data_spark.selectExpr(segment, stack_expr, target)

    # ---- Contagem de Good e Bad por grupo ----
    grouped = (
        df_stacked.groupBy(segment, "feature", "value")
        .agg(
            f.sum(f.when(f.col(target) == 0, 1).otherwise(0)).alias("good"),
            f.sum(f.when(f.col(target) == 1, 1).otherwise(0)).alias("bad")
        )
    )

    # ---- Totais de Good e Bad por segmento e feature ----
    totals = grouped.groupBy(segment, "feature").agg(
        f.sum("good").alias("total_good"),
        f.sum("bad").alias("total_bad")
    )

    # ---- Calculando proporções ----
    final = grouped.join(totals, on=[segment, "feature"], how="left")
    final = final.withColumn(
        "dist_good", f.when(f.col("total_good") == 0, 0).otherwise(f.col("good") / f.col("total_good"))
    ).withColumn(
        "dist_bad", f.when(f.col("total_bad") == 0, 0).otherwise(f.col("bad") / f.col("total_bad"))
    )

    # ---- Calculando WOE ----
    final = final.withColumn(
        "WOE",
        f.when((f.col("dist_good") == 0) | (f.col("dist_bad") == 0), 0)
         .otherwise(f.log(f.col("dist_good") / f.col("dist_bad")))
    )

    # ---- Calculando IV ----
    final = final.withColumn(
        "IV_component",
        (f.col("dist_good") - f.col("dist_bad")) * f.col("WOE")
    )

    # ---- Agregando IV final por feature e segmento ----
    iv_agg = (
        final.groupBy(segment, "feature")
        .agg(f.round(f.sum("IV_component"), 6).alias("IV"))
    )

    # ---- Fazendo pivot para que os segmentos fiquem nas colunas ----
    iv_pivot = (
        iv_agg.groupBy("feature")
        .pivot(segment)
        .agg(f.first("IV"))
        .orderBy("feature")
    )

    return iv_pivot


In [0]:
def gains_table(data_spark, score, bin_score, target):

    """
    Gera uma tabela de ganhos (gains table) para um conjunto de dados Spark, utilizando uma pontuação e um binning score.

    Args:
        data_spark (DataFrame): DataFrame Spark contendo os dados de entrada.
        score (str): Nome da coluna que contém as pontuações a serem avaliadas.
        bin_score (str): Nome da coluna que contém os binning scores.
        target (str): Nome da coluna que contém o target (deve ser binário, com valores 0 e 1).

    Returns:
        DataFrame: DataFrame Spark contendo a tabela de ganhos.

    Raises:
        ValueError: Se qualquer coluna especificada em `score`, `bin_score` ou `target` não estiver presente no DataFrame de entrada (`data_spark`).
        ValueError: Se a coluna target (`target`) contiver valores diferentes de 0 e 1.

    Example:
        >>> df_gains = gains_table(data_spark=df_spark, score='score_column', bin_score='bin_score_column', target='target_column')
    """

    #validando se a lista esta presente na tabela de input pu no objeto do modelo
    if score not in data_spark.columns:
            raise ValueError(f"A feature {score} definido no parâmetro score não esta presente na tabela de input, parâmetro data_spark.")

    #validando se a lista esta presente na tabela de input pu no objeto do modelo
    if bin_score not in data_spark.columns:
        raise ValueError(f"A feature {bin_score} para o cálculo não esta presente na tabela de input, parâmetro data_spark.")

    if target not in data_spark.columns:
        raise ValueError(f"A variavel {target} especificada no parâmetro target não esta presente no tabela de input, especificado no parâmetro data_spark.")
      
       #validando se o target é binario
    target_value  = data_spark.select(target).distinct().rdd.flatMap(lambda row: row).collect()
    result_bool = all(value in [1, 0] for value in target_value)

    if result_bool == False:
            raise ValueError(f"A variavel {target} especificada no parâmetro target possui os seguintes valores {target_value}, logo o processo binning foi desenvolvido para problemas de classificação assumindo somente valores (0,1).")


    #funcao para gerar o o gains_table
    from pyspark.sql.functions import col, sum as spark_sum, round as spark_round

    df_table = data_spark

    #criando soma de bons
    df_table = df_table.withColumn("target2", 1 - f.col(target))
    #Tabela descritivas
    table_gain = (
        df_table.groupBy(bin_score)
        .agg(f.min(score).alias("minimo"),
             f.max(score).alias("maximo"),
             f.count("*").alias("volume"),
             f.sum('target2').alias("bons"),
             f.sum(target).alias("maus"),
             f.round(f.mean(target), 4).alias("badrate")
             )
        .orderBy(f.col(bin_score))
        )
    #criando informações acumuladas
    # Definindo a janela de agregação
    windowSpec = Window.orderBy(f.col(bin_score)).rowsBetween(Window.unboundedPreceding, 0)

    # Realizando a soma cumulativa
    table_gain = table_gain.withColumn("acm", spark_sum(f.col("volume")).over(windowSpec))
    table_gain = table_gain.withColumn("acm_bons", spark_sum(f.col("bons")).over(windowSpec))
    table_gain = table_gain.withColumn("acm_maus", spark_sum(f.col("maus")).over(windowSpec))


    table_gain = table_gain.withColumn("badrate_acm",  f.round(f.col("acm_maus")/f.col("acm"), 4))

    total_volume = table_gain.select(spark_sum(f.col("volume"))).collect()[0][0]
    total_bons = table_gain.select(spark_sum(f.col("bons"))).collect()[0][0]
    total_maus = table_gain.select(spark_sum(f.col("maus"))).collect()[0][0]

    table_gain = table_gain.withColumn("perc_acm_volume", f.round(f.col("acm")/total_volume, 4))
    table_gain = table_gain.withColumn("bons_acm_volume", f.round(f.col("acm_bons")/total_bons, 4))
    table_gain = table_gain.withColumn("maus_acm_volume", f.round(f.col("acm_maus")/total_maus, 4))

    table_gain = table_gain.orderBy(f.col(bin_score))

    return(table_gain)

##Feature Selection

In [0]:
# def CrammerVCorrelation(data_spark, target, nsample = 100000):

#     """
#     Calcula a correlação de Cramer's V para todas as variáveis em um DataFrame Spark, excluindo a variável target e outra para com todas as variáveis com a variável target.

#     Args:
#         data_spark (DataFrame): DataFrame Spark contendo os dados de entrada.
#         target (str): Nome da coluna que contém o target.
#         nsample (int, opcional): Tamanho máximo da amostra para o cálculo da correlação. O valor default é 100000.

#     Returns:
#         tuple: Dois DataFrames Pandas:
#             - DataFrame de correlações de Cramer's V entre todas as variáveis, excluindo a variável target.
#             - DataFrame de correlações de Cramer's V filtrado para a variável target e ordenado por valor.

#     Raises:
#         ValueError: Se o tamanho da amostra for menor ou igual a zero.
        
#     Example:
#         >>> df_corr_matrix, df_target_corr = CrammerVCorrelation(data_spark=df_spark, target='target_column')
#     """

#     #Transformando em pandas
#     data_pandas = data_spark.toPandas()
#     # Avaliando tamanho da amostra
#     sizedata = len(data_pandas)
#     # Validando o tamanho da amostra
#     if sizedata > nsample:
#         print(f"O dataframe definido no parâmetro data_spark tem um tamanho de {sizedata}. Foi selecionada uma amostra com tamanho {nsample} para calculo do V-Crammer")

#         data_pandas = data_pandas.sample(frac=nsample/sizedata, random_state=42)
    

#     def cramers_V(var1, var2) :
#         crosstab =np.array(pd.crosstab(var1, var2, rownames=None, colnames=None)) # Cross table building
#         stat = chi2_contingency(crosstab)[0] # Keeping of the test statistic of the Chi2 test
#         obs = np.sum(crosstab) # Number of observations
#         mini = min(crosstab.shape)-1 # Take the minimum value between the columns and the rows of the cross table
#         return np.sqrt(stat/(obs*mini))

#     rows= []
#     for var1 in data_pandas:
#         col = []
#         for var2 in data_pandas :
#             cramers =cramers_V(data_pandas[var1], data_pandas[var2]) # Cramer's V test
#             col.append(round(cramers,4)) # Keeping of the rounded value of the Cramer's V  
#         rows.append(col)
  
#     cramers_results = np.array(rows)
#     pd_cramers_results = pd.DataFrame(cramers_results, columns = data_pandas.columns, index =data_pandas.columns)

#     return pd_cramers_results.drop(columns = target, index= target), pd_cramers_results.filter(regex = "^" + target).drop(index=target).sort_values(target)

In [0]:
# #Retorna uma lista de variaveis correlacionadas
# def createCorrelatedFeaturesList(corrMatrix, threshold = 0.7):

#     """
#     Retorna uma lista de variáveis correlacionadas com base em uma matriz de correlação fornecida e um limite de correlação.

#     Args:
#         corrMatrix (DataFrame): Matriz de correlação das variáveis.
#         threshold (float, opcional): Limite de correlação para considerar variáveis como correlacionadas. O valor default é 0.7.

#     Returns:
#         list: Lista de variáveis correlacionadas.
#     """

#     #Obtaining the correlation matrix of the dataframe (without the target)                        
#     colCorr = []
#     #Iterating through the columns of the correlation matrix dataframe
#     for column in corrMatrix.columns:
#         #Iterating through the values (row wise) of the correlation matrix dataframe
#         for idx, row in corrMatrix.iterrows():                                            
#             if(row[column] >= threshold) and (idx != column):
#                 #Adding the features that are not already in the list of correlated features
#                 if (idx not in colCorr):
#                     colCorr.append(idx)
#                 if (column not in colCorr):
#                     colCorr.append(column)
#     print(colCorr, '\n')
#     return colCorr

# #função que deleta a feature mais recente 
# def deleteFeatures(data_pd, corrMatrix, corrWithTarget, colCorr, target):                                 
#     for idx, row in corrWithTarget.iterrows():
#         # print(idx, '\n')
#         if (idx in colCorr):
#             print(f"Coluna {idx} Deletada")
#             data_pd = data_pd.drop(idx, axis =1)
#             corrWithTarget = corrWithTarget.drop(index = idx).sort_values(target)
#             corrMatrix = corrMatrix.drop(columns = idx, index= idx)
#             break
#     return data_pd, corrMatrix, corrWithTarget, idx


In [0]:
# #Method to run automatically eliminate multicollinearity
# def autoEliminateMulticollinearity(data_spark, list_keys, list_features, target, threshold = 0.9, nsample = 100000):
    
#     """
#     Deleta a feature menos relavante com a variável resposta da lista de variáveis correlacionadas entre si no DataFrame.

#     Args:
#         data_pd (DataFrame): DataFrame contendo os dados.
#         corrMatrix (DataFrame): Matriz de correlação das variáveis.
#         corrWithTarget (DataFrame): Correlação das variáveis com o target.
#         colCorr (list): Lista de variáveis correlacionadas.
#         target (str): Nome da coluna target.

#     Returns:
#         tuple: DataFrame atualizado, matriz de correlação atualizada, correlação com o target atualizada e a variável deletada.

#     Example:
#         >>> data_spark_updated, features_removed = autoEliminateMulticollinearity(
#         >>>     data_spark=df_spark,
#         >>>     list_keys=['key1', 'key2'],
#         >>>     list_features=['feature1', 'feature2', 'feature3'],
#         >>>     target='target_column',
#         >>>     threshold=0.9,
#         >>>     nsample=100000
#         >>> )
        
#     """
#     #######Validacoes#######
#     #validando se a lista esta presente na tabela de input
#     if list_features != None:
#         list_not_in = [column for column in list_features if column not in data_spark.columns]
#         if list_not_in != []:
#             raise ValueError(f"A lista de features {list_not_in} do parâmetro list_features para categorização não esta presente na tabela de input, especificado no parâmetro data_spark.")

#     if target not in data_spark.columns:
#         raise ValueError(f"A variavel {target} especificada no parâmetro target não esta presente no tabela de input, especificado no parâmetro data_spark.")
      
#     #validando se o target é binario
#     target_value  = data_spark.select(target).distinct().rdd.flatMap(lambda row: row).collect()
#     result_bool = all(value in [1, 0] for value in target_value)

#     if result_bool == False:
#         raise ValueError(f"A variavel {target} especificada no parâmetro target possui os seguintes valores {target_value}, logo o processo binning foi desenvolvido para problemas de classificação assumindo somente valores (0,1).")

#     if threshold < 0 or threshold > 1:
#         raise ValueError("O valor do threshold tem que ser entre [0, 1].")

#     print("Fazendo Correlação de Crammer")
#     table_crammer, table_crammer_resp  = CrammerVCorrelation(data_spark = data_spark.selectExpr(*list_features, target), target = target, nsample = nsample)

#     #listando features altamente correlacionadas
#     ListColCorr = createCorrelatedFeaturesList(corrMatrix = table_crammer, threshold = threshold)


#     table_crammer_input, table_crammer_resp_input  = table_crammer, table_crammer_resp


#     print(f"Features com correlação maior que {threshold}:{ListColCorr}")

#     data_pandas = data_spark.selectExpr(*list_features, target).toPandas()
#     colCorrcand = ListColCorr

#     feature_remove = []

#     while colCorrcand != []:
#         #seleciona um dataframe que remove da lista de features correlacionadas a com menor correlação com a variavel resposta
#         data_pandas, table_crammer_input, table_crammer_resp_input, f_remove  = deleteFeatures(data_pd = data_pandas, corrMatrix = table_crammer_input, corrWithTarget = table_crammer_resp_input, colCorr = colCorrcand, target = target)

#         feature_remove.append(f_remove)

#         #Obtaining the list of correlated features
#         colCorrcand = createCorrelatedFeaturesList(table_crammer_input, threshold= threshold)
    
#     feature_columns = data_pandas.drop(columns=[target]).columns 
#     return data_spark.selectExpr(*list_keys, *feature_columns), feature_columns, feature_remove, table_crammer, table_crammer_resp_input

In [0]:
import pandas as pd
import numpy as np
from scipy.stats import chi2_contingency

def cramers_V(var1, var2):
    confusion_matrix = pd.crosstab(var1, var2)
    chi2 = chi2_contingency(confusion_matrix)[0]
    n = confusion_matrix.sum().sum()
    phi2 = chi2 / n
    r, k = confusion_matrix.shape
    phi2corr = max(0, phi2 - ((k - 1)*(r - 1)) / (n - 1))    
    rcorr = r - ((r - 1)**2) / (n - 1)
    kcorr = k - ((k - 1)**2) / (n - 1)
    return np.sqrt(phi2corr / min((kcorr - 1), (rcorr - 1)))

def calc_iv(df, feature, target, bins=10):
    """
    Calcula o Information Value para uma feature.
    A feature pode estar já em WOE ou ser categórica.
    """
    tmp = df[[feature, target]].copy()
    tmp[feature] = tmp[feature].fillna("MISSING")
    if tmp[feature].dtype.kind in "bifc":  # Numérica
        tmp['bin'] = pd.qcut(tmp[feature], q=bins, duplicates='drop')
    else:
        tmp['bin'] = tmp[feature]

    grouped = tmp.groupby('bin')[target].agg(['count', 'sum'])
    grouped['non_event'] = grouped['count'] - grouped['sum']
    grouped['event_rate'] = grouped['sum'] / grouped['sum'].sum()
    grouped['non_event_rate'] = grouped['non_event'] / grouped['non_event'].sum()
    grouped['woe'] = np.log((grouped['event_rate'] + 1e-9) / (grouped['non_event_rate'] + 1e-9))
    grouped['iv'] = (grouped['event_rate'] - grouped['non_event_rate']) * grouped['woe']
    return grouped['iv'].sum()


def buildHybridCorrelationMatrix(df_pd, list_features_num, list_features_cat, method_num='spearman'):
    all_features = list_features_num + list_features_cat
    matrix = pd.DataFrame(index=all_features, columns=all_features)

    # Correlação entre numéricas
    for i in list_features_num:
        for j in list_features_num:
            matrix.loc[i, j] = abs(df_pd[i].corr(df_pd[j], method=method_num))

    # Correlação entre categóricas
    for i in list_features_cat:
        for j in list_features_cat:
            matrix.loc[i, j] = cramers_V(df_pd[i], df_pd[j])

    # Correlação entre tipos diferentes é indefinida
    for i in list_features_cat:
        for j in list_features_num:
            matrix.loc[i, j] = np.nan
            matrix.loc[j, i] = np.nan

    return matrix.astype(float)

def createCorrelatedFeaturesList(corrMatrix, threshold=0.7):
    colCorr = []
    for column in corrMatrix.columns:
        for idx, row in corrMatrix.iterrows():
            if pd.notnull(row[column]) and (row[column] >= threshold) and (idx != column):
                if idx not in colCorr:
                    colCorr.append(idx)
                if column not in colCorr:
                    colCorr.append(column)
    return colCorr

def deleteFeatures(data_pd, corrMatrix, corrWithTarget, colCorr, target):
    for idx, row in corrWithTarget.iterrows():
        if idx in colCorr:
            data_pd = data_pd.drop(idx, axis=1)
            corrWithTarget = corrWithTarget.drop(index=idx).sort_values(target)
            corrMatrix = corrMatrix.drop(columns=idx, index=idx)
            break
    return data_pd, corrMatrix, corrWithTarget, idx

def autoEliminateMulticollinearityHybrid(data_spark, list_keys, list_features_num, list_features_cat, target, threshold=0.9, nsample=100000, method_num='spearman', target_metric ='iv'):
    # Validações
    all_features = list_features_num + list_features_cat

    if any(col not in data_spark.columns for col in all_features):
        raise ValueError("Alguma coluna em list_features não está presente na tabela de input.")

    if target not in data_spark.columns:
        raise ValueError("A variável target não está presente na tabela de input.")

    # Valida target binário
    target_value = data_spark.select(target).distinct().rdd.flatMap(lambda row: row).collect()
    if not all(value in [0, 1] for value in target_value):
        raise ValueError(f"Target deve ser binário. Valores encontrados: {target_value}")

    if threshold < 0 or threshold > 1:
        raise ValueError("Threshold deve estar entre 0 e 1.")

    # Amostragem
    df_pd = data_spark.select(*all_features, target).toPandas()
    if len(df_pd) > nsample:
        df_pd = df_pd.sample(n=nsample, random_state=42)

    # Matriz híbrida
    corrMatrix = buildHybridCorrelationMatrix(df_pd, list_features_num, list_features_cat, method_num=method_num)

    # Correlação ou IV com o target
    corr_target = {}
    for col in all_features:
        if target_metric == 'iv':
            corr_target[col] = calc_iv(df_pd, col, target)
        else:
            if col in list_features_cat:
                corr_target[col] = cramers_V(df_pd[col], df_pd[target])
            else:
                corr_target[col] = abs(df_pd[col].corr(df_pd[target], method=method_num))


    df_target_corr = pd.DataFrame.from_dict(corr_target, orient='index', columns=[target]).sort_values(by=target, ascending=True)

    df_target_corr_sup = df_target_corr
    corrMatrix_sup = corrMatrix

    colCorrcand = createCorrelatedFeaturesList(corrMatrix, threshold=threshold)
    feature_remove = []

    while colCorrcand != []:
        df_pd, corrMatrix_sup, df_target_corr_sup, f_remove = deleteFeatures(df_pd, corrMatrix_sup, df_target_corr_sup, colCorrcand, target)
        feature_remove.append(f_remove)
        colCorrcand = createCorrelatedFeaturesList(corrMatrix_sup, threshold=threshold)

    selected_features = df_pd.drop(columns=[target]).columns.tolist()
    df_final = data_spark.select(*list_keys, *selected_features)

    return df_final, selected_features, feature_remove, corrMatrix, df_target_corr


In [0]:
def fs_moda(df: DataFrame, colunas: list, threshold: float = 0.9):
    """
    Calcula a moda e sua porcentagem para várias colunas de forma eficiente.

    Parâmetros:
        df (DataFrame): DataFrame de entrada
        colunas (list): Lista de colunas categóricas ou discretas
        threshold (float): Valor de corte da porcentagem da moda

    Retorna:
        Tuple:
            - DataFrame com colunas ['variavel', 'moda', 'porcentagem']
            - Lista de variáveis abaixo do threshold
            - Lista de variáveis acima ou igual ao threshold
    """
    if not colunas:
        empty_df = df.sparkSession.createDataFrame([], schema="variavel string, moda string, porcentagem double")
        return empty_df, [], []

    # Transforma em formato long
    df_long = df.select([f.col(c).cast("string").alias(c) for c in colunas]) \
                .select(f.explode(f.array([
                    f.struct(f.lit(c).alias("variavel"), f.col(c).alias("valor"))
                    for c in colunas
                ])).alias("kv")) \
                .select("kv.variavel", "kv.valor")

    total_por_coluna = df_long.groupBy("variavel").agg(f.count("*").alias("total"))

    moda_df = (
        df_long.groupBy("variavel", "valor")
               .agg(f.count("*").alias("frequencia"))
               .join(total_por_coluna, on="variavel", how="left")
               .withColumn("porcentagem", f.round(f.col("frequencia") / f.col("total"), 4))
               .withColumn("rank", f.row_number().over(Window.partitionBy("variavel").orderBy(f.desc("frequencia"))))
               .filter(f.col("rank") == 1)
               .select(
                   f.col("variavel"),
                   f.col("valor").alias("moda"),
                   f.col("porcentagem")
               )
    )

    abaixo = (
        moda_df.filter(f.col("porcentagem") < threshold)
               .select("variavel")
               .rdd.flatMap(lambda x: x)
               .collect()
    )

    acima = (
        moda_df.filter(f.col("porcentagem") >= threshold)
               .select("variavel")
               .rdd.flatMap(lambda x: x)
               .collect()
    )

    return moda_df, abaixo, acima


In [0]:
def features_binning_process(data_spark, list_keys, target, list_features_num = None, list_features_cat = None, list_exception_code = None, dev = None, max_nsample = 100000, metric_bin = "woe", is_features_cat_default = False, n_bins = None):

       """
       Realiza o binning em features numéricas e categóricas em um DataFrame Spark para problemas de classificação binária. Olhar exemplos para entender outputs.
       
       Args:
           data_spark (DataFrame): O DataFrame Spark contendo os dados de entrada.
           list_keys (list): Uma lista de colunas a serem usadas como chaves de identificação.
           target (str): O nome da coluna target, que deve ser binária (0 e 1).
           list_features_num (list, opcional): Uma lista de colunas de features numéricas a serem categorizadas. 
           Default é None.
           list_features_cat (list, opcional): Uma lista de colunas de features categóricas a serem categorizadas.
           Default é None.
           list_exception_code (list, opcional): Uma lista de códigos de exceção a serem considerados durante o
           binning. Default é None.
           dev (str, opcional): O nome da coluna usada para filtrar o conjunto de desenvolvimento. Esta coluna deve ser
           do tipo booleano (True/False). Default é None.
           max_nsample (int, opcional): O número máximo de amostras a serem usadas no processo de binning. Default é
           100000.
           metric_bin (str, opcional): A métrica a ser usada para o binning. Valores possíveis são 'woe','event_rate',
           'indices' e 'bins'. Default é "woe".
           is_features_cat_default (bool, opcional): Uma flag indicando se as features categóricas devem ser tratadas
           com configurações Default, ou seja não realizara agrupamentos. Default é False. caso selecionado True deverá ser inserido no output mas um objeto como no exemplo 2. Caso seja True, obrigatóriamente é necessário passar uma lista em list_features_cat
       
       Returns:
           tuple: 
               - DataFrame Spark com as features categorizadas.
               - Objeto do processo de binning para features numéricas (se aplicável).
               - Objeto do processo de binning para features categóricas (se aplicável).
       
       Raises:
           ValueError: Se nenhuma lista de features for fornecida.
           ValueError: Se as features especificadas não estiverem presentes no DataFrame de entrada.
           ValueError: Se a coluna target não for binária.
           ValueError: Se as features numéricas não forem do tipo NumericType.
           ValueError: Se as features categóricas não forem do tipo StringType.
           ValueError: Se a métrica especificada não for válida.
           ValueError: Se a coluna de desenvolvimento não for booleana.
       
       Example:
           >>> data_spark_cat_bins, binning_process, binning_process_cat = features_binning_process(
           >>>     data_spark=df_spark,
           >>>     list_keys=['key1', 'key2'],
           >>>     target='target_column',
           >>>     list_features_num=['num_feature1', 'num_feature2'],
           >>>     list_features_cat=['cat_feature1', 'cat_feature2'],
           >>>     list_exception_code=[999, -1],
           >>>     dev='dev_column',
           >>>     max_nsample=100000,
           >>>     metric_bin='woe',
           >>>     is_features_cat_default=True)
       

           >>> data_spark_cat_bins, binning_process = features_binning_process(
           >>>     data_spark=df_spark,
           >>>     list_keys=['key1', 'key2'],
           >>>     target='target_column',
           >>>     list_features_num=['num_feature1', 'num_feature2'],
           >>>     list_features_cat=['cat_feature1', 'cat_feature2'],
           >>>     list_exception_code=[999, -1],
           >>>     dev='dev_column',
           >>>     max_nsample=100000,
           >>>     metric_bin='woe',
           >>>     is_features_cat_default=False)
       """

       # Validação se existe uma lista de features:
       if list_features_num == None and list_features_cat == None:
              raise ValueError("É necessário passar uma lista de features no parâmetro list_features_num ou/e list_features_cat para categorização.")

       #validando se a lista esta presente na tabela de input
       if list_features_num != None:
              list_not_in = [column for column in list_features_num if column not in data_spark.columns]
              if list_not_in != []:
                     raise ValueError(f"A lista de features {list_not_in} do parâmetro list_features_num para categorização não esta presente na tabela de input, especificado no parâmetro data_spark.")
 
       if list_features_cat != None:
              list_not_in_cat = [column for column in list_features_cat if column not in data_spark.columns]
              if list_not_in_cat != []:
                     raise ValueError(f"A lista de features {list_not_in_cat} do parâmetro list_features_cat para categorização não esta presente na tabela de input, especificado no parâmetro data_spark.")

       if target not in data_spark.columns:
             raise ValueError(f"A variavel {target} especificada no parâmetro target não esta presente no tabela de input, especificado no parâmetro data_spark.")
      

       if dev != None:
              if dev not in data_spark.columns:
                     raise ValueError(f"A variavel {dev} especificada no parâmetro dev não esta presente no tabela de input, especificado no parâmetro data_spark.")      


       if dev != None:
              type_bool_columns = [col.name for col in data_spark.selectExpr(dev).schema.fields if isinstance(col.dataType, BooleanType)]

              if type_bool_columns != [dev]:
                     raise ValueError(f"A feature {dev} definida no parâmetro dev precisa ser do tipo booleano (True e False)")
      
       #validando se o target é binario

       if dev != None:
              target_value  = data_spark.filter(f.col(dev) == True).select(target).distinct().rdd.flatMap(lambda row: row).collect()
              result_bool = all(value in [1, 0] for value in target_value)

       if dev == None:
              target_value  = data_spark.select(target).distinct().rdd.flatMap(lambda row: row).collect()
              result_bool = all(value in [1, 0] for value in target_value)


       if result_bool == False:
              raise ValueError(f"A variavel {target} especificada no parâmetro target possui os seguintes valores {target_value}, logo o processo binning foi desenvolvido para problemas de classificação assumindo somente valores (0,1).")

       # #Verificando se as colunas listadas como numericas são numericas no dataset
       if list_features_num != None:
              type_num_columns = [col.name for col in data_spark.selectExpr(*list_features_num).schema.fields  if isinstance(col.dataType, NumericType)]
              #comparando lista numerica vs. lista dataset numerica
              type_num_list_not_in = [column for column in list_features_num if column not in type_num_columns]

              if type_num_list_not_in != []:
                     raise ValueError(f"A lista de features do parâmetro list_features_num precisam ser do tipo NumericType. As features {type_num_list_not_in} estão presentes na lista mas não são do tipo  NumericType na tabela de input, especificado no parâmetro data_spark.")

       #Verificando se as colunas listadas como string são strings no dataset
       if list_features_cat != None:
              type_string_columns = [col.name for col in data_spark.selectExpr(*list_features_cat).schema.fields  if isinstance(col.dataType, StringType)]
              #comparando lista string vs. lista dataset string
              type_string_list_not_in = [column for column in list_features_cat if column not in type_string_columns]
              
              if type_string_list_not_in != []:
                     raise ValueError(f"A lista de features do parâmetro list_features_cat precisam ser do tipo StringType. As features {type_string_list_not_in} estão presentes na lista mas não são do tipo  StringType na tabela de input, especificado no parâmetro data_spark.")

       #Validando se metrica indicada é valida

       list_possible_metrics = ['woe', 'event_rate', 'indices', 'bins']
       metric_result_valid = [column for column in list_possible_metrics if column in metric_bin.lower()]


       if metric_result_valid == []:
              raise ValueError(f"A Métrica {metric_bin} especificada no parâmetro metric não esta presente nas possibilidades: {list_possible_metrics}. Escolha uma dessas informações para retorno da categorização.")

       metric_bin = metric_bin.lower()

       #Processo Binning
       #Verificando se o data set possui uma marcacao dev

       #remover duplicatas
       data_spark_temp = data_spark.dropDuplicates(list_keys)
       data_pandas, data_pandas_temp = data_spark_temp.toPandas(), data_spark_temp.toPandas()

       if data_spark_temp.count() < data_spark.count():
              print(f"O dataframe de input tem linhas duplicatas quando verificado as chaves {list_keys}, logo seu output final sera uma dataframe pyspark com {data_spark_temp.count()} linhas")

       data_pandas_temp['dev_opbinning'] = True

       dev_opbinning = 'dev_opbinning'

       if dev is None:
              print("Não foi especificada uma coluna para treinar o processo de categorização, logo será utilizado `100%` da sua base para criar o modelo binning.")
              dev = 'dev_opbinning'
       
       data_pandas_temp = data_pandas_temp[data_pandas_temp[dev] == True]

       #criando lista de flags de exceção
       if list_exception_code != None:
              list_exception = {f"special_{i+1}": value for i, value in enumerate(list_exception_code)}
       else:
              list_exception = None

       # Avaliando tamanho da amostra
       sizedata = len(data_pandas_temp)

       print('Base Filtrando Dev:',len(data_pandas_temp))

       # Validando o tamanho da amostra
       if sizedata > max_nsample:
              print(f"O dataframe definido no parâmetro data_spark tem um tamanho de {sizedata}. Foi selecionada uma amostra com tamanho {max_nsample} para o processo de binning")
              data_pandas_temp = data_pandas_temp.sample(frac=max_nsample/sizedata, random_state=42)

       print('Base Filtrando Final:', len(data_pandas_temp))


       #Criando data set treino
       # data_pandas_train_features, data_pandas_train_target = data_pandas_temp[(list_features_num or []) + (list_features_cat or [])], data_pandas_temp[target]

       data_pandas_train_features = data_pandas_temp[(list_features_num or []) + (list_features_cat or [])].copy(); data_pandas_train_target = data_pandas_temp[target].copy()

       #Criando processo binning
       if is_features_cat_default is False:
              binning_process = BinningProcess(variable_names = (list_features_num or []) + (list_features_cat or []), categorical_variables = (list_features_cat or []), special_codes = list_exception, max_n_bins = n_bins)

              binning_process.fit(data_pandas_train_features.filter(items = (list_features_num or []) + (list_features_cat or [])), data_pandas_train_target)

              pd_woe_binning_process = binning_process.transform(data_pandas.filter(items = (list_features_num or []) + (list_features_cat or [])), metric = metric_bin)

              base_join_cat_bins = (
                     data_pandas[list_keys]
                    .merge(data_pandas_temp.filter(items = list_keys + [dev_opbinning]), 
                           how = 'left', on = list_keys) 
                    .merge(pd_woe_binning_process, 
                           left_index = True, 
                           right_index = True, 
                           how = 'left'))
              

       if is_features_cat_default is True:

              if list_features_num != None:

                     binning_process = BinningProcess(variable_names = list_features_num, special_codes = list_exception, max_n_bins = n_bins)

                     binning_process.fit(data_pandas_train_features[list_features_num],    data_pandas_train_target)

                     pd_woe_binning_process = binning_process.transform(data_pandas.filter(items =      list_features_num), metric = metric_bin)

              binning_process_cat = BinningProcess(variable_names = list_features_cat, categorical_variables = list_features_cat, min_prebin_size = 0.000000001, special_codes = list_exception, max_n_bins = n_bins)

              binning_process_cat.fit(data_pandas_train_features[list_features_cat], data_pandas_train_target)
              
              pd_woe_binning_process_cat = binning_process_cat.transform(data_pandas.filter(items = list_features_cat), metric = metric_bin)


              if list_features_num != None:
                     base_join_cat_bins = (
                            data_pandas[list_keys]
                            .merge(data_pandas_temp[list_keys + [dev_opbinning]],
                                   how = 'left', on = list_keys)
                            .merge(pd_woe_binning_process, 
                                   left_index = True, 
                                   right_index = True, 
                                   how = 'left')
                            .merge(pd_woe_binning_process_cat, 
                                   left_index = True, 
                                   right_index = True, 
                                   how = 'left'))
              else:
                     base_join_cat_bins = (
                            data_pandas[list_keys]
                            .merge(data_pandas_temp[list_keys + [dev_opbinning]], 
                                  how = 'left', on = list_keys)
                            .merge(pd_woe_binning_process_cat, 
                                  left_index = True, 
                                  right_index = True, 
                                  how = 'left'))
              
                     
       data_spark_cat_bins = spark.createDataFrame(base_join_cat_bins).fillna(False, subset= [dev_opbinning])

       try:
              binning_process  # Tenta acessar o objeto
       except NameError:
              binning_process = None  # Se não existir, define como None

       try:
              binning_process_cat  # Tenta acessar o objeto
       except NameError:
              binning_process_cat = None  # Se não existir, define como None
              
       if binning_process is None:
              return data_spark_cat_bins, binning_process_cat
       if binning_process_cat is None:      
              return data_spark_cat_bins, binning_process
       else:
              return data_spark_cat_bins, binning_process, binning_process_cat

In [0]:
def apply_binning_process(data_spark, model_object, list_keys, list_features, metric_bin = "woe"):

    """
    Aplica um processo de binning (categorização) em um DataFrame Spark com base em um objeto de modelo fornecido.

    Args:
        data_spark (DataFrame): DataFrame Spark contendo os dados de entrada que serão categorizados.
        model_object (Model Object): Objeto de modelo que contém as informações de binning para as variáveis (saídas de
        features_binning_process).
        list_keys (list): Lista de chaves (colunas) que serão mantidas no DataFrame final.
        list_features (list): Lista de variáveis (colunas) que serão categorizadas.
        metric_bin (str, opcional): Métrica a ser usada para o processo de binning. As opções possíveis são 'woe',
        'event_rate', 'indices', e 'bins'. O valor default é 'woe'.

    Returns:
        DataFrame: DataFrame Spark com as variáveis categorizadas.

    Raises:
        ValueError: Se qualquer coluna especificada em `list_features` não estiver presente no DataFrame de entrada
                    (`data_spark`) ou no objeto de modelo (`model_object`).
        ValueError: Se a métrica especificada em `metric_bin` não for uma das opções válidas.

    Example:
        >>> df_spark_cat_bins = apply_binning_process(data_spark=df_spark, model_object=model, list_keys=['key1', 'key2'], list_features=['feature1', 'feature2'])
    """

    #features no arquivo objeto do modelo
    features_cat = spark.createDataFrame(model_object.summary()).select("name").distinct().rdd.flatMap(lambda row: row).collect()

    #validando se a lista esta presente na tabela de input pu no objeto do modelo
    list_not_in = [column for column in list_features if column not in data_spark.columns]
    list_not_in_model = [column for column in list_features if column not in features_cat]

    if list_not_in != []:
        raise ValueError(f"A lista de features {list_not_in} para categorização não esta presente na tabela de input, parametro data_spark.")
    
    if list_not_in_model != []:
        raise ValueError(f"A lista de features {list_not_in_model} para categorização não esta presente no artefato do modelo, parametro model_object.")

    #Validando se metrica indicada é valida

    list_possible_metrics = ['woe', 'event_rate', 'indices', 'bins']
    metric_result_valid = [column for column in list_possible_metrics if column in metric_bin.lower()]
    metric_bin = metric_bin.lower()

    if metric_result_valid == []:
        raise ValueError(f"A Métrica {metric_bin} especificada no parâmetro metric não esta presente nas possibilidades: {list_possible_metrics}. Escolha uma dessas informações para retorno da categorização.")

    #transformando data set em pandas
    data_pandas = data_spark.toPandas()

    #Calculando os woe por feature listada
    for i in list_features:
        print("Categorizando Feature:", i)
        # Obter o binned variable específico
        optb = model_object.get_binned_variable(i)

        # Aplicar a transformação de binning para a coluna específica na nova base
        data_pandas[i] = optb.transform(data_pandas[i], metric = metric_bin)

    # Data set final escorado
    data_spark_cat_bins = spark.createDataFrame(data_pandas.filter(items = list_keys + list_features))

    return(data_spark_cat_bins)

In [0]:
def fs_iv(model_object, threshold = 0.01):
    """
    Filtra e seleciona features com base no Information Value (IV) de um modelo feito pelo process binning.

    Args:
        model_object : Objeto do modelo treinado que suporta o método. Geralmente, é um modelo de binning (Saida de
        features_binning_process).

        threshold (float, optional): Limite mínimo de IV para considerar uma feature como relevante. Features com
        abaixo desse valor serão removidas. Default é 0.01.

    Returns:
        tuple[DataFrame, list[str], list[str]]:
            - DataFrame: Tabela contendo o resumo das features, número de bins e IV arredondado.
            - list[str]: Lista de features selecionadas (IV >= threshold).
            - list[str]: Lista de features removidas (IV < threshold).
    
    Raises:
        ValueError: Se o `threshold` for negativo ou inválido.
    
    Example:
        >>> table_summary, selected_features, removed_features = fs_iv(model, threshold=0.05)
        >>> print("Features selecionadas:", selected_features)
        >>> print("Features removidas:", removed_features)
    """

    #Selecinonando Tabela
    table_describe_num = spark.createDataFrame(model_object.summary())
    table_describe_num = (table_describe_num
                          .selectExpr("name as Features", "n_bins as qtd_bins", "iv as information_value")
                          .withColumn("information_value", f.round(f.col("information_value"), 4)))
    
    list_features_selection = (table_describe_num
                               .filter(f.col("information_value") >= threshold)
                               .select("Features")
                               .distinct()
                               .rdd.flatMap(lambda row: row)
                               .collect())

    not_list_features_selection = (table_describe_num
                                   .filter(f.col("information_value") < threshold)
                                   .select("Features")
                                   .distinct()
                                   .rdd.flatMap(lambda row: row)
                                   .collect())
    
    print(f'Features Removidas: {len(not_list_features_selection)}')

    return table_describe_num, list_features_selection, not_list_features_selection

In [0]:
def describe_binning(model_object, list_features):

    """
    Gera um descritivo de binning para cada uma das variáveis especificadas.

    Args:
        model_object: Objeto do modelo treinado que suporta o método binning. Deve ser um modelo que contenha as variáveis binned e suporte `get_binned_variable()`.
        list_features (list[str]): Lista de nomes das features para as quais o descritivo de binning será gerado.

    Returns:
        dict[str, DataFrame]: Uma lista em que cada posição dessa lista é uma tabela resumo de cada feature listada
        contendo as tabelas de binning associadas.

    Raises:
        ValueError: Se qualquer feature em `list_features` não estiver presente no artefato do modelo (`model_object`).

    Example:
        >>> model_object = treinado_modelo_com_binning  # Objeto com método summary e binning
        >>> list_features = ["feature_1", "feature_2"]
        >>> binning_results = describe_binning(model_object, list_features)
        >>> for feature, table in binning_results.items():
        >>>     print(f"Binning para {feature}:")
        >>>     table.show()
    
    Workflow:
        1. Identifica as variáveis disponíveis no artefato do modelo (model_object.summary()).
        2. Valida se todas as features na lista `list_features` estão presentes no modelo.
        3. Para cada feature válida:
           - Obtém o objeto de binning usando `model_object.get_binned_variable(feature)`.
           - Gera a tabela de binning com o método `binning_table.build()`.
        4. Retorna um dicionário com as tabelas de binning por feature.

    Notes:
        - O método `model_object.summary()` deve retornar uma estrutura que inclua o nome das variáveis.
        - A função `get_binned_variable()` deve estar disponível no `model_object` e retornar uma estrutura de binning.
    """

    #features no arquivo objeto do modelo
    features_cat = spark.createDataFrame(model_object.summary()).select("name").distinct().rdd.flatMap(lambda row: row).collect()

    #validando se a lista esta presente na tabela de input pu no objeto do modelo
    list_not_in_model = [column for column in list_features if column not in features_cat]
    
    if list_not_in_model != []:
        raise ValueError(f"A lista de features {list_not_in_model} para categorização não esta presente no artefato do modelo, parametro model_object.")

    #Descritivo Binning
    table_final =  {}
    for i in list_features:
        optb = model_object.get_binned_variable(i)
        print("Variavel:", i)
        binning_table_result = optb.binning_table.build()
        table_final[i] = binning_table_result
    return(table_final)

## Tratamentos

In [0]:
def process_categoric_cuts(data_spark, feature, reference = None, cuts = 10):

    """
    Processa uma variável categórica contínua em faixas (buckets) com base em cortes definidos por quantis.

    Args:
        data_spark (DataFrame): DataFrame PySpark contendo os dados de entrada.
        feature (str): Nome da coluna no DataFrame que será categorizada em faixas.
        reference (str, optional): Nome de uma coluna boolean no DataFrame usada como referência para calcular os 
        quantis.
            Se None, considera toda a base. Default é None.
        cuts (int, optional): Número de faixas (buckets) desejadas. Deve ser maior que 2. Default é 10.

    Returns:
        DataFrame: Um DataFrame com a coluna original categorizada em uma nova coluna de faixas.

    Raises:
        ValueError: Caso:
            - A coluna `feature` não esteja presente no DataFrame.
            - A coluna `reference` (se especificada) não esteja presente no DataFrame.
            - A coluna `reference` (se especificada) não seja do tipo booleano.
            - O valor de `cuts` seja menor ou igual a 2.
        ValueError: Caso não haja quantis suficientes para formar buckets.

    Example:
        >>> df_result = process_categoric_cuts(df, feature="value", reference="is_valid", cuts=3)
        >>> df_result.show()
    
    Workflow:
        1. Valida se `feature` e, se especificado, `reference` estão no DataFrame.
        2. Garante que `reference` é do tipo booleano, se presente.
        3. Filtra valores nulos e negativos na coluna `feature`.
        4. Calcula os quantis da coluna `feature` com base no número de cortes especificado.
        5. Ajusta os quantis para garantir o suporte a valores extremos.
        6. Aplica o `Bucketizer` para categorizar a coluna em faixas.
        7. Trata valores nulos ou negativos na coluna categorizada.

    Notes:
        - A coluna categorizada será adicionada ao DataFrame com o nome `faixa_<feature>`.
        - Valores nulos ou negativos serão tratados separadamente, e negativos são convertidos para string temporariamente.
        - A precisão dos quantis é ajustada com um erro absoluto máximo de 0.001.

    Columns:
        - Entrada: `data_spark` deve conter as colunas especificadas em `feature` e (opcional) `reference`.
        - Saída: Uma nova coluna `faixa_<feature>` será adicionada ao DataFrame, representando os buckets.
    """

    
    if feature not in data_spark.columns:
            raise ValueError(f"A feature {feature} definido no parâmetro feature não esta presente na tabela de input, parâmetro data_spark.")

    if reference != None and reference not in data_spark.columns:
            raise ValueError(f"A feature {reference} definido no parâmetro feature não esta presente na tabela de input, parâmetro data_spark.")
    
    if reference != None:
        type_bool_columns = [col.name for col in data_spark.selectExpr(reference).schema.fields if isinstance(col.dataType, BooleanType)]
        
        if type_bool_columns != [reference]:
            raise ValueError(f"A feature {reference} definida no parâmetro feature precisa ser do tipo booleano (True e False)")

    if cuts <= 2:
        raise ValueError("O valor do cuts tem que ser maior que 2.")
    
    df_table = data_spark.filter(f.col(feature) > 0).filter(f.col(feature).isNotNull())

    df_table_ref = df_table

    if reference != None:
        df_table_ref = df_table_ref.filter(f.col(reference) == True)


    #Criando as probabilidades baseado na quantidade de quebras
    probabilities = [i/cuts for i in range(cuts+1)]  # Probabilidades de 0 a 1 em incrementos de 0.1
    
    #Ajustando a feature para o tipo inteiro
    df_table_ref = df_table_ref.withColumn(feature, f.col(feature).cast("integer"))

    # Obtendo os quantis
    quantiles = df_table_ref.approxQuantile(feature, probabilities, 0.001)
    
    #ajustando os máximio e minimos para aplicar em toda a base se gerar missings
    quantiles = sorted(set(quantiles))
    quantiles[-1] = float('inf')
    quantiles[0] = float('-inf')
    
    # verificando se a quantidade de quebras é mínima para categorizar
    if len(quantiles)==2:
        return df_table
    
    feature_low = f'faixa_{feature.lower()}'

    #gerando o modelo buccketizer para aplicar
    bucketizer = Bucketizer(splits=quantiles, inputCol=feature, outputCol= feature_low)
    df_table_with_bins = bucketizer.transform(data_spark)

    n_cuts  = len(df_table_with_bins.select(feature_low).distinct().rdd.flatMap(lambda row: row).collect())

    df_table_with_bins = df_table_with_bins.withColumn(feature_low, (n_cuts-1)-f.col(feature_low))

    #ajustando valores como missing ou negativos

    # Trate valores nulos e negativos antes de aplicar o Bucketizer
    df_table_with_bins = (df_table_with_bins
                          .withColumn(feature_low,
                                      f.when(f.col(feature).isNull(), None)
                                      .when(f.col(feature) < 0, f.col(feature).cast("string"))
                                      .otherwise(f.col(feature_low)) ) # Temporariamente colocar None para valores que passarão pelo Bucketizer
                          )
    return(df_table_with_bins)

In [0]:
def process_categoric_percentiles(data_spark, feature, reference = None, percentiles_acm = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]):
    
    """
    Processa percentis categóricos para uma feature contínua em um DataFrame Spark.
    
    Esta função aplica um modelo de bucketização (via `Bucketizer`) para transformar uma variável contínua em faixas    categóricas com base nos percentis definidos. Ela também suporta filtragem condicional com base em uma feature de  referência booleana.
    
    Parameters:
        data_spark (DataFrame): 
            DataFrame Spark de entrada contendo as colunas para processamento.
        feature (str): 
            Nome da feature contínua que será categorizada com base nos percentis.
        reference (str, optional): 
            Nome de uma coluna booleana usada como referência para determinar os percentis.
            Apenas valores `True` nesta coluna serão considerados para calcular os percentis.
            Default: None.
        percentiles_acm (list of float, optional): 
            Lista de valores de percentis cumulativos para determinar os cortes.
            Valores devem estar no intervalo [0, 1] e ordenados de forma crescente.
            Default: [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0].
    
    Returns:
        DataFrame:
            DataFrame Spark contendo a coluna categorizada adicional, chamada `faixa_<nome_da_feature>`, 
            representando os intervalos definidos pelos percentis.
            - Valores nulos ou negativos na coluna original são tratados separadamente.
    
    Raises:
        ValueError: 
            Caso a coluna especificada em `feature` não exista em `data_spark`.
        ValueError: 
            Caso a coluna especificada em `reference` (se fornecida) não exista em `data_spark`.
        ValueError: 
            Caso `reference` seja fornecida, mas não seja uma coluna booleana.
        ValueError: 
            Caso o número de percentis em `percentiles_acm` seja menor ou igual a 2.
    
    Example:
        >>> # Chamando a função
        >>> df_result = process_categoric_percentiles(
        ...     data_spark=df,
        ...     feature="feature",
        ...     reference="reference",
        ...     percentiles_acm=[0.0, 0.5, 1.0]
        ... )
    """

    if feature not in data_spark.columns:
            raise ValueError(f"A feature {feature} definido no parâmetro feature não esta presente na tabela de input, parâmetro data_spark.")

    if reference != None and reference not in data_spark.columns:
            raise ValueError(f"A feature {reference} definido no parâmetro feature não esta presente na tabela de input, parâmetro data_spark.")
    
    if reference != None:
        type_bool_columns = [col.name for col in data_spark.selectExpr(reference).schema.fields if isinstance(col.dataType, BooleanType)]
        
        if type_bool_columns != [reference]:
            raise ValueError(f"A feature {reference} definida no parâmetro feature precisa ser do tipo booleano (True e False)")

    if len(percentiles_acm) <= 2:
        raise ValueError("O valores do percentis tem que ser maior que 2")
    
    df_table = data_spark.filter(f.col(feature) > 0).filter(f.col(feature).isNotNull())

    df_table_ref = df_table

    if reference != None:
        df_table_ref = df_table_ref.filter(f.col(reference) == True)

    
    #Ajustando a feature para o tipo inteiro
    df_table_ref = df_table_ref.withColumn(feature, f.col(feature).cast("integer"))

    # Obtendo os quantis
    quantiles = df_table_ref.approxQuantile(feature, percentiles_acm, 0.001)
    
    #ajustando os máximio e minimos para aplicar em toda a base se gerar missings
    quantiles = sorted(set(quantiles))
    quantiles[-1] = float('inf')
    quantiles[0] = float('-inf')
    
    # verificando se a quantidade de quebras é mínima para categorizar
    if len(quantiles)==2:
        return df_table
    
    feature_low = f'faixa_{feature.lower()}'

    #gerando o modelo buccketizer para aplicar
    bucketizer = Bucketizer(splits=quantiles, inputCol=feature, outputCol= feature_low)
    df_table_with_bins = bucketizer.transform(data_spark)

    n_cuts  = len(df_table_with_bins.select(feature_low).distinct().rdd.flatMap(lambda row: row).collect())
    df_table_with_bins = df_table_with_bins.withColumn(feature_low, (n_cuts-1)-f.col(feature_low))

    #ajustando valores como missing ou negativos

    # Trate valores nulos e negativos antes de aplicar o Bucketizer
    df_table_with_bins = (df_table_with_bins
                          .withColumn(feature_low,
                                      f.when(f.col(feature).isNull(), None)
                                      .when(f.col(feature) < 0, f.col(feature).cast("string"))
                                      .otherwise(f.col(feature_low)) ) # Temporariamente colocar None para valores que passarão pelo Bucketizer
                          )
    return(df_table_with_bins)

In [0]:
def process_categoric_cuts_multiple_features(data_spark, list_features, cuts = 10, reference = None):

    """
    Processa cortes categóricos em múltiplas features de um DataFrame Spark.

    Args:
        data_spark (DataFrame): DataFrame Spark contendo os dados de entrada.
        list_features (list): Lista de colunas que contêm as features a serem processadas.
        cuts (int, opcional): Número de cortes categóricos a serem aplicados. O valor default é 10.
        reference (str, opcional): Nome da coluna de referência para os cortes.

    Returns:
        DataFrame: DataFrame Spark com as features categorizadas.

    Example:
        >>> df_processed = process_categoric_cuts_multiple_features(data_spark=df_spark, list_features=['feature1', 'feature2'], cuts=10, reference='reference_column')
    """
    
    data_spark_temp = data_spark

    for i in list_features:
        data_spark_temp = process_categoric_cuts(data_spark = data_spark_temp, feature = i, reference = reference, cuts = cuts)

    return(data_spark_temp)

In [0]:
def process_categoric_percentiles_multiple_features(data_spark, list_features, cuts = 10, reference = None):

    """
    Processa percentis categóricos em múltiplas features de um DataFrame Spark.

    Args:
        data_spark (DataFrame): DataFrame Spark contendo os dados de entrada.
        list_features (list): Lista de colunas que contêm as features a serem processadas.
        cuts (int, opcional): Número de cortes categóricos a serem aplicados. O valor default é 10.
        reference (str, opcional): Nome da coluna de referência para os cortes.

    Returns:
        DataFrame: DataFrame Spark com as features categorizadas.

    Example:
        >>> df_processed = process_categoric_percentiles_multiple_features(data_spark=df_spark, list_features=['feature1', 'feature2'], cuts=10, reference='reference_column')
    """
    
    data_spark_temp = data_spark

    for i in list_features:
        data_spark_temp = process_categoric_percentiles(data_spark = data_spark_temp, feature = i, reference = reference, percentiles_acm=[0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0])

    return(data_spark_temp)

## Modelagem

In [0]:
def model_glm_multiples_combination(data_spark, list_scores, target, dev = None):

    """
    Treina múltiplos modelos GLM com diferentes combinações de features em um DataFrame Spark.

    Args:
        data_spark (DataFrame): DataFrame Spark contendo os dados de entrada.
        list_scores (list): Lista de listas, onde cada sublista contém os nomes das features a serem utilizadas em um
        modelo.
        target (str): Nome da coluna que contém o target.
        dev (str, opcional): Nome da coluna boolean para filtrar a base de desenvolvimento. O valor default é None.

    Returns:
        dict: Dicionário contendo os modelos treinados, com os nomes das features concatenadas como chaves.

    Raises:
        ValueError: Se alguma feature especificada em `list_scores` não estiver presente no DataFrame de entrada
        `data_spark`.
        ValueError: Se a coluna `dev` não for do tipo booleano (True e False).

    Example:
        >>> models = model_glm_multiples_combination(data_spark=df_spark, list_scores=[['feature1', 'feature2'], ['feature3', 'feature4']], target='target_column', dev='dev_column')
    """

    #validando 
    #verificando nomes unicos da lista
    list_features_blend = set(item for sublist in list_scores for item in sublist)

    #validando se a lista esta presente na tabela de input pu no objeto do modelo
    list_not_in = [column for column in list_features_blend if column not in data_spark.columns]

    if list_not_in != []:
        raise ValueError(f"As features {list_not_in} para aplicação não esta presente na tabela de input, parâmetro data_spark.")

    # if dev != None:
    #     type_bool_columns = [col.name for col in data_spark.selectExpr(dev).schema.fields if isinstance(col.dataType, BooleanType)]
        
    #     if type_bool_columns != [dev]:
    #         raise ValueError(f"A feature {dev} definida no parâmetro dev precisa ser do tipo booleano (True e False)")

    #Transformando em data pandas
    df_pandas = data_spark.toPandas()

    #Verificando se existe a base esta em desenvolvimento e validação
    if dev != None:
         df_pandas = df_pandas[df_pandas[dev] == True]

    # Treinando o modelo em treino e validação
    df_dev, df_val = train_test_split(df_pandas, test_size=0.2, random_state=1234)

    lista_model = {}

    y_train = df_dev[target]
    
    for i in list_scores:
        modelo_glm = LogisticRegression(random_state=7)
        modelo_glm.fit(df_dev.filter(items = i), y_train)
        features = list(modelo_glm.feature_names_in_)
        model_feature_X = 'model_glm_' + '_'.join(features)
        print("nome do modelo utilizando as features", i,":", model_feature_X)
        lista_model[model_feature_X] = modelo_glm
    return lista_model

In [0]:
def predict_model_glm_multiples_combination(data_spark, list_models):

    """
    Gera previsões para múltiplos modelos GLM com diferentes combinações de features em um DataFrame Spark.

    Args:
        data_spark (DataFrame): DataFrame Spark contendo os dados de entrada.
        list_models (dict): Dicionário contendo os modelos treinados, com os nomes das features concatenadas como chaves.

    Returns:
        DataFrame: DataFrame Spark contendo as previsões de probabilidade para cada modelo.

    Raises:
        ValueError: Se alguma feature especificada em `list_models` não estiver presente no DataFrame de entrada `data_spark`.
        
    Example:
        >>> df_predictions = predict_model_glm_multiples_combination(data_spark=df_spark, list_models=models)
    """

    names_models = list(list_models.keys())
    #validando
    lists_features_models = list()
    # for i in np.arange(0, (len(list_models)), 1):
    for i in names_models:
            lists_features_models.append(list(list_models[i].feature_names_in_))

    #verificando nomes unicos da lista
    list_features_blend = set(item for sublist in lists_features_models for item in sublist)

    #validando se a lista esta presente na tabela de input pu no objeto do modelo
    list_not_in = [column for column in list_features_blend if column not in data_spark.columns]

    if list_not_in != []:
        raise ValueError(f"As features {list_not_in} para aplicação não esta presente na tabela de input, parâmetro data_spark.")


    #tranformando em data panas
    data_pandas = data_spark.toPandas()

    #Criando um looping de aplicação 
    # for i in np.arange(0, (len(list_models)), 1):
    for i in names_models:
        features = list(list_models[i].feature_names_in_)
        model_feature_X = 'prob_model_glm_' + '_'.join(features)
        data_pandas[f'{model_feature_X}'] = list_models[i].predict_proba(data_pandas[list(list_models[i].feature_names_in_)])[:, 1]
        
    return(spark.createDataFrame(data_pandas))

In [0]:

# Função para calcular KS e Gini
def calculate_metrics(y_true, y_pred):
    """
    Calcula as métricas KS, Gini e AUC com base nas previsões e valores reais.

    Args:
        y_true (array-like): Valores reais do target (0 ou 1).
        y_pred (array-like): Probabilidades previstas pelo modelo.

    Returns:
        dict: Um dicionário contendo as métricas calculadas:
            - 'ks': Valor do KS.
            - 'gini': Valor do Gini.
            - 'auc': Área sob a curva ROC.
            - 'bad_decil10': Taxa de default no primeiro decil.


    Raises:
        ValueError: Se os arrays `y_true` e `y_pred` tiverem tamanhos diferentes.
        ValueError: Se `y_true` contiver valores diferentes de 0 e 1.

    Example:
        >>> metrics = calculate_metrics(y_true=[0, 1, 1, 0], y_pred=[0.1, 0.9, 0.8, 0.2])
        >>> print(metrics)
        {'ks': 0.75, 'gini': 0.8, 'auc': 0.9}
    """


    # Gini: 2 * AUC - 1
    auc = roc_auc_score(y_true, y_pred)
    # Corrigindo o AUC se for menor que 0.5
    if auc < 0.5:
        auc = 1 - auc

    gini = 2 * auc - 1
    
    # KS
    sorted_indices = np.argsort(y_pred)
    cum_good = (y_true[sorted_indices] == 1).cumsum()
    cum_bad = (y_true[sorted_indices] == 0).cumsum()
    ks = max(abs(cum_good / cum_good[-1] - cum_bad / cum_bad[-1]))

    # # Taxa de default no primeiro decil
    # decil_size = len(y_true) // 10
    # decil_size_30 = len(y_true) // 30
    # decil_size_40 = len(y_true) // 40
    # Tamanhos dos percentis desejados
    size_10 = int(len(y_true) * 0.10)
    size_30 = int(len(y_true) * 0.30)
    size_40 = int(len(y_true) * 0.40)

    # Seleciona os índices dos menores scores preditos (menor risco, se score for default probability)
    sorted_indices = np.argsort(y_pred)

    indices_10 = sorted_indices[:size_10]
    indices_30 = sorted_indices[:size_30]
    indices_40 = sorted_indices[:size_40]

    # top_decil_indices = np.argsort(y_pred)[:decil_size]  # Pegamos os 10% com menor probabilidade default
    # top_decil_indices_30 = np.argsort(y_pred)[:decil_size_30]  # Pegamos os 10% com menor probabilidade default
    # top_decil_indices_40 = np.argsort(y_pred)[:decil_size_40]  # Pegamos os 10% com menor probabilidade default

    # Taxa de default nos grupos
    bad_rate_10 = round(np.mean(y_true[indices_10]), 4)
    bad_rate_30 = round(np.mean(y_true[indices_30]), 4)
    bad_rate_40 = round(np.mean(y_true[indices_40]), 4)

    # bad_decil10 = round(np.mean(y_true[top_decil_indices]), 4)  # Média dos valores reais (proporção de defaults)
    # bad_decil30 = round(np.mean(y_true[top_decil_indices_30]), 4)  # Média dos valores reais (proporção de defaults)
    # bad_decil40 = round(np.mean(y_true[top_decil_indices_40]), 4)  # Média dos valores reais (proporção de defaults)

    return ks, gini, auc, bad_rate_10, bad_rate_30, bad_rate_40



# # Função para realizar a busca de hiperparâmetros via Optuna
# def objective_express(trial, X_train, X_valid, y_train, y_valid, iteration, results):

#     """
#     Define o objetivo para a otimização do Optuna usando LightGBM.

#     Args:
#         trial (optuna.trial.Trial): Objeto que controla as iterações do Optuna.
#         X_train (pandas.DataFrame): Dados de treinamento.
#         X_valid (pandas.DataFrame): Dados de validação.
#         y_train (pandas.Series): Target para os dados de treinamento.
#         y_valid (pandas.Series): Target para os dados de validação.
#         iteration (int): Número atual da iteração de treinamento.
#         results (dict): Dicionário para armazenar os resultados das métricas.

#     Returns:
#         float: Valor da métrica de avaliação escolhida (ex.: KS ou AUC).

#     Raises:
#         ValueError: Se `X_train` e `X_valid` tiverem colunas inconsistentes.
#         ValueError: Se o tamanho de `X_train` não coincidir com `y_train`.

#     Example:
#         >>> study.optimize(lambda trial: objective_express(trial, X_train, X_valid, y_train, y_valid, iteration=1, results={}), n_trials=10)
#     """
    
#     # Lista para armazenar os resultados
#     params = {
#         "objective": "binary",
#         "metric": "auc",
#         "boosting_type": "gbdt",
#         "learning_rate": trial.suggest_float("learning_rate", 0.000001, 0.1, log = True),
#         "num_leaves": trial.suggest_int("num_leaves", 20, 150),
#         "max_depth": trial.suggest_int("max_depth", 3, 15),
#         "min_child_samples": trial.suggest_int("min_child_samples", 5, 100),
#         "min_child_weight": trial.suggest_float("min_child_weight", 1e-3, 1e-1, log = True),
#         "subsample": trial.suggest_float("subsample", 0.4, 1.0),
#         "colsample_bytree": trial.suggest_float("colsample_bytree", 0.4, 1.0),
#         "reg_alpha": trial.suggest_float("reg_alpha", 1e-3, 10.0, log = True),
#         "reg_lambda": trial.suggest_float("reg_lambda", 1e-3, 10.0, log = True),
#         "verbose": -1,
#         "seed": 42,
#     }


#     # Criação dos datasets de treino e validação
#     train_data = lgb.Dataset(X_train, label=y_train)
#     valid_data = lgb.Dataset(X_valid, label=y_valid, reference=train_data)

#     # Treinamento com early stopping
#     model = lgb.train(
#         params,
#         train_data,
#         num_boost_round=1000,
#         valid_sets=[valid_data],
#         callbacks=[lgb.early_stopping(stopping_rounds=10)],
#     )

#     #Avaliando quantidade de features dentro de um range de acm pelo menos de 98%
#     # Calcular a importância das features
#     feature_importance = model.feature_importance(importance_type='gain')
#     features = model.feature_name()
        
#     importance_df = pd.DataFrame({
#         'feature': features,
#         'importance': feature_importance
#     })
    
#     importance_df = importance_df.sort_values(by='importance', ascending=False)
    
#     total_gain = importance_df['importance'].sum()
#     importance_df['cumulative_gain'] = importance_df['importance'].cumsum() / total_gain

#     # Selecionar as features com ganho acumulado até 95%
#     selected_features_max = importance_df[importance_df['cumulative_gain'] <= 0.98]['feature'].tolist()
#     qtd_feautures = len(selected_features_max)

#     # Previsões para treino e validação/teste
#     y_pred_train = model.predict(X_train, num_iteration=model.best_iteration)
#     y_pred_valid = model.predict(X_valid, num_iteration=model.best_iteration)

#     # Calcular métricas para treino
#     ks_train, gini_train, auc_train, bad_decil10_train, bad_decil30_train, bad_decil40_train = calculate_metrics(y_train.values, y_pred_train)

#     # Calcular métricas para validação/teste
#     ks_valid, gini_valid, auc_valid, bad_decil10_valid, bad_decil30_valid, bad_decil40_valid = calculate_metrics(y_valid.values, y_pred_valid)

#     # Armazenar os resultados
#     results.append({
#         "iteration": iteration,
#         "params": params,
#         "features_relevance": qtd_feautures,
#         "ks2_train": ks_train,
#         "gini_train": gini_train,
#         "auc_train": auc_train,
#         "bad_decil10_train": bad_decil10_train,
#         "ks2_valid": ks_valid,
#         "gini_valid": gini_valid,
#         "auc_valid": auc_valid,
#         "bad_decil10_valid": bad_decil10_valid,
#     })

#     return auc_valid  # Maximizar o AUC na validação/teste

In [0]:

# # Função para realizar a busca de hiperparâmetros via Optuna
# def objective_express_V2(trial, X_train, X_valid, X_oot, y_train, y_valid, y_oot, iteration, results):

#     """
#     Define o objetivo para a otimização do Optuna usando LightGBM.

#     Args:
#         trial (optuna.trial.Trial): Objeto que controla as iterações do Optuna.
#         X_train (pandas.DataFrame): Dados de treinamento.
#         X_valid (pandas.DataFrame): Dados de validação.
#         y_train (pandas.Series): Target para os dados de treinamento.
#         y_valid (pandas.Series): Target para os dados de validação.
#         iteration (int): Número atual da iteração de treinamento.
#         results (dict): Dicionário para armazenar os resultados das métricas.

#     Returns:
#         float: Valor da métrica de avaliação escolhida (ex.: KS ou AUC).

#     Raises:
#         ValueError: Se `X_train` e `X_valid` tiverem colunas inconsistentes.
#         ValueError: Se o tamanho de `X_train` não coincidir com `y_train`.

#     Example:
#         >>> study.optimize(lambda trial: objective_express(trial, X_train, X_valid, y_train, y_valid, iteration=1, results={}), n_trials=10)
#     """

#     is_unbalance_list = trial.suggest_categorical('is_unbalance', [True, False])

    
#     # Lista para armazenar os resultados
#     params = {
#         "objective": "binary",
#         "metric": "auc",
#         "boosting_type": trial.suggest_categorical('boosting_type', ['gbdt', 'dart']),
#         'is_unbalance': is_unbalance_list,
#         "learning_rate": trial.suggest_float("learning_rate", 0.0001, 0.3, log = True),
#         "num_leaves": trial.suggest_int("num_leaves", 2, 20),
#         "max_depth": trial.suggest_int("max_depth", 2, 10),
#         "min_child_samples": trial.suggest_int("min_child_samples", 5, 100),
#         # "min_child_weight": trial.suggest_float("min_child_weight", 1e-3, 1e-1, log = True),
#         "subsample": trial.suggest_float("subsample", 0.3, 1.0),
#         "colsample_bytree": trial.suggest_float("colsample_bytree", 0.3, 1.0),
#         "reg_alpha": trial.suggest_float("reg_alpha", 1e-3, 10.0, log = True),
#         "reg_lambda": trial.suggest_float("reg_lambda", 1e-3, 10.0, log = True),
#         "verbose": -1,
#         "seed": 42,
#     }


#     # Criação dos datasets de treino e validação
#     train_data = lgb.Dataset(X_train, label=y_train)
#     valid_data = lgb.Dataset(X_valid, label=y_valid, reference=train_data)
#     oot_data = lgb.Dataset(X_oot, label=y_oot, reference=train_data)

#     # Treinamento com early stopping
#     model = lgb.train(
#         params,
#         train_data,
#         num_boost_round=1000,
#         valid_sets=[valid_data],
#         callbacks=[lgb.early_stopping(stopping_rounds=20)],
#     )

#     #Avaliando quantidade de features dentro de um range de acm pelo menos de 98%
#     # Calcular a importância das features
#     feature_importance = model.feature_importance(importance_type='gain')
#     features = model.feature_name()
        
#     importance_df = pd.DataFrame({
#         'feature': features,
#         'importance': feature_importance
#     })
    
#     importance_df = importance_df.sort_values(by='importance', ascending=False)
    
#     total_gain = importance_df['importance'].sum()
#     importance_df['cumulative_gain'] = importance_df['importance'].cumsum() / total_gain

#     # Selecionar as features com ganho acumulado até 95%
#     selected_features_max = importance_df[importance_df['cumulative_gain'] <= 0.98]['feature'].tolist()
#     qtd_feautures = len(selected_features_max)

#     # Previsões para treino e validação/teste
#     y_pred_train = model.predict(X_train, num_iteration=model.best_iteration)
#     y_pred_valid = model.predict(X_valid, num_iteration=model.best_iteration)
#     y_pred_oot = model.predict(X_oot, num_iteration=model.best_iteration)

#     # Calcular métricas para treino
#     ks_train, gini_train, auc_train, bad_decil10_train, bad_decil30_train, bad_decil40_train = calculate_metrics(y_train.values, y_pred_train)

#     # Calcular métricas para validação/teste
#     ks_valid, gini_valid, auc_valid, bad_decil10_valid, bad_decil30_valid, bad_decil40_valid = calculate_metrics(y_valid.values, y_pred_valid)

#     # Calcular métricas para OOT
#     ks_oot, gini_oot, auc_oot, bad_decil10_oot, bad_decil30_oot, bad_decil40_oot = calculate_metrics(y_oot.values, y_pred_oot)


#     shift_oot  = ((gini_oot-gini_train)/gini_train)*100

#     # Armazenar os resultados
#     results.append({
#         "iteration": iteration,
#         "params": params,
#         "features_relevance": qtd_feautures,
#         "ks2_train": ks_train,
#         "gini_train": gini_train,
#         "auc_train": auc_train,
#         "bad_decil10_train": bad_decil10_train,
#         "bad_decil30_train": bad_decil30_train,
#         "bad_decil40_train": bad_decil40_train,        
#         "ks2_valid": ks_valid,
#         "gini_valid": gini_valid,
#         "auc_valid": auc_valid,
#         "bad_decil10_valid": bad_decil10_valid,
#         "bad_decil30_valid": bad_decil30_valid,
#         "bad_decil40_valid": bad_decil40_valid,
#         "ks2_oot": ks_oot,
#         "gini_oot": gini_oot,
#         "auc_oot": auc_oot,
#         "bad_decil10_oot": bad_decil10_oot,
#         "bad_decil30_oot": bad_decil30_oot,
#         "bad_decil40_oot": bad_decil40_oot,
#         'shift_oot': shift_oot,
#     })

#     # return auc_valid  # Maximizar o AUC na validação/teste
#     return np.abs(shift_oot)

## objective_express original com lgbm

In [0]:

# Função para realizar a busca de hiperparâmetros via Optuna - Antigo V3
def objective_express_lgbm(trial, X_train, X_valid, X_oot, y_train, y_valid, y_oot, iteration, results):

    """
    Define o objetivo para a otimização do Optuna usando LightGBM.

    Args:
        trial (optuna.trial.Trial): Objeto que controla as iterações do Optuna.
        X_train (pandas.DataFrame): Dados de treinamento.
        X_valid (pandas.DataFrame): Dados de validação.
        y_train (pandas.Series): Target para os dados de treinamento.
        y_valid (pandas.Series): Target para os dados de validação.
        iteration (int): Número atual da iteração de treinamento.
        results (dict): Dicionário para armazenar os resultados das métricas.

    Returns:
        float: Valor da métrica de avaliação escolhida (ex.: KS ou AUC).

    Raises:
        ValueError: Se `X_train` e `X_valid` tiverem colunas inconsistentes.
        ValueError: Se o tamanho de `X_train` não coincidir com `y_train`.

    Example:
        >>> study.optimize(lambda trial: objective_express(trial, X_train, X_valid, y_train, y_valid, iteration=1, results={}), n_trials=10)
    """

    is_unbalance_list = trial.suggest_categorical('is_unbalance', [True, False])

    
    # Lista para armazenar os resultados
    params = {
        "objective": "binary",
        "metric": "auc",
        "boosting_type": trial.suggest_categorical('boosting_type', ['gbdt', 'dart']),
        'is_unbalance': is_unbalance_list,
        "learning_rate": trial.suggest_float("learning_rate", 0.0001, 0.3, log = True),
        "num_leaves": trial.suggest_int("num_leaves", 2, 20),
        "max_depth": trial.suggest_int("max_depth", 2, 10),
        "min_child_samples": trial.suggest_int("min_child_samples", 5, 100),
        # "min_child_weight": trial.suggest_float("min_child_weight", 1e-3, 1e-1, log = True),
        "subsample": trial.suggest_float("subsample", 0.3, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.3, 1.0),
        "reg_alpha": trial.suggest_float("reg_alpha", 1e-3, 10.0, log = True),
        "reg_lambda": trial.suggest_float("reg_lambda", 1e-3, 10.0, log = True),
        "verbose": -1,
        "seed": 42,
    }


    # Criação dos datasets de treino e validação
    train_data = lgb.Dataset(X_train, label=y_train)
    valid_data = lgb.Dataset(X_valid, label=y_valid, reference=train_data)
    oot_data = lgb.Dataset(X_oot, label=y_oot, reference=train_data)

    # Treinamento com early stopping
    model = lgb.train(
        params,
        train_data,
        num_boost_round=1000,
        valid_sets=[valid_data],
        callbacks=[lgb.early_stopping(stopping_rounds=20)],
    )

    #Avaliando quantidade de features dentro de um range de acm pelo menos de 98%
    # Calcular a importância das features
    feature_importance = model.feature_importance(importance_type='gain')
    features = model.feature_name()
        
    importance_df = pd.DataFrame({
        'feature': features,
        'importance': feature_importance
    })
    
    importance_df = importance_df.sort_values(by='importance', ascending=False)
    
    total_gain = importance_df['importance'].sum()
    importance_df['cumulative_gain'] = importance_df['importance'].cumsum() / total_gain

    # Selecionar as features com ganho acumulado até 95%
    selected_features_max = importance_df[importance_df['cumulative_gain'] <= 0.98]['feature'].tolist()
    qtd_feautures = len(selected_features_max)

    # Previsões para treino e validação/teste
    y_pred_train = model.predict(X_train, num_iteration=model.best_iteration)
    y_pred_valid = model.predict(X_valid, num_iteration=model.best_iteration)
    y_pred_oot = model.predict(X_oot, num_iteration=model.best_iteration)

    # Calcular métricas para treino
    ks_train, gini_train, auc_train, bad_decil10_train, bad_decil30_train, bad_decil40_train = calculate_metrics(y_train.values, y_pred_train)

    # Calcular métricas para validação/teste
    ks_valid, gini_valid, auc_valid, bad_decil10_valid, bad_decil30_valid, bad_decil40_valid = calculate_metrics(y_valid.values, y_pred_valid)

    # Calcular métricas para OOT
    ks_oot, gini_oot, auc_oot, bad_decil10_oot, bad_decil30_oot, bad_decil40_oot = calculate_metrics(y_oot.values, y_pred_oot)


    shift_oot  = ((gini_oot-gini_train)/gini_train)*100

    # Armazenar os resultados
    results.append({
        "iteration": iteration,
        "params": params,
        'best_iteration': model.best_iteration,
        "features_relevance": qtd_feautures,
        "ks2_train": ks_train,
        "gini_train": gini_train,
        "auc_train": auc_train,
        "bad_decil10_train": bad_decil10_train,
        "bad_decil30_train": bad_decil30_train,
        "bad_decil40_train": bad_decil40_train,        
        "ks2_valid": ks_valid,
        "gini_valid": gini_valid,
        "auc_valid": auc_valid,
        "bad_decil10_valid": bad_decil10_valid,
        "bad_decil30_valid": bad_decil30_valid,
        "bad_decil40_valid": bad_decil40_valid,
        "ks2_oot": ks_oot,
        "gini_oot": gini_oot,
        "auc_oot": auc_oot,
        "bad_decil10_oot": bad_decil10_oot,
        "bad_decil30_oot": bad_decil30_oot,
        "bad_decil40_oot": bad_decil40_oot,
        'shift_oot': shift_oot,
    })

    return auc_valid  # Maximizar o AUC na validação/teste
    # return np.abs(shift_oot)

## objective_express com xgBoost

In [0]:
!pip install xgboost

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Looking in indexes: https://svc_nexus_data_pipeline%40picpay.com:****@nexus-prod.limbo.work/repository/picpay-pypi-hosted/simple, https://pypi.org/simple/
Collecting xgboost
  Downloading xgboost-3.1.1-py3-none-manylinux_2_28_x86_64.whl (115.9 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 115.9/115.9 MB 20.0 MB/s eta 0:00:00
Collecting nvidia-nccl-cu12
  Downloading nvidia_nccl_cu12-2.28.7-py3-none-manylinux_2_18_x86_64.whl (296.8 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 296.8/296.8 MB 5.0 MB/s eta 0:00:00
Installing collected packages: nvidia-nccl-cu12, xgboost
Successfully installed nvidia-nccl-cu12-2.28.7 xgboost-3.1.1
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import optuna
import xgboost as xgb
import pandas as pd

def objective_express_xgb(trial, X_train, X_valid, X_oot, y_train, y_valid, y_oot, iteration, results):
    # Sugestão de hiperparâmetros pelo Optuna
    params = {
        "objective": "binary:logistic",
        "eval_metric": "auc",
        "booster": trial.suggest_categorical('booster', ['gbtree', 'dart']),
        "eta": trial.suggest_float("learning_rate", 0.0001, 0.3, log=True),
        "max_depth": trial.suggest_int("max_depth", 2, 10),
        "min_child_weight": trial.suggest_float("min_child_weight", 0.1, 10.0, log=True),
        "subsample": trial.suggest_float("subsample", 0.3, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.3, 1.0),
        "lambda": trial.suggest_float("reg_lambda", 1e-3, 10.0, log=True),
        "alpha": trial.suggest_float("reg_alpha", 1e-3, 10.0, log=True),
        "scale_pos_weight": trial.suggest_categorical('is_unbalance', [1.0, (len(y_train)-sum(y_train))/sum(y_train)]),
        "verbosity": 0,
        "seed": 42
    }

    # Criação dos DMatrix
    dtrain = xgb.DMatrix(X_train, label=y_train)
    dvalid = xgb.DMatrix(X_valid, label=y_valid)
    doot = xgb.DMatrix(X_oot, label=y_oot)

    # Treinamento com early stopping
    evals = [(dtrain, 'train'), (dvalid, 'valid')]
    model = xgb.train(
        params,
        dtrain,
        num_boost_round=1000,
        evals=evals,
        early_stopping_rounds=20
    )

    # Importância das features (gain)
    importance_dict = model.get_score(importance_type='gain')
    importance_df = pd.DataFrame({
        'feature': list(importance_dict.keys()),
        'importance': list(importance_dict.values())
    }).sort_values(by='importance', ascending=False)

    total_gain = importance_df['importance'].sum()
    importance_df['cumulative_gain'] = importance_df['importance'].cumsum() / total_gain
    selected_features_max = importance_df[importance_df['cumulative_gain'] <= 0.98]['feature'].tolist()
    qtd_features = len(selected_features_max)

    # Previsões
    y_pred_train = model.predict(dtrain)
    y_pred_valid = model.predict(dvalid)
    y_pred_oot = model.predict(doot)

    # Função já existente
    ks_train, gini_train, auc_train, bad_decil10_train, bad_decil30_train, bad_decil40_train = calculate_metrics(y_train.values, y_pred_train)
    ks_valid, gini_valid, auc_valid, bad_decil10_valid, bad_decil30_valid, bad_decil40_valid = calculate_metrics(y_valid.values, y_pred_valid)
    ks_oot, gini_oot, auc_oot, bad_decil10_oot, bad_decil30_oot, bad_decil40_oot = calculate_metrics(y_oot.values, y_pred_oot)

    shift_oot = ((gini_oot - gini_train) / gini_train) * 100

    # Registro dos resultados
    results.append({
        "iteration": iteration,
        "params": params,
        'best_iteration': model.num_boosted_rounds(),
        "features_relevance": qtd_features,
        "ks2_train": ks_train,
        "gini_train": gini_train,
        "auc_train": auc_train,
        "bad_decil10_train": bad_decil10_train,
        "bad_decil30_train": bad_decil30_train,
        "bad_decil40_train": bad_decil40_train,
        "ks2_valid": ks_valid,
        "gini_valid": gini_valid,
        "auc_valid": auc_valid,
        "bad_decil10_valid": bad_decil10_valid,
        "bad_decil30_valid": bad_decil30_valid,
        "bad_decil40_valid": bad_decil40_valid,
        "ks2_oot": ks_oot,
        "gini_oot": gini_oot,
        "auc_oot": auc_oot,
        "bad_decil10_oot": bad_decil10_oot,
        "bad_decil30_oot": bad_decil30_oot,
        "bad_decil40_oot": bad_decil40_oot,
        'shift_oot': shift_oot,
    })

    return auc_valid


In [0]:
import plotly.express as px
import plotly.graph_objects as go

plot_best_model_express(results_df, metric="ks2",  label_x = 'train', label_y = 'valid')

In [0]:
display(results_df)

iteration,params,best_iteration,features_relevance,ks2_train,gini_train,auc_train,bad_decil10_train,bad_decil30_train,bad_decil40_train,ks2_valid,gini_valid,auc_valid,bad_decil10_valid,bad_decil30_valid,bad_decil40_valid,ks2_oot,gini_oot,auc_oot,bad_decil10_oot,bad_decil30_oot,bad_decil40_oot,shift_oot
1,"List(0.0013476948052914012, gbtree, 0.38218877775917537, 0.007593736435184247, auc, 0.018567737872063735, 5, 0.8212589495379132, binary:logistic, 1.0, 42, 0.5400014404693172, 0)",41,29,0.4447568478509199,0.5941388557884704,0.7970694278942352,0.0065,0.0154,0.0208,0.4477132460238384,0.5826442616152763,0.7913221308076381,0.0105,0.0179,0.0237,0.3078080772475922,0.4049616034595342,0.7024808017297671,0.0301,0.0527,0.0656,-31.84057909794211
2,"List(1.5001007867493956, gbtree, 0.5059112445014149, 0.0026522874535484763, auc, 0.006021746428497064, 6, 0.20898291606169625, binary:logistic, 1.0, 42, 0.8591584099700258, 0)",42,27,0.4776810025262223,0.6229381109865786,0.8114690554932893,0.0056,0.0131,0.0173,0.4547920213897875,0.5900594163724346,0.7950297081862173,0.0105,0.0165,0.0227,0.3196672422675689,0.4117861795769915,0.7058930897884957,0.0304,0.0515,0.0635,-33.89613312872689
3,"List(6.768905405420063, gbtree, 0.6419377344638948, 0.0026967329900819143, auc, 0.012529662792892828, 8, 4.253273953564923, binary:logistic, 8.596018735362998, 42, 0.646970876415464, 0)",42,25,0.5490174457504333,0.6967124017232691,0.8483562008616345,0.0012,0.0051,0.0088,0.4689978317778117,0.5968305231138982,0.7984152615569491,0.0084,0.0177,0.0234,0.3096959909288256,0.3981569393989885,0.6990784696994943,0.03,0.0561,0.0686,-42.85203788332526
4,"List(0.009546060351193025, gbtree, 0.8049240727416493, 0.09612787425724241, auc, 1.6603386643883074, 4, 0.3992206882176105, binary:logistic, 1.0, 42, 0.38962724411311905, 0)",129,32,0.5213837925182601,0.6760451166931105,0.8380225583465553,0.0014,0.0083,0.0135,0.474054921725874,0.6149674713405024,0.8074837356702512,0.0035,0.016,0.0197,0.3188218999646076,0.4187640624527522,0.7093820312263761,0.0261,0.0483,0.0605,-38.05678761483468
5,"List(0.0021832066857251503, dart, 0.4119352915552552, 0.0017030826344147285, auc, 2.1087911460130244, 6, 5.75513538466941, binary:logistic, 1.0, 42, 0.5926758196567055, 0)",51,27,0.4567714777095092,0.6080128047559235,0.8040064023779617,0.0061,0.0135,0.0188,0.4469661801550008,0.5853385460153959,0.792669273007698,0.0119,0.0179,0.0221,0.302257093448978,0.4024586718771685,0.7012293359385843,0.0292,0.0521,0.065,-33.80753353727002
6,"List(0.003506698758684396, gbtree, 0.8852581011896616, 0.028509541065392375, auc, 0.01607905471471701, 7, 7.511817234759516, binary:logistic, 8.596018735362998, 42, 0.5441035289236018, 0)",217,32,0.6453274684832899,0.7964881455080433,0.8982440727540216,0.0,0.0005,0.0013,0.4691877276653272,0.6110152398773905,0.8055076199386952,0.0063,0.016,0.0213,0.2778532267777632,0.3679100449380339,0.683955022469017,0.0331,0.06,0.0714,-53.80847197627016
7,"List(0.026947584032355597, dart, 0.9115062455857392, 0.0898748057915318, auc, 1.6604910110561295, 6, 3.565918035639329, binary:logistic, 1.0, 42, 0.710065145363248, 0)",130,32,0.620284707369787,0.782616276676366,0.891308138338183,0.0005,0.0026,0.0051,0.4765929215442344,0.6155354012010048,0.8077677006005024,0.0063,0.0151,0.0194,0.2366896605397944,0.3519161468559271,0.6759580734279635,0.031,0.066,0.0817,-55.0333723762489
8,"List(0.005594401253968181, dart, 0.671885575120243, 0.005686216232768655, auc, 0.013130650963318577, 7, 1.2702271596853445, binary:logistic, 1.0, 42, 0.7879471543117682, 0)",947,32,0.650443329669625,0.8112578028936648,0.9056289014468324,0.0002,0.0012,0.0032,0.4734260547278349,0.6224046785233075,0.8112023392616537,0.0028,0.0144,0.0194,0.325940715848915,0.4179405711369259,0.708970285568463,0.0275,0.0495,0.0598,-48.48239737772885
9,"List(0.03399333875367799, dart, 0.914038053723657, 0.003285994235295857, auc, 0.0057183820497200884, 3, 0.3357304162168704, binary:logistic, 8.596018735362998, 42, 0.44613525035603385, 0)",35,18,0.4018829154659579,0.5340954338819142,0.7670477169409571,0.008,0.0212,0.0273,0.4101095989574635,0.5445902976251829,0.7722951488125914,0.0091,0.0209,0.0256,0.2955750896919423,0.3554641604739386,0.6777320802369693,0.047,0.0657,0.079,-33.445572097414725
10,"List(0.02368657123432723, gbtree, 0.6033940265246992, 0.0035349570265780297, auc, 0.002741012209924299, 5, 0.3614749296078761, binary:logistic, 1.0, 42, 0.4509271364577573, 0)",122,30,0.4659617389265795,0.6104823067915364,0.8052411533957682,0.0056,0.0134,0.0187,0.457136290116172,0.591630997933517,0.7958154989667585,0.0091,0.0156,0.0218,0.3299307253872672,0.4150110392517667,0.7075055196258834,0.0317,0.0512,0.0632,-32.01915360448241


## objective_express com RandomForest

In [0]:
import optuna
import pandas as pd
from sklearn.ensemble import RandomForestClassifier

def objective_express_RdF(trial, X_train, X_valid, X_oot, y_train, y_valid, y_oot, iteration, results):
    """
    Função objetivo do Optuna para otimização de hiperparâmetros usando RandomForestClassifier (scikit-learn).
    Mantém regras de negócio originais.
    """

    # Sugestão de hiperparâmetros via Optuna
    params = {
        "n_estimators": trial.suggest_int("n_estimators", 100, 1000),
        "max_depth": trial.suggest_int("max_depth", 2, 20),
        "min_samples_split": trial.suggest_int("min_samples_split", 2, 20),
        "min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 20),
        "max_features": trial.suggest_categorical("max_features", ["sqrt", "log2", None]),
        "bootstrap": trial.suggest_categorical("bootstrap", [True, False]),
        "class_weight": trial.suggest_categorical("is_unbalance", [None, "balanced"]),
        "random_state": 42,
        "n_jobs": -1
    }

    # Treinamento
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # Importância das features
    importance_df = pd.DataFrame({
        "feature": X_train.columns,
        "importance": model.feature_importances_
    }).sort_values(by="importance", ascending=False)

    total_gain = importance_df['importance'].sum()
    importance_df['cumulative_gain'] = importance_df['importance'].cumsum() / total_gain
    selected_features_max = importance_df[importance_df['cumulative_gain'] <= 0.98]['feature'].tolist()
    qtd_features = len(selected_features_max)

    # Previsões (probabilidade de classe positiva)
    y_pred_train = model.predict_proba(X_train)[:, 1]
    y_pred_valid = model.predict_proba(X_valid)[:, 1]
    y_pred_oot = model.predict_proba(X_oot)[:, 1]

    # Métricas de negócio
    ks_train, gini_train, auc_train, bad_decil10_train, bad_decil30_train, bad_decil40_train = calculate_metrics(y_train.values, y_pred_train)
    ks_valid, gini_valid, auc_valid, bad_decil10_valid, bad_decil30_valid, bad_decil40_valid = calculate_metrics(y_valid.values, y_pred_valid)
    ks_oot, gini_oot, auc_oot, bad_decil10_oot, bad_decil30_oot, bad_decil40_oot = calculate_metrics(y_oot.values, y_pred_oot)

    shift_oot = ((gini_oot - gini_train) / gini_train) * 100

    # Registro dos resultados
    results.append({
        "iteration": iteration,
        "params": params,
        "features_relevance": qtd_features,
        "ks2_train": ks_train,
        "gini_train": gini_train,
        "auc_train": auc_train,
        "bad_decil10_train": bad_decil10_train,
        "bad_decil30_train": bad_decil30_train,
        "bad_decil40_train": bad_decil40_train,
        "ks2_valid": ks_valid,
        "gini_valid": gini_valid,
        "auc_valid": auc_valid,
        "bad_decil10_valid": bad_decil10_valid,
        "bad_decil30_valid": bad_decil30_valid,
        "bad_decil40_valid": bad_decil40_valid,
        "ks2_oot": ks_oot,
        "gini_oot": gini_oot,
        "auc_oot": auc_oot,
        "bad_decil10_oot": bad_decil10_oot,
        "bad_decil30_oot": bad_decil30_oot,
        "bad_decil40_oot": bad_decil40_oot,
        'shift_oot': shift_oot,
    })

    return auc_valid


In [0]:
import plotly.express as px
import plotly.graph_objects as go

plot_best_model_express(results_df, metric="ks2",  label_x = 'train', label_y = 'valid')

In [0]:
display(results_df)

iteration,params,features_relevance,ks2_train,gini_train,auc_train,bad_decil10_train,bad_decil30_train,bad_decil40_train,ks2_valid,gini_valid,auc_valid,bad_decil10_valid,bad_decil30_valid,bad_decil40_valid,ks2_oot,gini_oot,auc_oot,bad_decil10_oot,bad_decil30_oot,bad_decil40_oot,shift_oot
1,"List(true, null, 20, sqrt, 16, 2, 352, -1, 42)",30,0.7319434076168678,0.8695055253896196,0.9347527626948098,0.0,0.0,0.0,0.4601011689333735,0.5980653639989708,0.7990326819994854,0.0056,0.0156,0.022,0.320733703960109,0.409768598502382,0.704884299251191,0.0285,0.0557,0.0658,-52.87337612733772
2,"List(false, balanced, 15, log2, 12, 10, 698, -1, 42)",30,0.8825410015738354,0.954369646262522,0.977184823131261,0.0,0.0,0.0,0.466879317532197,0.6003015499158992,0.8001507749579496,0.007,0.0139,0.0214,0.2927772127975534,0.3751088308772754,0.6875544154386377,0.0314,0.062,0.0712,-60.695645304074056
3,"List(false, balanced, 7, sqrt, 6, 15, 103, -1, 42)",27,0.4922705850073059,0.6433387121930916,0.8216693560965458,0.0012,0.0088,0.0128,0.4570304662477349,0.5792318390171334,0.7896159195085667,0.007,0.0165,0.0225,0.3118420739797918,0.3964873843211205,0.6982436921605603,0.033,0.0572,0.0683,-38.37035191469733
4,"List(true, balanced, 18, null, 7, 13, 918, -1, 42)",30,0.9303272290340896,0.969513188155502,0.984756594077751,0.0,0.0,0.0,0.4700419739676015,0.5931679406483374,0.7965839703241687,0.0063,0.0174,0.0209,0.1619100385707533,0.2470534717042056,0.6235267358521028,0.0338,0.0992,0.1162,-74.5177812202612
5,"List(true, null, 14, log2, 20, 7, 302, -1, 42)",30,0.6666706659405661,0.818663469761002,0.909331734880501,0.0,0.0001,0.0003,0.462952777735641,0.5984748813788809,0.7992374406894405,0.0077,0.0151,0.0211,0.3239773495634272,0.4170589298604756,0.7085294649302378,0.0289,0.0535,0.0634,-49.05612070583405
6,"List(false, null, 10, null, 11, 9, 901, -1, 42)",28,0.5315774976187139,0.697369485119929,0.8486847425599645,0.0007,0.008,0.0116,0.3923207255046601,0.495803776119309,0.7479018880596545,0.0349,0.0279,0.0316,0.2620032292471809,0.3145825334971071,0.6572912667485535,0.0655,0.0818,0.0895,-54.89012063052813
7,"List(true, null, 9, log2, 9, 20, 536, -1, 42)",30,0.569035305688166,0.72676829810009,0.863384149050045,0.0005,0.0033,0.0068,0.4615892141379203,0.593477268058265,0.7967386340291325,0.0077,0.0158,0.0207,0.3319921511747352,0.4234092132600238,0.7117046066300119,0.0291,0.0497,0.0616,-41.74082518914272
8,"List(true, balanced, 11, sqrt, 12, 6, 143, -1, 42)",30,0.6453761196993282,0.7885173489254484,0.8942586744627242,0.0,0.0002,0.0005,0.4613088177547052,0.58966191723464,0.79483095861732,0.007,0.0163,0.0223,0.2914727365174392,0.3825069158732679,0.691253457936634,0.0331,0.0626,0.0721,-51.49036144930369
9,"List(false, balanced, 18, null, 20, 15, 367, -1, 42)",29,0.6890313361306388,0.8358180147888437,0.9179090073944218,0.0,0.0,0.0,0.3062767765123017,0.3770271592489338,0.6885135796244669,0.0474,0.0493,0.05,0.0887777534799914,0.0846135193159709,0.5423067596579855,0.1527,0.1527,0.1522,-89.87656190476497
10,"List(false, balanced, 5, sqrt, 2, 7, 680, -1, 42)",24,0.4379523945416836,0.5843404287194129,0.7921702143597065,0.0054,0.0159,0.0208,0.4405404586855295,0.5613713348363913,0.7806856674181957,0.0084,0.02,0.0262,0.3105140035768053,0.3942772102701712,0.6971386051350856,0.0349,0.0562,0.0705,-32.5261113398857


## objective_express com CatBoost


In [0]:
!pip install catboost

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Looking in indexes: https://svc_nexus_data_pipeline%40picpay.com:****@nexus-prod.limbo.work/repository/picpay-pypi-hosted/simple, https://pypi.org/simple/
Collecting catboost
  Downloading catboost-1.2.8-cp310-cp310-manylinux2014_x86_64.whl (99.2 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 99.2/99.2 MB 21.3 MB/s eta 0:00:00
Collecting graphviz
  Downloading graphviz-0.21-py3-none-any.whl (47 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 47.3/47.3 kB 10.5 MB/s eta 0:00:00
Installing collected packages: graphviz, catboost
Successfully installed catboost-1.2.8 graphviz-0.21
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import optuna
import pandas as pd
from catboost import CatBoostClassifier

def objective_express_catb(trial, X_train, X_valid, X_oot, y_train, y_valid, y_oot, iteration, results):
    """
    Função objetivo do Optuna para otimização de hiperparâmetros usando CatBoostClassifier.
    Mantém regras de negócio originais.
    """

    # Sugestão de hiperparâmetros pelo Optuna
    params = {
        # "iterations": trial.suggest_int("iterations", 200, 2000),
        "learning_rate": trial.suggest_float("learning_rate", 0.0001, 0.3, log=True),
        "depth": trial.suggest_int("depth", 2, 10),
        "l2_leaf_reg": trial.suggest_float("l2_leaf_reg", 1e-3, 10.0, log=True),
        "bagging_temperature": trial.suggest_float("bagging_temperature", 0.0, 10.0),
        "border_count": trial.suggest_int("border_count", 32, 255),
        "scale_pos_weight": trial.suggest_categorical('is_unbalance', [1.0, (len(y_train)-sum(y_train))/sum(y_train)]),
        "eval_metric": "AUC",
        "loss_function": "Logloss",
        "verbose": False,
        "random_seed": 42
    }

    # Criação do modelo
    model = CatBoostClassifier(**params, iterations=1000)


    # Treinamento com conjuntos de validação para early stopping
    model.fit(
        X_train, y_train,
        eval_set=(X_valid, y_valid),
        early_stopping_rounds=50,
        use_best_model=True
    )

    # Importância das features
    importance_df = pd.DataFrame({
        "feature": X_train.columns,
        "importance": model.get_feature_importance(type='PredictionValuesChange')
    }).sort_values(by="importance", ascending=False)

    total_gain = importance_df['importance'].sum()
    importance_df['cumulative_gain'] = importance_df['importance'].cumsum() / total_gain
    selected_features_max = importance_df[importance_df['cumulative_gain'] <= 0.98]['feature'].tolist()
    qtd_features = len(selected_features_max)

    # Previsões (probabilidade classe positiva)
    y_pred_train = model.predict_proba(X_train)[:, 1]
    y_pred_valid = model.predict_proba(X_valid)[:, 1]
    y_pred_oot = model.predict_proba(X_oot)[:, 1]
    # [:, 1]

    # Métricas de negócio (mantém função existente)
    ks_train, gini_train, auc_train, bad_decil10_train, bad_decil30_train, bad_decil40_train = calculate_metrics(y_train.values, y_pred_train)
    ks_valid, gini_valid, auc_valid, bad_decil10_valid, bad_decil30_valid, bad_decil40_valid = calculate_metrics(y_valid.values, y_pred_valid)
    ks_oot, gini_oot, auc_oot, bad_decil10_oot, bad_decil30_oot, bad_decil40_oot = calculate_metrics(y_oot.values, y_pred_oot)

    shift_oot = ((gini_oot - gini_train) / gini_train) * 100

    # Registro dos resultados
    results.append({
        "iteration": iteration,
        "params": params,
        'best_iteration': model.get_best_iteration(),
        "features_relevance": qtd_features,
        "ks2_train": ks_train,
        "gini_train": gini_train,
        "auc_train": auc_train,
        "bad_decil10_train": bad_decil10_train,
        "bad_decil30_train": bad_decil30_train,
        "bad_decil40_train": bad_decil40_train,
        "ks2_valid": ks_valid,
        "gini_valid": gini_valid,
        "auc_valid": auc_valid,
        "bad_decil10_valid": bad_decil10_valid,
        "bad_decil30_valid": bad_decil30_valid,
        "bad_decil40_valid": bad_decil40_valid,
        "ks2_oot": ks_oot,
        "gini_oot": gini_oot,
        "auc_oot": auc_oot,
        "bad_decil10_oot": bad_decil10_oot,
        "bad_decil30_oot": bad_decil30_oot,
        "bad_decil40_oot": bad_decil40_oot,
        'shift_oot': shift_oot,
    })

    return auc_valid


In [0]:
display(results_df)

iteration,params,best_iteration,features_relevance,ks2_train,gini_train,auc_train,bad_decil10_train,bad_decil30_train,bad_decil40_train,ks2_valid,gini_valid,auc_valid,bad_decil10_valid,bad_decil30_valid,bad_decil40_valid,ks2_oot,gini_oot,auc_oot,bad_decil10_oot,bad_decil30_oot,bad_decil40_oot,shift_oot
1,"List(0.6422841953296543, 35, 5, AUC, 1984, 0.5014425841096116, 0.006157872015824569, Logloss, 42, 1.0, false)",1983,31,0.4921886539524732,0.6441327022976151,0.8220663511488076,0.0054,0.0126,0.0172,0.4755306764421766,0.6204373784353379,0.8102186892176689,0.0049,0.0146,0.019,0.3294388878824641,0.4282941902547541,0.7141470951273771,0.0273,0.0495,0.0599,-33.5083921795877
2,"List(3.8476298650636522, 147, 2, AUC, 1176, 0.0031520147961594806, 0.22290413855785526, Logloss, 42, 8.596018735362998, false)",187,28,0.4851115535148653,0.633835242425957,0.8169176212129785,0.0031,0.0113,0.0163,0.4784034043831593,0.6184186759542536,0.8092093379771268,0.0056,0.0149,0.0192,0.31086638942121,0.3970499910188176,0.6985249955094088,0.0564,0.0607,0.0682,-37.35753955568351
3,"List(1.7140738494440522, 114, 5, AUC, 777, 0.025326547192515963, 0.01749784433159412, Logloss, 42, 1.0, false)",748,31,0.4999212933915752,0.6512261215468713,0.8256130607734357,0.0049,0.0117,0.0167,0.4751426283242101,0.6203570522085851,0.8101785261042925,0.0042,0.0149,0.019,0.3235554999053897,0.4209291390763112,0.7104645695381556,0.0293,0.0514,0.0618,-35.363597197779896
4,"List(1.5273372249339878, 108, 7, AUC, 1801, 0.006873025012483629, 0.0011440231268892722, Logloss, 42, 1.0, false)",1797,31,0.4628821548784721,0.6155881025947445,0.8077940512973723,0.0049,0.0132,0.0187,0.4526316826773136,0.5929178688301482,0.7964589344150741,0.0049,0.0167,0.0216,0.3147575135563303,0.3987468616791174,0.6993734308395587,0.0383,0.0598,0.0673,-35.22505389588053
5,"List(5.972729362051474, 34, 4, AUC, 1999, 0.004610219874245724, 0.024634921646486698, Logloss, 42, 8.596018735362998, false)",801,30,0.502984903689681,0.6570697079801227,0.8285348539900613,0.0026,0.0081,0.0136,0.4803107794014364,0.6198546404242804,0.8099273202121402,0.0049,0.0139,0.0197,0.3172910766050308,0.4132624102589359,0.706631205129468,0.0314,0.0512,0.0626,-37.10524085346547
6,"List(2.1865663965760165, 54, 9, AUC, 399, 0.03784685446329705, 0.007404596547430789, Logloss, 42, 8.596018735362998, false)",375,32,0.5835847319023678,0.7387475836488515,0.8693737918244258,0.0,0.0018,0.0039,0.4568083801348127,0.5957436687832203,0.7978718343916101,0.0077,0.0184,0.022,0.2888951570993948,0.3735821063601108,0.6867910531800554,0.05,0.0674,0.0744,-49.43034473088911
7,"List(1.5697680172641881, 101, 10, AUC, 1650, 0.3139740293193272, 0.011713671124899006, Logloss, 42, 8.596018735362998, false)",349,32,0.6747361676553757,0.8247935947026865,0.9123967973513432,0.0,0.0001,0.0007,0.4616696615884831,0.5964889768500135,0.7982444884250067,0.0091,0.0177,0.0218,0.2805688315198022,0.3628143325021828,0.6814071662510914,0.0499,0.0711,0.078,-56.01149974582834
8,"List(9.724315603790863, 206, 2, AUC, 1084, 3.084164318614105, 1.605137209826623E-4, Logloss, 42, 8.596018735362998, false)",273,2,0.3953454361036556,0.506706211252222,0.753353105626111,0.0134,0.0267,0.0314,0.4089759764390605,0.5250897211460124,0.7625448605730062,0.0146,0.0246,0.0256,0.2768329814091226,0.3280397824537329,0.6640198912268664,0.0573,0.0717,0.0813,-35.26035892809586
9,"List(7.509618329935769, 40, 5, AUC, 561, 0.046238852097586325, 4.2032708784603125E-4, Logloss, 42, 8.596018735362998, false)",559,19,0.409818792959646,0.5456106814106167,0.7728053407053084,0.0084,0.0239,0.027,0.426598687667607,0.5582186434082823,0.7791093217041412,0.007,0.0219,0.0239,0.3040175528861952,0.3785125878844011,0.6892562939422006,0.049,0.0627,0.0761,-30.625883843439755
10,"List(6.012595730533411, 140, 8, AUC, 1901, 0.0018447254338055071, 0.004792584264774349, Logloss, 42, 8.596018735362998, false)",802,32,0.5553056343495144,0.7137368969919955,0.8568684484959977,0.0002,0.0033,0.007,0.4650465591158149,0.5997299376859175,0.7998649688429588,0.0091,0.0172,0.0214,0.2906082076332867,0.375181049647747,0.6875905248238735,0.0522,0.0671,0.0737,-47.434264470713686


In [0]:
import plotly.express as px
import plotly.graph_objects as go

plot_best_model_express(results_df, metric="ks2",  label_x = 'train', label_y = 'valid')

## objective_express com MLPClassifier (rede neural simples)

In [0]:
import optuna
import pandas as pd
from sklearn.neural_network import MLPClassifier

def objective_express_mlp(trial, X_train, X_valid, X_oot, y_train, y_valid, y_oot, iteration, results):
    """
    Função objetivo do Optuna para otimização de hiperparâmetros usando MLPClassifier (rede neural).
    Mantém métricas e regras originais.
    """

    # Espaço de busca de hiperparâmetros
    params = {
        "hidden_layer_sizes": tuple([
            trial.suggest_int(f"layer_{i}_neurons", 10, 200) 
            for i in range(trial.suggest_int("n_layers", 1, 3))
        ]),
        "activation": trial.suggest_categorical("activation", ["relu", "tanh", "logistic"]),
        "solver": trial.suggest_categorical("solver", ["adam", "sgd"]),
        "alpha": trial.suggest_float("alpha", 1e-5, 1e-1, log=True),
        "learning_rate_init": trial.suggest_float("learning_rate_init", 1e-4, 1e-1, log=True),
        "max_iter": 500,
        "random_state": 42
    }

    # Treinamento
    model = MLPClassifier(**params)
    model.fit(X_train, y_train)

    # Importância das features (usando coeficientes absolutos da primeira camada como proxy)
    if hasattr(model, "coefs_"):
        importances = pd.Series(abs(model.coefs_[0]).sum(axis=1), index=X_train.columns)
    else:
        importances = pd.Series([0]*X_train.shape[1], index=X_train.columns)
    
    importance_df = pd.DataFrame({
        "feature": importances.index,
        "importance": importances.values
    }).sort_values(by="importance", ascending=False)

    total_gain = importance_df['importance'].sum()
    importance_df['cumulative_gain'] = importance_df['importance'].cumsum() / total_gain
    selected_features_max = importance_df[importance_df['cumulative_gain'] <= 0.98]['feature'].tolist()
    qtd_features = len(selected_features_max)

    # Predições
    y_pred_train = model.predict_proba(X_train)[:, 1]
    y_pred_valid = model.predict_proba(X_valid)[:, 1]
    y_pred_oot = model.predict_proba(X_oot)[:, 1]

    # Métricas
    ks_train, gini_train, auc_train, bad_decil10_train, bad_decil30_train, bad_decil40_train = calculate_metrics(y_train.values, y_pred_train)
    ks_valid, gini_valid, auc_valid, bad_decil10_valid, bad_decil30_valid, bad_decil40_valid = calculate_metrics(y_valid.values, y_pred_valid)
    ks_oot, gini_oot, auc_oot, bad_decil10_oot, bad_decil30_oot, bad_decil40_oot = calculate_metrics(y_oot.values, y_pred_oot)

    shift_oot = ((gini_oot - gini_train) / gini_train) * 100

    # Registro
    results.append({
        "iteration": iteration,
        "params": params,
        "features_relevance": qtd_features,
        "ks2_train": ks_train,
        "gini_train": gini_train,
        "auc_train": auc_train,
        "bad_decil10_train": bad_decil10_train,
        "bad_decil30_train": bad_decil30_train,
        "bad_decil40_train": bad_decil40_train,
        "ks2_valid": ks_valid,
        "gini_valid": gini_valid,
        "auc_valid": auc_valid,
        "bad_decil10_valid": bad_decil10_valid,
        "bad_decil30_valid": bad_decil30_valid,
        "bad_decil40_valid": bad_decil40_valid,
        "ks2_oot": ks_oot,
        "gini_oot": gini_oot,
        "auc_oot": auc_oot,
        "bad_decil10_oot": bad_decil10_oot,
        "bad_decil30_oot": bad_decil30_oot,
        "bad_decil40_oot": bad_decil40_oot,
        'shift_oot': shift_oot,
    })

    return auc_valid


In [0]:
display(results_df)

iteration,params,features_relevance,ks2_train,gini_train,auc_train,bad_decil10_train,bad_decil30_train,bad_decil40_train,ks2_valid,gini_valid,auc_valid,bad_decil10_valid,bad_decil30_valid,bad_decil40_valid,ks2_oot,gini_oot,auc_oot,bad_decil10_oot,bad_decil30_oot,bad_decil40_oot,shift_oot
1,"List(tanh, 1.673469756655084E-5, List(42, 19, 106), 0.0010624238067451503, 500, 42, sgd)",33,0.2791066676893512,0.3799392018306131,0.6899696009153066,0.0239,0.052,0.0543,0.2753730069254737,0.3617567793231342,0.6808783896615671,0.0293,0.0556,0.0582,0.2073266662141997,0.238751447061573,0.6193757235307865,0.0854,0.1085,0.1069,-37.16061782747686
2,"List(tanh, 0.01713839939010223, List(151, 12), 0.01455440405766502, 500, 42, sgd)",33,0.2814375135383797,0.3898532656598432,0.6949266328299216,0.0258,0.051,0.0532,0.2713871108898766,0.3738616966678199,0.6869308483339099,0.0209,0.0502,0.0553,0.1988628448487358,0.223975913962992,0.611987956981496,0.0911,0.116,0.1188,-42.54866287091292
3,"List(logistic, 0.002373466283144853, List(148, 165), 0.005919590596693202, 500, 42, adam)",30,0.3038334402275719,0.4274956728455683,0.7137478364227842,0.0183,0.0378,0.0482,0.3015159444630434,0.4204241682866412,0.7102120841433206,0.0202,0.0388,0.0492,0.2218104053741683,0.2692885468658517,0.6346442734329258,0.0731,0.104,0.1029,-37.00788944286427
4,"List(tanh, 0.043367127858688376, List(141, 25, 163), 0.008807455745044204, 500, 42, adam)",28,0.2707440791506832,0.3495046181065058,0.6747523090532529,0.0305,0.0609,0.059,0.2710416761548027,0.3551993786968772,0.6775996893484386,0.0286,0.0567,0.0572,0.2135975424703464,0.2520492689791849,0.6260246344895924,0.0886,0.1044,0.1032,-27.883851622705308
5,"List(tanh, 2.4000651665609507E-4, List(88, 30), 0.01272144263919466, 500, 42, adam)",31,0.2829061642678468,0.3894459594495401,0.69472297972477,0.0223,0.045,0.053,0.2818504541920782,0.3990686845168798,0.6995343422584399,0.0188,0.0414,0.0486,0.2272692633256509,0.2528982633234868,0.6264491316617434,0.0777,0.1079,0.1059,-35.06203949812593
6,"List(relu, 7.529244104062126E-4, List(107, 108), 2.936111778402719E-4, 500, 42, sgd)",32,0.2545603916900223,0.3020204928466019,0.651010246423301,0.0401,0.0675,0.0707,0.2259312022913215,0.262776544209121,0.6313882721045605,0.0502,0.0637,0.0703,0.0937564089104959,0.0066700797755112,0.5033350398877556,0.2193,0.1978,0.1773,-97.7915141742057
7,"List(relu, 7.800389490208145E-5, List(107), 0.0028065149644923492, 500, 42, adam)",31,0.3762001789333445,0.4857069215778979,0.742853460788949,0.0209,0.0357,0.0436,0.3720832271327931,0.4849739086246558,0.7424869543123279,0.0181,0.0325,0.0396,0.1618270365825779,0.1302801567690099,0.565140078384505,0.209,0.1553,0.1424,-73.17720811023783
8,"List(tanh, 0.06064849343590846, List(139, 144), 0.0133672169906271, 500, 42, adam)",26,0.2421885969847484,0.3172436979584778,0.6586218489792389,0.0452,0.0621,0.0621,0.2392747308765119,0.3184766035216233,0.6592383017608117,0.0418,0.06,0.0593,0.2004327844585692,0.226072824109359,0.6130364120546795,0.0862,0.1063,0.109,-28.73843497469621
9,"List(logistic, 6.203898974110009E-5, List(103, 155), 0.026007708091610578, 500, 42, adam)",31,0.2900646374033946,0.4017611184705814,0.7008805592352907,0.0223,0.0491,0.0515,0.2941201787951013,0.3922156535468051,0.6961078267734025,0.0237,0.049,0.0523,0.2248569294450782,0.2742100983412037,0.6371050491706018,0.0732,0.0996,0.0991,-31.747975168661704
10,"List(logistic, 0.0012887212280691528, List(134, 74, 54), 0.0207572686248065, 500, 42, sgd)",32,0.2857585838170007,0.3398218660265863,0.6699109330132932,0.0314,0.0597,0.0688,0.2782445629984267,0.3371663538578029,0.6685831769289015,0.0244,0.0556,0.0657,0.2107172022277443,0.26527300046766,0.63263650023383,0.0861,0.0951,0.1007,-21.937630568214804


In [0]:
import plotly.express as px
import plotly.graph_objects as go

plot_best_model_express(results_df, metric="ks2",  label_x = 'train', label_y = 'valid')

## Codigo para rodar modelos

In [0]:

import numpy as np
import pandas as pd
import optuna
from pyspark.sql.types import NumericType, BooleanType

def main(data_spark, data_spark_oot, list_features, target, dev, metric, n_trials):

    """
    Orquestra a preparação de dados, a seleção do modelo e a otimização
    de hiperparâmetros usando Optuna para um modelo de classificação.

    Args:
        data_spark (pyspark.sql.DataFrame): Dados de desenvolvimento (Treino/Validação).
        data_spark_oot (pyspark.sql.DataFrame): Dados Out-of-Time (OOT) para avaliação.
        list_features (list): Lista de colunas (features) a serem utilizadas no modelo.
        target (str): Nome da coluna target (binária).
        dev (str, optional): Nome da coluna booleana que indica o conjunto de desenvolvimento.
        metric (str): Métrica de avaliação a ser maximizada ('ks2', 'gini', 'auc').
        n_trials (int): Número de tentativas de otimização a serem executadas pelo Optuna.

    Returns:
        pandas.DataFrame: Um DataFrame contendo os resultados de cada trial de otimização,
                          incluindo os parâmetros testados e as métricas de desempenho.

    Raises:
        ValueError: Em caso de inconsistências nos dados, features, target ou métricas inválidas.

    Example:
        >>> results_df = main(
    #     data_spark=data_spark_cat_dev_blend,
    #     data_spark_oot=data_spark_cat_oot_blend,
    #     list_features=features_model,
    #     target="ever30reneg_mob3",
    #     dev="dev",
    #     metric="ks2",
    #     n_trials=50)
    
    """

    def menu():
        print("Escolha uma opção:")
        print("1. Modelo LightGBM")
        print("2. Modelo XGBoost")
        print("3. Modelo Random Forest")
        print("4. Modelo CatBoost")
        print("5. Modelo MLPclassifier")

    menu()
    modelo = int(input("Escolha uma opção: "))

    # Função para limpar labels e features (Pandas)
    def clean_labels_and_features(X_df, y_series):
        mask_labels = y_series.isin([0, 1]) & ~y_series.isna()
        X_clean = X_df[mask_labels].replace([np.inf, -np.inf], np.nan).dropna()
        y_clean = y_series[mask_labels].loc[X_clean.index]
        return X_clean, y_clean

    # -----------------------------
    # Validações com Spark DataFrame
    # -----------------------------
    list_not_in = [column for column in list_features if column not in data_spark.columns]
    list_not_in_oot = [column for column in list_features if column not in data_spark_oot.columns]
    if list_not_in:
        raise ValueError(f"Features {list_not_in} não estão no data_spark.")
    if list_not_in_oot:
        raise ValueError(f"Features {list_not_in_oot} não estão no data_spark_oot.")
    if target not in data_spark.columns:
        raise ValueError(f"Target {target} não está no data_spark.")
    if target not in data_spark_oot.columns:
        raise ValueError(f"Target {target} não está no data_spark_oot.")
    if dev is not None and dev not in data_spark.columns:
        raise ValueError(f"Dev {dev} não está no data_spark.")

    # target binário — ignorando nulos
    target_value = [row[target] for row in data_spark.select(target).distinct().collect()]
    target_value = [v for v in target_value if v is not None]
    if not all(value in [0, 1] for value in target_value):
        raise ValueError(f"Target {target} possui valores {target_value} — esperado só 0 e 1.")

    target_value_oot = [row[target] for row in data_spark_oot.select(target).distinct().collect()]
    target_value_oot = [v for v in target_value_oot if v is not None]
    if not all(value in [0, 1] for value in target_value_oot):
        raise ValueError(f"Target {target} possui valores {target_value_oot} — esperado só 0 e 1.")

    # validar tipo numérico das features
    type_num_columns = [f.name for f in data_spark.selectExpr(*list_features).schema.fields if isinstance(f.dataType, NumericType)]
    type_num_columns_oot = [f.name for f in data_spark_oot.selectExpr(*list_features).schema.fields if isinstance(f.dataType, NumericType)]
    type_num_list_not_in = [column for column in list_features if column not in type_num_columns]
    type_num_list_not_in_oot = [column for column in list_features if column not in type_num_columns_oot]
    if type_num_list_not_in:
        raise ValueError(f"Features {type_num_list_not_in} não são NumericType no data_spark.")
    if type_num_list_not_in_oot:
        raise ValueError(f"Features {type_num_list_not_in_oot} não são NumericType no data_spark_oot.")

    # métricas válidas
    list_possible_metrics = ['ks2', 'gini', 'auc']
    if metric.lower() not in list_possible_metrics:
        raise ValueError(f"Métrica {metric} inválida. Escolha {list_possible_metrics}.")
    metric = metric.lower()

    if n_trials <= 0:
        raise ValueError("n_trials deve ser > 0.")

    if dev is not None:
        type_bool_columns = [f.name for f in data_spark.selectExpr(dev).schema.fields if isinstance(f.dataType, BooleanType)]
        if type_bool_columns != [dev]:
            raise ValueError(f"A coluna {dev} precisa ser booleana.")

    # -----------------------------
    # Conversão para Pandas — removendo linhas com target nulo
    # -----------------------------
    data_pandas = data_spark.filter(f"{target} IS NOT NULL").toPandas()
    data_pandas_oot = data_spark_oot.filter(f"{target} IS NOT NULL").toPandas()

    X = data_pandas[list_features]
    y = data_pandas[target]
    X_oot = data_pandas_oot[list_features]
    y_oot = data_pandas_oot[target]

    from sklearn.model_selection import train_test_split
    X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, random_state=42)

    X_train, y_train = clean_labels_and_features(X_train, y_train)
    X_valid, y_valid = clean_labels_and_features(X_valid, y_valid)
    X_oot, y_oot = clean_labels_and_features(X_oot, y_oot)

    # -----------------------------
    # Otimização com Optuna
    # -----------------------------
    results = []
    study = optuna.create_study(direction="maximize")

    if modelo == 1:
        nome_modelo = 'LightGBMClassifier'
        for i in range(1, n_trials + 1):
            study.optimize(
                lambda trial: objective_express(trial, X_train, X_valid, X_oot, y_train, y_valid, y_oot, iteration=i, results=results),
                n_trials=1,
                show_progress_bar=False
            )
    elif modelo == 2:
        nome_modelo = 'XGBoostClassifier'
        for i in range(1, n_trials + 1):
            study.optimize(
                lambda trial: objective_express_xgb(trial, X_train, X_valid, X_oot, y_train, y_valid, y_oot, iteration=i, results=results),
                n_trials=1,
                show_progress_bar=False
            )
    elif modelo == 3:
        nome_modelo = 'RandomForestClassifier'
        for i in range(1, n_trials + 1):
            study.optimize(
                lambda trial: objective_express_RdF(trial, X_train, X_valid, X_oot, y_train, y_valid, y_oot, iteration=i, results=results),
                n_trials=1,
                show_progress_bar=False
            )
    elif modelo == 4:
        nome_modelo = 'CatBoostClassifier'
        for i in range(1, n_trials + 1):
            study.optimize(
                lambda trial: objective_express_catb(trial, X_train, X_valid, X_oot, y_train, y_valid, y_oot, iteration=i, results=results),
                n_trials=1,
                show_progress_bar=False
            )
    elif modelo == 5:
        nome_modelo = 'MLPclassifier'
        for i in range(1, n_trials + 1):
            study.optimize(
                lambda trial: objective_express_mlp(trial, X_train, X_valid, X_oot, y_train, y_valid, y_oot, iteration=i, results=results),
                n_trials=1,
                show_progress_bar=False
            )

    best_params = study.best_params
    print("Melhores parâmetros:", best_params)

    results_df = pd.DataFrame(results)
    results_df["model_champion"] = nome_modelo
    return results_df




In [0]:
features_model = ['flag_low_risk', 'pc_rto_daybalgt50_12m_mov', 'income_range', 'risco_final_flag_12m', 
    'pc_rt_amtotindebtprodcsjamtotindebtprodcc_r00_scr', 'am_min_bal_01m_mov', 'risco_final_flag_3m', 'ct_tot_regdev_r00_reg', 
    'pc_rt_amtotindebtamtottotcrdlim_r00_scr', 'ct_tot_payp2binsur_06m_trx', 'ct_max_pay00to03hr_12m_trx', 'lb_schmcarddflt_r00_reg', 
    'ct_tot_fi_r00_scr', 'am_min_bal_12m_mov', 'pc_rt_amtotcrdlimnetinc_r00_scr', 'am_tot_veryrecdueprodcc_r00_scr', 
    'pc_rt_amtotindebtprodcsjamtotindebt_r00_scr', 'am_min_payp2b_12m_trx', 'am_min_bal_06m_mov', 
    'pc_rt_amtotindebtprodccramtotindebtprodcc_r00_scr', 'months_since_last_restriction', 'lb_zipcode_r00_geo', 
    'pc_rto_amtotactcsinflsumactcsinoutfl_03m_mov', 'months_with_restriction_last_24m', 'vl_agecon_r00_reg', 
    'am_avg_payp2ppixext_12m_trx', 'ct_avg2_hrbwfstlstacsappwkend_06m_dla', 'ct_tot_pay00to03hr_03m_trx', 
    'months_since_first_restriction', 'pc_rto_amtotactcsinflsumactcsinoutfl_12m_mov', 'am_avg_payp2bpixintextbal_12m_trx', 
    'lb_max_lbmaxdaydebt_03m_scr', 'am_tot_payp2bbillbal_12m_trx', 'am_avg_cdbinvestwthdrwl_12m_trx']

path_root = "s3://picpay-credit-laboratory/modeling/modeling_ds_credit/third_generation_models/05.hard_transaction/02.Model_Transactions_Hard/"

join_blend = spark.read.parquet(path_root + 'tb_publicos_modelagem_full_fe_vn_final_v4_premodel_blend_atualizado')

data_spark_cat_dev_blend = join_blend.filter(f.col("fl_sample") == 1).filter(f.col("dev") == True)
data_spark_cat_oot_blend = join_blend.filter(f.col("ref_concessao") <= "2024-08").filter(f.col("ref_concessao") >= "2024-07").filter(f.col("dev") == False)

data_pandas = data_spark_cat_dev_blend.toPandas()
data_pandas_oot = data_spark_cat_oot_blend.toPandas()

results_df = main(
    data_spark=data_spark_cat_dev_blend,
    data_spark_oot=data_spark_cat_oot_blend,
    list_features=features_model,
    target="ever30reneg_mob3",
    dev="dev",
    metric="ks2",
    n_trials=50
)


Escolha uma opção:
1. Modelo LightGBM
2. Modelo XGBoost
3. Modelo Random Forest
4. Modelo CatBoost
5. Modelo MLPclassifier


Escolha uma opção:  4

[I 2025-11-10 18:54:02,390] A new study created in memory with name: no-name-c3dead35-9bc8-405e-b9f3-ffd150b984d8
[I 2025-11-10 18:54:11,492] Trial 0 finished with value: 0.7892448397955877 and parameters: {'learning_rate': 0.013288863982834069, 'depth': 2, 'l2_leaf_reg': 1.307705144308288, 'bagging_temperature': 1.905249276270421, 'border_count': 235, 'is_unbalance': 8.608877721943049}. Best is trial 0 with value: 0.7892448397955877.
[I 2025-11-10 18:54:22,887] Trial 1 finished with value: 0.797371242374358 and parameters: {'learning_rate': 0.009823682148978295, 'depth': 5, 'l2_leaf_reg': 3.418716918455993, 'bagging_temperature': 7.078315749267303, 'border_count': 203, 'is_unbalance': 1.0}. Best is trial 1 with value: 0.797371242374358.
[I 2025-11-10 18:54:33,782] Trial 2 finished with value: 0.7969580920661519 and parameters: {'learning_rate': 0.009318535041222245, 'depth': 5, 'l2_leaf_reg': 0.003355191424017271, 'bagging_temperature': 7.986545619543326, 'border_count': 105, 'is_unba

Melhores parâmetros: {'learning_rate': 0.017528505658689502, 'depth': 7, 'l2_leaf_reg': 0.5567566962570129, 'bagging_temperature': 5.541377978761911, 'border_count': 163, 'is_unbalance': 1.0}


In [0]:
path_root = "s3://picpay-credit-laboratory/modeling/modeling_ds_credit/third_generation_models/05.hard_transaction/02.Model_Transactions_Hard/"
results_df_spark = spark.createDataFrame(results_df)
# results_df_spark.write.mode('overwrite').parquet(path_root + 'projeto_machine_learning/result_df_xgboost')
# results_df_spark.write.mode('overwrite').parquet(path_root + 'projeto_machine_learning/result_df_random_forest')
# results_df_spark.write.mode('overwrite').parquet(path_root + 'projeto_machine_learning/result_df_catboost')
# results_df_spark.write.mode('overwrite').parquet(path_root + 'projeto_machine_learning/result_df_mlp')

In [0]:
import plotly.express as px
import plotly.graph_objects as go
import pandas 

path_root = "s3://picpay-credit-laboratory/modeling/modeling_ds_credit/third_generation_models/05.hard_transaction/02.Model_Transactions_Hard/"
result_df_xgboost = spark.read.parquet(path_root + 'projeto_machine_learning/result_df_xgboost')
result_df_random_forest = spark.read.parquet(path_root + 'projeto_machine_learning/result_df_random_forest')
result_df_catboost = spark.read.parquet(path_root + 'projeto_machine_learning/result_df_catboost')
result_df_mlp = spark.read.parquet(path_root + 'projeto_machine_learning/result_df_mlp')


In [0]:
display(result_df_mlp)

iteration,params,features_relevance,ks2_train,gini_train,auc_train,bad_decil10_train,bad_decil30_train,bad_decil40_train,ks2_valid,gini_valid,auc_valid,bad_decil10_valid,bad_decil30_valid,bad_decil40_valid,ks2_oot,gini_oot,auc_oot,bad_decil10_oot,bad_decil30_oot,bad_decil40_oot,shift_oot,model_champion
44,"List(logistic, 4.122644773608433E-4, List(84), 1.51997889441261E-4, 500, 42, adam)",32,0.455233393995353,0.5914143007044836,0.7957071503522418,0.0091,0.0203,0.0252,0.4087710346371794,0.5377449555624452,0.7688724777812226,0.0132,0.0246,0.0307,0.2658780459970969,0.3564610876752532,0.6782305438376266,0.0438,0.0694,0.0799,-39.727347267280784,MLPclassifier
45,"List(logistic, 8.964761298368503E-4, List(57), 2.1325526066506872E-4, 500, 42, adam)",32,0.4516797545420612,0.5933433561775168,0.7966716780887584,0.0101,0.02,0.0241,0.4129507268468282,0.543831759503264,0.771915879751632,0.0132,0.0251,0.027,0.2513194611080064,0.3476801505494391,0.6738400752747196,0.0476,0.068,0.0813,-41.40321165989091,MLPclassifier
30,"List(logistic, 2.5720537991260294E-4, List(65), 1.0695861360637855E-4, 500, 42, adam)",32,0.4554612784029517,0.597452766670298,0.798726383335149,0.0106,0.0192,0.0242,0.4064203009008078,0.5478550375480116,0.7739275187740058,0.0098,0.0207,0.0249,0.2670340543770845,0.3601471366159252,0.6800735683079626,0.0456,0.0629,0.0753,-39.71956333500904,MLPclassifier
31,"List(logistic, 2.6993617883102575E-4, List(92), 0.0028449513986108245, 500, 42, adam)",32,0.3436446860606537,0.4736401871457671,0.7368200935728836,0.0166,0.0366,0.0423,0.3181572839697548,0.4428592035687835,0.7214296017843917,0.0167,0.0388,0.0424,0.2443638020894766,0.3047438297685831,0.6523719148842916,0.0598,0.088,0.0898,-35.65921177317764,MLPclassifier
33,"List(logistic, 7.708355023172051E-4, List(65), 2.859576460154476E-4, 500, 42, adam)",32,0.4358087485467011,0.5719335641293446,0.7859667820646723,0.0098,0.0225,0.0285,0.3938574338173237,0.5113437408498249,0.7556718704249125,0.0091,0.0288,0.0336,0.2493539838732626,0.3423497163023139,0.671174858151157,0.0546,0.0704,0.0816,-40.14169865629875,MLPclassifier
34,"List(logistic, 0.0035749068307531114, List(115), 1.0055054531429562E-4, 500, 42, adam)",32,0.4559361566473131,0.5996814001934971,0.7998407000967486,0.0085,0.0182,0.0233,0.4101375325100329,0.5479454628050777,0.7739727314025389,0.0105,0.0209,0.0279,0.2573565996105091,0.3506657904348529,0.6753328952174265,0.0459,0.0695,0.0806,-41.52465120283791,MLPclassifier
41,"List(logistic, 0.00699278194532281, List(142), 3.216920034061814E-4, 500, 42, adam)",32,0.473010163716016,0.6231025335336384,0.8115512667668192,0.0066,0.0173,0.0226,0.3901541965931207,0.5134180316574519,0.756709015828726,0.0105,0.0267,0.0323,0.2629264325735037,0.3403326397596227,0.6701663198798113,0.048,0.0706,0.0809,-45.38095715490303,MLPclassifier
42,"List(logistic, 0.002132002655064362, List(45), 1.0067792578007182E-4, 500, 42, adam)",32,0.4417862494262706,0.5763541124969009,0.7881770562484505,0.0134,0.0228,0.0266,0.4061602206376272,0.5358906995470127,0.7679453497735064,0.0126,0.0239,0.0305,0.2770120997616008,0.3667330488908342,0.6833665244454171,0.0534,0.0653,0.0757,-36.370186151346985,MLPclassifier
22,"List(logistic, 4.889215099119325E-5, List(28), 1.009639526472691E-4, 500, 42, adam)",31,0.4328435723559783,0.5715297593234776,0.7857648796617388,0.0131,0.0217,0.0258,0.4029407585387279,0.5347246981518801,0.76736234907594,0.0146,0.0249,0.0279,0.2396165352604102,0.3305608693235955,0.6652804346617978,0.0577,0.0724,0.0838,-42.16208973704503,MLPclassifier
23,"List(logistic, 5.56785629295E-5, List(34), 1.2901224187613746E-4, 500, 42, adam)",32,0.4372566003432553,0.5715701083067892,0.7857850541533946,0.0136,0.0224,0.0283,0.4052264722093043,0.5367547451730137,0.7683773725865068,0.0139,0.0221,0.0281,0.2545546986538068,0.3462105253220189,0.6731052626610095,0.0526,0.0692,0.0806,-39.42816107937698,MLPclassifier


In [0]:

result_df_xgboost = result_df_xgboost.toPandas()
plot_best_model_express(result_df_xgboost, metric="ks2",  label_x = 'train', label_y = 'valid')

In [0]:

result_df_random_forest = result_df_random_forest.toPandas()
plot_best_model_express(result_df_random_forest, metric="ks2",  label_x = 'train', label_y = 'valid')

In [0]:

result_df_catboost = result_df_catboost.toPandas()
plot_best_model_express(result_df_catboost, metric="ks2",  label_x = 'train', label_y = 'valid')

In [0]:

result_df_mlp = result_df_mlp.toPandas()
plot_best_model_express(result_df_mlp, metric="ks2",  label_x = 'train', label_y = 'valid')

In [0]:
import pyspark.sql.functions as f
display(result_df_mlp.where(f.col("iteration") == 17))


iteration,params,features_relevance,ks2_train,gini_train,auc_train,bad_decil10_train,bad_decil30_train,bad_decil40_train,ks2_valid,gini_valid,auc_valid,bad_decil10_valid,bad_decil30_valid,bad_decil40_valid,ks2_oot,gini_oot,auc_oot,bad_decil10_oot,bad_decil30_oot,bad_decil40_oot,shift_oot,model_champion
17,"List(logistic, 1.748443066868105E-5, List(10, 131), 8.119841390123543E-4, 500, 42, adam)",29,0.4119980426229566,0.5355564943403128,0.7677782471701564,0.0148,0.0271,0.0328,0.3858148607451041,0.5068151578566631,0.7534075789283315,0.0126,0.0277,0.0343,0.2450852978731565,0.3088880238679627,0.6544440119339814,0.0636,0.0948,0.0962,-42.323914072138265,MLPclassifier


In [0]:

# # Função para realizar a busca de hiperparâmetros via Optuna
# def objective_express_V3(trial, X_train, X_valid, X_oot, y_train, y_valid, y_oot, iteration, results):

#     """
#     Define o objetivo para a otimização do Optuna usando LightGBM.

#     Args:
#         trial (optuna.trial.Trial): Objeto que controla as iterações do Optuna.
#         X_train (pandas.DataFrame): Dados de treinamento.
#         X_valid (pandas.DataFrame): Dados de validação.
#         y_train (pandas.Series): Target para os dados de treinamento.
#         y_valid (pandas.Series): Target para os dados de validação.
#         iteration (int): Número atual da iteração de treinamento.
#         results (dict): Dicionário para armazenar os resultados das métricas.

#     Returns:
#         float: Valor da métrica de avaliação escolhida (ex.: KS ou AUC).

#     Raises:
#         ValueError: Se `X_train` e `X_valid` tiverem colunas inconsistentes.
#         ValueError: Se o tamanho de `X_train` não coincidir com `y_train`.

#     Example:
#         >>> study.optimize(lambda trial: objective_express(trial, X_train, X_valid, y_train, y_valid, iteration=1, results={}), n_trials=10)
#     """

#     is_unbalance_list = trial.suggest_categorical('is_unbalance', [True, False])

    
#     # Lista para armazenar os resultados
#     params = {
#         "objective": "binary",
#         "metric": "binary_logloss",
#         "boosting_type": trial.suggest_categorical('boosting_type', ['gbdt', 'dart']),
#         'is_unbalance': is_unbalance_list,
#         "learning_rate": trial.suggest_float("learning_rate", 0.0001, 0.3, log = True),
#         "num_leaves": trial.suggest_int("num_leaves", 2, 20),
#         "max_depth": trial.suggest_int("max_depth", 2, 5),
#         "min_child_samples": trial.suggest_int("min_child_samples", 5, 100),
#         # "min_child_weight": trial.suggest_float("min_child_weight", 1e-3, 1e-1, log = True),
#         "subsample": trial.suggest_float("subsample", 0.6, 1.0),
#         "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
#         "reg_alpha": trial.suggest_float("reg_alpha", 1e-3, 10.0, log = True),
#         "reg_lambda": trial.suggest_float("reg_lambda", 1e-3, 10.0, log = True),
#         "verbose": -1,
#         "seed": 42,
#     }


#     # Criação dos datasets de treino e validação
#     train_data = lgb.Dataset(X_train, label=y_train)
#     valid_data = lgb.Dataset(X_valid, label=y_valid, reference=train_data)
#     oot_data = lgb.Dataset(X_oot, label=y_oot, reference=train_data)

#     # Treinamento com early stopping
#     model = lgb.train(
#         params,
#         train_data,
#         num_boost_round=1000,
#         valid_sets=[valid_data],
#         callbacks=[lgb.early_stopping(stopping_rounds=10)],
#     )

#     #Avaliando quantidade de features dentro de um range de acm pelo menos de 98%
#     # Calcular a importância das features
#     feature_importance = model.feature_importance(importance_type='gain')
#     features = model.feature_name()
        
#     importance_df = pd.DataFrame({
#         'feature': features,
#         'importance': feature_importance
#     })
    
#     importance_df = importance_df.sort_values(by='importance', ascending=False)
    
#     total_gain = importance_df['importance'].sum()
#     importance_df['cumulative_gain'] = importance_df['importance'].cumsum() / total_gain

#     # Selecionar as features com ganho acumulado até 95%
#     selected_features_max = importance_df[importance_df['cumulative_gain'] <= 0.98]['feature'].tolist()
#     qtd_feautures = len(selected_features_max)

#     # Previsões para treino e validação/teste
#     y_pred_train = model.predict(X_train, num_iteration=model.best_iteration)
#     y_pred_valid = model.predict(X_valid, num_iteration=model.best_iteration)
#     y_pred_oot = model.predict(X_oot, num_iteration=model.best_iteration)

#     # Calcular métricas para treino
#     ks_train, gini_train, auc_train, bad_decil10_train = calculate_metrics(y_train.values, y_pred_train)

#     # Calcular métricas para validação/teste
#     ks_valid, gini_valid, auc_valid, bad_decil10_valid = calculate_metrics(y_valid.values, y_pred_valid)

#     # Calcular métricas para OOT
#     ks_oot, gini_oot, auc_oot, bad_decil10_oot = calculate_metrics(y_oot.values, y_pred_oot)

#     # Armazenar os resultados
#     results.append({
#         "iteration": iteration,
#         "params": params,
#         "features_relevance": qtd_feautures,
#         "ks2_train": ks_train,
#         "gini_train": gini_train,
#         "auc_train": auc_train,
#         "bad_decil10_train": bad_decil10_train,
#         "ks2_valid": ks_valid,
#         "gini_valid": gini_valid,
#         "auc_valid": auc_valid,
#         "bad_decil10_valid": bad_decil10_valid,
#         "ks2_oot": ks_oot,
#         "gini_oot": gini_oot,
#         "auc_oot": auc_oot,
#         "bad_decil10_oot": bad_decil10_oot,
#     })

#     return auc_valid  # Maximizar o AUC na validação/teste

In [0]:
# def plot_best_model_exp(metrics_result, metric="ks2"):
#     fig = px.scatter(
#         metrics_result,
#         x=f"{metric}_train",
#         y=f"{metric}_valid",
#         color="features_relevance",  # Cor de cada ponto de acordo com a iteração
#         hover_data=["iteration", "bad_decil10_train", "bad_decil10_valid"],  # Exibe o número da iteração ao passar o mouse
#         labels={f"{metric}_train": f"{metric.upper()} Train",
#                 f"{metric}_valid": f"{metric.upper()} Valid/Test",
#                 "features_relevance": "Feature Relevance",
#                 "bad_decil10_train": "BadRate Top10% Train",
#                 "bad_decil10_valid": "BadRate Top10% Valid/Test"},

#         title=f"{metric.upper()} Train vs. {metric.upper()} Valid"
#     )

#     # Ajustando o layout do gráfico
#     fig.update_traces(marker=dict(size=12, line=dict(width=2, color='DarkSlateGrey')))
#     fig.update_layout(title=f"{metric.upper()} Train vs. {metric.upper()} Valid", title_x=0.5)

#     fig.show()


In [0]:
def plot_best_model_express(metrics_result, metric="ks2",  label_x = 'train', label_y = 'valid'):
    fig = px.scatter(
        metrics_result,
        x=f"{metric}_{label_x}",
        y=f"{metric}_{label_y}",
        color="features_relevance",  # Cor de cada ponto de acordo com a iteração
        hover_data=[f"{metric}_oot", "iteration", "bad_decil10_train", "bad_decil10_valid", 'bad_decil10_oot', "bad_decil30_train", "bad_decil30_valid", 'bad_decil30_oot'],  # Exibe o número da iteração ao passar o mouse
        labels={f"{metric}_{label_x}": f"{metric.upper()} {label_x}",
                f"{metric}_{label_y}": f"{metric.upper()} {label_y}/Test",
                f"{metric}_oot": f"{metric.upper()} OOT",
                "features_relevance": "Feature Relevance",
                f"bad_decil10_{label_x}": f"BadRate Top10% {label_x}",
                f"bad_decil10_{label_y}": f"BadRate Top10% {label_y}/Test",
                f"bad_decil10_oot": f"BadRate Top10% OOT",
                f"bad_decil30_{label_x}": f"BadRate Top30% {label_x}",
                f"bad_decil30_{label_y}": f"BadRate Top30% {label_y}/Test",
                f"bad_decil30_oot": f"BadRate Top30% OOT",
                },

        title=f"{metric.upper()} {label_x} vs. {metric.upper()} {label_y}"
    )

    # Ajustando o layout do gráfico
    fig.update_traces(marker=dict(size=12, line=dict(width=2, color='DarkSlateGrey')))
    fig.update_layout(title=f"{metric.upper()} {label_x} vs. {metric.upper()} {label_y}", title_x=0.5)

    fig.show()


In [0]:

# def model_lgbm_express_v2(data_spark, data_spark_oot, list_features, target, dev = None, n_trials = 20, metric = "ks2", max_nsample = 100000):
    
#     """
#     Treina e otimiza um modelo LightGBM utilizando Optuna para busca de hiperparâmetros, com suporte a métricas de avaliação como KS2, Gini e AUC.

#     Parameters
#     ----------
#     data_spark : pyspark.sql.DataFrame
#         DataFrame Spark contendo os dados de entrada para o treinamento do modelo.
    
#     list_features : list of str
#         Lista com os nomes das colunas que serão utilizadas como features no modelo.
    
#     target : str
#         Nome da coluna alvo (variável dependente). Deve ser binária com valores 0 e 1.
    
#     dev : str, optional
#         Nome da coluna de desenvolvimento (booleano), indicando as linhas que serão utilizadas no treinamento. 
#         Se não for especificada, toda a base será usada. Por padrão, `None`.
    
#     n_trials : int, optional
#         Número de iterações para a otimização dos hiperparâmetros. Deve ser maior que 0. Por padrão, 20.
    
#     metric : str, optional
#         Métrica utilizada para avaliar o modelo. As opções disponíveis são:
#         - `'ks2'` (Kolmogorov-Smirnov)
#         - `'gini'`
#         - `'auc'` (Área sob a curva ROC)
#         Por padrão, `'ks2'`.
    
#     max_nsample : int, optional
#         Número máximo de amostras utilizadas para o treinamento. Caso o dataset tenha mais registros, será feita uma amostragem aleatória.
#         Por padrão, 100000.

#     Returns
#     -------
#     tuple
#         - **best_params**: dict
#             Dicionário com os melhores hiperparâmetros encontrados pela otimização.
#         - **results_df**: pandas.DataFrame
#             DataFrame contendo os resultados detalhados de cada iteração da busca por hiperparâmetros.

#     Raises
#     ------
#     ValueError
#         - Se `list_features` contiver colunas que não estão presentes no `data_spark`.
#         - Se o `target` não for encontrado no `data_spark`.
#         - Se `target` não for binário com valores 0 e 1.
#         - Se `list_features` contiver colunas que não sejam numéricas.
#         - Se a métrica especificada em `metric` não estiver entre as opções permitidas.
#         - Se `n_trials` for menor ou igual a 0.
#         - Se a coluna `dev` (caso especificada) não for do tipo booleano.

#     Notes
#     -----
#     - O processo realiza uma validação dos dados e assegura que os tipos das colunas estejam corretos antes de iniciar o treinamento.
#     - Os resultados são apresentados por meio de um gráfico da métrica de avaliação especificada.
#     - A otimização usa uma amostragem do dataset se o número de registros exceder o valor de `max_nsample`.

#     Examples
#     --------
#     >>> best_params, results_df = model_lgbm_express(
#     ...     data_spark=data_spark,
#     ...     list_features=["feature1", "feature2", "feature3"],
#     ...     target="target",
#     ...     dev="is_dev",
#     ...     n_trials=10,
#     ...     metric="gini",
#     ...     max_nsample=50000
#     ... )
#     >>> print(best_params)
#     >>> print(results_df.head())
#     """
    

#     #######Validacoes#######
#     #validando se a lista esta presente na tabela de input
#     list_not_in = [column for column in list_features if column not in data_spark.columns]

#     list_not_in_oot = [column for column in list_features if column not in data_spark_oot.columns]

#     if list_not_in != []:
#         raise ValueError(f"A lista de features {list_not_in} do parâmetro list_features para o processo de modelagem não esta presente na tabela de input, especificado no parâmetro data_spark.")

#     if list_not_in_oot != []:
#         raise ValueError(f"A lista de features {list_not_in_oot} do parâmetro list_features para o processo de modelagem não esta presente na tabela de input, especificado no parâmetro data_spark.")

#     if target not in data_spark.columns:
#         raise ValueError(f"A variavel {target} especificada no parâmetro target não esta presente no tabela de input, especificado no parâmetro data_spark.")

#     if target not in data_spark_oot.columns:
#         raise ValueError(f"A variavel {target} especificada no parâmetro target não esta presente no tabela de input, especificado no parâmetro data_spark.")


#     if dev != None:
#         if dev not in data_spark.columns:
#             raise ValueError(f"A variavel {dev} especificada no parâmetro dev não esta presente no tabela de input, especificado no parâmetro data_spark.")


#     #validando se o target é binario
#     target_value  = data_spark.select(target).distinct().rdd.flatMap(lambda row: row).collect()
#     result_bool = all(value in [1, 0] for value in target_value)

#     #validando se o target é binario
#     target_value_oot  = data_spark_oot.select(target).distinct().rdd.flatMap(lambda row: row).collect()
#     result_bool_oot = all(value in [1, 0] for value in target_value_oot)

#     if result_bool == False:
#         raise ValueError(f"A variavel {target} especificada no parâmetro target possui os seguintes valores {target_value}, logo o processo binning foi desenvolvido para problemas de classificação assumindo somente valores (0,1).")

#     if result_bool_oot == False:
#         raise ValueError(f"A variavel {target} especificada no parâmetro target possui os seguintes valores {target_value_oot}, logo o processo binning foi desenvolvido para problemas de classificação assumindo somente valores (0,1).")


#     #Verificando se as colunas listadas como numericas são numericas no dataset
#     type_num_columns = [col.name for col in data_spark.selectExpr(*list_features).schema.fields  if isinstance(col.dataType, NumericType)]

#     type_num_columns_oot = [col.name for col in data_spark_oot.selectExpr(*list_features).schema.fields  if isinstance(col.dataType, NumericType)]


#     #comparando lista numerica vs. lista dataset numerica
#     type_num_list_not_in = [column for column in list_features if column not in type_num_columns]

#     type_num_list_not_in_oot = [column for column in list_features if column not in type_num_columns_oot]

#     if type_num_list_not_in != []:
#         raise ValueError(f"A lista de features do parâmetro list_features precisam ser do tipo NumericType. As features {type_num_list_not_in} estão presentes na lista mas não são do tipo  NumericType na tabela de input, especificado no parâmetro data_spark.")

#     if type_num_list_not_in_oot != []:
#         raise ValueError(f"A lista de features do parâmetro list_features precisam ser do tipo NumericType. As features {type_num_list_not_in_oot} estão presentes na lista mas não são do tipo  NumericType na tabela de input, especificado no parâmetro data_spark.")

#     list_possible_metrics = ['ks2', 'gini', 'auc']
#     # Verifica se a métrica é válida, ignorando case sensitivity
#     metric_result_valid = [m for m in list_possible_metrics if m == metric.lower()]

#     # Levanta um erro se nenhuma correspondência for encontrada
#     if not metric_result_valid: 
#         raise ValueError(f"A métrica '{metric}' especificada no parâmetro 'metric' não está presente nas possibilidades: {list_possible_metrics}. "f"Escolha uma das opções disponíveis.")

#     metric = metric.lower()

#     if n_trials <= 0:
#         raise ValueError("O valor do n_trials tem que ser maior do que 0.")

#     if dev != None:
#         type_bool_columns = [col.name for col in data_spark.selectExpr(dev).schema.fields if isinstance(col.dataType, BooleanType)]

#         if type_bool_columns != [dev]:
#             raise ValueError(f"A feature {dev} definida no parâmetro dev precisa ser do tipo booleano (True e False)")


#     #marcacao dos resultados
#     results = []

#     #transformando data set em pandas
#     data_pandas = data_spark.toPandas()
#     data_pandas_oot = data_spark_oot.toPandas()

#     # dev_lgbm_exp = 'dev_lgbm_exp'
#     data_pandas['dev_lgbm_exp'] = True


#     if dev is None:
#         print("Não foi especificada uma coluna para treinar o processo de categorização, logo será utilizado `100%` da sua base para criar o modelo modelagem.")
 
    
#     data_pandas = data_pandas[data_pandas[dev] == True]

#     # Avaliando tamanho da amostra
#     sizedata = len(data_pandas)
#     # Validando o tamanho da amostra
#     if sizedata > max_nsample:
#         print(f"O dataframe definido no parâmetro data_spark tem um tamanho de {sizedata}. Foi selecionada uma amostra com tamanho {max_nsample} para o processo de modelagem")
        
#         data_pandas = data_pandas.sample(frac=max_nsample/sizedata, random_state=42)


#     X_train, X_valid, y_train, y_valid = train_test_split(data_pandas[list_features], data_pandas[target], test_size=0.2, random_state=42)

#     X_oot = data_pandas_oot[list_features]
#     y_oot = data_pandas_oot[target]

#     # Otimização via Optuna com índice da iteração
#     study = optuna.create_study(direction="minimize")

#     for i in range(1, (n_trials + 1)):
#         study.optimize(lambda trial, iteration=i: objective_express_V2(trial, X_train, X_valid, X_oot, y_train, y_valid, y_oot, iteration, results), n_trials=1, show_progress_bar=False)
#         pass

#     # Melhores parâmetros
#     best_params = study.best_params
#     # print("Melhores parâmetros:", best_params)

#     # Converter os resultados em um DataFrame para análise
#     results_df = pd.DataFrame(results)
#     # print(results_df)

#     plot_best_model_exp(metrics_result = results_df, metric = metric)

#     return best_params, results_df


In [0]:
#antigo v3
def model_lgbm_express(data_spark, data_spark_oot, list_features, target, dev = None, n_trials = 20, metric = "ks2", max_nsample = 100000):
    
    """
    Treina e otimiza um modelo LightGBM utilizando Optuna para busca de hiperparâmetros, com suporte a métricas de avaliação como KS2, Gini e AUC.

    Parameters
    ----------
    data_spark : pyspark.sql.DataFrame
        DataFrame Spark contendo os dados de entrada para o treinamento do modelo.
    
    list_features : list of str
        Lista com os nomes das colunas que serão utilizadas como features no modelo.
    
    target : str
        Nome da coluna alvo (variável dependente). Deve ser binária com valores 0 e 1.
    
    dev : str, optional
        Nome da coluna de desenvolvimento (booleano), indicando as linhas que serão utilizadas no treinamento. 
        Se não for especificada, toda a base será usada. Por padrão, `None`.
    
    n_trials : int, optional
        Número de iterações para a otimização dos hiperparâmetros. Deve ser maior que 0. Por padrão, 20.
    
    metric : str, optional
        Métrica utilizada para avaliar o modelo. As opções disponíveis são:
        - `'ks2'` (Kolmogorov-Smirnov)
        - `'gini'`
        - `'auc'` (Área sob a curva ROC)
        Por padrão, `'ks2'`.
    
    max_nsample : int, optional
        Número máximo de amostras utilizadas para o treinamento. Caso o dataset tenha mais registros, será feita uma amostragem aleatória.
        Por padrão, 100000.

    Returns
    -------
    tuple
        - **best_params**: dict
            Dicionário com os melhores hiperparâmetros encontrados pela otimização.
        - **results_df**: pandas.DataFrame
            DataFrame contendo os resultados detalhados de cada iteração da busca por hiperparâmetros.

    Raises
    ------
    ValueError
        - Se `list_features` contiver colunas que não estão presentes no `data_spark`.
        - Se o `target` não for encontrado no `data_spark`.
        - Se `target` não for binário com valores 0 e 1.
        - Se `list_features` contiver colunas que não sejam numéricas.
        - Se a métrica especificada em `metric` não estiver entre as opções permitidas.
        - Se `n_trials` for menor ou igual a 0.
        - Se a coluna `dev` (caso especificada) não for do tipo booleano.

    Notes
    -----
    - O processo realiza uma validação dos dados e assegura que os tipos das colunas estejam corretos antes de iniciar o treinamento.
    - Os resultados são apresentados por meio de um gráfico da métrica de avaliação especificada.
    - A otimização usa uma amostragem do dataset se o número de registros exceder o valor de `max_nsample`.

    Examples
    --------
    >>> best_params, results_df = model_lgbm_express(
    ...     data_spark=data_spark,
    ...     list_features=["feature1", "feature2", "feature3"],
    ...     target="target",
    ...     dev="is_dev",
    ...     n_trials=10,
    ...     metric="gini",
    ...     max_nsample=50000
    ... )
    >>> print(best_params)
    >>> print(results_df.head())
    """
    

    #######Validacoes#######
    #validando se a lista esta presente na tabela de input
    list_not_in = [column for column in list_features if column not in data_spark.columns]

    list_not_in_oot = [column for column in list_features if column not in data_spark_oot.columns]

    if list_not_in != []:
        raise ValueError(f"A lista de features {list_not_in} do parâmetro list_features para o processo de modelagem não esta presente na tabela de input, especificado no parâmetro data_spark.")

    if list_not_in_oot != []:
        raise ValueError(f"A lista de features {list_not_in_oot} do parâmetro list_features para o processo de modelagem não esta presente na tabela de input, especificado no parâmetro data_spark.")

    if target not in data_spark.columns:
        raise ValueError(f"A variavel {target} especificada no parâmetro target não esta presente no tabela de input, especificado no parâmetro data_spark.")

    if target not in data_spark_oot.columns:
        raise ValueError(f"A variavel {target} especificada no parâmetro target não esta presente no tabela de input, especificado no parâmetro data_spark.")


    if dev != None:
        if dev not in data_spark.columns:
            raise ValueError(f"A variavel {dev} especificada no parâmetro dev não esta presente no tabela de input, especificado no parâmetro data_spark.")


    #validando se o target é binario
    target_value  = data_spark.select(target).distinct().rdd.flatMap(lambda row: row).collect()
    result_bool = all(value in [1, 0] for value in target_value)

    #validando se o target é binario
    target_value_oot  = data_spark_oot.select(target).distinct().rdd.flatMap(lambda row: row).collect()
    result_bool_oot = all(value in [1, 0] for value in target_value_oot)

    if result_bool == False:
        raise ValueError(f"A variavel {target} especificada no parâmetro target possui os seguintes valores {target_value}, logo o processo binning foi desenvolvido para problemas de classificação assumindo somente valores (0,1).")

    if result_bool_oot == False:
        raise ValueError(f"A variavel {target} especificada no parâmetro target possui os seguintes valores {target_value_oot}, logo o processo binning foi desenvolvido para problemas de classificação assumindo somente valores (0,1).")


    #Verificando se as colunas listadas como numericas são numericas no dataset
    type_num_columns = [col.name for col in data_spark.selectExpr(*list_features).schema.fields  if isinstance(col.dataType, NumericType)]

    type_num_columns_oot = [col.name for col in data_spark_oot.selectExpr(*list_features).schema.fields  if isinstance(col.dataType, NumericType)]


    #comparando lista numerica vs. lista dataset numerica
    type_num_list_not_in = [column for column in list_features if column not in type_num_columns]

    type_num_list_not_in_oot = [column for column in list_features if column not in type_num_columns_oot]

    if type_num_list_not_in != []:
        raise ValueError(f"A lista de features do parâmetro list_features precisam ser do tipo NumericType. As features {type_num_list_not_in} estão presentes na lista mas não são do tipo  NumericType na tabela de input, especificado no parâmetro data_spark.")

    if type_num_list_not_in_oot != []:
        raise ValueError(f"A lista de features do parâmetro list_features precisam ser do tipo NumericType. As features {type_num_list_not_in_oot} estão presentes na lista mas não são do tipo  NumericType na tabela de input, especificado no parâmetro data_spark.")

    list_possible_metrics = ['ks2', 'gini', 'auc']
    # Verifica se a métrica é válida, ignorando case sensitivity
    metric_result_valid = [m for m in list_possible_metrics if m == metric.lower()]

    # Levanta um erro se nenhuma correspondência for encontrada
    if not metric_result_valid: 
        raise ValueError(f"A métrica '{metric}' especificada no parâmetro 'metric' não está presente nas possibilidades: {list_possible_metrics}. "f"Escolha uma das opções disponíveis.")

    metric = metric.lower()

    if n_trials <= 0:
        raise ValueError("O valor do n_trials tem que ser maior do que 0.")

    if dev != None:
        type_bool_columns = [col.name for col in data_spark.selectExpr(dev).schema.fields if isinstance(col.dataType, BooleanType)]

        if type_bool_columns != [dev]:
            raise ValueError(f"A feature {dev} definida no parâmetro dev precisa ser do tipo booleano (True e False)")


    #marcacao dos resultados
    results = []

    #transformando data set em pandas
    data_pandas = data_spark.toPandas()
    data_pandas_oot = data_spark_oot.toPandas()

    # dev_lgbm_exp = 'dev_lgbm_exp'
    data_pandas['dev_lgbm_exp'] = True


    if dev is None:
        print("Não foi especificada uma coluna para treinar o processo de categorização, logo será utilizado `100%` da sua base para criar o modelo modelagem.")
 
    
    data_pandas = data_pandas[data_pandas[dev] == True]

    # Avaliando tamanho da amostra
    sizedata = len(data_pandas)
    # Validando o tamanho da amostra
    if sizedata > max_nsample:
        print(f"O dataframe definido no parâmetro data_spark tem um tamanho de {sizedata}. Foi selecionada uma amostra com tamanho {max_nsample} para o processo de modelagem")
        
        data_pandas = data_pandas.sample(frac=max_nsample/sizedata, random_state=42)


    X_train, X_valid, y_train, y_valid = train_test_split(data_pandas[list_features], data_pandas[target], test_size=0.2, random_state=42)

    X_oot = data_pandas_oot[list_features]
    y_oot = data_pandas_oot[target]

    # Otimização via Optuna com índice da iteração
    study = optuna.create_study(direction="maximize")

    for i in range(1, (n_trials + 1)):
        study.optimize(lambda trial, iteration=i: objective_express(trial, X_train, X_valid, X_oot, y_train, y_valid, y_oot, iteration, results), n_trials=1, show_progress_bar=False)
        pass

    # Melhores parâmetros
    best_params = study.best_params
    # print("Melhores parâmetros:", best_params)

    # Converter os resultados em um DataFrame para análise
    results_df = pd.DataFrame(results)
    # print(results_df)

    plot_best_model_express(metrics_result = results_df, metric = metric)

    return best_params, results_df


In [0]:

# def model_lgbm_express(data_spark, list_features, target, dev = None, n_trials = 20, metric = "ks2", max_nsample = 100000):
    
#     """
#     Treina e otimiza um modelo LightGBM utilizando Optuna para busca de hiperparâmetros, com suporte a métricas de avaliação como KS2, Gini e AUC.

#     Parameters
#     ----------
#     data_spark : pyspark.sql.DataFrame
#         DataFrame Spark contendo os dados de entrada para o treinamento do modelo.
    
#     list_features : list of str
#         Lista com os nomes das colunas que serão utilizadas como features no modelo.
    
#     target : str
#         Nome da coluna alvo (variável dependente). Deve ser binária com valores 0 e 1.
    
#     dev : str, optional
#         Nome da coluna de desenvolvimento (booleano), indicando as linhas que serão utilizadas no treinamento. 
#         Se não for especificada, toda a base será usada. Por padrão, `None`.
    
#     n_trials : int, optional
#         Número de iterações para a otimização dos hiperparâmetros. Deve ser maior que 0. Por padrão, 20.
    
#     metric : str, optional
#         Métrica utilizada para avaliar o modelo. As opções disponíveis são:
#         - `'ks2'` (Kolmogorov-Smirnov)
#         - `'gini'`
#         - `'auc'` (Área sob a curva ROC)
#         Por padrão, `'ks2'`.
    
#     max_nsample : int, optional
#         Número máximo de amostras utilizadas para o treinamento. Caso o dataset tenha mais registros, será feita uma amostragem aleatória.
#         Por padrão, 100000.

#     Returns
#     -------
#     tuple
#         - **best_params**: dict
#             Dicionário com os melhores hiperparâmetros encontrados pela otimização.
#         - **results_df**: pandas.DataFrame
#             DataFrame contendo os resultados detalhados de cada iteração da busca por hiperparâmetros.

#     Raises
#     ------
#     ValueError
#         - Se `list_features` contiver colunas que não estão presentes no `data_spark`.
#         - Se o `target` não for encontrado no `data_spark`.
#         - Se `target` não for binário com valores 0 e 1.
#         - Se `list_features` contiver colunas que não sejam numéricas.
#         - Se a métrica especificada em `metric` não estiver entre as opções permitidas.
#         - Se `n_trials` for menor ou igual a 0.
#         - Se a coluna `dev` (caso especificada) não for do tipo booleano.

#     Notes
#     -----
#     - O processo realiza uma validação dos dados e assegura que os tipos das colunas estejam corretos antes de iniciar o treinamento.
#     - Os resultados são apresentados por meio de um gráfico da métrica de avaliação especificada.
#     - A otimização usa uma amostragem do dataset se o número de registros exceder o valor de `max_nsample`.

#     Examples
#     --------
#     >>> best_params, results_df = model_lgbm_express(
#     ...     data_spark=data_spark,
#     ...     list_features=["feature1", "feature2", "feature3"],
#     ...     target="target",
#     ...     dev="is_dev",
#     ...     n_trials=10,
#     ...     metric="gini",
#     ...     max_nsample=50000
#     ... )
#     >>> print(best_params)
#     >>> print(results_df.head())
#     """
    

#     #######Validacoes#######
#     #validando se a lista esta presente na tabela de input
#     list_not_in = [column for column in list_features if column not in data_spark.columns]

#     if list_not_in != []:
#         raise ValueError(f"A lista de features {list_not_in} do parâmetro list_features para o processo de modelagem não esta presente na tabela de input, especificado no parâmetro data_spark.")

#     if target not in data_spark.columns:
#         raise ValueError(f"A variavel {target} especificada no parâmetro target não esta presente no tabela de input, especificado no parâmetro data_spark.")

#     if dev != None:
#         if dev not in data_spark.columns:
#             raise ValueError(f"A variavel {dev} especificada no parâmetro dev não esta presente no tabela de input, especificado no parâmetro data_spark.")


#     #validando se o target é binario
#     target_value  = data_spark.select(target).distinct().rdd.flatMap(lambda row: row).collect()
#     result_bool = all(value in [1, 0] for value in target_value)

#     if result_bool == False:
#         raise ValueError(f"A variavel {target} especificada no parâmetro target possui os seguintes valores {target_value}, logo o processo binning foi desenvolvido para problemas de classificação assumindo somente valores (0,1).")


#     #Verificando se as colunas listadas como numericas são numericas no dataset
#     type_num_columns = [col.name for col in data_spark.selectExpr(*list_features).schema.fields  if isinstance(col.dataType, NumericType)]
#     #comparando lista numerica vs. lista dataset numerica
#     type_num_list_not_in = [column for column in list_features if column not in type_num_columns]

#     if type_num_list_not_in != []:
#         raise ValueError(f"A lista de features do parâmetro list_features precisam ser do tipo NumericType. As features {type_num_list_not_in} estão presentes na lista mas não são do tipo  NumericType na tabela de input, especificado no parâmetro data_spark.")

#     list_possible_metrics = ['ks2', 'gini', 'auc']
#     # Verifica se a métrica é válida, ignorando case sensitivity
#     metric_result_valid = [m for m in list_possible_metrics if m == metric.lower()]

#     # Levanta um erro se nenhuma correspondência for encontrada
#     if not metric_result_valid: 
#         raise ValueError(f"A métrica '{metric}' especificada no parâmetro 'metric' não está presente nas possibilidades: {list_possible_metrics}. "f"Escolha uma das opções disponíveis.")

#     metric = metric.lower()

#     if n_trials <= 0:
#         raise ValueError("O valor do n_trials tem que ser maior do que 0.")

#     if dev != None:
#         type_bool_columns = [col.name for col in data_spark.selectExpr(dev).schema.fields if isinstance(col.dataType, BooleanType)]

#         if type_bool_columns != [dev]:
#             raise ValueError(f"A feature {dev} definida no parâmetro dev precisa ser do tipo booleano (True e False)")


#     #marcacao dos resultados
#     results = []

#     #transformando data set em pandas
#     data_pandas = data_spark.toPandas()

#     # dev_lgbm_exp = 'dev_lgbm_exp'
#     data_pandas['dev_lgbm_exp'] = True


#     if dev is None:
#         print("Não foi especificada uma coluna para treinar o processo de categorização, logo será utilizado `100%` da sua base para criar o modelo modelagem.")
 
    
#     data_pandas = data_pandas[data_pandas[dev] == True]

#     # Avaliando tamanho da amostra
#     sizedata = len(data_pandas)
#     # Validando o tamanho da amostra
#     if sizedata > max_nsample:
#         print(f"O dataframe definido no parâmetro data_spark tem um tamanho de {sizedata}. Foi selecionada uma amostra com tamanho {max_nsample} para o processo de modelagem")
        
#         data_pandas = data_pandas.sample(frac=max_nsample/sizedata, random_state=42)


#     X_train, X_valid, y_train, y_valid = train_test_split(data_pandas[list_features], data_pandas[target], test_size=0.2, random_state=42)

#     # Otimização via Optuna com índice da iteração
#     study = optuna.create_study(direction="maximize")

#     for i in range(1, (n_trials + 1)):
#         study.optimize(lambda trial, iteration=i: objective_express(trial, X_train, X_valid, y_train, y_valid, iteration, results), n_trials=1, show_progress_bar=False)
#         pass

#     # Melhores parâmetros
#     best_params = study.best_params
#     # print("Melhores parâmetros:", best_params)

#     # Converter os resultados em um DataFrame para análise
#     results_df = pd.DataFrame(results)
#     # print(results_df)

#     plot_best_model_exp(metrics_result = results_df, metric = metric)

#     return best_params, results_df


In [0]:
# ETAPA RFE

# Função para realizar a seleção de features
def feature_selection_lgb(params, data_pandas, data_pandas_oot, list_features, target, threshold = 0.95, min_diff=3):
    """
    Realiza a seleção iterativa de features utilizando o LightGBM com base na importância acumulada e métricas de performance.

    Parameters
    ----------
    params : dict
        Dicionário com os hiperparâmetros para o modelo LightGBM.
    
    data_pandas : pandas.DataFrame
        DataFrame contendo os dados de entrada. Todas as colunas, exceto a coluna alvo, serão consideradas como features.
    
    target : str
        Nome da coluna alvo (variável dependente). Deve ser binária com valores 0 e 1.
    
    threshold : float, optional
        Limite de ganho acumulado para a seleção de features. Features com ganho acumulado abaixo deste limite serão mantidas.
        Por padrão, 0.95 (95% do ganho acumulado).
    
    min_diff : int, optional
        Número mínimo de features a serem removidas em cada iteração para continuar o processo. 
        Caso a diferença seja menor que este valor, a seleção é interrompida. Por padrão, 3.

    Returns
    -------
    list of dict
        Histórico das iterações, onde cada elemento é um dicionário contendo:
        - 'iteration': número da iteração.
        - 'lengh_features_model': quantidade de features no modelo na iteração.
        - 'ks2_train': valor do KS (Kolmogorov-Smirnov) para o conjunto de treino.
        - 'gini_train': valor do Gini para o conjunto de treino.
        - 'auc_train': valor da AUC para o conjunto de treino.
        - 'bad_decil10_train': valor da badrate dos 10% melhores scores para o conjunto de treino.
        - 'ks2_valid': valor do KS para o conjunto de validação.
        - 'gini_valid': valor do Gini para o conjunto de validação.
        - 'auc_valid': valor da AUC para o conjunto de validação.
        - 'bad_decil10_val': valor da badrate dos 10% melhores scores para o conjunto de validação.
        - 'selected_features': lista de features selecionadas para a próxima iteração.
        - 'removed_features': lista de features removidas na iteração.

    Raises
    ------
    ValueError
        - Se `target` não estiver presente no `data_pandas`.
        - Se a coluna alvo não for binária com valores 0 e 1.

    Notes
    -----
    - O processo utiliza o LightGBM para calcular a importância das features com base no ganho acumulado (importance_type='gain').
    - A seleção continua até que a diferença no número de features entre iterações seja menor que `min_diff`.
    - A validação do modelo é realizada em cada iteração com um split de 80% treino e 20% validação.
    - As métricas KS, Gini e AUC são calculadas para os conjuntos de treino e validação.

    Examples
    --------
    >>> params = {
    ...     'objective': 'binary',
    ...     'metric': 'auc',
    ...     'boosting_type': 'gbdt',
    ...     'learning_rate': 0.01,
    ...     'num_leaves': 31,
    ...     'seed': 42
    ... }
    >>> history = feature_selection_lgb(
    ...     params=params,
    ...     data_pandas=data_pandas,
    ...     target="target_column",
    ...     threshold=0.95,
    ...     min_diff=5
    ... )
    >>> print(history[-1]['selected_features'])
    """

    selected_features = data_pandas[list_features].columns.tolist()  # Começar com todas as features
    feature_history = []
    
    while True:
        X = data_pandas[selected_features]
        y = data_pandas[target]

        X_oot = data_pandas_oot[selected_features]
        y_oot = data_pandas_oot[target]
        
        
        # Dividir os dados em treino e validação
        X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, random_state=42)
        
        train_data = lgb.Dataset(X_train, label=y_train)
        valid_data = lgb.Dataset(X_valid, label=y_valid)
        oot_data = lgb.Dataset(X_oot, label=y_oot)

        
        model = lgb.train(params, train_data, num_boost_round=1000, valid_sets=[valid_data],
                          callbacks=[lgb.early_stopping(stopping_rounds=10)])
        
        # Calcular a importância das features
        feature_importance = model.feature_importance(importance_type='gain')
        # features = model.feature_name()

        importance_df = pd.DataFrame({
            'feature': selected_features,
            'importance': feature_importance
        })
        
        importance_df = importance_df.sort_values(by='importance', ascending=False)
        
        total_gain = importance_df['importance'].sum()
        importance_df['cumulative_gain'] = importance_df['importance'].cumsum() / total_gain

        # display(importance_df)
        
        # Selecionar as features com ganho acumulado até 95%
        selected_features_next = importance_df[importance_df['cumulative_gain'] <= threshold]['feature'].tolist()
        
        # Identificar as features removidas
        removed_features = list(set(selected_features) - set(selected_features_next))
        
        # Previsões do modelo
        y_pred_train = model.predict(X_train, num_iteration=model.best_iteration)
        y_pred_valid = model.predict(X_valid, num_iteration=model.best_iteration)
        y_pred_oot = model.predict(X_oot, num_iteration=model.best_iteration)

        
        # Calcular métricas para treino
        ks_train, gini_train, auc_train, bad_decil10_train, bad_decil30_train, bad_decil40_train = calculate_metrics(y_train.values, y_pred_train)

        # Calcular métricas para validação/teste
        ks_valid, gini_valid, auc_valid, bad_decil10_val, bad_decil30_val, bad_decil40_val = calculate_metrics(y_valid.values, y_pred_valid)

        # Calcular métricas para validação/teste
        ks_oot, gini_oot, auc_oot, bad_decil10_oot, bad_decil30_oot, bad_decil40_oot = calculate_metrics(y_oot.values, y_pred_oot)
        
        # Armazenar os resultados da interação
        feature_history.append({
            'iteration': len(feature_history) + 1,
            'best_iteration': model.best_iteration,
            'lengh_features_model': len(selected_features),
            'ks2_train': ks_train,
            'gini_train': gini_train,
            'auc_train': auc_train,
            'bad_decil10_train': bad_decil10_train,
            'bad_decil30_train': bad_decil30_train,
            'bad_decil40_train': bad_decil40_train,
            'ks2_valid': ks_valid,
            'gini_valid': gini_valid,
            'auc_valid': auc_valid,
            'bad_decil10_val': bad_decil10_val,
            'bad_decil30_val': bad_decil30_val,
            'bad_decil40_val': bad_decil40_val,
            'ks2_oot': ks_oot,
            'gini_oot': gini_oot,
            'auc_oot': auc_oot,
            'bad_decil10_oot': bad_decil10_oot,
            'bad_decil30_oot': bad_decil30_oot,
            'bad_decil40_oot': bad_decil40_oot,
            'selected_features': selected_features_next,
            'removed_features': removed_features
        })
        
        # Verificar se a diferença de features entre iterações é menor que 3
        if len(selected_features) - len(selected_features_next) < min_diff:
            break
        
        selected_features = selected_features_next

    for result in feature_history:
        print(f"Iteração {result['iteration']}:")
        print(f"  Quantidade de Features do Modelo nessa iteração : {result['lengh_features_model']} features")
        print(f"  KS (Treino): {result['ks2_train']:.4f}")
        print(f"  Gini (Treino): {result['gini_train']:.4f}")
        print(f"  AUC (Treino): {result['auc_train']:.4f}")
        print(f"  BadRate10Decil (Treino): {result['bad_decil10_train']:.4f}")
        print(f"  BadRate30Decil (Treino): {result['bad_decil30_train']:.4f}")
        print(f"  BadRate40Decil (Treino): {result['bad_decil40_train']:.4f}")
        print(f"  KS (Validação): {result['ks2_valid']:.4f}")
        print(f"  Gini (Validação): {result['gini_valid']:.4f}")
        print(f"  AUC (Validação): {result['auc_valid']:.4f}")
        print(f"  BadRate10Decil (Validação): {result['bad_decil10_val']:.4f}")
        print(f"  BadRate30Decil (Validação): {result['bad_decil30_val']:.4f}")
        print(f"  BadRate40Decil (Validação): {result['bad_decil40_val']:.4f}")
        print(f"  KS (OOT): {result['ks2_oot']:.4f}")
        print(f"  Gini (OOT): {result['gini_oot']:.4f}")
        print(f"  AUC (OOT): {result['auc_oot']:.4f}")
        print(f"  BadRate10Decil (OOT): {result['bad_decil10_oot']:.4f}")
        print(f"  BadRate30Decil (OOT): {result['bad_decil30_oot']:.4f}")
        print(f"  BadRate40Decil (OOT): {result['bad_decil40_oot']:.4f}")
        print(f"  Selecionadas Prox. Iteração: {len(result['selected_features'])} features")
        print(f"  Removidas Prox. Iteração: {len(result['removed_features'])} features")
        print(f"  Nome das Features Selecionadas Prox. Iteração: {result['selected_features']}")
        print(f"  Nome das Features Removidas Prox. Iteração: {result['removed_features']}")
        print("-" * 50)

    
    return feature_history



In [0]:

def plot_interactive_metric(feature_history, metric = "ks2"):
    """
    Plota um gráfico interativo de KS por iteração usando Plotly,
    exibindo a quantidade de features ao posicionar o mouse nos pontos.

    Args:
        feature_history (list): Lista de dicionários contendo os resultados de cada iteração.
    """
    # Extrair dados
    iterations = [result['iteration'] for result in feature_history]
    ks_train = [result[f'{metric}_train'] * 100 for result in feature_history]  # Converter para porcentagem
    ks_valid = [result[f'{metric}_valid'] * 100 for result in feature_history]  # Converter para porcentagem
    ks_oot = [result[f'{metric}_oot'] * 100 for result in feature_history]  # Converter para porcentagem


    num_features = [(result['lengh_features_model']) for result in feature_history]

    bad_decil10_train = [result['bad_decil10_train'] * 100 for result in feature_history]  # Converter para porcentagem
    bad_decil30_train = [result['bad_decil30_train'] * 100 for result in feature_history]  # Converter para porcentagem
    bad_decil40_train = [result['bad_decil40_train'] * 100 for result in feature_history]  # Converter para porcentagem


    bad_decil10_val = [result['bad_decil10_val'] * 100 for result in feature_history]  # Converter para porcentagem
    bad_decil30_val = [result['bad_decil30_val'] * 100 for result in feature_history]  # Converter para porcentagem
    bad_decil40_val = [result['bad_decil40_val'] * 100 for result in feature_history]  # Converter para porcentagem


    bad_decil10_oot = [result['bad_decil10_oot'] * 100 for result in feature_history]  # Converter para porcentagem
    bad_decil30_oot = [result['bad_decil30_oot'] * 100 for result in feature_history]  # Converter para porcentagem
    bad_decil40_oot = [result['bad_decil40_oot'] * 100 for result in feature_history]  # Converter para porcentagem


    # Criar o gráfico
    fig = go.Figure()

    # Adicionar linha KS Treino
    fig.add_trace(go.Scatter(
        x=iterations,
        y=ks_train,
        mode='lines+markers',
        name=f'{metric.upper()} Treino',
        marker=dict(size=10, color='blue'),
        line=dict(width=2, color='blue'),
        hoverinfo='text',
        text=[f'Iteração: {i}<br>{metric} Treino: {ks:.0f}%<br>Features: {n}<br>BadTopTrain10%: {top10:.2f}%<br>BadTopTrain30%: {top30:.2f}%<br>BadTopTrain40%: {top40:.2f}%' for i, ks, n, top10, top30, top40 in zip(iterations, ks_train, num_features, bad_decil10_train, bad_decil30_train, bad_decil40_train)]
    ))

    # Adicionar linha KS Validação
    fig.add_trace(go.Scatter(
        x=iterations,
        y=ks_valid,
        mode='lines+markers',
        name=f'{metric.upper()} Validação',
        marker=dict(size=10, color='green'),
        line=dict(width=2, color='green'),
        hoverinfo='text',
        text=[f'Iteração: {i}<br>{metric} Validação: {ks:.0f}%<br>Features: {n}<br>BadTopVal10%: {top10:.2f}%<br>BadTopVal30%: {top30:.2f}%<br>BadTopVal40%: {top40:.2f}%' for i, ks, n, top10, top30, top40 in zip(iterations, ks_train, num_features, bad_decil10_val, bad_decil30_val, bad_decil40_val)]
    ))

    # Adicionar linha KS OOT
    fig.add_trace(go.Scatter(
        x=iterations,
        y=ks_oot,
        mode='lines+markers',
        name=f'{metric.upper()} OOT',
        marker=dict(size=10, color='red'),
        line=dict(width=2, color='red'),
        hoverinfo='text',
        text=[f'Iteração: {i}<br>{metric} OOT: {ks:.0f}%<br>Features: {n}<br>BadTopOOT10%: {top10:.2f}%<br>BadTopOOT30%: {top30:.2f}%<br>BadTopOOT40%: {top40:.2f}%' for i, ks, n, top10, top30, top40 in zip(iterations, ks_train, num_features, bad_decil10_oot, bad_decil30_oot, bad_decil40_oot)]
        # text=[f'Iteração: {i}<br>{metric} OOT: {ks:.0f}%<br>Features: {n}<br>BadTopVal10%: {top10:.2f}%' for i, ks, n, top10 in zip(iterations, ks_oot, num_features, bad_decil10_oot)]
    ))


    # Configurar layout do gráfico
    fig.update_layout(
        title=dict(
            text=f'Evolução do {metric.upper()} por Iteração',
            x=0.5,  # Centraliza o título no eixo X
            xanchor='center',
            font=dict(size=20, color='black')
        ),
        xaxis=dict(
            title='Interações',
            tickmode='linear',
            showgrid=False,  # Remove a grade no eixo X
            zeroline=False   # Remove a linha zero
        ),
        yaxis=dict(
            title=f'{metric.upper()} (%)',
            tickformat=',.0f',  # Formata os valores do eixo Y como números inteiros
            showgrid=False,  # Remove a grade no eixo Y
            zeroline=False   # Remove a linha zero
        ),
        template='plotly_white',
        plot_bgcolor='rgba(0,0,0,0)',  # Fundo transparente
        paper_bgcolor='white',  # Fundo do papel branco
        legend=dict(
            orientation="h",  # Legenda na horizontal
            yanchor="bottom",
            y=1.02,  # Posiciona a legenda acima do gráfico
            xanchor="center",
            x=0.5,  # Centraliza a legenda
            font=dict(size=12)
        ),
        hovermode='x unified',
        font=dict(size=14)
    )

    # Exibir gráfico
    fig.show()


In [0]:
def fs_rfe_lgbm(data_spark, data_spark_oot, list_features, target, params, dev = None, threshold = 0.98, min_diff = 3, metric = "ks2", max_nsample = 100000):
    
    """
    Realiza a seleção iterativa de features utilizando o LightGBM em um conjunto de dados Spark,
    com base em métricas de importância acumulada e desempenho do modelo.

    Parameters
    ----------
    data_spark : pyspark.sql.DataFrame
        DataFrame do Spark contendo os dados de entrada.
    
    list_features : list of str
        Lista das colunas que serão utilizadas como features para o modelo.
    
    target : str
        Nome da coluna alvo (variável dependente). Deve ser binária, contendo apenas os valores 0 e 1.
    
    params : dict
        Dicionário contendo os hiperparâmetros para o modelo LightGBM.
    
    dev : str, optional
        Nome da coluna utilizada para indicar os dados de treino/validação. Deve ser do tipo booleano. 
        Se não especificado, todo o conjunto será usado. Por padrão, None.
    
    threshold : float, optional
        Limite do ganho acumulado para a seleção de features. Features com ganho acumulado abaixo deste valor serão mantidas.
        Valor padrão é 0.98 (98% do ganho acumulado).
    
    min_diff : int, optional
        Número mínimo de features a serem removidas em cada iteração. 
        O processo para quando a diferença é menor que este valor. Por padrão, 3.
    
    metric : str, optional
        Métrica de avaliação utilizada para o processo interativo. Opções disponíveis: 'ks2', 'gini', 'auc'. 
        Valor padrão é 'ks2'.

    max_nsample : int, optional
        Número máximo de amostras a serem utilizadas para o processo de modelagem. 
        Se o conjunto de dados exceder este limite, será realizada uma amostragem. Por padrão, 100000.

    Returns
    -------
    list of dict
        Histórico das iterações de seleção de features, contendo para cada iteração:
        - 'iteration': número da iteração.
        - 'lengh_features_model': quantidade de features no modelo na iteração.
        - 'ks2_train': KS (Kolmogorov-Smirnov) no conjunto de treino.
        - 'gini_train': Gini no conjunto de treino.
        - 'auc_train': AUC no conjunto de treino.
        - 'ks2_valid': KS no conjunto de validação.
        - 'gini_valid': Gini no conjunto de validação.
        - 'auc_valid': AUC no conjunto de validação.
        - 'selected_features': lista de features selecionadas para a próxima iteração.
        - 'removed_features': lista de features removidas na iteração.

    Raises
    ------
    ValueError
        - Se `list_features` ou `target` não estiverem presentes em `data_spark`.
        - Se `target` não for binária com valores 0 e 1.
        - Se alguma feature em `list_features` não for numérica.
        - Se `metric` não for uma das opções válidas ('ks2', 'gini', 'auc').
        - Se `threshold` estiver fora do intervalo (0, 1].
        - Se `min_diff` for menor que 2.
        - Se `dev` for especificada e não for do tipo booleano.

    Notes
    -----
    - O conjunto de dados Spark é convertido para Pandas antes do processamento.
    - Caso o número de amostras no conjunto de dados exceda `max_nsample`, é realizada uma amostragem com uma fração proporcional.
    - O processo utiliza a função `feature_selection_lgb` para realizar a seleção iterativa de features e o LightGBM para calcular a importância das features.
    - A métrica selecionada é utilizada para avaliar as iterações de seleção de features.

    Examples
    --------
    >>> params = {
    ...     'objective': 'binary',
    ...     'metric': 'auc',
    ...     'boosting_type': 'gbdt',
    ...     'learning_rate': 0.01,
    ...     'num_leaves': 31,
    ...     'seed': 42
    ... }
    >>> history = fs_rfe_lgbm(
    ...     data_spark=data_spark,
    ...     list_features=['feature1', 'feature2', 'feature3'],
    ...     target='target_column',
    ...     params=params,
    ...     dev='dev_column',
    ...     threshold=0.95,
    ...     min_diff=3,
    ...     metric='gini',
    ...     max_nsample=50000
    ... )
    >>> print(history[-1]['selected_features'])
    """

    #######Validacoes#######
    #validando se a lista esta presente na tabela de input
    list_not_in = [column for column in list_features if column not in data_spark.columns]
    
    list_not_in_oot = [column for column in list_features if column not in data_spark_oot.columns]


    if list_not_in != []:
        raise ValueError(f"A lista de features {list_not_in} do parâmetro list_features para o processo de modelagem não esta presente na tabela de input, especificado no parâmetro data_spark.")

    if list_not_in_oot != []:
        raise ValueError(f"A lista de features {list_not_in_oot} do parâmetro list_features para o processo de modelagem não esta presente na tabela de input, especificado no parâmetro data_spark_oot.")

    if target not in data_spark.columns:
        raise ValueError(f"A variavel {target} especificada no parâmetro target não esta presente no tabela de input, especificado no parâmetro data_spark.")

    if target not in data_spark_oot.columns:
        raise ValueError(f"A variavel {target} especificada no parâmetro target não esta presente no tabela de input, especificado no parâmetro data_spark_oot.")



    #validando se o target é binario
    target_value  = data_spark.select(target).distinct().rdd.flatMap(lambda row: row).collect()
    result_bool = all(value in [1, 0] for value in target_value)

    target_value_oot  = data_spark_oot.select(target).distinct().rdd.flatMap(lambda row: row).collect()
    result_bool_oot = all(value in [1, 0] for value in target_value_oot)

    if result_bool == False:
        raise ValueError(f"A variavel {target} especificada no parâmetro target presente no parametro data_spark_oot possui os seguintes valores {target_value}, logo o processo de modelagem foi desenvolvido para problemas de classificação assumindo somente valores (0,1).")

    if result_bool_oot == False:
        raise ValueError(f"A variavel {target} especificada no parâmetro target presente no parametro data_spark_oot possui os seguintes valores {target_value}, logo o processo modelagem foi desenvolvido para problemas de classificação assumindo somente valores (0,1).")


    #Verificando se as colunas listadas como numericas são numericas no dataset
    type_num_columns = [col.name for col in data_spark.selectExpr(*list_features).schema.fields  if isinstance(col.dataType, NumericType)]

    type_num_columns_oot = [col.name for col in data_spark_oot.selectExpr(*list_features).schema.fields  if isinstance(col.dataType, NumericType)]


    #comparando lista numerica vs. lista dataset numerica
    type_num_list_not_in = [column for column in list_features if column not in type_num_columns]

    type_num_list_not_in_oot = [column for column in list_features if column not in type_num_columns_oot]

    if type_num_list_not_in != []:
        raise ValueError(f"A lista de features do parâmetro list_features precisam ser do tipo NumericType. As features {type_num_list_not_in} estão presentes na lista mas não são do tipo  NumericType na tabela de input, especificado no parâmetro data_spark.")

    if type_num_list_not_in_oot != []:
        raise ValueError(f"A lista de features do parâmetro list_features precisam ser do tipo NumericType. As features {type_num_list_not_in_oot} estão presentes na lista mas não são do tipo  NumericType na tabela de input, especificado no parâmetro data_spark_oot.")

    list_possible_metrics = ['ks2', 'gini', 'auc']
    # Verifica se a métrica é válida, ignorando case sensitivity
    metric_result_valid = [m for m in list_possible_metrics if m == metric.lower()]

    # Levanta um erro se nenhuma correspondência for encontrada
    if not metric_result_valid: 
        raise ValueError(f"A métrica '{metric}' especificada no parâmetro 'metric' não está presente nas possibilidades: {list_possible_metrics}. "f"Escolha uma das opções disponíveis.")

    metric = metric.lower()

    if threshold <= 0 or threshold > 1:
        raise ValueError("O valor do threshold tem que ser maior do que 0 e menor do que 1.")

    if dev != None:
        type_bool_columns = [col.name for col in data_spark.selectExpr(dev).schema.fields if isinstance(col.dataType, BooleanType)]

        if type_bool_columns != [dev]:
            raise ValueError(f"A feature {dev} definida no parâmetro dev precisa ser do tipo booleano (True e False)")

    if min_diff < 2:
        raise ValueError("O valor do min_diff tem que ser maior do que 1")

    
    data_pandas = data_spark.toPandas()
    data_pandas_oot = data_spark_oot.toPandas()


    data_pandas['dev_lgbm_exp'] = True

    if dev is None:
        print("Não foi especificada uma coluna para treinar o modelo, logo será utilizado `100%` da sua base o desenvolvimento.")
        dev = 'dev_lgbm_exp'

    
    data_pandas = data_pandas[data_pandas[dev] == True]

    # Avaliando tamanho da amostra
    sizedata = len(data_pandas)
    # Validando o tamanho da amostra
    if sizedata > max_nsample:
        print(f"O dataframe definido no parâmetro data_spark tem um tamanho de {sizedata}. Foi selecionada uma amostra com tamanho {max_nsample} para o processo de modelagem")
        
        data_pandas = data_pandas.sample(frac=max_nsample/sizedata, random_state=42)


    feature_history = feature_selection_lgb(data_pandas = data_pandas, data_pandas_oot = data_pandas_oot, list_features= list_features, params = params, target = target, threshold = threshold, min_diff = min_diff)

    plot_interactive_metric(feature_history, metric = metric)

    return feature_history

##Validação Modelos

In [0]:
def graph_feature_importance(run_model, n_features = None, metric = "gain"):
   
    """
    Gera um gráfico de barras com a importância das features de um modelo LightGBM treinado.

    Args:
        run_model (str): Caminho do modelo registrado no MLflow.
        n_features (int, opcional): Número de features a serem exibidas no gráfico. Exibe todas se for None.
        metric (str, opcional): Tipo de importância das features ('gain' ou 'split'). O valor default é 'gain'.

    Returns:
        DataFrame: DataFrame contendo as importâncias das features em porcentagem.

    Example:
        >>> df_importance = graph_feature_importance(run_model='1834hdajdafh1h1383141h41l4ndfoa1'', n_features=10,
        metric='gain')
    """

    logged_model = f'runs:/{run_model}/model'

    lgb_model = mlflow.lightgbm.load_model(logged_model)

    # Obter a importância das features usando 'gain' ou 'split'
    feature_importances = lgb_model.feature_importance(importance_type=metric)  # gain ou 'split'
    features = lgb_model.feature_name()

    if n_features is None:
       n_features = len(features)
    
    # Converter as importâncias para porcentagem
    total_importance = sum(feature_importances)
    feature_importances_percent = [(imp / total_importance).round(4) for imp in feature_importances]
    # Criar DataFrame com as importâncias em porcentagem
    importance_df = pd.DataFrame({
                'Feature': features,
                f'{metric}': feature_importances,
                'Importance (%)': feature_importances_percent
    })
    importance_df = importance_df.sort_values(by='Importance (%)', ascending=False)
    
    importance_df['gain_acm'] = importance_df['Importance (%)'].cumsum().round(4)
    # Plotar o gráfico de barras com as importâncias em porcentagem
    plt.figure(figsize=(10, 8))
    plt.barh(importance_df['Feature'][:n_features], importance_df['Importance (%)'][:n_features], color='#238662')
    plt.xlabel("Feature Importance (%)", fontsize=14)
    plt.ylabel("Feature", fontsize=14)
    plt.title(f'Top {n_features} Most Important Features (Percentage)', fontsize=16)
    plt.gca().invert_yaxis()  # Inverter eixo y para mostrar as features mais importantes no topo
    plt.show()
    return importance_df

In [0]:
def graph_shap_value(data_spark , run_model):

    """
    Gera um gráfico SHAP dos valores SHAP calculados para um modelo LightGBM treinado.

    Args:
        data_spark (DataFrame): DataFrame Spark contendo os dados de entrada.
        run_model (str): Caminho do modelo registrado no MLflow.

    Example:
        >>> graph_shap_value(data_spark=df_spark, run_model='1834hdajdafh1h1383141h41l4ndfoa1')
    """

    logged_model = f'runs:/{run_model}/model'
    
    #transformando pandas
    data_pandas = data_spark.toPandas()
    #carregando modelo
    model_shap = mlflow.lightgbm.load_model(logged_model)
    #listando variaveis do modelo
    features = model_shap.feature_name()
    #selecionando base para calculo
    X_DEV = data_pandas.loc[:, features]
    #calculo shap
    shap_values = shap.TreeExplainer(model_shap).shap_values(X_DEV)
    #plot shap
    shap.summary_plot(shap_values, 
                  X_DEV, 
                  show = True,
                  plot_type='violin',
                  # plot_type='bar',
                  plot_size = [15,7],
                  max_display=X_DEV.shape[1])

##Salvar Artefatos

In [0]:
def save_artefact_mlflow(data_spark, model_object, path_save_artefact, artefact_name):

    """
    Salva um artefato de modelo no MLflow (GLM e Binning, Woe Process).

    Args:
        data_spark (DataFrame): DataFrame Spark contendo os dados de entrada.
        model_object (object): Objeto do modelo a ser salvo.
        path_save_artefact (str): Caminho para salvar o artefato.
        artefact_name (str): Nome do artefato.

    Example:
        >>> save_artefact_mlflow(data_spark=df_spark, model_object=model, path_save_artefact='my_experiment', artefact_name='my_model')

    Returns:
            print("Seu objeto foi salvo como:", f"{experiment_id}/runs/{run_id}") em que run_id é o código de
            identicação do seu modelo.
    """

    mlflow.set_experiment(f'{path_save_artefact}/{artefact_name}')

    with mlflow.start_run() as run:
        signature = infer_signature(data_spark)  
        mlflow.sklearn.log_model(model_object, 'model', signature=signature)

        run_id = run.info.run_id
        experiment_id = run.info.experiment_id
        
    print("Seu objeto foi salvo como:", f"{experiment_id}/runs/{run_id}")


def load_artefact_mlflow(run_id):

    """
    Carrega um artefato de modelo do MLflow (GLM e Binning, Woe Process).

    Args:
        run_id (str): ID da execução do MLflow.

    Returns:
        object: Objeto do modelo carregado.

    Example:
        >>> model = load_artefact_mlflow(run_id='12345')
    """
    logged_model = f"runs:/{run_id}/model"

    # Load model
    return mlflow.sklearn.load_model(logged_model)