# Create Prediction with Machine Learning

In [1]:
import seaborn as sns
import matplotlib.pylab as plt
from google.colab import files, drive
import pyspark
from pyspark.sql import SparkSession
drive.mount('/content/drive')

# file location
loc = '/content/drive/MyDrive/Machine learning folder/Customer Churn/WA_Fn-UseC_-Telco-Customer-Churn.csv'

spark = SparkSession.builder.appName('churn_prediction').getOrCreate()
spark

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
df = spark.read.csv(loc, header = True, inferSchema = True)
df.show(5)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|         Yes|              No|         No|    

## Find and Input Missing Value

In [3]:
from pyspark.sql.functions import isnan, when, count, col
missing_val = df.select([count(when(isnan(c), c)).alias(c) for c in df.columns])
missing_val.show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|         0|     0|            0|      0|         0|     0|           0|            0|              0|             0|           0|               0|          0|          0|              0|       0|               0| 

## Find and Delete Duplicate

In [4]:
from pyspark.sql.functions import count
duplicated_count = df.groupBy(df.columns).agg(count("*").alias("count")).where('count > 1')
duplicated_count.show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|count|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+-----+
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+

## Check and Change Data type

In [5]:
# Total Charges still string data type
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import col
df = df.withColumn('TotalCharges', col('TotalCharges').cast(DoubleType()))
df = df.withColumn('tenure', col('tenure').cast(IntegerType()))
df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- Churn: string (nullable = true)



In [6]:
# Find null value
from pyspark.sql.functions import isnan, when, count, col, isnull
miss_val = df.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in df.columns])
miss_val.show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|         0|     0|            0|      0|         0|     0|           0|            0|              0|             0|           0|               0|          0|          0|              0|       0|               0| 

In [7]:
# fill median value into null value
from pyspark.sql.functions import when, col, mean
med = df.select(mean(col('TotalCharges'))).collect()[0][0]
df = df.withColumn('TotalCharges', when(isnull(col('TotalCharges')), med).otherwise(col('TotalCharges')))

# CHECK THE NULL VALUE
miss_val = df.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in df.columns])
miss_val.show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|         0|     0|            0|      0|         0|     0|           0|            0|              0|             0|           0|               0|          0|          0|              0|       0|               0| 

## Encode the String Data with One Hot Encoded

In [8]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline


string_type = [col_name for col_name, col_type in df.dtypes if col_type == 'string' and col_name != 'customerID' and col_name != 'Churn']

# Create StringIndexer stages for each categorical column
string_inds= [StringIndexer(inputCol = col, outputCol= col + '_indexed', handleInvalid='keep') for col in string_type]

# Create OneHotEncoder Stages for each indexed categorical column
enc_cols = [col + '_indexed' for col in string_type]
ohe_enc = [OneHotEncoder(inputCol = col, outputCol= col + '_dummy') for col in enc_cols]

# create pipeline to chain the stages
pipeline = Pipeline(stages = string_inds + ohe_enc)

# fit the pipeline
df_encoded = pipeline.fit(df).transform(df)

df_encoded.show(5)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+--------------+---------------+------------------+--------------------+---------------------+-----------------------+----------------------+--------------------+------------------------+-------------------+-------------------+-----------------------+----------------+------------------------+---------------------+--------------------+---------------------+------------------------+--------------------------+---------------------------+-----------------------------+----------------------------+--------------------------+------------------------------+-------------------------+-------------------------+-----------------------------+----------------------+------------------------------+---------------------------+
|cus

## Encode Monthly and Total Charges

In [9]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

charges = ['MonthlyCharges', 'TotalCharges']

# create vector assembler to combine those charges
assembler = VectorAssembler(inputCols = charges, outputCol = 'scaled_charges')
df_clean = assembler.transform(df_encoded)

# Applied Min Max Scaler
scaler = MinMaxScaler(inputCol = 'scaled_charges', outputCol = 'scaled_charges_final')
df_clean = scaler.fit(df_clean).transform(df_clean)

# Show the scaled feature
df_clean.select('scaled_charges', 'scaled_charges_final').show(5)


+--------------+--------------------+
|scaled_charges|scaled_charges_final|
+--------------+--------------------+
| [29.85,29.85]|[0.11542288557213...|
|[56.95,1889.5]|[0.38507462686567...|
|[53.85,108.15]|[0.35422885572139...|
|[42.3,1840.75]|[0.23930348258706...|
| [70.7,151.65]|[0.52189054726368...|
+--------------+--------------------+
only showing top 5 rows



## Lets Drop String Type Data

In [10]:
clean_df = df_encoded.drop(*string_type)
clean_df.show(5)

+----------+-------------+------+--------------+------------+-----+--------------+---------------+------------------+--------------------+---------------------+-----------------------+----------------------+--------------------+------------------------+-------------------+-------------------+-----------------------+----------------+------------------------+---------------------+--------------------+---------------------+------------------------+--------------------------+---------------------------+-----------------------------+----------------------------+--------------------------+------------------------------+-------------------------+-------------------------+-----------------------------+----------------------+------------------------------+---------------------------+
|customerID|SeniorCitizen|tenure|MonthlyCharges|TotalCharges|Churn|gender_indexed|Partner_indexed|Dependents_indexed|PhoneService_indexed|MultipleLines_indexed|InternetService_indexed|OnlineSecurity_indexed|OnlineB

# Create model without Dummy Columns

## Drop '_dummy' Columns

In [11]:
indexed_df = clean_df.select([col for col in clean_df.columns if '_dummy' not in col])
indexed_df = indexed_df.drop('customerID')
indexed_df.show(5)

+-------------+------+--------------+------------+-----+--------------+---------------+------------------+--------------------+---------------------+-----------------------+----------------------+--------------------+------------------------+-------------------+-------------------+-----------------------+----------------+------------------------+---------------------+
|SeniorCitizen|tenure|MonthlyCharges|TotalCharges|Churn|gender_indexed|Partner_indexed|Dependents_indexed|PhoneService_indexed|MultipleLines_indexed|InternetService_indexed|OnlineSecurity_indexed|OnlineBackup_indexed|DeviceProtection_indexed|TechSupport_indexed|StreamingTV_indexed|StreamingMovies_indexed|Contract_indexed|PaperlessBilling_indexed|PaymentMethod_indexed|
+-------------+------+--------------+------------+-----+--------------+---------------+------------------+--------------------+---------------------+-----------------------+----------------------+--------------------+------------------------+----------------

## Create Vector Assembler

In [12]:
from pyspark.ml.feature import VectorAssembler
feat_assembler = VectorAssembler(inputCols = [col for col in indexed_df.columns if col != 'Churn' ], outputCol = 'independet_feature', handleInvalid = 'keep')
output_indexed = feat_assembler.transform(indexed_df)
output_indexed.show()

+-------------+------+--------------+------------+-----+--------------+---------------+------------------+--------------------+---------------------+-----------------------+----------------------+--------------------+------------------------+-------------------+-------------------+-----------------------+----------------+------------------------+---------------------+--------------------+
|SeniorCitizen|tenure|MonthlyCharges|TotalCharges|Churn|gender_indexed|Partner_indexed|Dependents_indexed|PhoneService_indexed|MultipleLines_indexed|InternetService_indexed|OnlineSecurity_indexed|OnlineBackup_indexed|DeviceProtection_indexed|TechSupport_indexed|StreamingTV_indexed|StreamingMovies_indexed|Contract_indexed|PaperlessBilling_indexed|PaymentMethod_indexed|  independet_feature|
+-------------+------+--------------+------------+-----+--------------+---------------+------------------+--------------------+---------------------+-----------------------+----------------------+--------------------

In [14]:
# Drop unused columns
final_df = output_indexed.select('independet_feature', 'Churn')

# map the Churn
map_churn = {'No':0, 'Yes':1}
final_df = final_df.rdd.map(lambda x: (map_churn[x[1]], x[0])).toDF(['Churn', 'independet_feature'])
final_df.show(5)

+-----+--------------------+
|Churn|  independet_feature|
+-----+--------------------+
|    0|(19,[1,2,3,4,5,7,...|
|    0|(19,[1,2,3,9,10,1...|
|    1|(19,[1,2,3,9,10,1...|
|    0|[0.0,45.0,42.3,18...|
|    1|(19,[1,2,3,4],[2....|
+-----+--------------------+
only showing top 5 rows



# Train Machine Learning Model

## Logistic Regression

In [26]:
# Import and fit the model
from pyspark.ml.classification import LogisticRegression
train_data, test_data = final_df.randomSplit([0.75, 0.25], seed = 42)

print('Training Model Dataset: ', train_data.count())
print('Testing Model Dataset: ', test_data.count())

lr = LogisticRegression(featuresCol = 'independet_feature', labelCol = 'Churn', maxIter = 200, elasticNetParam=0.05, regParam = 0.03)
lr_model = lr.fit(train_data)

predictions = lr_model.transform(test_data)
predictions.select('Churn', 'prediction', 'probability').show(10)

Training Model Dataset:  5356
Testing Model Dataset:  1687
+-----+----------+--------------------+
|Churn|prediction|         probability|
+-----+----------+--------------------+
|    0|       1.0|[0.31727162981171...|
|    0|       1.0|[0.30663236908517...|
|    0|       0.0|[0.59807633979288...|
|    0|       1.0|[0.44459410024351...|
|    0|       1.0|[0.29545133131310...|
|    0|       0.0|[0.59612527237768...|
|    0|       0.0|[0.76547911417658...|
|    0|       0.0|[0.61287353053462...|
|    0|       0.0|[0.70989508479705...|
|    0|       1.0|[0.42276084282819...|
+-----+----------+--------------------+
only showing top 10 rows



## Evaluate the ML model

In [27]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

binary_evaluator = BinaryClassificationEvaluator(rawPredictionCol = 'rawPrediction' ,labelCol='Churn', metricName='areaUnderROC')
auc = binary_evaluator.evaluate(predictions)

multiclass_evaluator = MulticlassClassificationEvaluator(labelCol='Churn', metricName='accuracy')
acc = multiclass_evaluator.evaluate(predictions)

multiclass_evaluator.setMetricName('precisionByLabel')
precision = multiclass_evaluator.evaluate(predictions)

print(f"Area Under ROC: {round(auc,2) * 100} %")
print(f"Accuracy of Model: {round(acc,2)*100} %")
print(f"Precision of Model: {round(precision,2)*100} %")


Area Under ROC: 84.0 %
Accuracy of Model: 80.0 %
Precision of Model: 84.0 %
