<a href="https://colab.research.google.com/github/Melvinmcrn/DataScience/blob/master/PySpark/4_Pyspark_Classification_Pipeline_Churn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Pyspark_Classification_Pipeline_Churn

In [0]:
#1 - import module
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [0]:
#2 - Create spark context
sc = SparkContext.getOrCreate()

In [0]:
sc

In [0]:
sc._conf.getAll()

[(u'spark.driver.host', u'c15872cb2e6f'),
 (u'spark.rdd.compress', u'True'),
 (u'spark.app.id', u'local-1564039798062'),
 (u'spark.serializer.objectStreamReset', u'100'),
 (u'spark.master', u'local[*]'),
 (u'spark.executor.id', u'driver'),
 (u'spark.submit.deployMode', u'client'),
 (u'spark.driver.port', u'32839'),
 (u'spark.ui.showConsoleProgress', u'true'),
 (u'spark.app.name', u'pyspark-shell')]

In [0]:
print (sc.getConf().toDebugString())

spark.app.id=local-1564039798062
spark.app.name=pyspark-shell
spark.driver.host=c15872cb2e6f
spark.driver.port=32839
spark.executor.id=driver
spark.master=local[*]
spark.rdd.compress=True
spark.serializer.objectStreamReset=100
spark.submit.deployMode=client
spark.ui.showConsoleProgress=true


In [0]:
#3 - Setup SparkSession(SparkSQL)
spark = (SparkSession
         .builder
         .appName("Pyspark_Classification_Pipeline_Churn")
         .getOrCreate())
print (spark)

<pyspark.sql.session.SparkSession object at 0x7fc5731ef590>


In [0]:
#4 - Read file to spark DataFrame
data = (spark
        .read
        .option("header","true")
        .option("inferSchema", "true")
        .csv("churn.csv"))
data.cache()
print ("finish caching data")

finish caching data


In [0]:
#5 - Understand data and problems
category = ['International plan','Voice mail plan']
continuous = ['Number vmail messages','Total day minutes','Total day calls','Total day charge','Total eve minutes','Total eve calls','Total eve charge','Total night minutes','Total night calls','Total night charge','Total intl minutes','Total intl calls','Total intl charge','Customer service calls']
label = 'churn'

unique_features = ['State','Account length','Area code']
unused_features = ['Total day charge','Total eve charge','Total night charge','Total intl charge']
#bcz charges computed from minutes / 22.2252

print (len(category) + len(continuous))


16


In [0]:
data.describe().toPandas()

Unnamed: 0,summary,State,Account length,Area code,International plan,Voice mail plan,Number vmail messages,Total day minutes,Total day calls,Total day charge,Total eve minutes,Total eve calls,Total eve charge,Total night minutes,Total night calls,Total night charge,Total intl minutes,Total intl calls,Total intl charge,Customer service calls
0,count,3000,3000.0,3000.0,3000,3000,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0
1,mean,,101.09333333333332,437.04566666666665,,,8.011,179.5585000000005,100.56133333333334,30.525506666666693,201.1894666666669,100.01833333333332,17.101323333333337,201.26770000000008,99.97733333333332,9.057113333333342,10.263133333333348,4.489,2.771566666666659,1.5616666666666668
2,stddev,,39.56984390169308,42.26102514103134,,,13.629996500422669,54.64951726833965,20.05754238578949,9.290379267048117,51.20153176094548,19.975977500853205,4.352130347501413,50.48089883403788,19.5420090129182,2.271682573422988,2.8103131739364486,2.4672335547743733,0.7587626676487821,1.3206482976923195
3,min,AK,1.0,408.0,No,No,0.0,0.0,0.0,0.0,0.0,0.0,0.0,23.2,33.0,1.04,0.0,0.0,0.0,0.0
4,max,WY,243.0,510.0,Yes,Yes,51.0,350.8,165.0,59.64,363.7,170.0,30.91,395.0,175.0,17.77,20.0,20.0,5.4,9.0


In [0]:
data.printSchema()

root
 |-- State: string (nullable = true)
 |-- Account length: integer (nullable = true)
 |-- Area code: integer (nullable = true)
 |-- International plan: string (nullable = true)
 |-- Voice mail plan: string (nullable = true)
 |-- Number vmail messages: integer (nullable = true)
 |-- Total day minutes: double (nullable = true)
 |-- Total day calls: integer (nullable = true)
 |-- Total day charge: double (nullable = true)
 |-- Total eve minutes: double (nullable = true)
 |-- Total eve calls: integer (nullable = true)
 |-- Total eve charge: double (nullable = true)
 |-- Total night minutes: double (nullable = true)
 |-- Total night calls: integer (nullable = true)
 |-- Total night charge: double (nullable = true)
 |-- Total intl minutes: double (nullable = true)
 |-- Total intl calls: integer (nullable = true)
 |-- Total intl charge: double (nullable = true)
 |-- Customer service calls: integer (nullable = true)
 |-- Churn: boolean (nullable = true)



In [0]:
data.sample(False, 0.001, 1234).toPandas()

Unnamed: 0,State,Account length,Area code,International plan,Voice mail plan,Number vmail messages,Total day minutes,Total day calls,Total day charge,Total eve minutes,Total eve calls,Total eve charge,Total night minutes,Total night calls,Total night charge,Total intl minutes,Total intl calls,Total intl charge,Customer service calls,Churn
0,MD,106,510,No,No,0,213.9,95,36.36,151.9,70,12.91,260.1,124,11.7,12.2,5,3.29,3,False
1,KY,151,408,No,Yes,17,214.7,97,36.5,138.5,90,11.77,169.1,44,7.61,8.6,4,2.32,1,False
2,OH,74,415,No,No,0,136.7,106,23.24,228.6,105,19.43,265.3,114,11.94,9.8,4,2.65,0,False
3,NJ,96,510,No,No,0,150.0,122,25.5,218.5,116,18.57,212.4,89,9.56,9.8,1,2.65,3,False
4,NM,232,408,No,No,0,165.6,104,28.15,195.9,115,16.65,118.3,77,5.32,11.8,3,3.19,1,False
5,TX,71,415,No,Yes,39,183.2,103,31.14,209.4,111,17.8,172.4,109,7.76,11.9,6,3.21,1,False


In [0]:
data.groupBy(label).count().toPandas()

Unnamed: 0,churn,count
0,True,438
1,False,2562


In [0]:
#6 - Change column type from boolean to string
data.select(label).printSchema()
data = data.withColumn(label, data[label].cast("string"))
data.select(label).printSchema()

root
 |-- churn: boolean (nullable = true)

root
 |-- churn: string (nullable = true)



In [0]:
#8 - Remove unused variables
print ("number of features : " + str(len(data.drop(label).head())))
for unused_feature in unique_features + unused_features:
    print (unused_feature)
    data = data.drop(unused_feature)
print ("\nnumber of features remain : " + str(len(data.drop(label).head())))


category = [feature for feature in category if feature not in (unique_features + unused_features)]
continuous = [feature for feature in continuous if feature not in (unique_features + unused_features)]

print ("\nnumber of features remain : " + str(len(category) + len(continuous)))

number of features : 19
State
Account length
Area code
Total day charge
Total eve charge
Total night charge
Total intl charge

number of features remain : 12

number of features remain : 12


In [0]:
#9 - split Train and Test data
data = data.sort(label)
(trainingData, testData) = data.randomSplit([0.7, 0.3],seed = 50)

print(type(data))
print(type(trainingData))
print(type(testData))

print ("data count : " + str(data.count()))
print ("trainingData count : " + str(trainingData.count()))
print ("testData count : " + str(testData.count()))

data.groupBy(label).count().show()
trainingData.groupBy(label).count().show()
testData.groupBy(label).count().show()

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
data count : 3000
trainingData count : 2100
testData count : 900
+-----+-----+
|churn|count|
+-----+-----+
|false| 2562|
| true|  438|
+-----+-----+

+-----+-----+
|churn|count|
+-----+-----+
|false| 1800|
| true|  300|
+-----+-----+

+-----+-----+
|churn|count|
+-----+-----+
|false|  762|
| true|  138|
+-----+-----+



In [0]:
#10 - String indexer
featureidx_list = [StringIndexer(inputCol = label, outputCol='label')]
featureidx_list += [StringIndexer(inputCol = c, outputCol=c + 'idx') for c in category]

print (featureidx_list)

[StringIndexer_223f569d739f, StringIndexer_793442c17a1d, StringIndexer_52a4f52ae653]


In [0]:
#11 - Create Vector
features = continuous + [c + 'idx' for c in category]
assem =  VectorAssembler(inputCols = features ,outputCol="features")

print (type(assem))

<class 'pyspark.ml.feature.VectorAssembler'>


In [0]:
#12 - Create model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

print (dt)


DecisionTreeClassifier_548508eb9738


In [0]:
#13 - Set ML pipeline
print (featureidx_list)
print (assem)
print (dt)
print ("\n")

all_process_list = featureidx_list + [assem,dt]
print (all_process_list)

pipeline = Pipeline(stages=all_process_list)
print ("\n")
print (pipeline)


[StringIndexer_223f569d739f, StringIndexer_793442c17a1d, StringIndexer_52a4f52ae653]
VectorAssembler_4029dd90a7c3
DecisionTreeClassifier_548508eb9738


[StringIndexer_223f569d739f, StringIndexer_793442c17a1d, StringIndexer_52a4f52ae653, VectorAssembler_4029dd90a7c3, DecisionTreeClassifier_548508eb9738]


Pipeline_a89eaaeb0be5


In [0]:
#14 - Train model
model = pipeline.fit(trainingData)
#predictions.cache()

In [0]:
#15 - (Optional) Assign multiple parameter lists used to train multiple models
paramGrid = (ParamGridBuilder()
    .addGrid(dt.maxDepth, [4,5,6])
     .addGrid(dt.minInstancesPerNode, [1,10])
     .addGrid(dt.impurity, ["gini","entropy"])        
    .build())

for param in paramGrid:
    print (param)
    print ("\n\n")

{Param(parent=u'DecisionTreeClassifier_548508eb9738', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 4, Param(parent=u'DecisionTreeClassifier_548508eb9738', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini'): 'gini', Param(parent=u'DecisionTreeClassifier_548508eb9738', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 1}



{Param(parent=u'DecisionTreeClassifier_548508eb9738', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 4, Param(parent=u'DecisionTreeClassifier_548508eb9738', name='impurity', doc='Criterion used for information gain calculation (case

In [0]:
#16 - (Optional) Train multiple models with multiple parameters
crossval = CrossValidator(estimator=pipeline,
                      estimatorParamMaps=paramGrid,
                      evaluator=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1"),
                      numFolds=3)
cvModel = crossval.fit(trainingData)
model = cvModel.bestModel

print (model)

PipelineModel_bee484e18683


In [0]:
#17 - Make predictions
predictions = model.transform(testData)

In [0]:
# Print sample result
predictions.toPandas()

Unnamed: 0,International plan,Voice mail plan,Number vmail messages,Total day minutes,Total day calls,Total eve minutes,Total eve calls,Total night minutes,Total night calls,Total intl minutes,Total intl calls,Customer service calls,churn,label,International planidx,Voice mail planidx,features,rawPrediction,probability,prediction
0,No,No,0,7.8,86,171.4,100,186.5,80,12.9,2,2,false,0.0,0.0,0.0,"[0.0, 7.8, 86.0, 171.4, 100.0, 186.5, 80.0, 12...","[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0
1,No,No,0,34.0,133,278.6,61,129.6,120,11.5,3,0,false,0.0,0.0,0.0,"[0.0, 34.0, 133.0, 278.6, 61.0, 129.6, 120.0, ...","[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0
2,No,No,0,39.5,78,264.3,106,185.8,90,10.0,6,0,false,0.0,0.0,0.0,"[0.0, 39.5, 78.0, 264.3, 106.0, 185.8, 90.0, 1...","[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0
3,No,No,0,40.4,105,172.4,83,145.1,89,9.0,2,2,false,0.0,0.0,0.0,"[0.0, 40.4, 105.0, 172.4, 83.0, 145.1, 89.0, 9...","[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0
4,No,No,0,45.0,108,151.3,74,152.9,94,9.8,6,2,false,0.0,0.0,0.0,"[0.0, 45.0, 108.0, 151.3, 74.0, 152.9, 94.0, 9...","[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0
5,No,No,0,48.4,101,281.1,138,218.5,87,18.2,1,1,false,0.0,0.0,0.0,"[0.0, 48.4, 101.0, 281.1, 138.0, 218.5, 87.0, ...","[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0
6,No,No,0,49.9,123,150.7,81,188.2,67,10.1,4,2,false,0.0,0.0,0.0,"[0.0, 49.9, 123.0, 150.7, 81.0, 188.2, 67.0, 1...","[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0
7,No,No,0,51.1,106,208.6,137,198.0,92,12.3,3,1,false,0.0,0.0,0.0,"[0.0, 51.1, 106.0, 208.6, 137.0, 198.0, 92.0, ...","[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0
8,No,No,0,51.9,108,162.0,83,223.5,115,10.1,3,3,false,0.0,0.0,0.0,"[0.0, 51.9, 108.0, 162.0, 83.0, 223.5, 115.0, ...","[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0
9,No,No,0,55.6,65,242.7,121,176.3,134,11.3,4,0,false,0.0,0.0,0.0,"[0.0, 55.6, 65.0, 242.7, 121.0, 176.3, 134.0, ...","[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0


In [0]:
# Print sample result
predictions.select("prediction", "rawPrediction", "probability", "label", "features").toPandas()

Unnamed: 0,prediction,rawPrediction,probability,label,features
0,0.0,"[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0,"[0.0, 7.8, 86.0, 171.4, 100.0, 186.5, 80.0, 12..."
1,0.0,"[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0,"[0.0, 34.0, 133.0, 278.6, 61.0, 129.6, 120.0, ..."
2,0.0,"[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0,"[0.0, 39.5, 78.0, 264.3, 106.0, 185.8, 90.0, 1..."
3,0.0,"[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0,"[0.0, 40.4, 105.0, 172.4, 83.0, 145.1, 89.0, 9..."
4,0.0,"[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0,"[0.0, 45.0, 108.0, 151.3, 74.0, 152.9, 94.0, 9..."
5,0.0,"[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0,"[0.0, 48.4, 101.0, 281.1, 138.0, 218.5, 87.0, ..."
6,0.0,"[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0,"[0.0, 49.9, 123.0, 150.7, 81.0, 188.2, 67.0, 1..."
7,0.0,"[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0,"[0.0, 51.1, 106.0, 208.6, 137.0, 198.0, 92.0, ..."
8,0.0,"[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0,"[0.0, 51.9, 108.0, 162.0, 83.0, 223.5, 115.0, ..."
9,0.0,"[1329.0, 35.0]","[0.9743401759530792, 0.025659824046920823]",0.0,"[0.0, 55.6, 65.0, 242.7, 121.0, 176.3, 134.0, ..."


In [0]:
#18 - Evaluate model
for metricName in ['accuracy','weightedPrecision','weightedRecall','f1']:
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName=metricName)
    result = evaluator.evaluate(predictions)
    print ('%s = %g' % (metricName,result))
    if(metricName == 'accuracy') :
        print("error = %g " % (1.0 - result))


accuracy = 0.937778
error = 0.0622222 
weightedPrecision = 0.935472
weightedRecall = 0.937778
f1 = 0.934827


In [0]:
#19 - Show tree diagram
treeModel = model.stages[-1]
treeModel_debug_str = treeModel.toDebugString
print (treeModel_debug_str)


DecisionTreeClassificationModel (uid=DecisionTreeClassifier_548508eb9738) of depth 6 with 55 nodes
  If (feature 1 <= 263.54999999999995)
   If (feature 10 in {1.0})
    If (feature 8 <= 2.5)
     Predict: 1.0
    Else (feature 8 > 2.5)
     If (feature 7 <= 13.05)
      If (feature 9 <= 3.5)
       Predict: 0.0
      Else (feature 9 > 3.5)
       If (feature 1 <= 160.05)
        Predict: 1.0
       Else (feature 1 > 160.05)
        Predict: 0.0
     Else (feature 7 > 13.05)
      Predict: 1.0
   Else (feature 10 not in {1.0})
    If (feature 9 <= 3.5)
     If (feature 1 <= 220.14999999999998)
      Predict: 0.0
     Else (feature 1 > 220.14999999999998)
      If (feature 3 <= 241.35000000000002)
       Predict: 0.0
      Else (feature 3 > 241.35000000000002)
       If (feature 0 <= 2.0)
        Predict: 1.0
       Else (feature 0 > 2.0)
        Predict: 0.0
    Else (feature 9 > 3.5)
     If (feature 1 <= 174.64999999999998)
      If (feature 3 <= 188.3)
       Predict: 1.0
      Else

In [0]:
#20 - Save model
model_dir = "/user/admin/"
modelFile = "dt_churn"

#Save model as Pipeline model format
model.write().overwrite().save(model_dir + modelFile +".plmodel")

#Save model as DecisionTree model format
treeModel.write().overwrite().save(model_dir + modelFile +".model")

print ("finish save model")


finish save model


In [0]:
#21 - Load Pipeline model
read_plmodel = PipelineModel.read().load(model_dir + modelFile + ".plmodel")
print (read_plmodel.stages)


[StringIndexer_223f569d739f, StringIndexer_793442c17a1d, StringIndexer_52a4f52ae653, VectorAssembler_4029dd90a7c3, DecisionTreeClassificationModel (uid=DecisionTreeClassifier_548508eb9738) of depth 6 with 55 nodes]


In [0]:
#22 - Load DecisionTree model
read_model = DecisionTreeClassificationModel.read().load(model_dir + modelFile + ".model")
print ("depth : " + str(read_model.depth))
print ("numNodes : " + str(read_model.numNodes))
print ("featureImportances : " + str(read_model.featureImportances))


#these lines avaiable for Spark2.1 or above
#print readed_model.numClasses  
#print readed_model.numFeatures


depth : 6
numNodes : 55
featureImportances : (12,[0,1,2,3,5,6,7,8,9,10,11],[0.02477754857220218,0.2856032077079659,0.0066445867554996004,0.14271101802532418,0.0327020560178828,0.006013576732871346,0.09241092174525296,0.0821047226532208,0.15727843892885698,0.13318129744014037,0.03657262542078297])
