In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=22c3c468cbe4ab2200b5cb7a2b5305d6c35c8a4ee55cec70c51b1f5d1e62c749
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:


from pyspark import SparkFiles

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier,LogisticRegression,GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
spark = SparkSession.builder.appName("Tesco_retail_cub_card").getOrCreate()

In [None]:
df = spark.read.csv('Tesco.csv',header= True,inferSchema=True)

In [None]:
df.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+--------------+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|LeftMembership|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+--------------+
|7590-VHVEG|Female|            0| 

In [None]:
type(df)

pyspark.sql.dataframe.DataFrame

In [None]:
df.schema.names

['customerID',
 'gender',
 'SeniorCitizen',
 'Partner',
 'Dependents',
 'tenure',
 'PhoneService',
 'MultipleLines',
 'InternetService',
 'OnlineSecurity',
 'OnlineBackup',
 'DeviceProtection',
 'TechSupport',
 'StreamingTV',
 'StreamingMovies',
 'Contract',
 'PaperlessBilling',
 'PaymentMethod',
 'MonthlyCharges',
 'TotalCharges',
 'LeftMembership']

In [None]:
df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- LeftMembership: string (nullable = true)



In [None]:
df.count()

7043

In [None]:
from pyspark.sql.functions import isnan, when,col,count
df_null = df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns])
df_null.show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+--------------+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|LeftMembership|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+--------------+
|         0|     0|            0|      0|         0|     0|           0|            0|              0|             0|           0|               0|          0|          0|              0|

In [None]:
stringIndexer  = StringIndexer(inputCols=['gender','SeniorCitizen','Partner','Dependents','PhoneService','MultipleLines','InternetService','OnlineSecurity','OnlineBackup','DeviceProtection','TechSupport','StreamingTV','StreamingMovies','PaperlessBilling','LeftMembership'],
                               outputCols=['gender_inedx','SeniorCitizen_index','Partner_index','Dependents_index','PhoneService_index','MultipleLines_index','InternetService_index','OnlineSecurity_index','OnlineBackup_index','DeviceProtection_index','TechSupport_index','StreamingTV_index','StreamingMovies_index','PaperlessBilling_index','label'])

In [None]:
assembler=VectorAssembler(inputCols=['gender_inedx','SeniorCitizen_index','Partner_index','Dependents_index','PhoneService_index','MultipleLines_index','InternetService_index','OnlineSecurity_index','OnlineBackup_index','DeviceProtection_index','TechSupport_index','StreamingTV_index','StreamingMovies_index','PaperlessBilling_index'],
                          outputCol='features')

#Train and Test Split

In [None]:
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

In [None]:
train_data.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+--------------+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|LeftMembership|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+--------------+
|0002-ORFBO|Female|            0| 

In [None]:
test_data.show(10)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+--------------+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|LeftMembership|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+--------------+
|0004-TLHLJ|  Male|            0| 

#Decision Tress

In [None]:
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features')

In [None]:
pipeline = Pipeline(stages=[stringIndexer, assembler, dt])

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [3, 5, 7]) \
    .addGrid(dt.minInstancesPerNode, [1, 3, 5]) \
    .build()

In [None]:
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid,
                      evaluator=MulticlassClassificationEvaluator(
                      labelCol='label', predictionCol='prediction', metricName='accuracy'),
                      numFolds=5)

In [None]:
cvModel = crossval.fit(train_data)

In [None]:
best_model = cvModel.bestModel

In [None]:
predictions = best_model.transform(test_data)

In [None]:
predictions.select("prediction", "label", "features").show(5)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       1.0|  1.0|      (14,[9],[1.0])|
|       0.0|  0.0|[1.0,1.0,1.0,0.0,...|
|       0.0|  0.0|(14,[0,1,6,7],[1....|
|       0.0|  0.0|[1.0,0.0,1.0,1.0,...|
|       0.0|  0.0|(14,[0,5,7,8,9,11...|
+----------+-----+--------------------+
only showing top 5 rows



In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy1 = evaluator.evaluate(predictions)

In [None]:
print(f"DecisionTree Test Accuracy: ",accuracy1)
print("Test Error = %g" % (1.0 - accuracy1))

DecisionTree Test Accuracy:  0.7653213751868461
Test Error = 0.234679


In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="f1")
f1_dt = evaluator.evaluate(predictions)
print("f1 for Decision Tree Classifier:", f1_dt)

f1 for Decision Tree Classifier: 0.7458719582506969


In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="weightedPrecision")
weightedPrecision_dt = evaluator.evaluate(predictions)
print("weightedPrecision for Decision Tree Classifier:", weightedPrecision_dt)

weightedPrecision for Decision Tree Classifier: 0.7445230723434431


In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="weightedRecall")
weightedRecall_dt = evaluator.evaluate(predictions)
print("weightedRecall for Decision Tree Classifier:", weightedRecall_dt)

weightedRecall for Decision Tree Classifier: 0.7653213751868461


In [None]:
print("Accuracy: ",accuracy1)
print("f1 for Decision Tree Classifier:", f1_dt)
print("weightedPrecision for Decision Tree Classifier:", weightedPrecision_dt)
print("weightedRecall for Decision Tree Classifier:", weightedRecall_dt)

Accuracy:  0.7653213751868461
f1 for Decision Tree Classifier: 0.7458719582506969
weightedPrecision for Decision Tree Classifier: 0.7445230723434431
weightedRecall for Decision Tree Classifier: 0.7653213751868461


#Random Forest

In [None]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

In [None]:
pipeline1 = Pipeline(stages=[stringIndexer, assembler, rf])

NameError: ignored

In [None]:
paramGrid1 = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

In [None]:
cross_validator1 = CrossValidator(estimator=pipeline1,
                          estimatorParamMaps=paramGrid1,
                          evaluator=MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy"),
                          numFolds=5, seed=42)

In [None]:
cv_model1 = cross_validator1.fit(train_data)

In [None]:
predictions = cv_model1.transform(test_data)

In [None]:
predictions.select("prediction", "label", "features").show(5)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       1.0|  1.0|      (14,[9],[1.0])|
|       0.0|  0.0|[1.0,1.0,1.0,0.0,...|
|       0.0|  0.0|(14,[0,1,6,7],[1....|
|       0.0|  0.0|[1.0,0.0,1.0,1.0,...|
|       0.0|  0.0|(14,[0,5,7,8,9,11...|
+----------+-----+--------------------+
only showing top 5 rows



In [None]:
evaluator2 = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")

In [None]:
accuracy2 = evaluator2.evaluate(predictions)
print("Random Forest accuracy = ",accuracy2)
print("Test Error= %g" %(1.0-accuracy2))

Random Forest accuracy =  0.772795216741405
Test Error= 0.227205


In [None]:
evaluator2 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="f1")
f1_rf = evaluator2.evaluate(predictions)
print("f1 for Random Forest Classifier:", f1_rf)

f1 for Random Forest Classifier: 0.7450644275837307


In [None]:
evaluator2 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="weightedPrecision")
weightedPrecision_rf = evaluator2.evaluate(predictions)
print("weightedPrecision for Random Forest Classifier:", weightedPrecision_rf)

weightedPrecision for Random Forest Classifier: 0.7524744759681549


In [None]:
evaluator2 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="weightedRecall")
weightedRecall_rf = evaluator2.evaluate(predictions)
print("weightedRecall for Random Forest Classifier:", weightedRecall_rf)

weightedRecall for Random Forest Classifier: 0.7727952167414052


In [None]:
print("Accuracy: ",accuracy2)
print("f1 for Random Forest Classifier:", f1_rf)
print("weightedPrecision for Random Forest Classifier:", weightedPrecision_rf)
print("weightedRecall for Random Forest Classifier:", weightedRecall_rf)

Accuracy:  0.772795216741405
f1 for Random Forest Classifier: 0.7450644275837307
weightedPrecision for Random Forest Classifier: 0.7524744759681549
weightedRecall for Random Forest Classifier: 0.7727952167414052


#GBTClassifier

In [None]:

gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
pipeline = Pipeline(stages=[stringIndexer,assembler, gbt])


In [None]:
model = pipeline.fit(train_data)


In [None]:
predictions = model.transform(test_data)

In [None]:
predictions.select("prediction", "label", "features").show(5)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       1.0|  1.0|      (14,[9],[1.0])|
|       0.0|  0.0|[1.0,1.0,1.0,0.0,...|
|       1.0|  0.0|(14,[0,1,6,7],[1....|
|       0.0|  0.0|[1.0,0.0,1.0,1.0,...|
|       0.0|  0.0|(14,[0,5,7,8,9,11...|
+----------+-----+--------------------+
only showing top 5 rows



In [None]:
evaluator3 = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy3 = evaluator3.evaluate(predictions)
print("Accuracy: ",accuracy3)
print("Test Error = %g" % (1.0 - accuracy3))

Accuracy:  0.772795216741405
Test Error = 0.227205


In [None]:
evaluator3 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_gbt = evaluator3.evaluate(predictions)
print("f1 for GBT Classifier:", f1_gbt)

f1 for GBT Classifier: 0.758889647835172


In [None]:
evaluator3 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="weightedPrecision")
weightedPrecision_gbt = evaluator3.evaluate(predictions)
print("weightedPrecision for GBT Classifier:", weightedPrecision_gbt)

weightedPrecision for GBT Classifier: 0.7561859707331716


In [None]:
evaluator3 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="weightedRecall")
weightedRecall_gbt = evaluator3.evaluate(predictions)
print("weightedRecall for GBT Classifier:", weightedRecall_gbt)

weightedRecall for GBT Classifier: 0.772795216741405


In [None]:
print("Accuracy: ",accuracy3)
print("f1 for GBT Classifier:", f1_gbt)
print("weightedPrecision for GBT Classifier:", weightedPrecision_gbt)
print("weightedRecall for GBT Classifier:", weightedRecall_gbt)

Accuracy:  0.772795216741405
f1 for GBT Classifier: 0.758889647835172
weightedPrecision for GBT Classifier: 0.7561859707331716
weightedRecall for GBT Classifier: 0.772795216741405


#Logistic Regression Classifier

In [None]:

from pyspark.ml.classification import LogisticRegression

In [None]:
logistic_regression = LogisticRegression(featuresCol="features", labelCol="label")
pipeline5 = Pipeline(stages=[stringIndexer,assembler, logistic_regression])

In [None]:
model = pipeline5.fit(train_data)

In [None]:
predictions5 = model.transform(test_data)

In [None]:
predictions5.select("prediction", "label", "features").show(5)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       1.0|  1.0|      (14,[9],[1.0])|
|       0.0|  0.0|[1.0,1.0,1.0,0.0,...|
|       0.0|  0.0|(14,[0,1,6,7],[1....|
|       0.0|  0.0|[1.0,0.0,1.0,1.0,...|
|       0.0|  0.0|(14,[0,5,7,8,9,11...|
+----------+-----+--------------------+
only showing top 5 rows



In [None]:
evaluator5 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy5 = evaluator5.evaluate(predictions5)
print("Accuracy: ",accuracy5)
print("Test Error = %g" % (1.0 - accuracy5))

Accuracy:  0.7658196312904834
Test Error = 0.23418


In [None]:
evaluator5 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_lr = evaluator5.evaluate(predictions5)
print("f1 for Logistic Regression Classifier:", f1_lr)

f1 for Logistic Regression Classifier: 0.7311190615367359


In [None]:
evaluator5 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
weightedPrecision_lr = evaluator5.evaluate(predictions5)
print("weightedPrecision for Logistic Regression Classifier:", weightedPrecision_lr)

weightedPrecision for Logistic Regression Classifier: 0.742845072399753


In [None]:
evaluator5 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="weightedRecall")
weightedRecall_lr = evaluator5.evaluate(predictions5)
print("weightedRecall for Logistic Rgression Classifier:", weightedRecall_lr)

weightedRecall for Logistic Rgression Classifier: 0.7658196312904832


In [None]:
print("Accuracy: ",accuracy5)
print("f1 for Logistic Regression Classifier:", f1_lr)
print("weightedPrecision for Logistic Regression Classifier:", weightedPrecision_lr)
print("weightedRecall for Logistic Regression Classifier:", weightedRecall_lr)

Accuracy:  0.7658196312904834
f1 for Logistic Regression Classifier: 0.7311190615367359
weightedPrecision for Logistic Regression Classifier: 0.742845072399753
weightedRecall for Logistic Regression Classifier: 0.7658196312904832


In [None]:
print("Accuracy of Logistic Regression Classifier: %2f" % accuracy5)
print("Accuracy of Decision Tree Classifier: %2f" %accuracy1)
print("Accuracy of GBT Classifier: %2f"%accuracy3)
print("Accuracy of Random ForestClassifier: %2f"%accuracy2)

Accuracy of Logistic Regression Classifier: 0.765820
Accuracy of Decision Tree Classifier: 0.765321
Accuracy of GBT Classifier: 0.772795
Accuracy of Random ForestClassifier: 0.772795


f1 for Decision Tree Classifier: 0.7458719582506969
f1 for Random Forest Classifier: 0.7450644275837307
f1 for GBT Classifier: 0.758889647835172
f1 for Logistic Regression Classifier: 0.7311190615367359
