<a href="https://colab.research.google.com/github/brianadit24/PySpark-Try/blob/main/Spark_End_to_End_Machine_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 66 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 65.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=ddc8db04ab245601e221f0a844f625404d18254021afe3407675e83b10ad9f95
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('MLApp').getOrCreate()

In [4]:
# File location and type
file_location = "/content/WA_Fn-UseC_-Telco-Customer-Churn.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option('nanValue', ' ') \
  .option('nullValue', ' ') \
  .load(file_location)

df.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|  

In [None]:
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|         0|     0|            0|      0|         0|     0|           0|            0|              0|             0|           0|               0|          0|          0|              0|       0|               0| 

# Data Preprocessing

## Splitting Data

In [5]:
# Train Test Split

churn_df = df
(train_data, test_data) = churn_df.randomSplit([0.7, 0.3], 42)

print('Record for Training: {}'.format(str(train_data.count())))
print('Record for Evaluation: {}'.format(str(test_data.count())))

Record for Training: 5036
Record for Evaluation: 2007


## Pipeline

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

catColumns = ["gender", "SeniorCitizen", "Partner", "Dependents", "PhoneService",
              "MultipleLines", "InternetService", "OnlineSecurity","OnlineBackup",
              "DeviceProtection","TechSupport","StreamingTV","StreamingMovies",
              "Contract","PaperlessBilling","PaymentMethod"]

In [8]:
stages = []

for catCol in catColumns:

    stringIndexer = StringIndexer(inputCol=catCol, outputCol=catCol+'Index')
    
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],
                            outputCols=[catCol+'catVec'])
    
    stages += [stringIndexer, encoder]

In [12]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['TotalCharges'], outputCols=['Out_TotalCharges'])
stages += [imputer]

In [15]:
label_Idx = StringIndexer(inputCol="Churn", outputCol="label")
stages += [label_Idx]

In [18]:
temp = label_Idx.fit(train_data).transform(train_data)
temp.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|label|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+-----+
|0002-ORFBO|Female|            0|    Yes|  

In [19]:
df.stat.corr('TotalCharges', 'MonthlyCharges')

0.6511738315787813

In [23]:
## Binning Tenure Column
from pyspark.ml.feature import QuantileDiscretizer
tenure_bin = QuantileDiscretizer(numBuckets=3, inputCol='tenure', outputCol='tenure_bin')
stages += [tenure_bin]

In [28]:
numericCols = ["tenure_bin", "Out_TotalCharges","MonthlyCharges"]
assembleInputs = [c + "catVec" for c in catColumns] + numericCols
assembler = VectorAssembler(inputCols=assembleInputs, outputCol='features')
stages += [assembler]

In [29]:
stages

[StringIndexer_47544af2d8a2,
 OneHotEncoder_d65b9804a288,
 StringIndexer_ee57f3ae6d60,
 OneHotEncoder_91eacde7c957,
 StringIndexer_cca33d173917,
 OneHotEncoder_68ccc9acfa39,
 StringIndexer_2d8665173d05,
 OneHotEncoder_4a3794d97c73,
 StringIndexer_a134ef9e7ca5,
 OneHotEncoder_a5c0b40567be,
 StringIndexer_9294c7a7d5fc,
 OneHotEncoder_ce7d986a6a62,
 StringIndexer_420d5325fff9,
 OneHotEncoder_10d54e591c65,
 StringIndexer_a90fde61ba17,
 OneHotEncoder_bce3ac102ccd,
 StringIndexer_ee36e12dc393,
 OneHotEncoder_02612c9b8a66,
 StringIndexer_49287002685e,
 OneHotEncoder_8dfde9b08d5c,
 StringIndexer_ecd384d996df,
 OneHotEncoder_df5f770ec935,
 StringIndexer_ba470d08ad85,
 OneHotEncoder_12c35978ecf8,
 StringIndexer_167f670b599f,
 OneHotEncoder_cc0c05c8c61d,
 StringIndexer_a7b6f443607d,
 OneHotEncoder_ee51ddc67e2c,
 StringIndexer_566c0aaef23c,
 OneHotEncoder_ecd72acac1f1,
 StringIndexer_9cdd747845fc,
 OneHotEncoder_c6490a08c609,
 Imputer_5d796aaca4bd,
 StringIndexer_916de6db0d3d,
 QuantileDiscretizer

In [31]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(train_data)

In [33]:
trainprepDF = pipelineModel.transform(train_data)
testprepDF = pipelineModel.transform(test_data)

In [36]:
trainprepDF.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+-----------+-------------+------------------+-------------------+------------+-------------+---------------+----------------+-----------------+------------------+------------------+-------------------+--------------------+---------------------+-------------------+--------------------+-----------------+------------------+---------------------+----------------------+----------------+-----------------+----------------+-----------------+--------------------+---------------------+-------------+--------------+---------------------+----------------------+------------------+-------------------+----------------+-----+----------+--------------------+
|customerID|gender|SeniorCitizen|Partne

In [37]:
trainprepDF.select('tenure_bin').show()

+----------+
|tenure_bin|
+----------+
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       2.0|
|       2.0|
|       2.0|
|       0.0|
|       1.0|
|       0.0|
|       1.0|
|       2.0|
|       1.0|
|       0.0|
|       2.0|
|       2.0|
|       2.0|
|       2.0|
|       2.0|
+----------+
only showing top 20 rows



# Modelling

## Build ML Model

In [39]:
from pyspark.ml.classification import LogisticRegression

# Create initial Logistic Regression Model
lr = LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)

# Train Model with Training Data
lrModel = lr.fit(trainprepDF)

In [40]:
print("Coefficients: {}".format(str(lrModel.coefficients)))
print("Intercept: {}".format(str(lrModel.intercept)))

Coefficients: [-0.021560642539512086,-0.32069679629425424,0.011636080729672184,0.11034936087031902,-0.5643041818835485,-0.22980847464271004,0.03057132309250357,0.537193969096821,-0.3649407295058954,0.28639022026858707,-0.1128858667963498,0.19958053383225854,-0.0014818358395158574,0.1086743423933517,0.09831928689627477,0.25802959209150744,-0.0761087452756491,-8.117642014732638e-05,0.20728169092984583,-0.020702866290663043,0.22691611298832548,0.6263658841247365,-0.8917783914975173,0.28383344169714414,0.22699869791407642,-0.09100472140025453,-0.064457288578794,-0.6446507313150069,-0.00011409316245573024,0.004719080994330909]
Intercept: -1.1337788028425855


## Evaluate Model

In [41]:
summary = lrModel.summary

In [45]:
accuracy = summary.accuracy
FalsePositiveRate = summary.weightedFalsePositiveRate
TruePositiveRate = summary.weightedTruePositiveRate
fMeasure = summary.weightedFMeasure()
precision = summary.weightedPrecision
recall = summary.weightedRecall
AreaUnderROC = summary.areaUnderROC

print("Accuracy: {}\nFPR: {}\nTPR: {}\nF-Measure: {}\nPrecision: {}\nRecall: {}\nAreaUnderROC: {}" \
      .format(accuracy, FalsePositiveRate, TruePositiveRate, fMeasure, precision, recall, AreaUnderROC))

Accuracy: 0.8000397140587768
FPR: 0.3729909978540669
TPR: 0.8000397140587767
F-Measure: 0.7928507673500943
Precision: 0.7905608902766834
Recall: 0.8000397140587767
AreaUnderROC: 0.8405516411890037


In [46]:
# Displaying Plot Worked for Databricks Only
display(lrModel, trainprepDF, "ROC")

LogisticRegressionModel: uid=LogisticRegression_b614da7662fc, numClasses=2, numFeatures=30

DataFrame[customerID: string, gender: string, SeniorCitizen: int, Partner: string, Dependents: string, tenure: int, PhoneService: string, MultipleLines: string, InternetService: string, OnlineSecurity: string, OnlineBackup: string, DeviceProtection: string, TechSupport: string, StreamingTV: string, StreamingMovies: string, Contract: string, PaperlessBilling: string, PaymentMethod: string, MonthlyCharges: double, TotalCharges: double, Churn: string, genderIndex: double, gendercatVec: vector, SeniorCitizenIndex: double, SeniorCitizencatVec: vector, PartnerIndex: double, PartnercatVec: vector, DependentsIndex: double, DependentscatVec: vector, PhoneServiceIndex: double, PhoneServicecatVec: vector, MultipleLinesIndex: double, MultipleLinescatVec: vector, InternetServiceIndex: double, InternetServicecatVec: vector, OnlineSecurityIndex: double, OnlineSecuritycatVec: vector, OnlineBackupIndex: double, OnlineBackupcatVec: vector, DeviceProtectionIndex: double, DeviceProtectioncatVec: vector, T

'ROC'

In [47]:
# Displaying Plot Worked for Databricks Only
display(lrModel, trainprepDF, "fittedVsResiduals")

LogisticRegressionModel: uid=LogisticRegression_b614da7662fc, numClasses=2, numFeatures=30

DataFrame[customerID: string, gender: string, SeniorCitizen: int, Partner: string, Dependents: string, tenure: int, PhoneService: string, MultipleLines: string, InternetService: string, OnlineSecurity: string, OnlineBackup: string, DeviceProtection: string, TechSupport: string, StreamingTV: string, StreamingMovies: string, Contract: string, PaperlessBilling: string, PaymentMethod: string, MonthlyCharges: double, TotalCharges: double, Churn: string, genderIndex: double, gendercatVec: vector, SeniorCitizenIndex: double, SeniorCitizencatVec: vector, PartnerIndex: double, PartnercatVec: vector, DependentsIndex: double, DependentscatVec: vector, PhoneServiceIndex: double, PhoneServicecatVec: vector, MultipleLinesIndex: double, MultipleLinescatVec: vector, InternetServiceIndex: double, InternetServicecatVec: vector, OnlineSecurityIndex: double, OnlineSecuritycatVec: vector, OnlineBackupIndex: double, OnlineBackupcatVec: vector, DeviceProtectionIndex: double, DeviceProtectioncatVec: vector, T

'fittedVsResiduals'

In [48]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = lrModel.transform(testprepDF)
evaluatorLR = BinaryClassificationEvaluator(rawPredictionCol='prediction')
AUC = evaluatorLR.evaluate(predictions)

# Default Evaluation is areaUnderROC
print("AUC = {}".format(AUC))

evaluatorLR.getMetricName()

AUC = 0.7237720519666331


'areaUnderROC'

In [56]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Initial SparkContext
sc = spark.sparkContext

results = predictions.select(['prediction', 'label'])
 
## prepare score-label set
results_collect = results.collect()
results_list = [(float(i[0]), float(i[1])) for i in results_collect]
predictionAndLabels = sc.parallelize(results_list)
 
metrics = BinaryClassificationMetrics(predictionAndLabels)

# Area under precision-recall curve
print("Area under PR = %s" % metrics.areaUnderPR)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

predictions.show(1)

Area under PR = 0.5825077132121096
Area under ROC = 0.7237720519666331
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+----------------+--------------+------------+-----+-----------+-------------+------------------+-------------------+------------+-------------+---------------+----------------+-----------------+------------------+------------------+-------------------+--------------------+---------------------+-------------------+--------------------+-----------------+------------------+---------------------+----------------------+----------------+-----------------+----------------+-----------------+--------------------+---------------------+-------------+--------------+---------------------+----------------------+------------------+-------------------+----------------+-----+----------+--------------------+-----------

In [58]:
test_data.groupby('Churn').count().show()

+-----+-----+
|Churn|count|
+-----+-----+
|   No| 1477|
|  Yes|  530|
+-----+-----+



In [59]:
count = predictions.count()
correct = results.filter(results.prediction == results.label).count()
wrong = results.filter(results.prediction != results.label).count()
tp = results.filter(results.prediction == 1.0).filter(results.prediction == results.label).count()
fp = results.filter(results.prediction == 1.0).filter(results.prediction != results.label).count()
fn = results.filter(results.prediction == 0.0).filter(results.prediction != results.label).count()
tn = results.filter(results.prediction == 0.0).filter(results.prediction == results.label).count()

accuracy = (tp+tn)/count

precision = tp/(tp+fp)

recall = tp/(tp+fn)

print("Correct: %s\nWrong: %s\ntp: %s\nfp: %s\nfn: %s\ntn: %s\nAccuracy: %s\nPrecision: %s\nRecall: %s"
      % (correct, wrong, tp, fp, fn, tn, accuracy, precision, recall))

Correct: 1627
Wrong: 380
tp: 286
fp: 136
fn: 244
tn: 1341
Accuracy: 0.8106626806178375
Precision: 0.6777251184834123
Recall: 0.539622641509434


## Tuning Parameter

In [62]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder() 
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [5, 10, 20])
             .build())

In [64]:
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluatorLR,
                    numFolds=5)
# Run Cross Validation
cvModel = cv.fit(trainprepDF)

In [65]:
predictions = cvModel.bestModel.transform(testprepDF)

In [66]:
evaluatorLR.evaluate(predictions)

0.7181838504873468

In [67]:
count = predictions.count()
correct = results.filter(results.prediction == results.label).count()
wrong = results.filter(results.prediction != results.label).count()
tp = results.filter(results.prediction == 1.0).filter(results.prediction == results.label).count()
fp = results.filter(results.prediction == 1.0).filter(results.prediction != results.label).count()
fn = results.filter(results.prediction == 0.0).filter(results.prediction != results.label).count()
tn = results.filter(results.prediction == 0.0).filter(results.prediction == results.label).count()

accuracy = (tp+tn)/count

precision = tp/(tp+fp)

recall = tp/(tp+fn)

print("Correct: %s\nWrong: %s\ntp: %s\nfp: %s\nfn: %s\ntn: %s\nAccuracy: %s\nPrecision: %s\nRecall: %s"
      % (correct, wrong, tp, fp, fn, tn, accuracy, precision, recall))

Correct: 1627
Wrong: 380
tp: 286
fp: 136
fn: 244
tn: 1341
Accuracy: 0.8106626806178375
Precision: 0.6777251184834123
Recall: 0.539622641509434


In [68]:
cvModel.explainParams()

"estimator: estimator to be cross-validated (current: LogisticRegression_b614da7662fc)\nestimatorParamMaps: estimator param maps (current: [{Param(parent='LogisticRegression_b614da7662fc', name='regParam', doc='regularization parameter (>= 0).'): 0.01, Param(parent='LogisticRegression_b614da7662fc', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_b614da7662fc', name='maxIter', doc='max number of iterations (>= 0).'): 5}, {Param(parent='LogisticRegression_b614da7662fc', name='regParam', doc='regularization parameter (>= 0).'): 0.01, Param(parent='LogisticRegression_b614da7662fc', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_b614da7662fc', name='maxIter', doc='max number of iterations

## A/B Testing

In [69]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", featuresCol="features").setImpurity("gini").setMaxDepth(6).setNumTrees(50).setFeatureSubsetStrategy("auto").setSeed(1010)

rfModel = rf.fit(trainprepDF)

In [70]:
predictions = rfModel.transform(testprepDF)

In [71]:
results = predictions.select(['prediction', 'label'])

count=predictions.count()
correct = results.filter(results.prediction == results.label).count()
wrong = results.filter(results.prediction != results.label).count()
tp = results.filter(results.prediction == 1.0).filter(results.prediction == results.label).count()
fp = results.filter(results.prediction == 1.0).filter(results.prediction != results.label).count()
fn = results.filter(results.prediction == 0.0).filter(results.prediction != results.label).count()
tn = results.filter(results.prediction == 0.0).filter(results.prediction == results.label).count()

accuracy = (tp+tn)/count

precision = tp/(tp+fp)

recall = tp/(tp+fn)

print("Correct: %s\nWrong: %s\ntp: %s\nfp: %s\nfn: %s\ntn: %s\nAccuracy: %s\nPrecision: %s\nRecall: %s"
      % (correct, wrong, tp, fp, fn, tn, accuracy, precision, recall))

Correct: 1612
Wrong: 395
tp: 228
fp: 93
fn: 302
tn: 1384
Accuracy: 0.8031888390632785
Precision: 0.7102803738317757
Recall: 0.43018867924528303
