# Formação Cientista de Dados
### Projeto com Feedback 4
### Prevendo Customer Churn em Operadoras de Telecom

### Gabriel Quaiotti - Abr 2020

Customer Churn (ou Rotatividade de Clientes, em uma tradução livre) refere-se a uma decisão tomada pelo cliente sobre o término do relacionamento comercial. Refere-se também à perda de clientes. A fidelidade do cliente e a rotatividade de clientes sempre somam 100%. Se uma empresa tem uma taxa de fidelidade de 60%, então a taxa de perda de clientes é de 40%. De acordo com a
regra de lucratividade do cliente 80/20, 20% dos clientes estão gerando 80% da receita. Portanto, é muito importante prever os usuários que provavelmente abandonarão o relacionamento comercial e os fatores que afetam as decisões do cliente.

Neste projeto, você deve prever o Customer Churn em uma Operadora de Telecom.

Os datasets de treino e de teste serão fornecidos para você em anexo a este projeto. Seu trabalho é criar um modelo de aprendizagem de máquina que possa prever se um cliente pode ou não cancelar seu plano e qual a probabilidade
de isso ocorrer. O cabeçalho do dataset é uma descrição do tipo de informação em cada coluna.

Usando linguagem Python, recomendamos você criar um modelo de Regressão Logística, para extrair a informação se um cliente vai cancelar seu plano (Sim ou Não) e a probabilidade de uma opção ou outra.

In [169]:
# Libraries
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType

from pyspark.sql.functions import col
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import udf
from pyspark.sql.functions import stddev
from pyspark.sql.functions import mean

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import PipelineModel


from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LogisticRegressionModel


from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.mllib.evaluation import MulticlassMetrics

In [170]:
# Spark Session - usado para trabalhar com o Spark
spSession = SparkSession.builder.master("local").appName("DSA-TELECOM-TEST").getOrCreate()

# TEST

## Load TEST dataset

In [171]:
# Read train dataset
features_rdd = sc.textFile('../data/projeto4_telecom_teste.csv')

In [172]:
features_rdd.take(5)

['"","state","account_length","area_code","international_plan","voice_mail_plan","number_vmail_messages","total_day_minutes","total_day_calls","total_day_charge","total_eve_minutes","total_eve_calls","total_eve_charge","total_night_minutes","total_night_calls","total_night_charge","total_intl_minutes","total_intl_calls","total_intl_charge","number_customer_service_calls","churn"',
 '"1","HI",101,"area_code_510","no","no",0,70.9,123,12.05,211.9,73,18.01,236,73,10.62,10.6,3,2.86,3,"no"',
 '"2","MT",137,"area_code_510","no","no",0,223.6,86,38.01,244.8,139,20.81,94.2,81,4.24,9.5,7,2.57,0,"no"',
 '"3","OH",103,"area_code_408","no","yes",29,294.7,95,50.1,237.3,105,20.17,300.3,127,13.51,13.7,6,3.7,1,"no"',
 '"4","NM",99,"area_code_415","no","no",0,216.8,123,36.86,126.4,88,10.74,220.6,82,9.93,15.7,2,4.24,1,"no"']

In [173]:
# Remove header and split by ','
header = features_rdd.first()
features_rdd2 = features_rdd.filter(lambda line: line != header).map(lambda line: line.split(","))

In [174]:
# Define the dataFrame columns
test_fields = [StructField("id", StringType(), True), 
     StructField("state", StringType(), True),
     StructField("account_length", StringType(), True),
     StructField("area_code", StringType(), True),
     StructField("international_plan", StringType(), True),
     StructField("voice_mail_plan", StringType(), True),
     StructField("number_vmail_messages", StringType(), True),
     StructField("total_day_minutes", StringType(), True),
     StructField("total_day_calls", StringType(), True),
     StructField("total_day_charge", StringType(), True),
     StructField("total_eve_minutes", StringType(), True),
     StructField("total_eve_calls", StringType(), True),
     StructField("total_eve_charge", StringType(), True),
     StructField("total_night_minutes", StringType(), True),
     StructField("total_night_calls", StringType(), True),
     StructField("total_night_charge", StringType(), True),
     StructField("total_intl_minutes", StringType(), True),
     StructField("total_intl_calls", StringType(), True),
     StructField("total_intl_charge", StringType(), True),
     StructField("number_customer_service_calls", StringType(), True),
     StructField("churn", StringType(), True)]     

In [175]:
# Define the dataFrame schema
features_schema = StructType( test_fields )

In [176]:
# Create dataFrame
test_ds = spSession.createDataFrame(features_rdd2, features_schema)

In [177]:
test_ds = test_ds.select('international_plan', 'number_customer_service_calls', 'total_day_minutes', 'total_eve_charge', 'churn')

In [178]:
# Replace values into String columns
test_ds = test_ds.withColumn('international_plan', regexp_replace(col('international_plan'), 'no', '0') )
test_ds = test_ds.withColumn('international_plan', regexp_replace(col('international_plan'), 'yes', '1') )
test_ds = test_ds.withColumn('international_plan', regexp_replace(col('international_plan'), '"', '') )

test_ds = test_ds.withColumn('churn', regexp_replace(col('churn'), 'no', '0') )
test_ds = test_ds.withColumn('churn', regexp_replace(col('churn'), 'yes', '1') )
test_ds = test_ds.withColumn('churn', regexp_replace(col('churn'), '"', '') )

In [179]:
# Cast column types to Int or Double
test_ds = test_ds.withColumn("total_day_minutes", test_ds["total_day_minutes"].cast(DoubleType()))
test_ds = test_ds.withColumn("total_eve_charge", test_ds["total_eve_charge"].cast(DoubleType()))
test_ds = test_ds.withColumn("number_customer_service_calls", test_ds["number_customer_service_calls"].cast(DoubleType()))
test_ds = test_ds.withColumn("international_plan", test_ds["international_plan"].cast(DoubleType()))
test_ds = test_ds.withColumn("churn", test_ds["churn"].cast(DoubleType()))

In [180]:
test_ds.toPandas().head()

Unnamed: 0,international_plan,number_customer_service_calls,total_day_minutes,total_eve_charge,churn
0,0.0,3.0,70.9,18.01,0.0
1,0.0,0.0,223.6,20.81,0.0
2,0.0,1.0,294.7,20.17,0.0
3,0.0,1.0,216.8,10.74,0.0
4,0.0,2.0,197.4,10.54,0.0


## Transform features to vector

In [181]:
# Convert predictor columns to vector
vector_ds = VectorAssembler(inputCols = test_ds.drop('churn').columns, outputCol="features").transform(test_ds)

In [182]:
vector_ds

DataFrame[international_plan: double, number_customer_service_calls: double, total_day_minutes: double, total_eve_charge: double, churn: double, features: vector]

## Scale TEST dataset

In [183]:
# Get the columns to scale (predictors)
columns_to_scale = test_ds.drop('churn').columns

# Transform values to vector (requirement)
assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]

# Scale column values
scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]

# Execute the pipeline (Vector + Scale)
pipeline = Pipeline(stages=assemblers + scalers)

# Fit the scale model
scalerModel = pipeline.fit(test_ds)

# Transform as dataFrame
scaled_ds = scalerModel.transform(test_ds)

In [184]:
# Transform vector to double
unlist = udf(lambda x: float(list(x)[0]), DoubleType())

# Get scaled values
for i in columns_to_scale:
    # Transform original column
    scaled_ds = scaled_ds.withColumn(i, unlist(i + "_scaled"))
    # Drop _vec column
    scaled_ds = scaled_ds.drop(i + "_vec")
    # Drop _scaled column
    scaled_ds = scaled_ds.drop(i + "_scaled")

# Standartize TEST dataset

In [185]:
# Get the columns to scale (predictors)
columns_to_standard = scaled_ds.drop('id', 'churn').columns
columns_to_standard

['international_plan',
 'number_customer_service_calls',
 'total_day_minutes',
 'total_eve_charge']

In [186]:
for i in columns_to_standard:
    m = scaled_ds.select(mean(i).alias('mean')).collect()
    s = scaled_ds.select(stddev(i).alias('std')).collect()
    scaled_ds = scaled_ds.withColumn(i, (col(i) - m[0]['mean']) / s[0]['std'])

# TEST

In [187]:
# Convert predictor columns to vector
vector_ds = VectorAssembler(inputCols=scaled_ds.drop('churn').columns, outputCol="features").transform(scaled_ds)

In [188]:
vector_ds.show()

+--------------------+-----------------------------+--------------------+--------------------+-----+--------------------+
|  international_plan|number_customer_service_calls|   total_day_minutes|    total_eve_charge|churn|            features|
+--------------------+-----------------------------+--------------------+--------------------+-----+--------------------+
|-0.31435656731631717|           1.0980750080169768| -2.0939056396184457|  0.2375069325583213|  0.0|[-0.3143565673163...|
|-0.31435656731631717|           -1.230793997535554|  0.8018596606739083|  0.8932817674646958|  0.0|[-0.3143565673163...|
|-0.31435656731631717|          -0.4545043290180438|  2.1501826001027644|  0.7433903766289537|  0.0|[-0.3143565673163...|
|-0.31435656731631717|          -0.4545043290180438|  0.6729061306019508| -1.4651655852164471|  0.0|[-0.3143565673163...|
|-0.31435656731631717|          0.32178533949946647|  0.3050092948084224| -1.5120066448526168|  0.0|[-0.3143565673163...|
|-0.31435656731631717|  

In [189]:
model = LogisticRegressionModel.load("../obj/logistic_regression.obj")

In [190]:
prediction = model.transform(vector_ds)

In [191]:
prediction.toPandas().head()

Unnamed: 0,international_plan,number_customer_service_calls,total_day_minutes,total_eve_charge,churn,features,rawPrediction,probability,prediction
0,-0.314357,1.098075,-2.093906,0.237507,0.0,"[-0.31435656731631717, 1.0980750080169768, -2....","[0.7504101832107734, -0.7504101832107734]","[0.6792680694739247, 0.3207319305260753]",0.0
1,-0.314357,-1.230794,0.80186,0.893282,0.0,"[-0.31435656731631717, -1.230793997535554, 0.8...","[0.6884941765504509, -0.6884941765504509]","[0.665631865153395, 0.33436813484660494]",0.0
2,-0.314357,-0.454504,2.150183,0.74339,0.0,"[-0.31435656731631717, -0.4545043290180438, 2....","[-1.076142456507508, 1.076142456507508]","[0.2542367148420343, 0.7457632851579656]",1.0
3,-0.314357,-0.454504,0.672906,-1.465166,0.0,"[-0.31435656731631717, -0.4545043290180438, 0....","[0.4650273424971406, -0.4650273424971406]","[0.6142061217921756, 0.3857938782078244]",0.0
4,-0.314357,0.321785,0.305009,-1.512007,0.0,"[-0.31435656731631717, 0.32178533949946647, 0....","[-0.003832589247174896, 0.003832589247174896]","[0.49904185386103594, 0.500958146138964]",1.0


# EVALUATE

In [192]:
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol = "churn")
print(evaluator.getMetricName(), evaluator.evaluate(prediction))

areaUnderROC 0.8197888822888871


In [193]:
# View confusion matrix
total = prediction.count()
prediction.groupBy('churn', 'prediction').count().withColumn('%', col('count') / total).show()

+-----+----------+-----+--------------------+
|churn|prediction|count|                   %|
+-----+----------+-----+--------------------+
|  1.0|       1.0|  196| 0.11757648470305938|
|  0.0|       1.0|  543| 0.32573485302939414|
|  1.0|       0.0|   28|0.016796640671865627|
|  0.0|       0.0|  900|  0.5398920215956808|
+-----+----------+-----+--------------------+



In [194]:
preds = prediction.select('churn', 'prediction').withColumnRenamed('churn', 'label')

In [195]:
metrics = MulticlassMetrics(preds.rdd)

In [196]:
metrics.accuracy

0.6574685062987402

In [197]:
metrics.confusionMatrix().toArray()

array([[900.,  28.],
       [543., 196.]])