# **1. Introdução**

A regressão logística é um método popular para prever uma resposta categórica. É um caso especial de modelos Lineares Generalizados que prevê a probabilidade dos resultados. Em `spark.ml`, a regressão logística pode ser usada para prever um resultado binário usando regressão logística binomial ou pode ser usada para prever um resultado multiclasse usando regressão logística multinomial. Use o parâmetro `family` para selecionar entre esses dois algoritmos ou deixe-o indefinido e o Spark inferirá a variante correta.

<font size=2>**Fonte:** [MLlib](https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression)</font>

 <img src="https://miro.medium.com/max/1400/0*1KnKYuv0UDu_1-qM.gif?width=1191&height=670" alt="Minha Figura">

## **1.1 Carregando o pyspark**

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName("Classificação com Spark 1.1 - v1").getOrCreate()

In [2]:
spark

## **1.2 Carregando as principais funções**

In [3]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
import pandas as pd
from datetime import datetime
import sys
import os

# Adiciona a pasta work ao sys.path
sys.path.append('/home/jovyan/')

from work.src.utils import *
start_time = datetime.now()


n = 'best_lr_model'
caminho_modelo = 'work/models'

## **1.3 Carregamento dos Dados**

In [4]:
dados = spark.read.csv(f'work/data/raw/dados_clientes.csv', sep=',', header=True, inferSchema=True)

In [5]:
dados

DataFrame[id: int, Churn: string, Mais65anos: int, Conjuge: string, Dependentes: string, MesesDeContrato: int, TelefoneFixo: string, MaisDeUmaLinhaTelefonica: string, Internet: string, SegurancaOnline: string, BackupOnline: string, SeguroDispositivo: string, SuporteTecnico: string, TVaCabo: string, StreamingFilmes: string, TipoContrato: string, ContaCorreio: string, MetodoPagamento: string, MesesCobrados: double]

In [6]:
dados.show()

+---+-----+----------+-------+-----------+---------------+------------+------------------------+-----------+------------------+------------------+------------------+------------------+------------------+------------------+------------+------------+----------------+-------------+
| id|Churn|Mais65anos|Conjuge|Dependentes|MesesDeContrato|TelefoneFixo|MaisDeUmaLinhaTelefonica|   Internet|   SegurancaOnline|      BackupOnline| SeguroDispositivo|    SuporteTecnico|           TVaCabo|   StreamingFilmes|TipoContrato|ContaCorreio| MetodoPagamento|MesesCobrados|
+---+-----+----------+-------+-----------+---------------+------------+------------------------+-----------+------------------+------------------+------------------+------------------+------------------+------------------+------------+------------+----------------+-------------+
|  0|  Nao|         0|    Sim|        Nao|              1|         Nao|    SemServicoTelefonico|        DSL|               Nao|               Sim|              

In [7]:
dados.count()

10348

In [8]:
dados.groupBy('Churn').count().show()

+-----+-----+
|Churn|count|
+-----+-----+
|  Sim| 5174|
|  Nao| 5174|
+-----+-----+



In [9]:
dados.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Churn: string (nullable = true)
 |-- Mais65anos: integer (nullable = true)
 |-- Conjuge: string (nullable = true)
 |-- Dependentes: string (nullable = true)
 |-- MesesDeContrato: integer (nullable = true)
 |-- TelefoneFixo: string (nullable = true)
 |-- MaisDeUmaLinhaTelefonica: string (nullable = true)
 |-- Internet: string (nullable = true)
 |-- SegurancaOnline: string (nullable = true)
 |-- BackupOnline: string (nullable = true)
 |-- SeguroDispositivo: string (nullable = true)
 |-- SuporteTecnico: string (nullable = true)
 |-- TVaCabo: string (nullable = true)
 |-- StreamingFilmes: string (nullable = true)
 |-- TipoContrato: string (nullable = true)
 |-- ContaCorreio: string (nullable = true)
 |-- MetodoPagamento: string (nullable = true)
 |-- MesesCobrados: double (nullable = true)



## **1.4 Separa treino e validação e teste**

In [10]:
# Divida o DataFrame em 3 partes: treino (60%), validação (20%) e teste (20%)
train_df, val_df, test_df = dados.randomSplit([0.6, 0.2, 0.2], seed=101)

In [11]:
print(f'train_df: {train_df.count()}')
print(f'val_df: {val_df.count()}')
print(f'test_df: {test_df.count()}')

train_df: 6182
val_df: 2080
test_df: 2086


## **1.5 Transformando os Dados**

In [12]:
def transform_df(df: DataFrame) -> DataFrame:
    """
    Aplica transformação ao DataFrame para pivotar e renomear colunas categóricas, e remover colunas originais.

    Args:
        df (DataFrame): DataFrame que será transformado.

    Returns:
        DataFrame: DataFrame transformado.
    """
    # Lista das colunas binárias
    colunasBinarias = [
        'Churn',
        'Conjuge',
        'Dependentes',
        'TelefoneFixo',
        'MaisDeUmaLinhaTelefonica',
        'SegurancaOnline',
        'BackupOnline',
        'SeguroDispositivo',
        'SuporteTecnico',
        'TVaCabo',
        'StreamingFilmes',
        'ContaCorreio'
    ]

    # Criação da lista das transformações para as colunas binárias
    colunasBinarias_transformadas = [ F.when(F.col(c) == 'Sim', 1).otherwise(0).alias(c) for c in colunasBinarias if c in train_df.columns]

    # Criação da lista das colunas originais que não são binárias
    colunas_outras = [F.col(c) for c in train_df.columns if c not in colunasBinarias]
    
    df_binario = df.select(*[colunas_outras + colunasBinarias_transformadas])
    
    # Pivotagem e transformação das colunas categóricas
    Internet = df_binario.groupBy('id').pivot('Internet').agg(F.lit(1)).na.fill(0)
    TipoContrato = df_binario.groupBy('id').pivot('TipoContrato').agg(F.lit(1)).na.fill(0)
    MetodoPagamento = df_binario.groupBy('id').pivot('MetodoPagamento').agg(F.lit(1)).na.fill(0)

    # Aplicando as transformações ao DataFrame
    df_transformed = df_binario\
        .join(Internet, 'id', how='inner')\
        .join(TipoContrato, 'id', how='inner')\
        .join(MetodoPagamento, 'id', how='inner')\
        .select(
            '*',
            F.col('DSL').alias('Internet_DSL'), 
            F.col('FibraOptica').alias('Internet_FibraOptica'), 
            F.col('Nao').alias('Internet_Nao'), 
            F.col('Mensalmente').alias('TipoContrato_Mensalmente'), 
            F.col('UmAno').alias('TipoContrato_UmAno'), 
            F.col('DoisAnos').alias('TipoContrato_DoisAnos'), 
            F.col('DebitoEmConta').alias('MetodoPagamento_DebitoEmConta'), 
            F.col('CartaoCredito').alias('MetodoPagamento_CartaoCredito'), 
            F.col('BoletoEletronico').alias('MetodoPagamento_BoletoEletronico'), 
            F.col('Boleto').alias('MetodoPagamento_Boleto')        
        )\
        .drop(
            'Internet', 'TipoContrato', 'MetodoPagamento', 'DSL', 
            'FibraOptica', 'Nao', 'Mensalmente', 'UmAno', 'DoisAnos', 
            'DebitoEmConta', 'CartaoCredito', 'BoletoEletronico', 'Boleto'
        )
    
    return df_transformed


In [13]:
# Aplicar a transformação aos três DataFrames
train_df_transformed = transform_df(train_df)
val_df_transformed = transform_df(val_df)
test_df_transformed = transform_df(test_df)

In [14]:
print(f'train_df_transformed: {train_df_transformed.count()}')
print(f'val_df_transformed: {val_df_transformed.count()}')
print(f'test_df_transformed: {test_df_transformed.count()}')

train_df_transformed: 6182
val_df_transformed: 2080
test_df_transformed: 2086


In [15]:
# train_df_transformed.show(3)
# +----+----------+---------------+-----------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+------------+--------------------+------------+------------------------+------------------+---------------------+-----------------------------+-----------------------------+--------------------------------+----------------------+
# |  id|Mais65anos|MesesDeContrato|    MesesCobrados|Churn|Conjuge|Dependentes|TelefoneFixo|MaisDeUmaLinhaTelefonica|SegurancaOnline|BackupOnline|SeguroDispositivo|SuporteTecnico|TVaCabo|StreamingFilmes|ContaCorreio|Internet_DSL|Internet_FibraOptica|Internet_Nao|TipoContrato_Mensalmente|TipoContrato_UmAno|TipoContrato_DoisAnos|MetodoPagamento_DebitoEmConta|MetodoPagamento_CartaoCredito|MetodoPagamento_BoletoEletronico|MetodoPagamento_Boleto|
# +----+----------+---------------+-----------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+------------+--------------------+------------+------------------------+------------------+---------------------+-----------------------------+-----------------------------+--------------------------------+----------------------+
# |7982|         0|              1|45.30540797610398|    1|      0|          0|           0|                       0|              0|           0|                0|             0|      1|              1|           1|           1|                   0|           0|                       1|                 0|                    0|                            0|                            0|                               1|                     0|
# |6654|         0|              7|             86.5|    1|      1|          0|           1|                       1|              0|           0|                0|             0|      1|              0|           1|           0|                   1|           0|                       1|                 0|                    0|                            0|                            0|                               1|                     0|
# |7880|         0|             14|85.03742670311915|    1|      0|          0|           1|                       1|              0|           0|                0|             0|      0|              1|           1|           0|                   1|           0|                       1|                 0|                    0|                            0|                            0|                               1|                     0|
# +----+----------+---------------+-----------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+------------+--------------------+------------+------------------------+------------------+---------------------+-----------------------------+-----------------------------+--------------------------------+----------------------+

In [16]:
train_df_transformed.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Mais65anos: integer (nullable = true)
 |-- MesesDeContrato: integer (nullable = true)
 |-- MesesCobrados: double (nullable = true)
 |-- Churn: integer (nullable = false)
 |-- Conjuge: integer (nullable = false)
 |-- Dependentes: integer (nullable = false)
 |-- TelefoneFixo: integer (nullable = false)
 |-- MaisDeUmaLinhaTelefonica: integer (nullable = false)
 |-- SegurancaOnline: integer (nullable = false)
 |-- BackupOnline: integer (nullable = false)
 |-- SeguroDispositivo: integer (nullable = false)
 |-- SuporteTecnico: integer (nullable = false)
 |-- TVaCabo: integer (nullable = false)
 |-- StreamingFilmes: integer (nullable = false)
 |-- ContaCorreio: integer (nullable = false)
 |-- Internet_DSL: integer (nullable = true)
 |-- Internet_FibraOptica: integer (nullable = true)
 |-- Internet_Nao: integer (nullable = true)
 |-- TipoContrato_Mensalmente: integer (nullable = true)
 |-- TipoContrato_UmAno: integer (nullable = true)
 |-- TipoContr

# **2. Treinamento do modelo**

In [17]:
def data_prep(df_transformed):
    df_transformed = df_transformed.withColumnRenamed('Churn', 'label')
    X = df_transformed.columns
    X.remove('label')
    X.remove('id')
    assembler = VectorAssembler(inputCols=X, outputCol='features')
    df_prep = assembler.transform(df_transformed).select('features', 'label')
    return df_prep

In [18]:
train_df_transformed_prep = data_prep(train_df_transformed)
val_df_transformed_prep = data_prep(val_df_transformed)
test_df_transformed_prep = data_prep(test_df_transformed)

In [19]:
# train_df_transformed.show(3)
# +----+----------+---------------+-----------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+------------+--------------------+------------+------------------------+------------------+---------------------+-----------------------------+-----------------------------+--------------------------------+----------------------+
# |  id|Mais65anos|MesesDeContrato|    MesesCobrados|label|Conjuge|Dependentes|TelefoneFixo|MaisDeUmaLinhaTelefonica|SegurancaOnline|BackupOnline|SeguroDispositivo|SuporteTecnico|TVaCabo|StreamingFilmes|ContaCorreio|Internet_DSL|Internet_FibraOptica|Internet_Nao|TipoContrato_Mensalmente|TipoContrato_UmAno|TipoContrato_DoisAnos|MetodoPagamento_DebitoEmConta|MetodoPagamento_CartaoCredito|MetodoPagamento_BoletoEletronico|MetodoPagamento_Boleto|
# +----+----------+---------------+-----------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+------------+--------------------+------------+------------------------+------------------+---------------------+-----------------------------+-----------------------------+--------------------------------+----------------------+
# |7982|         0|              1|45.30540797610398|    1|      0|          0|           0|                       0|              0|           0|                0|             0|      1|              1|           1|           1|                   0|           0|                       1|                 0|                    0|                            0|                            0|                               1|                     0|
# |6654|         0|              7|             86.5|    1|      1|          0|           1|                       1|              0|           0|                0|             0|      1|              0|           1|           0|                   1|           0|                       1|                 0|                    0|                            0|                            0|                               1|                     0|
# |7880|         0|             14|85.03742670311915|    1|      0|          0|           1|                       1|              0|           0|                0|             0|      0|              1|           1|           0|                   1|           0|                       1|                 0|                    0|                            0|                            0|                               1|                     0|
# +----+----------+---------------+-----------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+------------+--------------------+------------+------------------------+------------------+---------------------+-----------------------------+-----------------------------+--------------------------------+----------------------+

In [20]:
train_df_transformed_prep.show(3, truncate=False)

+----------------------------------------------------------------------------------+-----+
|features                                                                          |label|
+----------------------------------------------------------------------------------+-----+
|(24,[1,2,11,12,13,14,17,22],[1.0,45.30540797610398,1.0,1.0,1.0,1.0,1.0,1.0])      |1    |
|(24,[1,2,3,5,6,11,13,15,17,22],[7.0,86.5,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])        |1    |
|(24,[1,2,5,6,12,13,15,17,22],[14.0,85.03742670311915,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|1    |
+----------------------------------------------------------------------------------+-----+
only showing top 3 rows



In [21]:
train_df_transformed_prep.groupBy('label').count().show()
# +-----+-----+
# |label|count|
# +-----+-----+
# |    1| 3068|
# |    0| 3114|
# +-----+-----+

+-----+-----+
|label|count|
+-----+-----+
|    1| 3068|
|    0| 3114|
+-----+-----+



## **2.2 Ajuste e Previsão**

In [22]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [23]:
# Criação do modelo de Regressão Logística
lr = LogisticRegression()

# Definindo a métrica de avaliação
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")

# Configurando o grid de parâmetros
grid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.01, 0.1, 1.0])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .addGrid(lr.maxIter, [10, 50, 100])\
    .build()

# Configurando o CrossValidator
cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    numFolds=3,
    seed=42
)

# Ajusta o CrossValidator com o DataFrame de treino transformado
modelo_lr = cv.fit(train_df_transformed_prep)

# Obtendo o melhor modelo
best_model = modelo_lr.bestModel

# Extraindo os melhores hiperparâmetros
best_reg_param = best_model.getRegParam()
best_elastic_net_param = best_model.getElasticNetParam()
best_max_iter = best_model.getMaxIter()

print(f"Melhor regParam: {best_reg_param}")
print(f"Melhor elasticNetParam: {best_elastic_net_param}")
print(f"Melhor maxIter: {best_max_iter}")


Melhor regParam: 0.01
Melhor elasticNetParam: 0.0
Melhor maxIter: 50


In [24]:
# Obter o grid de parâmetros
paramMaps = cv.getEstimatorParamMaps()

# Obter o índice do melhor modelo
# O índice do melhor modelo é o mesmo do grid de parâmetros
best_metrics = modelo_lr.avgMetrics
best_metric = max(best_metrics)  # A métrica máxima (supondo que estamos maximizando a métrica)
best_index = best_metrics.index(best_metric)

# Obter os melhores parâmetros
best_params = paramMaps[best_index]
print(f"Melhores hiperparâmetros encontrados: {best_params}")

Melhores hiperparâmetros encontrados: {Param(parent='LogisticRegression_6d12eab82dc1', name='regParam', doc='regularization parameter (>= 0).'): 0.01, Param(parent='LogisticRegression_6d12eab82dc1', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_6d12eab82dc1', name='maxIter', doc='max number of iterations (>= 0).'): 50}


In [25]:
# Criando um novo modelo de Regressão Logística com os melhores hiperparâmetros
best_lr = LogisticRegression(
    regParam=best_reg_param,
    elasticNetParam=best_elastic_net_param,
    maxIter=best_max_iter
)
# Ajustando o novo modelo com o DataFrame de treino
best_lr_model = best_lr.fit(train_df_transformed_prep)

In [26]:
## Salvando o modelo
best_lr_model.save(f'{caminho_modelo}/Regressor_logistico_{n}')

In [27]:
resumo_lr_treino = best_lr_model.summary

In [28]:
print('METRICAS RESUMO DO TREINO')
print('--------')
print("Acurácia: %f" % resumo_lr_treino.accuracy)
print("Precisão: %f" % resumo_lr_treino.precisionByLabel[1])
print("Recall: %f" % resumo_lr_treino.recallByLabel[1])
print("F1: %f" % resumo_lr_treino.fMeasureByLabel()[1])

METRICAS RESUMO DO TREINO
--------
Acurácia: 0.784536
Precisão: 0.768398
Recall: 0.809974
F1: 0.788639


## **2.3 Métricas BASE DE VALIDAÇÃO**

<font size=2>**Documentação:**</font>
<font size=2>[LogisticRegressionTrainingSummary](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LogisticRegressionTrainingSummary.html)</font>

In [29]:
# Faz previsões
predictions_val = best_lr_model.transform(val_df_transformed_prep)
predictions_val.write.mode('overwrite').orc(f'work/data/final/predictions_val_{n}.orc')

In [30]:
# predictions_val.show(truncate=False)
# +-----------------------------------------------------------------------------------------------------------+-----+------------------------------------------+----------------------------------------+----------+
# |features                                                                                                   |label|rawPrediction                             |probability                             |prediction|
# +-----------------------------------------------------------------------------------------------------------+-----+------------------------------------------+----------------------------------------+----------+
# |(24,[1,2,5,6,10,11,12,13,14,18,23],[12.0,75.85,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                       |0    |[0.05103567311346513,-0.05103567311346513]|[0.5127561496338285,0.48724385036617146]|0.0       |
# |(24,[1,2,3,5,8,12,13,14,19,21],[69.0,61.45,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                               |0    |[2.382789551862372,-2.382789551862372]    |[0.9155054708840569,0.08449452911594313]|0.0       |
# |(24,[1,2,3,5,12,13,15,17,22],[46.0,80.8824189403559,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                          |1    |[-1.1704365714764247,1.1704365714764247]  |[0.23677608099237907,0.7632239190076209]|1.0       |

In [30]:
print('METRICAS RESUMO DA BASE DE VALIDAÇÃO')

auc_roc = calculate_auc_roc(predictions_val)
print(f"AUC ROC: {auc_roc}")
auc_pr = calculate_auc_pr(predictions_val)
print(f"AUC PR: {auc_pr}")
ks = calculate_ks(predictions_val)
print(f"KS: {ks}")

METRICAS RESUMO DA BASE DE VALIDAÇÃO
AUC ROC: 0.7897105323794532
AUC PR: 0.7617664882695159
KS: 0.5794210647589065


In [31]:
calculate_confusion_matrix(predictions_val)

{'TP': 870, 'TN': 774, 'FP': 235, 'FN': 201}

In [32]:
calcula_mostra_matriz_confusao(predictions_val, normalize=False)

                     Previsto
                Churn       Não-Churn
     Churn        870         201
Real
     Não-Churn    235         774


## **2.4 Métricas BASE DE TESTE**

In [33]:
# Faz previsões
predictions_test = best_lr_model.transform(test_df_transformed_prep)
predictions_test.write.mode('overwrite').orc(f'work/data/final/predictions_test_{n}.orc')

In [34]:
print('METRICAS RESUMO DA BASE DE TESTE')

auc_roc = calculate_auc_roc(predictions_test)
print(f"AUC ROC: {auc_roc}")
auc_pr = calculate_auc_pr(predictions_test)
print(f"AUC PR: {auc_pr}")
ks = calculate_ks(predictions_test)
print(f"KS: {ks}")

METRICAS RESUMO DA BASE DE TESTE
AUC ROC: 0.7630023396167441
AUC PR: 0.7168174012626157
KS: 0.5260046792334883


In [35]:
calculate_confusion_matrix(predictions_test)

{'TP': 830, 'TN': 761, 'FP': 290, 'FN': 205}

In [36]:
calcula_mostra_matriz_confusao(predictions_test, normalize=False)

                     Previsto
                Churn       Não-Churn
     Churn        830         205
Real
     Não-Churn    290         761


In [37]:
end_time = datetime.now()
execution_time = end_time - start_time

print(f"Tempo de execução: {execution_time}")

Tempo de execução: 0:12:24.153315


In [38]:
# spark.stop()