In [None]:
from pyspark.sql import SparkSession

In [None]:
spark: SparkSession = SparkSession \
  .builder \
  .master('local[*]') \
  .appName('classification with spark') \
  .getOrCreate()

In [None]:
spark

# Import and Processing of data

In [None]:
df = spark.read.csv('./data/dados_clientes.csv', header=True, inferSchema=True)

In [None]:
df.show()

In [None]:
df.count()

In [None]:
df.groupBy('Churn').count().show()

In [None]:
df.printSchema()

In [None]:
binary_columns = [
  'Churn',
  'Conjuge',
  'Dependentes',
  'TelefoneFixo',
  'MaisDeUmaLinhaTelefonica',
  'SegurancaOnline',
  'BackupOnline',
  'SeguroDispositivo',
  'SuporteTecnico',
  'TVaCabo',
  'StreamingFilmes',
  'ContaCorreio'
]

In [None]:
from pyspark.sql import functions as f

In [None]:
all_columns = [f.when(f.col(c) == 'Sim', 1).otherwise(0).alias(c) for c in binary_columns]

In [None]:
[all_columns.insert(0, c) if c not in binary_columns else None for c in reversed(df.columns)]    

In [None]:
all_columns

In [None]:
dataset = df.select(all_columns)

In [None]:
dataset.show()

In [None]:
dataset.printSchema()

In [None]:
dataset.select('Internet', 'TipoContrato', 'MetodoPagamento').show()

In [None]:
internet = dataset.groupBy('id').pivot('Internet').agg(f.lit(1)).fillna(0)
internet = internet \
  .select([f.col(c).alias(f'Internet_{c}') if c != 'id' else f.col(c) for c in internet.columns])


contract_type = dataset.groupBy('id').pivot('TipoContrato').agg(f.lit(1)).fillna(0)
contract_type = contract_type \
  .select([f.col(c).alias(f'TipoContrato_{c}') if c != 'id' else f.col(c) for c in contract_type.columns])


payment_method = dataset.groupBy('id').pivot('MetodoPagamento').agg(f.lit(1)).fillna(0)
payment_method = payment_method \
  .select([f.col(c).alias(f'MetodoPagamento_{c}') if c != 'id' else f.col(c) for c in payment_method.columns])

In [None]:
drop_columns = ['Internet', 'TipoContrato', 'MetodoPagamento']

dataset = dataset \
  .join(internet, 'id', how='inner') \
  .join(contract_type, 'id', how='inner') \
  .join(payment_method, 'id', how='inner') \
  .drop(*drop_columns)

In [None]:
dataset.printSchema()

In [None]:
dataset.show()

# Vectorizing and separating training and test data

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
dataset = dataset.withColumnRenamed('Churn', 'label')

In [None]:
drop_columns = ['label', 'id']
x = []

[x.append(c) if c not in drop_columns else None for c in dataset.columns]

In [None]:
assembler = VectorAssembler(inputCols=x, outputCol='features')

In [None]:
dataset_prep = assembler.transform(dataset).select('features', 'label')

In [None]:
dataset_prep.show(truncate=False)

In [None]:
seed = 101

In [None]:
training, test = dataset_prep.randomSplit([0.7, 0.3], seed=seed)

In [None]:
training.count()

In [None]:
test.count()

# Model result function

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator()

In [None]:
def get_result(df, result_name, evaluator):
  tp = df.select('label', 'prediction').where((f.col('label') == 1) & (f.col('prediction') == 1)).count()
  tn = df.select('label', 'prediction').where((f.col('label') == 0) & (f.col('prediction') == 0)).count()
  fp = df.select('label', 'prediction').where((f.col('label') == 0) & (f.col('prediction') == 1)).count()
  fn = df.select('label', 'prediction').where((f.col('label') == 1) & (f.col('prediction') == 0)).count()

  print('='*80)
  print(f'- {result_name}')
  print('-'*80)
  print('- Treino')
  print('-'*80)
  print("- Acurácia: %f" % evaluator.evaluate(df, {evaluator.metricName: "accuracy"}))
  print("- Precisão: %f" % evaluator.evaluate(df, {evaluator.metricName: "precisionByLabel", evaluator.metricLabel: 1}))
  print("- Recall: %f" % evaluator.evaluate(df, {evaluator.metricName: "recallByLabel", evaluator.metricLabel: 1}))
  print("- F1: %f" % evaluator.evaluate(df, {evaluator.metricName: "fMeasureByLabel", evaluator.metricLabel: 1}))
  print('-'*80)
  print('- Resultado com os dados de treino')
  print('-'*80)
  print(f'- Churn   : {tp} acertos | {fp} erros')
  print(f'- No-Churn: {tn} acertos | {fn} erros')
  print('='*80)

# Logistic Regression Model

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
lr = LogisticRegression()

In [None]:
model_lr = lr.fit(training)

In [None]:
predictions_lr_test = model_lr.transform(test)

In [None]:
predictions_lr_test.show()

In [None]:
training_lr_summary = model_lr.summary

In [None]:
training_lr_summary.accuracy

# Decision Tree Model

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

In [None]:
dtc = DecisionTreeClassifier(seed=seed)

In [None]:
modelo_dtc = dtc.fit(training)

In [None]:
predictions_dtc_training = modelo_dtc.transform(training)

In [None]:
predictions_dtc_test = modelo_dtc.transform(test)

# Random Forest Classifier Model

In [None]:
from pyspark.ml.classification import RandomForestClassifier

In [None]:
rfc = RandomForestClassifier(seed=seed)

In [None]:
modelo_rfc = rfc.fit(training)

In [None]:
predictions_rfc_training = modelo_rfc.transform(training)

In [None]:
predictions_rfc_training.show()

In [None]:
predictions_rfc_test = modelo_rfc.transform(test)

In [None]:
predictions_rfc_test.show()

# Model Results

In [None]:
get_result(predictions_lr_test, 'Modelo Regressão Logística', evaluator)


In [None]:
get_result(predictions_dtc_test, 'Modelo Árvore Decisão', evaluator)

In [None]:
get_result(predictions_rfc_test, 'Modelo Árvore Aleatória Decisão', evaluator)