In [0]:
# Carregar o Dataframe
df_clients = spark.read.csv("dbfs:/FileStore/shared_uploads/matheus_rpg90@hotmail.com/default_os_credit_card_clients-2.csv", header=False)

## Análise Exploratória dos Dados (EDA)
### Fazendo processamento dos dados 

In [0]:
# Visualizar o Dataframe
display(df_clients)

In [0]:
''' O Dataframe está com os nomes das colunas na segunda linha vamos corrigir isso e colocá-los no cabeçalho para serem
lidos corretamente como colunas'''

from pyspark.sql import Row

# Linha que contém os nomes reais das colunas
header_row = df_clients.collect()[1] 

# Pega as 2 primeiras linhas
rows_to_remove = df_clients.take(2)  

# Remove linhas específicas do DataFrame.
df_clean = df_clients.subtract(spark.createDataFrame(rows_to_remove, df_clients.schema))

# Renomeia as colunas do DataFrame `df_clean`
for i, col_name in enumerate(header_row):
    df_clean = df_clean.withColumnRenamed(f"_c{i}", col_name)

df_clients = df_clean

# Mostra os nomes reais das colunas
print(df_clients.columns)  


In [0]:
# Troca "_" por espaço e deixa tudo em minúsculo nos nomes das colunas
df_clients = df_clients.toDF(*[col.lower().replace("_", " ") for col in df_clients.columns])
                               

In [0]:
# Cria um dicionário para renomear as colunas para melhorar o entendimento
rename_columns = {
    'id': 'client id',
    'limit bal': 'credit limit',
    'sex': 'gender',
    'education': 'education level',
    'marriage': 'marital status',
    'age': 'age',
    'pay 0': 'payment status sep',
    'pay 2': 'payment status aug',
    'pay 3': 'payment status jul',
    'pay 4': 'payment status jun',
    'pay 5': 'payment status may',
    'pay 6': 'payment status apr',
    'bill amt1': 'bill amount sep',
    'bill amt2': 'bill amount aug',
    'bill amt3': 'bill amount jul',
    'bill amt4': 'bill amount jun',
    'bill amt5': 'bill amount may',
    'bill amt6': 'bill amount apr',
    'pay amt1': 'payment amount sep',
    'pay amt2': 'payment amount aug',
    'pay amt3': 'payment amount jul',
    'pay amt4': 'payment amount jun',
    'pay amt5': 'payment amount may',
    'pay amt6': 'payment amount apr',
    'default payment next month': 'default next month'
}
# Renomeia as colunas com os dados do dicionário
for old_name, new_name in rename_columns.items():
    df_clients = df_clients.withColumnRenamed(old_name, new_name)


In [0]:
# Visualizando o dataframe
display(df_clients)

In [0]:
# Verifica os tipos de dados de cada coluna
df_clients.printSchema()

In [0]:
# Tranformando os tipos de dados para os tipos corretos
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType

# Dicionário com os tipos desejados
fixed_types = {
    "client id": IntegerType(),
    "credit limit": DoubleType(),
    "gender": IntegerType(),
    "education level": IntegerType(),
    "marital status": IntegerType(),
    "age": IntegerType(),
    "payment status sep": IntegerType(),
    "payment status aug": IntegerType(),
    "payment status jul": IntegerType(),
    "payment status jun": IntegerType(),
    "payment status may": IntegerType(),
    "payment status apr": IntegerType(),
    "bill amount sep": DoubleType(),
    "bill amount aug": DoubleType(),
    "bill amount jul": DoubleType(),
    "bill amount jun": DoubleType(),
    "bill amount may": DoubleType(),
    "bill amount apr": DoubleType(),
    "payment amount sep": DoubleType(),
    "payment amount aug": DoubleType(),
    "payment amount jul": DoubleType(),
    "payment amount jun": DoubleType(),
    "payment amount may": DoubleType(),
    "payment amount apr": DoubleType(),
    "default next month": IntegerType()
}

# Aplicando as conversões de forma dinâmica
for col_name, data_type in fixed_types.items():
    df_clients = df_clients.withColumn(col_name, col(col_name).cast(data_type))


In [0]:
# Verifica os tipos de dados 
df_clients.printSchema()

In [0]:
''' As colunas education level" e "marital status" possui dados inconsistentes é preciso corrigi-los'''

from pyspark.sql.functions import when

# Corrige os dados inconsistentes da coluna "education level" (valores inválidos: 0, 5, 6 → agrupar em 4 = "outros")
df_clients = df_clients.withColumn("education level", 
    when(col("education level").isin(0, 5, 6), 4).otherwise(col("education level")))

# Corrige os dados inconsistentes da coluna "marital status" (valores inválidos: 0 → agrupar em 3 = "outros")
df_clients = df_clients.withColumn("marital status", 
    when(col("marital status") == 0, 3).otherwise(col("marital status")))



In [0]:
from pyspark.sql.functions import sum as _sum, isnan

# Verifica a quantidade de dados faltantes
df_clients.select([_sum(when(col(c).isNull() | isnan(c), 1).otherwise(0)).alias(c)
               for c in df_clients.columns]).show()

In [0]:

# Conta as linhas duplicadas de todas as colunas
duplicate_lines = df_clients.groupBy(df_clients.columns).count().filter("count > 1")

# Contar número total de duplicatas
total_duplicates = duplicate_lines.selectExpr("sum(count - 1) as total").collect()[0]["total"]
print(f"Total de linhas duplicadas: {total_duplicates}")


### Análise Estatística dos dados

In [0]:
# Verifica o balanceamento dos dados da varável alvo "default next month"
df_clients.groupBy("default next month").count().show()


In [0]:
# Faz análise descritiva das variáveis
df_clients.describe().display()

In [0]:
''' Verificando a correlação entre a variável alvo e as variáveis, em problemas desse tipo como classificação
é esperado que as variáveis não possuam correlação forte mas gosto sempre de verificar'''

from pyspark.ml.stat import Correlation

numeric_cols = [c for c, dtype in df_clients.dtypes if dtype in ("double", "int") and c != "default next month"]

for col_name in numeric_cols:
    corr = df_clients.stat.corr(col_name, "default next month")
    print(f"Correlação entre {col_name} e default next month: {corr:.4f}")



In [0]:
# verifica a frequência de variáveis categóricas por classe

categorical_cols = ["gender", "education level", "marital status"]

for col_name in categorical_cols:
    print(f"Frequência de {col_name} por classe 'default next month':")
    df_clients.groupBy("default next month", col_name).count().orderBy(col_name, "default next month").show()



### Treinamento dos modelos

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from sklearn.metrics import f1_score, precision_score, recall_score, roc_auc_score
import numpy as np

# 1. Montar vetores de features
feature_cols = [col for col in df_clients.columns if col != "default next month"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_vector = assembler.transform(df_clients).select("features", "default next month")

# 2. Dividir treino e teste (estratificação manual não disponível, mas pode comparar classes após split)
train, test = df_vector.randomSplit([0.8, 0.2], seed=42)

# 3. RANDOM FOREST
rf = RandomForestClassifier(labelCol="default next month", featuresCol="features", seed=42)
rf_model = rf.fit(train)
rf_preds = rf_model.transform(test)

# 4. AVALIAÇÃO - Random Forest
y_true_rf = np.array(rf_preds.select("default next month").collect()).flatten()
y_pred_rf = np.array(rf_preds.select("prediction").collect()).flatten()
y_score_rf = np.array(rf_preds.select("probability").rdd.map(lambda x: float(x[0][1])).collect())
thresholds = np.arange(0.0, 1.01, 0.01)

best_threshold = 0.5
best_f1 = 0

for t in thresholds:
    y_pred_custom = (y_score_rf >= t).astype(int)
    f1 = f1_score(y_true_rf, y_pred_custom)
    if f1 > best_f1:
        best_f1 = f1
        best_threshold = t

print(f"Melhor Threshold com base no F1-score: {best_threshold:.2f}")
print(f"Novo F1-score: {best_f1:.4f}")


y_pred_custom = (y_score_rf >= best_threshold).astype(int)

print("\n Métricas com threshold ajustado")
print("F1-score:", round(f1_score(y_true_rf, y_pred_custom), 4))
print("Precision:", round(precision_score(y_true_rf, y_pred_custom), 4))
print("Recall:", round(recall_score(y_true_rf, y_pred_custom), 4))
print("ROC-AUC (não muda):", round(roc_auc_score(y_true_rf, y_score_rf), 4))





In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from sklearn.metrics import f1_score, precision_score, recall_score, roc_auc_score
from pyspark.sql.functions import when
import numpy as np

# 1. Montar vetores de features
feature_cols = [col for col in df_clients.columns if col != "default next month"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_vector = assembler.transform(df_clients).select("features", "default next month")

# 2. Calcular pesos para classWeightCol
class_counts = df_vector.groupBy("default next month").count().toPandas()
count_0 = class_counts[class_counts["default next month"] == 0]["count"].values[0]
count_1 = class_counts[class_counts["default next month"] == 1]["count"].values[0]
total = count_0 + count_1
weight_0 = total / (2 * count_0)
weight_1 = total / (2 * count_1)

# 3. Criar coluna de pesos
df_weighted = df_vector.withColumn(
    "weight",
    when(df_vector["default next month"] == 0, weight_0).otherwise(weight_1)
)

# 4. Dividir treino e teste
train, test = df_weighted.randomSplit([0.8, 0.2], seed=42)

# 5. Treinar GBT com pesos
gbt = GBTClassifier(
    labelCol="default next month",
    featuresCol="features",
    weightCol="weight",
    seed=42,
    maxIter=100
)
gbt_model = gbt.fit(train)
gbt_preds = gbt_model.transform(test)

# 6. Avaliação com threshold padrão
y_true_gbt = np.array(gbt_preds.select("default next month").collect()).flatten()
y_score_gbt = np.array(gbt_preds.select("probability").rdd.map(lambda x: float(x[0][1])).collect())
y_pred_gbt = np.array(gbt_preds.select("prediction").collect()).flatten()

print("Métricas - GBT (threshold padrão = 0.5)")
print("F1-score:", round(f1_score(y_true_gbt, y_pred_gbt), 4))
print("Precision:", round(precision_score(y_true_gbt, y_pred_gbt), 4))
print("Recall:", round(recall_score(y_true_gbt, y_pred_gbt), 4))
print("ROC-AUC:", round(roc_auc_score(y_true_gbt, y_score_gbt), 4))

# 7. Ajustar threshold com base no melhor F1-score
thresholds = np.arange(0.0, 1.01, 0.01)
best_f1 = 0
best_threshold = 0.5

for t in thresholds:
    y_pred_thresh = (y_score_gbt >= t).astype(int)
    f1 = f1_score(y_true_gbt, y_pred_thresh)
    if f1 > best_f1:
        best_f1 = f1
        best_threshold = t

y_pred_adjusted = (y_score_gbt >= best_threshold).astype(int)

# 8. Avaliação com threshold ajustado
print("\n Métricas - GBT com classWeightCol + threshold ajustado")
print("Melhor threshold:", round(best_threshold, 2))
print("F1-score:", round(f1_score(y_true_gbt, y_pred_adjusted), 4))
print("Precision:", round(precision_score(y_true_gbt, y_pred_adjusted), 4))
print("Recall:", round(recall_score(y_true_gbt, y_pred_adjusted), 4))
print("ROC-AUC (não muda):", round(roc_auc_score(y_true_gbt, y_score_gbt), 4))


In [0]:
try:
    import mlflow
    print("MLflow está instalado!")
except ImportError:
    print("MLflow NÃO está instalado.")


### Salvando o melhor modelo e simulando deploy

In [0]:
%pip uninstall -y typing_extensions
%pip install typing_extensions==4.5.0


In [0]:
dbutils.library.restartPython()


In [0]:
import mlflow
import mlflow.spark

# Define experimento local (ou dentro de uma pasta de usuário)
mlflow.set_experiment("/Users/matheus_rpg90@hotmail.com/experiment_gbt")

with mlflow.start_run(run_name="GBTClassifier_model"):

    # Log de parâmetros e métricas
    mlflow.log_param("maxIter", 100)
    mlflow.log_param("threshold_otimizado", round(best_threshold, 2))
    mlflow.log_metric("f1_score", round(f1_score(y_true_gbt, y_pred_adjusted), 4))
    mlflow.log_metric("precision", round(precision_score(y_true_gbt, y_pred_adjusted), 4))
    mlflow.log_metric("recall", round(recall_score(y_true_gbt, y_pred_adjusted), 4))
    mlflow.log_metric("roc_auc", round(roc_auc_score(y_true_gbt, y_score_gbt), 4))

    # Salva o modelo treinado no MLflow
    mlflow.spark.log_model(gbt_model, "model")

    print("Modelo salvo e logado no MLflow com sucesso!")


In [0]:
# Exemplo: carregar o último modelo logado no MLflow
from mlflow.spark import load_model

# Caminho do modelo salvo (ajuste para o nome correto do run_id ou caminho do modelo)
model_uri = "runs:/8b8a561de9604eef83abf585e9911af9/model"  # Você verá esse ID na UI do MLflow

# Carrega o modelo
loaded_model = load_model(model_uri)


In [0]:
# Montar novos dados (com as mesmas colunas de treino)
novos_dados = df_clients.limit(3)
novos_vetor = assembler.transform(novos_dados).select("features")

# Fazer previsão
previsao = loaded_model.transform(novos_vetor)
previsao.select("prediction", "probability").show()


In [0]:
novo_cliente = spark.createDataFrame([{
    "client id": 20001,
    "credit limit": 20000.0,
    "gender": 1,
    "education level": 2,
    "marital status": 1,
    "age": 34,
    "payment status sep": -1,
    "payment status aug": -1,
    "payment status jul": 0,
    "payment status jun": 0,
    "payment status may": 0,
    "payment status apr": 0,
    "bill amount sep": 5000.0,
    "bill amount aug": 4500.0,
    "bill amount jul": 4000.0,
    "bill amount jun": 3500.0,
    "bill amount may": 3000.0,
    "bill amount apr": 2500.0,
    "payment amount sep": 1000.0,
    "payment amount aug": 1000.0,
    "payment amount jul": 1000.0,
    "payment amount jun": 1000.0,
    "payment amount may": 1000.0,
    "payment amount apr": 1000.0
}])

# ---------------------
# 3. Monta o vetor de features
# ---------------------
feature_cols = [col for col in novo_cliente.columns if col != "default next month"]  # Ignora alvo, se presente
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
novo_cliente_vector = assembler.transform(novo_cliente).select("features")

# ---------------------
# 4. Faz a previsão
# ---------------------
previsao = loaded_model.transform(novo_cliente_vector)
previsao.select("prediction", "probability").show(truncate=False)