In [3]:
!pip install pyspark
import pandas as pd
import numpy as np



In [None]:
from pyspark.sql import SparkSession

from pyspark.sql import SparkSession
session = SparkSession.builder.appName('Spark_ML').master('local').getOrCreate()

#we use customer churn dataset gotten from kaggle.
data = session.read.csv('customer-churn.csv', header=True, inferSchema=True)
data.show(10)
data.count()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|         Yes|              No|         No|    

6999

In [5]:
#next we handle the live data gotten from external sources..i.e live streaming

stream_data = session.read.csv('customer-churn.csv', header=True, inferSchema=True)
print(stream_data.show)
print(stream_data.columns)
print(stream_data.schema)

<bound method DataFrame.show of DataFrame[customerID: string, gender: string, SeniorCitizen: int, Partner: string, Dependents: string, tenure: int, PhoneService: string, MultipleLines: string, InternetService: string, OnlineSecurity: string, OnlineBackup: string, DeviceProtection: string, TechSupport: string, StreamingTV: string, StreamingMovies: string, Contract: string, PaperlessBilling: string, PaymentMethod: string, MonthlyCharges: double, TotalCharges: string, Churn: string]>
['customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents', 'tenure', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn']
StructType([StructField('customerID', StringType(), True), StructField('gender', StringType(), True), StructField('SeniorCitizen', IntegerType(), True), StructField('Partner', StringType(), 

In [6]:
stream = session.readStream.schema(stream_data.schema).csv('CustomerDir/')

In [7]:
import shutil
src = r"customer-churn-stream1.csv"
dst = r"CustomerDir/"
shutil.copy(src, dst)

'CustomerDir/customer-churn-stream1.csv'

In [8]:
stream.writeStream.queryName("Customer_Stream").format("memory").outputMode("append").start()


<pyspark.sql.streaming.query.StreamingQuery at 0x79ffdc78a350>

In [9]:
streamed_df = session.sql("SELECT * FROM Customer_Stream")
streamed_df.show()
streamed_df.count()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-

0

In [10]:
customer_data = data.union(streamed_df)
customer_data.count()
customer_data.head()

Row(customerID='7590-VHVEG', gender='Female', SeniorCitizen=0, Partner='Yes', Dependents='No', tenure=1, PhoneService='No', MultipleLines='No phone service', InternetService='DSL', OnlineSecurity='No', OnlineBackup='Yes', DeviceProtection='No', TechSupport='No', StreamingTV='No', StreamingMovies='No', Contract='Month-to-month', PaperlessBilling='Yes', PaymentMethod='Electronic check', MonthlyCharges=29.85, TotalCharges='29.85', Churn='No')

In [11]:
customer_data = customer_data.dropna()

In [12]:
from pyspark.sql.functions import col, when
customer_data = customer_data.withColumn(
    "TotalCharges",
    when(col("TotalCharges") == " ", None).otherwise(col("TotalCharges").cast("double"))
)

customer_data = customer_data.dropna(subset=["TotalCharges"])

customer_data.head()

Row(customerID='7590-VHVEG', gender='Female', SeniorCitizen=0, Partner='Yes', Dependents='No', tenure=1, PhoneService='No', MultipleLines='No phone service', InternetService='DSL', OnlineSecurity='No', OnlineBackup='Yes', DeviceProtection='No', TechSupport='No', StreamingTV='No', StreamingMovies='No', Contract='Month-to-month', PaperlessBilling='Yes', PaymentMethod='Electronic check', MonthlyCharges=29.85, TotalCharges=29.85, Churn='No')

In [13]:
#Feature extraction.
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

str_obj = StringIndexer(inputCols=["gender", "Partner", "Dependents", "PhoneService" ,"MultipleLines", "InternetService" ,"OnlineSecurity" ,"OnlineBackup" ,"DeviceProtection" ,"TechSupport","StreamingTV" , "StreamingMovies","Contract", "PaperlessBilling" ,"PaymentMethod"],
                        outputCols=["newGender", "newPartner", "newDependents", "newPhoneService", "newMultipleLines", "newInternetService", "newOnlineSecurity", "newOnlineBackup", "newDeviceProtection", "newTechSupport", "newStreamingTV", "newStreamingMovies", "newContract", "newPaperlessBilling", "newPaymentMethod"])

str_obj2 = StringIndexer(inputCol="Churn", outputCol="newChurn", stringOrderType="alphabetAsc")


In [14]:
one_hot = OneHotEncoder(inputCols=["newGender","newPartner", "newDependents", "newPhoneService", "newMultipleLines", "newInternetService", "newOnlineSecurity", "newOnlineBackup", "newDeviceProtection", "newTechSupport", "newStreamingTV", "newStreamingMovies", "newContract", "newPaperlessBilling", "newPaymentMethod"],
                        outputCols=["newGender_encoded", "newPartner_encoded", "newDependents_encoded", "newPhoneService_encoded", "newMultipleLines_encoded", "newInternetService_encoded", "newOnlineSecurity_encoded", "newOnlineBackup_encoded", "newDeviceProtection_encoded", "newTechSupport_encoded", "newStreamingTV_encoded", "newStreamingMovies_encoded", "newContract_encoded", "newPaperlessBilling_encoded", "newPaymentMethod_encoded"])

In [15]:
#I applied minmax scaling to ensure good engineered data i.e the data fall between specific ranges
from pyspark.ml.feature import MinMaxScaler

tenure_assembler = VectorAssembler(inputCols=["tenure", "MonthlyCharges","TotalCharges"], outputCol="tenure_vec")
mmscaler = MinMaxScaler(inputCol="tenure_vec", outputCol="scaledTenure")

In [16]:
vec_asm = VectorAssembler(inputCols=["newGender_encoded", "newPartner_encoded", "newDependents_encoded", "newPhoneService_encoded", "newMultipleLines_encoded", "newInternetService_encoded", "newOnlineSecurity_encoded", "newOnlineBackup_encoded", "newDeviceProtection_encoded", "newTechSupport_encoded", "newStreamingTV_encoded", "newStreamingMovies_encoded", "newContract_encoded", "newPaperlessBilling_encoded", "newPaymentMethod_encoded", "scaledTenure"],
                          outputCol="features")

In [17]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='features', labelCol='newChurn')

In [18]:
#Creating a pipeline
from pyspark.ml import Pipeline
lrpipline = Pipeline(stages=[str_obj, str_obj2, one_hot, tenure_assembler, mmscaler, vec_asm, lr])

#splitting the data into train test split
training, test =  customer_data.randomSplit([0.8, 0.2], seed=42)

In [19]:
#trainig the model
lrmodel = lrpipline.fit(training)

lrresults = lrmodel.transform(test)

lrresults.show(2, truncate=False)

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+-------------------------+--------------+------------+-----+---------+----------+-------------+---------------+----------------+------------------+-----------------+---------------+-------------------+--------------+--------------+------------------+-----------+-------------------+----------------+--------+-----------------+------------------+---------------------+-----------------------+------------------------+--------------------------+-------------------------+-----------------------+---------------------------+----------------------+----------------------+--------------------------+-------------------+---------------------------+------------------------+--------------------+-------------------------------------------------------------+-------------------------------

In [20]:
#Evaluating the results.
from pyspark.ml.evaluation import BinaryClassificationEvaluator

lreval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='newChurn')
print("Area under ROC:",lreval.evaluate(lrresults))

Area under ROC: 0.720354563908023


In [21]:
#Displaying Area Under PR (Precision Recall)
lrevalPR=BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="newChurn", metricName='areaUnderPR')
print("Area under PR:",lrevalPR.evaluate(lrresults))

Area under PR: 0.5875652612419574


In [22]:
#Determining the accuracy
lrresults=lrresults.withColumn("compare",lrresults['newChurn']-lrresults['prediction'])
correct=lrresults.filter(lrresults['compare']==0).count()
incorrect=lrresults.filter(lrresults['compare']!=0).count()
print(correct/(correct+incorrect))

0.802680565897245


In [23]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol='features', labelCol='newChurn', numTrees=100, maxDepth=10)

In [24]:
#Creating a pipeline
from pyspark.ml import Pipeline
rfpipline = Pipeline(stages=[str_obj, str_obj2, one_hot, tenure_assembler, mmscaler, vec_asm, rf])

#splitting the data into train test split
training, test =  customer_data.randomSplit([0.8, 0.2], seed=42)

In [25]:
rfmodel = rfpipline.fit(training)

rfresults = rfmodel.transform(test)

rfresults.show(2, truncate=False)

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+-------------------------+--------------+------------+-----+---------+----------+-------------+---------------+----------------+------------------+-----------------+---------------+-------------------+--------------+--------------+------------------+-----------+-------------------+----------------+--------+-----------------+------------------+---------------------+-----------------------+------------------------+--------------------------+-------------------------+-----------------------+---------------------------+----------------------+----------------------+--------------------------+-------------------+---------------------------+------------------------+--------------------+-------------------------------------------------------------+-------------------------------

In [26]:
 #Evaluating the results for random forest.
from pyspark.ml.evaluation import BinaryClassificationEvaluator

rfeval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='newChurn')
print("Area under ROC:",lreval.evaluate(rfresults))

Area under ROC: 0.714079121049771


In [27]:
#Displaying Area Under PR (Precision Recall) for random forest
rfevalPR=BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="newChurn", metricName='areaUnderPR')
print("Area under PR:",lrevalPR.evaluate(rfresults))

Area under PR: 0.6000031872556743


In [28]:
#Determining the accuracy for rf
rfresults=rfresults.withColumn("compare",rfresults['newChurn']-rfresults['prediction'])
correct=rfresults.filter(rfresults['compare']==0).count()
incorrect=rfresults.filter(rfresults['compare']!=0).count()
print(correct/(correct+incorrect))

0.8056589724497394


In [29]:
lr_results = lrresults.select("prediction", "newChurn").toPandas()
rf_results = rfresults.select("prediction", "newChurn").toPandas()

In [30]:
rf_results

Unnamed: 0,prediction,newChurn
0,1.0,1.0
1,0.0,0.0
2,0.0,0.0
3,0.0,0.0
4,1.0,1.0
...,...,...
1338,0.0,0.0
1339,0.0,0.0
1340,0.0,1.0
1341,0.0,0.0
