# Work with customer dataset of a telecommunications company




# Task

Using customer data from a telecommunications company, train a model that predicts their outflow.


## Initializing a spark session

In [1]:
!killall ngrok

In [2]:
!pip install pyspark --quiet
!pip install -U -q PyDrive --quiet
!apt install openjdk-8-jdk-headless &> /dev/null

import os
os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/jdk-21.jdk/Contents/Home"

!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip &> /dev/null
!unzip ngrok-stable-linux-amd64.zip &> /dev/null
get_ipython().system_raw('/opt/homebrew/bin/ngrok http 4050 &')


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').appName('PySpark_Tutorial').getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/04 15:27:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Data loading

In [4]:
sparkDataframe = spark.read.option('header', True).option('delimiter', ',').csv('WA_Fn-UseC_-Telco-Customer-Churn.csv')

In [5]:
type(sparkDataframe)

pyspark.sql.dataframe.DataFrame

## Understanding the structure of a dataframe


In [6]:
sparkDataframe.show(5, 8)

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|  7590-...|Female|            0|    Yes|        No|     1|          No|     No ph...|            DSL|            No|         Yes|              No|         No|         No|             No|Month...|             Yes| 

In [7]:
sparkDataframe.count()

7043

In [8]:
sparkDataframe.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: string (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: string (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: string (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



## Conversion of numerical types

In [10]:
sparkDataframe.columns

['customerID',
 'gender',
 'SeniorCitizen',
 'Partner',
 'Dependents',
 'tenure',
 'PhoneService',
 'MultipleLines',
 'InternetService',
 'OnlineSecurity',
 'OnlineBackup',
 'DeviceProtection',
 'TechSupport',
 'StreamingTV',
 'StreamingMovies',
 'Contract',
 'PaperlessBilling',
 'PaymentMethod',
 'MonthlyCharges',
 'TotalCharges',
 'Churn']

In [11]:
from pyspark.sql.functions import expr, col

sparkDataframe_ = sparkDataframe.select(
col('SeniorCitizen').cast('int'), 
    col('Tenure').cast('Double'), 
    col('MonthlyCharges').cast('Double'),
    col('TotalCharges').cast('Double'), 
    col('customerID').cast('string'), 
    col('gender').cast('string'), 
    col('Partner').cast('string'),
    col('Dependents').cast('string'),
    col('PhoneService').cast('string'), 
    col('InternetService').cast('string'), 
    col('OnlineSecurity').cast('string'),
    col('OnlineBackup').cast('string'),
    col('DeviceProtection').cast('string'), 
    col('TechSupport').cast('string'), 
    col('StreamingTV').cast('string'),
    col('StreamingMovies').cast('string'),
    col('Contract').cast('string'), 
    col('PaperlessBilling').cast('string'),
    col('PaymentMethod').cast('string'),
    col('Churn').cast('string')
    
)


## Data cleaning

In [12]:
cols= sparkDataframe_.columns

In [13]:
cols

['SeniorCitizen',
 'Tenure',
 'MonthlyCharges',
 'TotalCharges',
 'customerID',
 'gender',
 'Partner',
 'Dependents',
 'PhoneService',
 'InternetService',
 'OnlineSecurity',
 'OnlineBackup',
 'DeviceProtection',
 'TechSupport',
 'StreamingTV',
 'StreamingMovies',
 'Contract',
 'PaperlessBilling',
 'PaymentMethod',
 'Churn']

In [14]:
for i in range(len(cols)):
    sparkDataframe_ = sparkDataframe_.filter(cols[i] +' is not NULL')
    

## Feature engineering


In [15]:
from pyspark.sql.functions import when

sparkDataframe_1 = sparkDataframe_.drop('CustomerID', 'Gender', 'PhoneService', 'MultipleLines', 'Tenure')

#Gender does not affect whether a person remains a customer of the company, as well as the telephone service.
##Tenure can be obtained as follows Total Charges/Monthly Charges.

In [16]:
sparkDataframe_1.columns

['SeniorCitizen',
 'MonthlyCharges',
 'TotalCharges',
 'Partner',
 'Dependents',
 'InternetService',
 'OnlineSecurity',
 'OnlineBackup',
 'DeviceProtection',
 'TechSupport',
 'StreamingTV',
 'StreamingMovies',
 'Contract',
 'PaperlessBilling',
 'PaymentMethod',
 'Churn']

In [17]:
sparkDataframe_2  = sparkDataframe_1.withColumn('Tenure', col('TotalCharges')/col('MonthlyCharges')).drop('TotalCharges',
                                                                                                         'MonthlyCharges')

## Vectorization of features

In [18]:
sparkDataframe_2.printSchema()

root
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- Churn: string (nullable = true)
 |-- Tenure: double (nullable = true)



24/02/04 15:27:56 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [19]:
from pyspark import mllib
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

#Converting text columns to numbers via StringIndexer

text_cols = ["SeniorCitizen", "Partner", "Dependents", "InternetService", 
                "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", "StreamingTV", 
                "StreamingMovies", "Contract", "PaperlessBilling", "PaymentMethod", "Churn"]


indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(sparkDataframe_2) for column in text_cols]
pipeline = Pipeline(stages=indexers)
new_sparkDataframe = pipeline.fit(sparkDataframe_2).transform(sparkDataframe_2)

In [20]:
new_sparkDataframe.drop("Partner", "Dependents", "InternetService", 
                "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", "StreamingTV", 
                "StreamingMovies", "Contract", "PaperlessBilling", "PaymentMethod", "Churn")

DataFrame[SeniorCitizen: int, Tenure: double, SeniorCitizen_index: double, Partner_index: double, Dependents_index: double, InternetService_index: double, OnlineSecurity_index: double, OnlineBackup_index: double, DeviceProtection_index: double, TechSupport_index: double, StreamingTV_index: double, StreamingMovies_index: double, Contract_index: double, PaperlessBilling_index: double, PaymentMethod_index: double, Churn_index: double]

In [21]:
new_sparkDataframe.printSchema()

root
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- Churn: string (nullable = true)
 |-- Tenure: double (nullable = true)
 |-- SeniorCitizen_index: double (nullable = false)
 |-- Partner_index: double (nullable = false)
 |-- Dependents_index: double (nullable = false)
 |-- InternetService_index: double (nullable = false)
 |-- OnlineSecurity_index: double (nullable = false)
 |-- OnlineBackup_index: double (nullable = false)
 |-- DeviceProtection_index: double (

In [22]:
from pyspark.ml.feature import OneHotEncoder

#After indexing, we use OHE to encode categorical feature

features_inp  = ["Partner_index", "Dependents_index",  "InternetService_index", 
                "OnlineSecurity_index", "OnlineBackup_index", "DeviceProtection_index", "TechSupport_index",
                 "StreamingTV_index", 
                "StreamingMovies_index", "Contract_index", "PaperlessBilling_index", "PaymentMethod_index"]


In [45]:
new_columns = [OneHotEncoder(inputCol=column, outputCol=column+"_ohe").fit(new_sparkDataframe) for column in features_inp]
pipeline = Pipeline(stages=new_columns)
new_sparkDataframe_ = pipeline.fit(new_sparkDataframe).transform(new_sparkDataframe)

In [46]:
new_sparkDataframe_

DataFrame[SeniorCitizen: int, Partner: string, Dependents: string, InternetService: string, OnlineSecurity: string, OnlineBackup: string, DeviceProtection: string, TechSupport: string, StreamingTV: string, StreamingMovies: string, Contract: string, PaperlessBilling: string, PaymentMethod: string, Churn: string, Tenure: double, SeniorCitizen_index: double, Partner_index: double, Dependents_index: double, InternetService_index: double, OnlineSecurity_index: double, OnlineBackup_index: double, DeviceProtection_index: double, TechSupport_index: double, StreamingTV_index: double, StreamingMovies_index: double, Contract_index: double, PaperlessBilling_index: double, PaymentMethod_index: double, Churn_index: double, Partner_index_ohe: vector, Dependents_index_ohe: vector, InternetService_index_ohe: vector, OnlineSecurity_index_ohe: vector, OnlineBackup_index_ohe: vector, DeviceProtection_index_ohe: vector, TechSupport_index_ohe: vector, StreamingTV_index_ohe: vector, StreamingMovies_index_ohe

In [47]:
new_sparkDataframe_ = new_sparkDataframe_.drop("Partner", "Dependents", "InternetService", 
                "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", "StreamingTV", 
                "StreamingMovies", "Contract", "PaperlessBilling", "PaymentMethod", "Churn")

In [48]:
new_sparkDataframe_ = new_sparkDataframe_.drop("Partner_index", "Dependents_index", "InternetService_index", 
                "OnlineSecurity_index", "OnlineBackup_index", "DeviceProtection_index", "TechSupport_index", "StreamingTV_index", 
                "StreamingMovies_index", "Contract_index", "PaperlessBilling_index", "PaymentMethod_index")


In [49]:
new_sparkDataframe_

DataFrame[SeniorCitizen: int, Tenure: double, SeniorCitizen_index: double, Churn_index: double, Partner_index_ohe: vector, Dependents_index_ohe: vector, InternetService_index_ohe: vector, OnlineSecurity_index_ohe: vector, OnlineBackup_index_ohe: vector, DeviceProtection_index_ohe: vector, TechSupport_index_ohe: vector, StreamingTV_index_ohe: vector, StreamingMovies_index_ohe: vector, Contract_index_ohe: vector, PaperlessBilling_index_ohe: vector, PaymentMethod_index_ohe: vector]

In [50]:
from pyspark.ml.feature import VectorAssembler


#Combining features into one vector

features= list(new_sparkDataframe_.drop('Churn_index').columns)
target= 'Churn_index'

In [51]:
vectorizer = VectorAssembler(inputCols = features,  outputCol = "features_vec")

In [52]:
sparkDataframe_vectorised = vectorizer.transform(new_sparkDataframe_)


In [53]:
sparkDataframe_vectorised.printSchema()

root
 |-- SeniorCitizen: integer (nullable = true)
 |-- Tenure: double (nullable = true)
 |-- SeniorCitizen_index: double (nullable = false)
 |-- Churn_index: double (nullable = false)
 |-- Partner_index_ohe: vector (nullable = true)
 |-- Dependents_index_ohe: vector (nullable = true)
 |-- InternetService_index_ohe: vector (nullable = true)
 |-- OnlineSecurity_index_ohe: vector (nullable = true)
 |-- OnlineBackup_index_ohe: vector (nullable = true)
 |-- DeviceProtection_index_ohe: vector (nullable = true)
 |-- TechSupport_index_ohe: vector (nullable = true)
 |-- StreamingTV_index_ohe: vector (nullable = true)
 |-- StreamingMovies_index_ohe: vector (nullable = true)
 |-- Contract_index_ohe: vector (nullable = true)
 |-- PaperlessBilling_index_ohe: vector (nullable = true)
 |-- PaymentMethod_index_ohe: vector (nullable = true)
 |-- features_vec: vector (nullable = true)



In [54]:
sparkDataframe_vectorised.columns

['SeniorCitizen',
 'Tenure',
 'SeniorCitizen_index',
 'Churn_index',
 'Partner_index_ohe',
 'Dependents_index_ohe',
 'InternetService_index_ohe',
 'OnlineSecurity_index_ohe',
 'OnlineBackup_index_ohe',
 'DeviceProtection_index_ohe',
 'TechSupport_index_ohe',
 'StreamingTV_index_ohe',
 'StreamingMovies_index_ohe',
 'Contract_index_ohe',
 'PaperlessBilling_index_ohe',
 'PaymentMethod_index_ohe',
 'features_vec']

In [55]:
sparkDataframe_vectorised = sparkDataframe_vectorised.drop('SeniorCitizen',
 'Tenure',
 'SeniorCitizen_index',
 'Partner_index_ohe',
 'Dependents_index_ohe',
 'InternetService_index_ohe',
 'OnlineSecurity_index_ohe',
 'OnlineBackup_index_ohe',
 'DeviceProtection_index_ohe',
 'TechSupport_index_ohe',
 'StreamingTV_index_ohe',
 'StreamingMovies_index_ohe',
 'Contract_index_ohe',
 'PaperlessBilling_index_ohe',
 'PaymentMethod_index_ohe')

In [57]:
sparkDataframe_vectorised.show(2,100)

+-----------+---------------------------------------------------------------------------------------------+
|Churn_index|                                                                                 features_vec|
+-----------+---------------------------------------------------------------------------------------------+
|        0.0|     (25,[1,4,6,7,10,11,13,15,17,19,21,22],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|        0.0|(25,[1,3,4,6,8,9,12,13,15,17,23],[33.17822651448639,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
+-----------+---------------------------------------------------------------------------------------------+
only showing top 2 rows



## Creating and training a model

In [58]:
from pyspark.ml.classification import LogisticRegression

In [59]:
model = LogisticRegression(featuresCol='features_vec', labelCol = target)

In [60]:
data_train, data_test = sparkDataframe_vectorised.randomSplit([0.8, 0.2])

### Selection of optimal hyperparameters

In [61]:
from pyspark.ml.tuning import ParamGridBuilder

grid = ParamGridBuilder().addGrid(model.regParam, [0.5, 5]).addGrid(model.elasticNetParam, [0.01, 0.1]).addGrid(model.maxIter, [5, 15]).build()


In [63]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

eval = BinaryClassificationEvaluator( rawPredictionCol ='prediction', labelCol = target)

In [66]:
from pyspark.ml.tuning import CrossValidator

cv = CrossValidator(estimator = model, estimatorParamMaps = grid, evaluator = eval) 


In [67]:
cv_model = cv.fit(data_train)

24/02/04 21:53:00 WARN CacheManager: Asked to cache already cached data.
24/02/04 21:53:00 WARN CacheManager: Asked to cache already cached data.


### The best model

In [69]:
best_model = cv_model.bestModel
best_model.extractParamMap()

{Param(parent='LogisticRegression_8d3527345b22', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_8d3527345b22', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.01,
 Param(parent='LogisticRegression_8d3527345b22', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_8d3527345b22', name='featuresCol', doc='features column name.'): 'features_vec',
 Param(parent='LogisticRegression_8d3527345b22', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_8d3527345b22', name='labelCol', doc='label column name.'): 'Churn_index',
 Param(parent='LogisticRegression_8d3527345b22', name='maxBlockSizeInMB', doc='maximum memory 

In [70]:
test_model = cv.fit(data_test)

In [72]:
test_model.transform(data_test).show(6, 8)

+-----------+------------+-------------+-----------+----------+
|Churn_index|features_vec|rawPrediction|probability|prediction|
+-----------+------------+-------------+-----------+----------+
|        0.0|    (25,[...|     [-0.2...|   [0.43...|       1.0|
|        0.0|    (25,[...|     [-0.2...|   [0.43...|       1.0|
|        0.0|    (25,[...|     [-0.2...|   [0.44...|       1.0|
|        0.0|    (25,[...|     [0.01...|   [0.50...|       0.0|
|        0.0|    (25,[...|     [0.10...|   [0.52...|       0.0|
|        0.0|    (25,[...|     [-0.2...|   [0.44...|       1.0|
+-----------+------------+-------------+-----------+----------+
only showing top 6 rows



In [73]:
eval.evaluate(test_model.transform(data_test))


0.5993365608750224