In [None]:
# PARA TRABALHAR COM O GOOGLE COLAB É NECESSÁRIO:
# !apt-get update -qq
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
# !tar xf spark-3.1.2-bin-hadoop2.7.tgz
# !pip install -q findspark

import os
import findspark

os.environ["SPARK_HOME"] = "D:/spark-3.5.5-bin-hadoop3"
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').getOrCreate()

from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

import pandas as pd

### Obs: para este projeto estamos trabalhando com o google colab

In [None]:
from google.colab import drive
drive.mount('/content/drive')
DADOS = spark.read.csv('/content/drive/ml/dados_clientes.csv', sep=',', header=True, inferSchema=True)

# PROJETO DE CLASSIFICAÇÃO DE CLIENTES (POSSIVEIS CANCELAMENTO DE CONTAS)

In [None]:
# TROCANDO COLUNAS COM SIM/NAO PARA 1/0
VARIAVEIS_STRING = ['Churn', 'Conjuge', 'Dependentes', 'TelefonixeFixo','MaisDeUmaLinhaTelefonica']

QUERY = [f.when(f.col(c) == 'Sim', 1).otherwise(0).alias(c) for c in VARIAVEIS_STRING]
# reversed abaixo apenas inverte a ordem das colunas, a ultima vira primeira e assim por diante
for coluna in reversed(DADOS.columns):
    if coluna not in VARIAVEIS_STRING:
        QUERY.insert(0, coluna)

SPARK = DADOS.select(QUERY)

In [None]:
# TRANSFORMAR COLUNAS MULTICATEGORICAS STRING PARA BINÁRIOS COM PIVOT (DUMMIES)
DUMMIES_1 = SPARK.groupBy('id'.pivot('Internet')).agg.(f.lit(1)).na.fill(0)
DUMMIES_2 = SPARK.groupBy('id'.pivot('TipoContrato')).agg.(f.lit(1)).na.fill(0)
DUMMIES_3 = SPARK.groupBy('id'.pivot('MetodoPagamento')).agg.(f.lit(1)).na.fill(0)

SPARK = SPARK\
            .join(DUMMIES_1, 'id', how='inner')\
            .join(DUMMIES_2, 'id', how='inner')\
            .join(DUMMIES_3, 'id', how='inner')\
            .select('*', f.col('DSL').alias('Internet_DSL'),
                        f.col('FibraOptica').alias('Internet_FibraOptica'), 
                        f.col('Nao').alias('Internet_Nao'), 
                        f.col('Mensalmente').alias('TipoContrato_Mensalmente'), 
                        f.col('UmAno').alias('TipoContrato_UmAno'), 
                        f.col('DoisAnos').alias('TipoContrato_DoisAnos'), 
                        f.col('DebitoEmConta').alias('MetodoPagamento_DebitoEmConta'), 
                        f.col('CartaoCredito').alias('MetodoPagamento_CartaoCredito'), 
                        f.col('BoletoEletronico').alias('MetodoPagamento_BoletoEletronico'), 
                        f.col('Boleto').alias('MetodoPagamento_Boleto')  
            )\
            .drop('Internet', 'TipoContrato', 'MetodoPagamento', 'DSL', 
                'FibraOptica', 'Nao', 'Mensalmente', 'UmAno', 'DoisAnos', 
                'DebitoEmConta', 'CartaoCredito', 'BoletoEletronico', 'Boleto')

In [None]:
# VETORIZANDO AS COLUNAS PARA QUE AS LIBS DE ML POSSAM TRABALHAR
from pyspark.ml.feature import VectorAssembler

SPARK = SPARK.withColumnRenamed('Churn', 'label')

X = SPARK.columns
X.remove('label')
X.remove('id')

# A VETORIZAÇÃO É FEITA PELO OBJETO VETOR E AS FUNÇÕES DO SPARK ESPERAM RECEBER O OUTPUT COM NOME 'features'
ASSEMBLER = VectorAssembler(inputCols=X, outputCol='features')

DF_PREP = ASSEMBLER.transform(SPARK).select('features', 'label')

TREINO, TESTE = DF_PREP.randomSplit([0.7, 0.3], seed=101)

# REGRESSÃO LOGISTICA (LOGISTIC REGRESSION)
### A principal diferença entre a regressão logistica e a linear é que ao final da execução do processo a regressão logistica aplica uma função chamada "Sigmoide" que transforma o resultado em valores entre 0 e 1, assim, separando em classes, multivaloradas, com base em uma porcentagem para cada valor da classe

In [None]:
from pyspark.ml.classification import LogisticRegression

LR = LogisticRegression()
MODELO_LR = LR.fit(TREINO)

PREVISOES_LR_TESTE = MODELO_LR.transform(TREINO)

In [None]:
########## MÉTRICAS ##########

RESUMO = MODELO_LR.summary

print("Acurácia: %f" % RESUMO.accuracy)
print("Precisão: %f" % RESUMO.precisionByLabel[1])
print("Recall: %f" % RESUMO.recallByLabel[1])
print("F1: %f" % RESUMO.fMeasureByLabel()[1])

# ARVORE DE DECISÃO PARA CLASSIFICAÇÃO

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

DTC = DecisionTreeClassifier(seed=101)

MODELO_DTC = DTC.fit(TREINO)
PREVISOES_DTC_TREINO = MODELO_DTC.transform(TREINO)

In [None]:
########## MÉTRICAS COM FERRAMENTA NOVA ##########
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator()

print("Acurácia: %f" % evaluator.evaluate(PREVISOES_DTC_TREINO, {evaluator.metricName: "accuracy"}))
print("Precisão: %f" % evaluator.evaluate(PREVISOES_DTC_TREINO, {evaluator.metricName: "precisionByLabel", evaluator.metricLabel: 1}))
print("Recall: %f" % evaluator.evaluate(PREVISOES_DTC_TREINO, {evaluator.metricName: "recallByLabel", evaluator.metricLabel: 1}))
print("F1: %f" % evaluator.evaluate(PREVISOES_DTC_TREINO, {evaluator.metricName: "fMeasureByLabel", evaluator.metricLabel: 1}))

In [None]:
PREVISOES_DTC_TESTE = MODELO_DTC.transform(TESTE)

print("Acurácia: %f" % evaluator.evaluate(PREVISOES_DTC_TESTE, {evaluator.metricName: "accuracy"}))
print("Precisão: %f" % evaluator.evaluate(PREVISOES_DTC_TESTE, {evaluator.metricName: "precisionByLabel", evaluator.metricLabel: 1}))
print("Recall: %f" % evaluator.evaluate(PREVISOES_DTC_TESTE, {evaluator.metricName: "recallByLabel", evaluator.metricLabel: 1}))
print("F1: %f" % evaluator.evaluate(PREVISOES_DTC_TESTE, {evaluator.metricName: "fMeasureByLabel", evaluator.metricLabel: 1}))

# RANDOM FOREST PARA CLASSIFICACAO

In [None]:
from pyspark.ml.classification import RandomForestClassifier

RFC = RandomForestClassifier(seed=101)

MODELO_RFC = RFC.fit(TREINO)
PREVISOES_RFC_TREINO = MODELO_RFC.transform(TREINO)
PREVISOES_RFC_TESTE = MODELO_RFC.transform(TESTE)


In [None]:
########## MÉTRICAS ##########

print(15*'=' + ' TREINO ' + 25*'=')
print("Acurácia: %f" % evaluator.evaluate(PREVISOES_RFC_TREINO, {evaluator.metricName: "accuracy"}))
print("Precisão: %f" % evaluator.evaluate(PREVISOES_RFC_TREINO, {evaluator.metricName: "precisionByLabel", evaluator.metricLabel: 1}))
print("Recall: %f" % evaluator.evaluate(PREVISOES_RFC_TREINO, {evaluator.metricName: "recallByLabel", evaluator.metricLabel: 1}))
print("F1: %f" % evaluator.evaluate(PREVISOES_RFC_TREINO, {evaluator.metricName: "fMeasureByLabel", evaluator.metricLabel: 1}))
print(15*'=' + ' TESTE ' + 25*'=')
print("Acurácia: %f" % evaluator.evaluate(PREVISOES_RFC_TESTE, {evaluator.metricName: "accuracy"}))
print("Precisão: %f" % evaluator.evaluate(PREVISOES_RFC_TESTE, {evaluator.metricName: "precisionByLabel", evaluator.metricLabel: 1}))
print("Recall: %f" % evaluator.evaluate(PREVISOES_RFC_TESTE, {evaluator.metricName: "recallByLabel", evaluator.metricLabel: 1}))
print("F1: %f" % evaluator.evaluate(PREVISOES_RFC_TESTE, {evaluator.metricName: "fMeasureByLabel", evaluator.metricLabel: 1}))

# TÉCNICAS DE OTIMIZAÇÃO: CROSS VALIDATION

In [None]:
# PARA O MÉTODO DE ARVORE DE DECISÃO

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#########################################################################
###       O CROSS VALIDATOR NOS PERMITE FAZER DIVERSAS SEPARAÇÕES     ###
###       DE TREINO/TESTE PARA AVALIAR/TREINAR MELHOR O MODELO        ###
###      E O PARAMGRIDBUILDER É RESPONSAVEL NOS DEIXA AVALIAR MAIS    ###
###    PARAMETROS DE UMA VEZ PARA ENTENDERMOS QUAL O MELHOR PARAMETRO ###
###                         PARA OS MODELOS                           ###
#########################################################################

GRID = ParamGridBuilder().addGrid(DTC.maxDepth, [2, 5, 10]).addGrid(DTC.maxBins, [10, 32, 45]).build()
DTR_CV = CrossValidator(estimator=DTC, estimatorParamMaps=GRID, evaluator=evaluator, numFolds=3, seed=101) 

In [None]:
MODELO_DTC_CV = DTR_CV.fit(TREINO)
PREVISOES_DTC_CV_TESTE = MODELO_DTC_CV.transform(TESTE)


print("Acurácia: %f" % evaluator.evaluate(PREVISOES_DTC_CV_TESTE, {evaluator.metricName: "accuracy"}))
print("Precisão: %f" % evaluator.evaluate(PREVISOES_DTC_CV_TESTE, {evaluator.metricName: "precisionByLabel", evaluator.metricLabel: 1}))
print("Recall: %f" % evaluator.evaluate(PREVISOES_DTC_CV_TESTE, {evaluator.metricName: "recallByLabel", evaluator.metricLabel: 1}))
print("F1: %f" % evaluator.evaluate(PREVISOES_DTC_CV_TESTE, {evaluator.metricName: "fMeasureByLabel", evaluator.metricLabel: 1}))

In [None]:
# PARA O MÉTODO RANDOM FOREST

GRID = ParamGridBuilder().addGrid(RFC.maxDepth, [2, 5, 10]).addGrid(RFC.maxBins, [10, 32, 45]).addGrid(RFC.numTrees, [10, 20, 50]).build()
RFC_CV = CrossValidator(estimator=RFC, estimatorParamMaps=GRID, evaluator=evaluator, numFolds=3, seed=101) 

In [None]:
MODELO_RFC_CV = DTR_CV.fit(TREINO)
PREVISOES_RFC_CV_TESTE = MODELO_RFC_CV.transform(TESTE)


print("Acurácia: %f" % evaluator.evaluate(PREVISOES_RFC_CV_TESTE, {evaluator.metricName: "accuracy"}))
print("Precisão: %f" % evaluator.evaluate(PREVISOES_RFC_CV_TESTE, {evaluator.metricName: "precisionByLabel", evaluator.metricLabel: 1}))
print("Recall: %f" % evaluator.evaluate(PREVISOES_RFC_CV_TESTE, {evaluator.metricName: "recallByLabel", evaluator.metricLabel: 1}))
print("F1: %f" % evaluator.evaluate(PREVISOES_RFC_CV_TESTE, {evaluator.metricName: "fMeasureByLabel", evaluator.metricLabel: 1}))

# DETERMINAÇÃO DO MELHOR MODELO PELAS METRICAS CROSS VALIDATION

In [None]:
BEST_MODEL_RFC_CV = MODELO_RFC_CV.bestModel

# AGORA PARA ENCONTRAR OS HIPERPARAMETROS MAIS ADEQUADOS:
DEPTH = BEST_MODEL_RFC_CV.getMaxDepth()
BINS = BEST_MODEL_RFC_CV.getMaxBins()
TREES = BEST_MODEL_RFC_CV.getNumTrees()

RFC_TUNING = RandomForestClassifier(maxDepth=DEPTH, maxBins=BINS, numTrees=TREES, seed=101)
MODELO_TUNING_RFC = RFC_TUNING.fir(DF_PREP)