# LOAD

In [34]:
!pip install pyspark


from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]')\
    .appName('Classificação com Spark')\
    .getOrCreate()

spark



dados = spark.read.csv('/content/drive/MyDrive/alura/dados_clientes.csv',\
               sep=',',\
               header=True, inferSchema=True
               )


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# TRANSFORM

In [35]:
ColunasBinarias = [
    'Churn',
    'Conjuge',
    'Dependentes',
    'TelefoneFixo',
    'MaisDeUmaLinhaTelefonica',
    'SegurancaOnline',
    'BackupOnline',
    'SeguroDispositivo',
    'SuporteTecnico',
    'TVaCabo',
    'StreamingFilmes',
    'ContaCorreio'
]

from pyspark.sql import functions as f
 
todasColunas = [f.when(f.col(c) == 'Sim',1).otherwise(0).alias(c) for c in ColunasBinarias]

for coluna in reversed(dados.columns):
  if coluna not in ColunasBinarias:
    todasColunas.insert(0, coluna)

dataset = dados.select(todasColunas)
dataset.show()

+---+----------+---------------+-----------+------------+----------------+-------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+
| id|Mais65anos|MesesDeContrato|   Internet|TipoContrato| MetodoPagamento|MesesCobrados|Churn|Conjuge|Dependentes|TelefoneFixo|MaisDeUmaLinhaTelefonica|SegurancaOnline|BackupOnline|SeguroDispositivo|SuporteTecnico|TVaCabo|StreamingFilmes|ContaCorreio|
+---+----------+---------------+-----------+------------+----------------+-------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+
|  0|         0|              1|        DSL| Mensalmente|BoletoEletronico|        29.85|    0|      1|          0|           0|                       0|              0|           1|                0|             0|      0|              0|      

In [36]:
dataset.select([f.count(f.when(f.isnan(c)  |  f.isnull(c), True)).alias(c)  for c in dataset.columns])\
.show()

+---+----------+---------------+--------+------------+---------------+-------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+
| id|Mais65anos|MesesDeContrato|Internet|TipoContrato|MetodoPagamento|MesesCobrados|Churn|Conjuge|Dependentes|TelefoneFixo|MaisDeUmaLinhaTelefonica|SegurancaOnline|BackupOnline|SeguroDispositivo|SuporteTecnico|TVaCabo|StreamingFilmes|ContaCorreio|
+---+----------+---------------+--------+------------+---------------+-------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+
|  0|         0|              0|       0|           0|              0|            0|    0|      0|          0|           0|                       0|              0|           0|                0|             0|      0|              0|           0|
+---+---

In [37]:
internet = dataset.groupBy('id').pivot('Internet').agg(f.lit(1)).na.fill(0)
tipocontrato = dataset.groupBy('id').pivot('TipoContrato').agg(f.lit(1)).na.fill(0)
metodopagamento = dataset.groupBy('id').pivot('MetodoPagamento').agg(f.lit(1)).na.fill(0)

In [38]:
dataset = dataset\
  .join(internet ,'id',how='inner')\
  .join(tipocontrato ,'id',how='inner')\
  .join(metodopagamento ,'id',how='inner')\
  .select(
      '*',
      f.col('DSL').alias('Internet_DSL'),
      f.col('FibraOptica').alias('Internet_FibraOptica'),
      f.col('Nao').alias('Internet_Nao'),
      f.col('Mensalmente').alias('Internet_Mensalmente'),
      f.col('UmAno').alias('Internet_UmAno'),
      f.col('DoisAnos').alias('Internet_DoisAnos'),
      f.col('DebitoEmConta').alias('MetodoPagamento_DebitoEmConta'),
      f.col('CartaoCredito').alias('MetodoPagamento_CartaoCredito'),
      f.col('BoletoEletronico').alias('MetodoPagamento_BoletoEletronico'),
      f.col('Boleto').alias('MetodoPagamento_Boleto'),
  )\
  .drop(
      'Internet','TipoContrato','MetodoPagamento','DSL','FibraOptica','Nao','Mensalmente','UmAno','DoisAnos',
      'DebitoEmConta','CartaoCredito','BoletoEletronico','Boleto'
  )

In [39]:
from pyspark.ml.feature import VectorAssembler  

In [40]:
dataset = dataset.withColumnRenamed('Churn','label')

In [41]:
X = dataset.columns
X.remove('label')
X.remove('id')
X

['Mais65anos',
 'MesesDeContrato',
 'MesesCobrados',
 'Conjuge',
 'Dependentes',
 'TelefoneFixo',
 'MaisDeUmaLinhaTelefonica',
 'SegurancaOnline',
 'BackupOnline',
 'SeguroDispositivo',
 'SuporteTecnico',
 'TVaCabo',
 'StreamingFilmes',
 'ContaCorreio',
 'Internet_DSL',
 'Internet_FibraOptica',
 'Internet_Nao',
 'Internet_Mensalmente',
 'Internet_UmAno',
 'Internet_DoisAnos',
 'MetodoPagamento_DebitoEmConta',
 'MetodoPagamento_CartaoCredito',
 'MetodoPagamento_BoletoEletronico',
 'MetodoPagamento_Boleto']

In [42]:
assembler = VectorAssembler(inputCols=X,outputCol='features')
dataset_prep = assembler.transform(dataset).select('features','label')

In [43]:
seed = 101
treino, teste = dataset_prep.randomSplit([0.7,0.3],seed=seed)

# Criando o primeiro Modelo

In [44]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression()
modelo_lr = lr.fit(treino)

previsos_lr_teste = modelo_lr.transform(teste)

In [45]:
previsos_lr_teste.select('label','prediction').show()

+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    0|       1.0|
|    1|       0.0|
|    1|       0.0|
|    0|       1.0|
|    0|       1.0|
|    0|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 20 rows



In [46]:
tp = previsos_lr_teste.select('label','prediction').where((f.col('label') == 1) & (f.col('prediction') == 1)).count() #  true positive
tn = previsos_lr_teste.select('label','prediction').where((f.col('label') == 0) & (f.col('prediction') == 0)).count()
fp = previsos_lr_teste.select('label','prediction').where((f.col('label') == 0) & (f.col('prediction') == 1)).count()
fn = previsos_lr_teste.select('label','prediction').where((f.col('label') == 1) & (f.col('prediction') == 0)).count() # false negative

In [47]:
# crio a função que vai receber os dados para serem avaliados
def calcula_mostra_metricas(modelo_lr, df_transform_modelo, normalize=False, percentage=True):
# os passos para montagem da matriz de confusão são os mesmos da aula
  tp = df_transform_modelo.select('label', 'prediction').where((f.col('label') == 1) & (f.col('prediction') == 1)).count()
  tn = df_transform_modelo.select('label', 'prediction').where((f.col('label') == 0) & (f.col('prediction') == 0)).count()
  fp = df_transform_modelo.select('label', 'prediction').where((f.col('label') == 0) & (f.col('prediction') == 1)).count()
  fn = df_transform_modelo.select('label', 'prediction').where((f.col('label') == 1) & (f.col('prediction') == 0)).count()

  valorP = 1
  valorN = 1

  if normalize:
    valorP = tp + fn
    valorN = fp + tn

  if percentage and normalize:
    valorP = valorP / 100
    valorN = valorN / 100

  # ‘s’ será minha string de retorno
  # ela vai coletar e montar minha matriz de confusão
  # e também os valores de acurácia, precisão, recall e F1-score
  s = ''

  # construção da minha string da matriz de confusão  
  s += ' '*20 + 'Previsto\n'
  s += ' '*15 +  'Churn' + ' '*5 + 'Não-Churn\n'
  s += ' '*4 + 'Churn' + ' '*6 +  str(int(tp/valorP)) + ' '*7 + str(int(fn/valorP)) + '\n'
  s += 'Real\n'
  s += ' '*4 + 'Não-Churn' + ' '*2 + str(int(fp/valorN)) +  ' '*7 + str(int(tn/valorN))  + '\n'
  s += '\n'

  # coleto o resumo das métricas com summary
  resumo_lr_treino = modelo_lr.summary

  # adiciono os valores de cada métrica a minha string de retorno
  s += f'Acurácia: {resumo_lr_treino.accuracy}\n'
  s += f'Precisão: {resumo_lr_treino.precisionByLabel[1]}\n'
  s += f'Recall: {resumo_lr_treino.recallByLabel[1]}\n'
  s += f'F1: {resumo_lr_treino.fMeasureByLabel()[1]}\n'

  return s

In [48]:
s = calcula_mostra_metricas(modelo_lr,previsos_lr_teste)
print(s)

                    Previsto
               Churn     Não-Churn
    Churn      1256       307
Real
    Não-Churn  400       1179

Acurácia: 0.7849014709963918
Precisão: 0.7706855791962175
Recall: 0.8125173082248685
F1: 0.7910488002156916



In [51]:
fn# essas métricas de confusão, estão para predição.

307

# Criando o segundo modelo

In [52]:
from pyspark.ml.classification import DecisionTreeClassifier

In [53]:
dtc = DecisionTreeClassifier(seed=seed)

In [54]:
modelo_dtc = dtc.fit(treino)
previsoes_dtc_teste = modelo_dtc.transform(teste)

In [55]:
previsoes_dtc_teste.show()

+--------------------+-----+--------------+--------------------+----------+
|            features|label| rawPrediction|         probability|prediction|
+--------------------+-----+--------------+--------------------+----------+
|(24,[0,1,2,3,4,5,...|    0|[2056.0,334.0]|[0.86025104602510...|       0.0|
|(24,[0,1,2,3,4,5,...|    0|  [62.0,128.0]|[0.32631578947368...|       1.0|
|(24,[0,1,2,3,4,5,...|    1| [239.0,205.0]|[0.53828828828828...|       0.0|
|(24,[0,1,2,3,4,5,...|    1| [239.0,205.0]|[0.53828828828828...|       0.0|
|(24,[0,1,2,3,4,5,...|    0| [239.0,205.0]|[0.53828828828828...|       0.0|
|(24,[0,1,2,3,4,5,...|    0|  [51.0,141.0]| [0.265625,0.734375]|       1.0|
|(24,[0,1,2,3,4,5,...|    0|[331.0,1951.0]|[0.14504820333041...|       1.0|
|(24,[0,1,2,3,4,5,...|    0| [239.0,205.0]|[0.53828828828828...|       0.0|
|(24,[0,1,2,3,4,5,...|    0|  [63.0,118.0]|[0.34806629834254...|       1.0|
|(24,[0,1,2,3,4,5,...|    0|[2056.0,334.0]|[0.86025104602510...|       0.0|
|(24,[0,1,2,

# Testando o segundo modelo

In [56]:
from pyspark.sql import functions as f # importo a biblioteca functions
from pyspark.ml.evaluation import MulticlassClassificationEvaluator # importo a classe MulticlassClassificationEvaluator

# crio a função que vai receber os dados para serem avaliados

def calcula_mostra_metricas_evaluate(df_transform_modelo, normalize=False, percentage=True):
# os passos para montagem da matriz de confusão são os mesmos da aula
  tp = df_transform_modelo.select('label', 'prediction').where((f.col('label') == 1) & (f.col('prediction') == 1)).count()
  tn = df_transform_modelo.select('label', 'prediction').where((f.col('label') == 0) & (f.col('prediction') == 0)).count()
  fp = df_transform_modelo.select('label', 'prediction').where((f.col('label') == 0) & (f.col('prediction') == 1)).count()
  fn = df_transform_modelo.select('label', 'prediction').where((f.col('label') == 1) & (f.col('prediction') == 0)).count()

  valorP = 1
  valorN = 1

  if normalize:
    valorP = tp + fn
    valorN = fp + tn

  if percentage and normalize:
    valorP = valorP / 100
    valorN = valorN / 100

  # ‘s’ será minha string de retorno
  # ela vai coletar e montar minha matriz de confusão
  # e também os valores de acurácia, precisão, recall e F1-score
  s = ''

  # construção da minha string da matriz de confusão  
  s += ' '*20 + 'Previsto\n'
  s += ' '*15 +  'Churn' + ' '*5 + 'Não-Churn\n'
  s += ' '*4 + 'Churn' + ' '*6 +  str(int(tp/valorP)) + ' '*7 + str(int(fn/valorP)) + '\n'
  s += 'Real\n'
  s += ' '*4 + 'Não-Churn' + ' '*2 + str(int(fp/valorN)) +  ' '*7 + str(int(tn/valorN))  + '\n'
  s += '\n'

  # adiciono os valores de cada métrica a minha string de retorno com MulticlassClassificationEvaluator
  evaluator = MulticlassClassificationEvaluator()

  s += f'Acurácia: {evaluator.evaluate(df_transform_modelo, {evaluator.metricName: "accuracy"})}\n'
  s += f'Precisão: {evaluator.evaluate(df_transform_modelo, {evaluator.metricName: "precisionByLabel", evaluator.metricLabel: 1})}\n'
  s += f'Recall: {evaluator.evaluate(df_transform_modelo, {evaluator.metricName: "recallByLabel", evaluator.metricLabel: 1})}\n'
  s += f'F1: {evaluator.evaluate(df_transform_modelo, {evaluator.metricName: "fMeasureByLabel", evaluator.metricLabel: 1})}\n'

  return s

In [57]:
z = calcula_mostra_metricas_evaluate(previsoes_dtc_teste, normalize=False)
print(z) 

                    Previsto
               Churn     Não-Churn
    Churn      1181       382
Real
    Não-Churn  336       1243

Acurácia: 0.7714831317632082
Precisão: 0.7785102175346078
Recall: 0.7555982085732565
F1: 0.7668831168831168



# Criando o mlehor modelo entre os dois

In [58]:
from pyspark.ml.classification import RandomForestClassifier

In [59]:
rfc = RandomForestClassifier(seed=seed)

In [60]:
modelo_rfc = rfc.fit(treino)
previsoes_rfc_treino = modelo_rfc.transform(treino)
previsoes_rfc_treino.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(24,[0,1,2,3,4,5,...|    0|[15.0052773466704...|[0.75026386733352...|       0.0|
|(24,[0,1,2,3,4,5,...|    0|[16.9295040273249...|[0.84647520136624...|       0.0|
|(24,[0,1,2,3,4,5,...|    0|[9.13052909106814...|[0.45652645455340...|       1.0|
|(24,[0,1,2,3,4,5,...|    0|[9.13052909106814...|[0.45652645455340...|       1.0|
|(24,[0,1,2,3,4,5,...|    0|[8.59288938528764...|[0.42964446926438...|       1.0|
|(24,[0,1,2,3,4,5,...|    1|[5.59647122885698...|[0.27982356144284...|       1.0|
|(24,[0,1,2,3,4,5,...|    0|[9.33276328267787...|[0.46663816413389...|       1.0|
|(24,[0,1,2,3,4,5,...|    1|[5.21616013157118...|[0.26080800657855...|       1.0|
|(24,[0,1,2,3,4,5,...|    0|[5.45640255581361...|[0.27282012779068...|       1.0|
|(24,[0,1,2,3,4,

# Cross Validation - RFC

In [61]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
rfc = RandomForestClassifier(seed=seed)

In [62]:
grid = ParamGridBuilder()\
  .addGrid(rfc.maxDepth, [2,5,10])\
  .addGrid(rfc.maxBins, [10,32,45])\
  .addGrid(rfc.numTrees,[10,20,50])\
  .build()

In [63]:
evaluator = MulticlassClassificationEvaluator()

In [64]:
rfc_cv = CrossValidator(
    estimator=rfc,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    numFolds=3,
    seed=seed
)

In [66]:
modelo_rfc_cv = rfc_cv.fit(treino)
melhor_modelo_rfc_cv = modelo_rfc_cv.bestModel

In [68]:
rfc_tunning = RandomForestClassifier(maxDepth=10, maxBins=45, numTrees=10, seed=seed)
modelo_rfc_tunning = rfc_tunning.fit(dataset_prep)

In [100]:
novo_cliente = [{
    'Mais65anos': 1,
    'MesesDeContrato': 1,
    'MesesCobrados': 45.30540797610398,
    'Conjuge': 0,
    'Dependentes': 0,
    'TelefoneFixo': 0,
    'MaisDeUmaLinhaTelefonica': 0,
    'SegurancaOnline': 0,
    'BackupOnline': 0,
    'SeguroDispositivo': 0,
    'SuporteTecnico': 0,
    'TVaCabo': 1,
    'StreamingFilmes': 1,
    'ContaCorreio': 1,
    'Internet_DSL': 1,
    'Internet_FibraOptica': 0,
    'Internet_Nao': 0,
    'Internet_Mensalmente': 1,
    'Internet_UmAno': 0,
    'Internet_DoisAnos': 0,
    'MetodoPagamento_DebitoEmConta': 0,
    'MetodoPagamento_CartaoCredito': 0,
    'MetodoPagamento_BoletoEletronico': 1,
    'MetodoPagamento_Boleto': 0
}]


In [101]:
novo_cliente = spark.createDataFrame(novo_cliente)
novo_cliente.show()

+------------+-------+------------+-----------+------------+-----------------+--------------------+--------------------+------------+--------------+----------+------------------------+-----------------+---------------+----------------------+--------------------------------+-----------------------------+-----------------------------+---------------+-----------------+---------------+--------------+-------+------------+
|BackupOnline|Conjuge|ContaCorreio|Dependentes|Internet_DSL|Internet_DoisAnos|Internet_FibraOptica|Internet_Mensalmente|Internet_Nao|Internet_UmAno|Mais65anos|MaisDeUmaLinhaTelefonica|    MesesCobrados|MesesDeContrato|MetodoPagamento_Boleto|MetodoPagamento_BoletoEletronico|MetodoPagamento_CartaoCredito|MetodoPagamento_DebitoEmConta|SegurancaOnline|SeguroDispositivo|StreamingFilmes|SuporteTecnico|TVaCabo|TelefoneFixo|
+------------+-------+------------+-----------+------------+-----------------+--------------------+--------------------+------------+--------------+----------

In [102]:
assembler = VectorAssembler(inputCols = X, outputCol = 'features')

In [103]:
novo_cliente_prep = assembler.transform(novo_cliente).select('features')

In [104]:
modelo_rfc_tunning.transform(novo_cliente_prep).show()

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(24,[0,1,2,11,12,...|[1.24640781461703...|[0.12464078146170...|       1.0|
+--------------------+--------------------+--------------------+----------+

