In [None]:
#template base: https://medium.com/@dhiraj.p.rai/logistic-regression-in-spark-ml-8a95b5f5434c
# Inicialização da sessão
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Telecom-CustomeChurn").getOrCreate()

In [None]:
#carregando dataset - apesar de ter disponibilizado teste separado, farei o split da treino para simplificar o experimento
dados = spark.read.format("csv").option("header","true").option("inferSchema", "true").load(r"projeto4_telecom_treino.csv")
dados.columns

In [None]:
#análise exploratória
dados.describe().select('_c0',
 'state',
 'account_length',
 'area_code',
 'international_plan',
 'voice_mail_plan',
 'number_vmail_messages',
 'total_day_minutes',
 'total_day_calls',
 'total_day_charge',
 'total_eve_minutes',
 'total_eve_calls',
 'total_eve_charge',
 'total_night_minutes',
 'total_night_calls',
 'total_night_charge',
 'total_intl_minutes',
 'total_intl_calls',
 'total_intl_charge',
 'number_customer_service_calls',
 'churn').show()

In [None]:
#análise exploratória - campo Summary é parâmetro estatístico e não coluna do dataset
dados.describe().select('Summary','_c0', 'state', 'account_length', 'area_code', 'international_plan', 'voice_mail_plan',
 'churn').show()

In [None]:
#análise exploratória - campo Summary é parâmetro estatístico e não coluna do dataset
dados.describe().select('Summary',
 'number_vmail_messages',
 'total_day_minutes',
 'total_day_calls',
 'total_day_charge',
 'churn').show()

In [None]:
#análise exploratória - campo Summary é parâmetro estatístico e não coluna do dataset
dados.describe().select('Summary',
 'total_eve_minutes',
 'total_eve_calls',
 'total_eve_charge',
 'churn').show()

In [None]:
#análise exploratória - campo Summary é parâmetro estatístico e não coluna do dataset
dados.describe().select('Summary',
 'total_night_minutes',
 'total_night_calls',
 'total_night_charge',
 'churn').show()

In [None]:
#análise exploratória - campo Summary é parâmetro estatístico e não coluna do dataset
dados.describe().select('Summary',
 'total_intl_minutes',
 'total_intl_calls',
 'total_intl_charge',
 'churn').show()

In [None]:
#análise exploratória - campo Summary é parâmetro estatístico e não coluna do dataset
dados.describe().select('Summary',
 'number_customer_service_calls',
 'churn').show()

In [None]:
#pré-processamento - trocaria null, zero ou outliers por NaN caso estes valores atrapalhassem a análise - inicialmente não é o caso
#import numpy as np
#from pyspark.sql.functions import when
#treino =treino.withColumn("number_customer_service_calls",when(raw_data.number_customer_service_calls==null,np.nan).otherwise(raw_data.number_customer_service_calls))
#treino =treino.withColumn("total_intl_charge",when(raw_data.total_intl_charge==0,np.nan).otherwise(raw_data.total_intl_charge))


In [None]:
#utilizaria caso tivesse feito o ajuste anterior
#from pyspark.ml.feature import Imputer
#imputer=Imputer(inputCols=["number_customer_service_calls","total_intl_charge"],outputCols=["number_customer_service_calls","total_intl_charge"])
#model=imputer.fit(treino)
#treino = model.transform(treino)
#treino.show(5)

In [None]:
#transformando variáveis fator ou string em numéricas
dados.describe()


In [None]:
dados

In [None]:
dados.show(1,vertical=True)

In [None]:
#transformando variáveis fator ou string em numéricas
from pyspark.sql.types import IntegerType
dados = dados.na.replace(['yes', 'no'], ['1', '0'], 'international_plan')
dados = dados.withColumn('international_plan', dados['international_plan'].cast(IntegerType()))
dados = dados.na.replace(['yes', 'no'], ['1', '0'], 'voice_mail_plan')
dados = dados.withColumn('voice_mail_plan', dados['voice_mail_plan'].cast(IntegerType()))
dados = dados.na.replace(['yes', 'no'], ['1', '0'], 'churn')
dados = dados.withColumn('churn', dados['churn'].cast(IntegerType()))

In [None]:
from pyspark.sql.functions import substring
dados = dados.withColumn('area_code', substring(dados.area_code, 11,3).alias('area_code'))
dados = dados.withColumn('area_code', dados['area_code'].cast(IntegerType()))

In [None]:
dados.show(1,vertical=True)

In [None]:
dados

In [None]:
#combinando as variáveis em um único vetor
cols = dados.columns
#removendo variável target e id e state
cols.remove("churn") #target
cols.remove("_c0") #id
cols.remove("state") #string diversas, poderia substituir por nros

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols,outputCol="features")

In [None]:
dados=assembler.transform(dados)
dados.select("features").show(truncate=False)

In [None]:
#padronizando a escala
from pyspark.ml.feature import StandardScaler
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
dados=standardscaler.fit(dados).transform(dados)
dados.select("features","Scaled_features").show(5)

In [None]:
#split treino e teste
treino, teste = dados.randomSplit([0.7, 0.3], seed=12345)

In [None]:
#verificando equilibrio (se tiver muitas ocorrências da mesma target o ml pode aprender mais sobre este)
dataset_size=float(treino.select("churn").count())
numPositives=treino.select("churn").where('churn == 1').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))

In [None]:
#ajustando o desequilibrio caso exista
BalancingRatio= numNegatives/dataset_size
print('BalancingRatio = {}'.format(BalancingRatio))

In [None]:
from pyspark.sql.functions import when
treino=treino.withColumn("classWeights", when(treino.churn == 1,BalancingRatio).otherwise(1-BalancingRatio))
treino.select("classWeights").show(5)

In [None]:
# Feature selection using chisquareSelector
from pyspark.ml.feature import ChiSqSelector
css = ChiSqSelector(featuresCol='Scaled_features',outputCol='Aspect',labelCol='churn',fpr=0.05)
treino=css.fit(treino).transform(treino)
teste=css.fit(teste).transform(teste)
teste.select("Aspect").show(5,truncate=False)

In [None]:
#construindo o modelo de classificação com algoritmo ML regressão logística
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="churn", featuresCol="Aspect",weightCol="classWeights",maxIter=10)
model=lr.fit(treino)
predict_train=model.transform(treino)
predict_test=model.transform(teste)
predict_test.select("churn","prediction").show(10)

In [None]:
#avaliação do modelo
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="churn")
predict_test.select("churn","rawPrediction","prediction","probability").show(5)
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))

In [None]:
# Create 5-fold CrossValidator
#obs. meu note não aguentou essa parte...
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = ParamGridBuilder()\
    .addGrid(lr.aggregationDepth,[2,5,10])\
    .addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0])\
    .addGrid(lr.fitIntercept,[False, True])\
    .addGrid(lr.maxIter,[10, 100, 1000])\
    .addGrid(lr.regParam,[0.01, 0.5, 2.0]) \
    .build()

cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# Run cross validations
cvModel = cv.fit(treino)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing
predict_train=cvModel.transform(treino)
predict_test=cvModel.transform(teste)
print("The area under ROC for train set after CV  is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set after CV  is {}".format(evaluator.evaluate(predict_test)))