# Machine Learning in PySpark

We will look at implementation of machine learning techniques such as logisitic regression in PySpark. 
The dataset used can be found on https://github.com/sam16tyagi/Machine-Learning-techniques-in-python/blob/master/logistic%20regression%20dataset-Social_Network_Ads.csv


## Exercise 1 Load your dataset 

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [2]:
spark

In [3]:
df = spark.read.csv(
    "Social_Network_Ads.csv", inferSchema=True, header=True
)

Use print schema to show a summary of your data. 

In [4]:
df.printSchema()

root
 |-- User ID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- EstimatedSalary: integer (nullable = true)
 |-- Purchased: integer (nullable = true)



show the top 2

In [5]:
df.show(2)

+--------+------+---+---------------+---------+
| User ID|Gender|Age|EstimatedSalary|Purchased|
+--------+------+---+---------------+---------+
|15624510|  Male| 19|          19000|        0|
|15810944|  Male| 35|          20000|        0|
+--------+------+---+---------------+---------+
only showing top 2 rows



## Exercise 2 
Using Select() take all features except UserID.

In [6]:
df2 = df.select("Gender","Age", "EstimatedSalary", "Purchased")
df2.show(2)

+------+---+---------------+---------+
|Gender|Age|EstimatedSalary|Purchased|
+------+---+---------------+---------+
|  Male| 19|          19000|        0|
|  Male| 35|          20000|        0|
+------+---+---------------+---------+
only showing top 2 rows



## Exercise 3 
Use a 70-30 ratio for train and testing split.

In [7]:
test, train = df2.randomSplit([0.3, 0.7], seed=10)

In [8]:
[test.count(), train.count()]

[120, 280]

In [9]:
test.show(2)

+------+---+---------------+---------+
|Gender|Age|EstimatedSalary|Purchased|
+------+---+---------------+---------+
|Female| 18|          44000|        0|
|Female| 19|          26000|        0|
+------+---+---------------+---------+
only showing top 2 rows



In [10]:
train.show(2)

+------+---+---------------+---------+
|Gender|Age|EstimatedSalary|Purchased|
+------+---+---------------+---------+
|Female| 18|          68000|        0|
|Female| 18|          86000|        0|
+------+---+---------------+---------+
only showing top 2 rows



### Dtypes
In this dataset, any column of type string is treated as a categorical feature, but sometimes we might have numeric features we want treated as categorical or vice versa. We’ll need to carefully identify which columns are numeric and which are categorical.

In [11]:
train.dtypes

[('Gender', 'string'),
 ('Age', 'int'),
 ('EstimatedSalary', 'int'),
 ('Purchased', 'int')]

### One hot encoding for categorical values

StringIndexer:
Converts a single feature to an index feature.
http://spark.apache.org/docs/latest/ml-features#stringindexer

OneHotEncoder:
http://spark.apache.org/docs/latest/ml-features#onehotencoder



In [12]:
#new 

catCols = [x for (x, dataType) in df2.dtypes if dataType == "string"]
numCols = [ x for (x, dataType) in df2.dtypes if ((dataType == "int") & (x != "Purchased")) ]
print(numCols)
print(catCols)

['Age', 'EstimatedSalary']
['Gender']


In [13]:
df2.agg(F.countDistinct("Gender")).show()

+-------------+
|count(Gender)|
+-------------+
|            2|
+-------------+



In [14]:
df2.groupBy("Gender").count().show()

+------+-----+
|Gender|count|
+------+-----+
|Female|  204|
|  Male|  196|
+------+-----+



In [15]:
# ______ OLD _______
#catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
#numCols = [
#    x for (x, dataType) in train.dtypes if ((dataType == "int") & (x != "Purchased"))
#]
#print(numCols)
#print(catCols)

In [16]:
#train.agg(F.countDistinct("Gender")).show()

In [17]:
#train.groupBy("Gender").count().show()

In [18]:
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
)

In [19]:
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip")
    for x in catCols
]

In [20]:
string_indexer

[StringIndexer_942a8b8d0c45]

In [21]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]

In [22]:
one_hot_encoder

[OneHotEncoder_f359c9e202b1]

### Vector assembling

VectorAssembler:
Combines the values of input columns into a single vector.
http://spark.apache.org/docs/latest/ml-features#vectorassembler


In [23]:
from pyspark.ml.feature import VectorAssembler

In [24]:
assemblerInput = [x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]

In [25]:
assemblerInput

['Age', 'EstimatedSalary', 'Gender_OneHotEncoder']

In [26]:
vector_assembler = VectorAssembler(
    inputCols=assemblerInput, outputCol="VectorAssembler_features"
)

## Exercise 4
Stage together all your process 



In [27]:
stages = [string_indexer[0], one_hot_encoder[0], vector_assembler]

In [28]:
stages

[StringIndexer_942a8b8d0c45,
 OneHotEncoder_f359c9e202b1,
 VectorAssembler_ad5737ca619f]

In [29]:
import os
from os.path import isfile, join

In [30]:
%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)


CPU times: user 2.06 ms, sys: 0 ns, total: 2.06 ms
Wall time: 2.38 ms


## Exercise 4 
Describe the timings found above. Refer to pipeline documentation on spark. 

In [31]:
# System time is time spent running code on the OS kernal
#  - 926 microseconds
# CPU time is time spent on the processor running your 
# program's code
#  - 495 microseconds
# In total, sys + CPU Time is 1.42 milliseconds
# Wall time is the elapsed real time of the code running

In [32]:
# fit and transform model
model = pipeline.fit(train)
pp_df = model.transform(test)

## Exercise 5 
Select the correct feature vectors 

In [33]:
temp = pp_df.select("Age", "EstimatedSalary", "Gender_OneHotEncoder", "VectorAssembler_features", "Purchased")
temp.show(20)

+---+---------------+--------------------+------------------------+---------+
|Age|EstimatedSalary|Gender_OneHotEncoder|VectorAssembler_features|Purchased|
+---+---------------+--------------------+------------------------+---------+
| 18|          44000|       (1,[0],[1.0])|      [18.0,44000.0,1.0]|        0|
| 19|          26000|       (1,[0],[1.0])|      [19.0,26000.0,1.0]|        0|
| 22|          27000|       (1,[0],[1.0])|      [22.0,27000.0,1.0]|        0|
| 24|          55000|       (1,[0],[1.0])|      [24.0,55000.0,1.0]|        0|
| 26|          15000|       (1,[0],[1.0])|      [26.0,15000.0,1.0]|        0|
| 26|          43000|       (1,[0],[1.0])|      [26.0,43000.0,1.0]|        0|
| 26|          52000|       (1,[0],[1.0])|      [26.0,52000.0,1.0]|        0|
| 26|          80000|       (1,[0],[1.0])|      [26.0,80000.0,1.0]|        0|
| 28|          59000|       (1,[0],[1.0])|      [28.0,59000.0,1.0]|        0|
| 28|          84000|       (1,[0],[1.0])|      [28.0,84000.0,1.

### Logistic Regression

In [34]:
from pyspark.ml.classification import LogisticRegression

## Exercise 6
Select and assemble your data

In [35]:
data = pp_df.selectExpr("VectorAssembler_features as features", "Purchased as label")

In [36]:
data.show(5, truncate=False)

+------------------+-----+
|features          |label|
+------------------+-----+
|[18.0,44000.0,1.0]|0    |
|[19.0,26000.0,1.0]|0    |
|[22.0,27000.0,1.0]|0    |
|[24.0,55000.0,1.0]|0    |
|[26.0,15000.0,1.0]|0    |
+------------------+-----+
only showing top 5 rows



In [37]:
%%time
model = LogisticRegression().fit(data)

CPU times: user 29.4 ms, sys: 14.6 ms, total: 44 ms
Wall time: 5.75 s


In [38]:
model.summary.areaUnderROC

0.9362026862026862

In [39]:
model.summary.pr.show()

+--------------------+------------------+
|              recall|         precision|
+--------------------+------------------+
|                 0.0|               1.0|
|0.023809523809523808|               1.0|
|0.047619047619047616|               1.0|
| 0.07142857142857142|               1.0|
| 0.09523809523809523|               1.0|
| 0.11904761904761904|               1.0|
| 0.14285714285714285|               1.0|
| 0.16666666666666666|               1.0|
| 0.19047619047619047|               1.0|
| 0.21428571428571427|               1.0|
| 0.23809523809523808|               1.0|
|  0.2619047619047619|               1.0|
|  0.2857142857142857|               1.0|
| 0.30952380952380953|               1.0|
|  0.3333333333333333|               1.0|
| 0.35714285714285715|               1.0|
| 0.38095238095238093|               1.0|
| 0.38095238095238093|0.9411764705882353|
| 0.40476190476190477|0.9444444444444444|
| 0.42857142857142855|0.9473684210526315|
+--------------------+------------

## Exercise 7 
Obtain the confusion matrix of the classifer.

In [40]:
Summary = model.summary

accuracy = Summary.accuracy
falsePositiveRate = Summary.weightedFalsePositiveRate
truePositiveRate = Summary.weightedTruePositiveRate
fMeasure = Summary.weightedFMeasure()
precision = Summary.weightedPrecision
recall = Summary.weightedRecall
print("Accuracy: %s \nPrecision: %s\nRecall: %s\n\nFalse Positive Rate: %s\nTrue Positive Rate: %s\nF-measure: %s"
      % (accuracy, precision, recall, falsePositiveRate, truePositiveRate, fMeasure))

Accuracy: 0.85 
Precision: 0.8483311938382541
Recall: 0.8500000000000001

False Positive Rate: 0.20164835164835165
True Positive Rate: 0.8500000000000001
F-measure: 0.848125
