# Big Data Machine Learning Classification with Spark

In [631]:
#pip install pyspark

In [632]:
from pyspark import SparkConf, SparkContext 
import collections
from pyspark.ml.classification import GBTClassifier

#from __future__ import print_function #must apperantly occur at the beginning of the file

from pyspark.ml.regression import LinearRegression

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors

import pandas as pd

In [633]:
df=pd.read_csv("churn.csv")
df.sample(5)

Unnamed: 0.1,Unnamed: 0,Names,Age,Total_Purchase,Account_Manager,Years,Num_Sites,Churn
122,122,James Newman,35.0,9766.95,1,4.59,13.0,1
76,76,Jennifer King,43.0,8024.08,0,4.08,12.0,1
110,110,Eileen Reyes,34.0,9228.84,1,5.29,12.0,1
75,75,Ryan Weaver,39.0,10110.4,1,5.78,13.0,1
376,376,James Rivas,38.0,9557.2,0,4.47,8.0,0


In [634]:
df["Account_Manager"].value_counts()

0    467
1    433
Name: Account_Manager, dtype: int64

In [635]:
df["Num_Sites"].value_counts()

8.0     213
9.0     186
7.0     146
10.0    131
11.0     83
6.0      67
12.0     32
5.0      22
13.0      8
14.0      6
4.0       4
3.0       2
Name: Num_Sites, dtype: int64

### Create Spark Session

In [636]:
from pyspark.conf import SparkConf

spark=SparkSession.builder\
    .master("local")\
    .appName("CustomerBehaviour")\
    .getOrCreate()
sc=spark.sparkContext

### Preparing the data

In [637]:
spark_df = spark.read.csv("churn.csv",header=True,inferSchema=True,sep=",")
#inferSchema = True, helps so that every column is not a string
#header = True, takes the column names into the df, other wise the column names will be c0, c1...

In [638]:
spark_df.show(5)

+---+----------------+----+--------------+---------------+-----+---------+-----+
|_c0|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|
+---+----------------+----+--------------+---------------+-----+---------+-----+
|  0|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|
|  1|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|
|  2|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|
|  3|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|
|  4|  Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|
+---+----------------+----+--------------+---------------+-----+---------+-----+
only showing top 5 rows



In [639]:
spark_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Churn: integer (nullable = true)



In [640]:
#rename first column to index
spark_df=spark_df.withColumnRenamed("_c0","index")

In [641]:
#rename churn column to "label"
spark_df=spark_df.withColumnRenamed("churn","label")

When I tried to show the accuracy below in the code, it showed me an error that the column label doesn't exist, so I renamed the churn column to label and the error didn't occur.

P.S.: Later on I read somewhere that Pyspark ML models wants to have "features" ,"label" datas

In [642]:
spark_df.columns

['index',
 'Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'label']

In [643]:
spark_df.describe().show()

+-------+------------------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|summary|             index|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|              label|
+-------+------------------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|  count|               900|          900|              900|              900|               900|              900|               900|                900|
|   mean|             449.5|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|0.16666666666666666|
| stddev|259.95191863111916|         null|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.7648355920350969| 0.3728852122772358|
|    min|                 0|   Aaron King|             22.0|          

In [644]:
spark_df.groupby("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1|  150|
|    0|  750|
+-----+-----+



To combine all feature data and separate 'churn' data in a dataset, we use VectorAssembler. Then we split our data into train and test data

In [645]:
from pyspark.ml.feature import VectorAssembler

In [646]:
#Index and Names columns will not have an impact on our predictions, so we don't take them into our Assembler
vect = VectorAssembler(inputCols=['Age','Total_Purchase','Account_Manager','Years','Num_Sites'],outputCol='features')

In [647]:
vect_df = vect.transform(spark_df) #transform our DataFrame to a vector
vect_df = vect_df.select(['features', 'label']) # we only select 'features' and 'label' columns because Pyspark ML models wants the dataset like this
vect_df.show(3)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[42.0,11066.8,0.0...|    1|
|[41.0,11916.22,0....|    1|
|[38.0,12884.75,0....|    1|
+--------------------+-----+
only showing top 3 rows



In [648]:
# split data into train and test 
(train, test) = vect_df.randomSplit([0.9, 0.1])

In [664]:
train.show(3)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[22.0,11254.38,1....|    0|
|[25.0,9672.03,0.0...|    0|
|[26.0,8787.39,1.0...|    1|
+--------------------+-----+
only showing top 3 rows



(None,
 <bound method DataFrame.count of DataFrame[features: vector, label: int]>)

In [650]:
test.show(3)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[27.0,8628.8,1.0,...|    0|
|[30.0,12788.37,0....|    0|
|[32.0,5756.12,0.0...|    0|
+--------------------+-----+
only showing top 3 rows



### Using  Gradient-boosted Tree Classifier to fit and predict our model

In [651]:
# fit the model on train data
gbtc = GBTClassifier(labelCol="label", maxIter=20) #max itteration 20
gbtc = gbtc.fit(train) # fit train data

# predict
pred = gbtc.transform(test) #transform() method is for predicting instead of predict()

In [652]:
pred.show(5)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[27.0,8628.8,1.0,...|    0|[1.26113490121831...|[0.92568834420212...|       0.0|
|[30.0,12788.37,0....|    0|[0.78232502750770...|[0.82701959333826...|       0.0|
|[32.0,5756.12,0.0...|    0|[1.28328904187560...|[0.92867938095815...|       0.0|
|[32.0,12254.75,1....|    0|[0.98188947904986...|[0.87694134079162...|       0.0|
|[32.0,13630.93,0....|    0|[1.66467796868417...|[0.96542227599104...|       0.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 5 rows



### Check the accuracy using MulticlassClassificationEvaluator

In [653]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import confusion_matrix

In [654]:
evaluator=MulticlassClassificationEvaluator(metricName="accuracy")

In [655]:
acc = evaluator.evaluate(pred)
print("Accuracy Score: ", acc)

Accuracy Score:  0.9523809523809523


Create a confusion matrix

In [656]:
y_predict=pred.select("prediction").collect()
y_original=pred.select("label").collect()
confmatrix = confusion_matrix(y_original, y_predict)

In [657]:
print("Confusion Matrix:")
print(confmatrix)

Confusion Matrix:
[[73  3]
 [ 1  7]]


### Linear Regression with Spark

In [658]:
from pyspark.ml.regression import LinearRegression

In [659]:
#Create Linear regression model
lir = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Train the model using our training data
model = lir.fit(train)

# Now see if we can predict values in our test data.
# Generate predictions using our linear regression model for all features in our
# test dataframe:
fullPredictions = model.transform(test).cache()

# Extract the predictions and the "known" correct labels.
predictions = fullPredictions.select("prediction").rdd.map(lambda x: x[0])
labels = fullPredictions.select("label").rdd.map(lambda x: x[0])

# Zip them together
predictionAndLabel = predictions.zip(labels).collect()

# Print out the predicted and actual values for each point
for prediction in predictionAndLabel:
  print(prediction)

(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 1)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 1)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 1)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 1)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 0)
(0.17401960784313725, 1)


I didn't understand the outcome of Linear Regression

### Logistic Regression with Spark

In [669]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [661]:
# Create Logistic Regression model 
lg = LogisticRegression(labelCol='label')
lg_model = lg.fit(train) #fit the model
training_sum = lg_model.summary
training_sum.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|              label|         prediction|
+-------+-------------------+-------------------+
|  count|                816|                816|
|   mean|0.17401960784313725|0.12990196078431374|
| stddev|0.37935886450868445| 0.3364017320791374|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [662]:
preds = lg_model.evaluate(test)
preds.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[27.0,8628.8,1.0,...|    0|[5.37084399282023...|[0.99537131939924...|       0.0|
|[30.0,12788.37,0....|    0|[2.47087909669794...|[0.92207495381312...|       0.0|
|[32.0,5756.12,0.0...|    0|[4.08599598278138...|[0.98347139413087...|       0.0|
|[32.0,12254.75,1....|    0|[2.53956208329779...|[0.92686914893637...|       0.0|
|[32.0,13630.93,0....|    0|[2.27723086649077...|[0.90697367151261...|       0.0|
|[34.0,7324.32,0.0...|    0|[1.07493410726540...|[0.74553411362035...|       0.0|
|[35.0,7361.92,1.0...|    0|[2.74423847211813...|[0.93958713445352...|       0.0|
|[35.0,7814.68,1.0...|    0|[2.14348096666331...|[0.89505802466631...|       0.0|
|[35.0,9766.95,1.0...|    1|[-1.8858521844061...|[0.13171812450298...|       1.0|
|[35.0,12654.35,

In [663]:
# Using AUC (Area Under Curve)
eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='label')
acc = eval.evaluate(preds.predictions)
print(acc)

0.924342105263158


I have a question here: I started the Runtime over and over and always got very different acc scores from 70%-90%+. Why does it differ so much everytime i restart the code. If I remember correctly, in our other classification or regression models without using spark our model would always have the same acc score

### Decision Tree Classifier

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

In [666]:
# Create a DecisionTreeClassifier and fit the training data
dtc = DecisionTreeClassifier()
dtc_model = dtc.fit(train)

# Create predictions for the testing data and take a look at the predictions
prediction = dtc_model.transform(test)
prediction.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+-----------------------------------------+
|label|prediction|probability                              |
+-----+----------+-----------------------------------------+
|0    |0.0       |[0.9753086419753086,0.024691358024691357]|
|0    |0.0       |[0.9245283018867925,0.07547169811320754] |
|0    |0.0       |[0.9753086419753086,0.024691358024691357]|
|0    |0.0       |[0.9135802469135802,0.08641975308641975] |
|0    |0.0       |[0.9245283018867925,0.07547169811320754] |
+-----+----------+-----------------------------------------+
only showing top 5 rows



In [667]:
prediction.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count() #True Negatives
TP = prediction.filter('prediction = 1 AND label = prediction').count() #True Positives
FN = prediction.filter('prediction = 0 AND label = 1').count() #False Negatives
FP = prediction.filter('prediction = 1 AND label = 0').count() #False Psotives

# Accuracy measures the proportion of correct predictions
acc = (TN + TP) / (TN + TP + FN + FP)
print(acc)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|    1|
|    0|       0.0|   75|
|    1|       1.0|    7|
|    0|       1.0|    1|
+-----+----------+-----+

0.9761904761904762


In [668]:
sc.stop()