## **PySpark Machine Learning library tutorial**



### P1. Presentation of the chosen topic

This notebook aims at presenting how to compute basic statistical analysis, e.g. summary statistics, contingency tables, correlation matrices, feature selection, and solve typical machine learning problems, i.e. regression, classification and clustering in the PySpark environment using (some) spark.sql, datasets and (mostly) the spark.ml library.

Spark provides two libraries for ML, i.e. spark.mllib and spark.ml. The former is based on RDDs and is the oldest one, which is now in maintenance mode though still supported in the latter (and newer) version, which is based on DataFrames and has currently been the primary ML library in Spark since version 2.0, also thanks to its interoperability with SQL/DataFrame queries. Exercises in the following paragraphs are based on spark.ml.

Performing machine learning tasks in Spark is much quicker than using standard libraries and sequential programming. Especially in a cluster environment, allowing the system to process data parallelly speeds up the computations a lot, as can be seen in the solution timings provided in the final paragraph of this document. 

A complete list of classes and functions available in spark.ml, with some useful examples, can be found here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html. Some of the most classical ML methods will be presented in this project.

### P2. Programming questions

Given the *bank.csv* (https://www.kaggle.com/rouseguy/bankbalanced/data) dataset,
1. Read and explore the dataset, describe quantitative and qualitative variables (using DataFrames and spark.sql).
2. Compute the correlation matrix using spark.ml.
3. Perform a feature selection in spark.ml using PCA and show results (new computed features, explained variance). Choose an appropriate number of features. Each column in the dataset must have mean = 0 and std = 1 (standardised columns) before applying PCA.
4. Perform a linear regression in spark.ml to estimate the values of the 'pdays' variable using all (or part of) the other quantitative variables in the dataset. Split the dataset into training and test sets, show estimated coefficients, RMSE and R2 indices (or some equivalent).
5. Perform a logistic regression in spark.ml to classify records by their 'marital' variable, using all (or part of) the other quantitative variables. Compute area under ROC and accuracy score on both training and test sets.
6. Use a Support Vector Machine in spark.ml to perform a binary classification on the 'deposit' attribute. Compute area under ROC and accuracy score on both training and test sets.
7. Use k-Means in spark.ml to perform a clustering operation. Use the silhouette score (or some equivalent) to select the most appropriate number of clusters to build.
8. Choose one of the methods used in the previous questions and repeat the regression/classification procedure but this time also perform Cross Validation on the training set to select the best hyperparameters. Run the best model on the test set and show results (may be pretty slow).

For questions strictly related to machine learning, i.e. questions 4,5,6,7 and 8, try and apply the following pipeline: prepare the data (use VectorAssembler and StringIndexer when appropriate, see below), split the data into training and test sets, build the model, fit the training data to the model, evaluate it on both the training and test sets. If needed, go back and adjust hyperparameters.

### P3. Educational goals

Spark ML libraries are particularly useful in which they allow to solve ML problems dealing with data at large scale, in a distributed environment, offering a highly scalable approach. In a cluster environment, the details of data distribution and server coordination are hidden to the user, who is left with what looks very similar to sequential programming - not much different, in this respect, from building ML pipelines with SKLearn or Keras. However, Spark has proven to be much more efficient in terms of solution time and its approach is therefore worth exploring. The methods proposed in the questions above here cover most of a basic ML course material and could possibly support lab activities in this respect. Pipelines are pretty standard ad consist of data preparation, splitting, training and evaluation. With hopefully little effort, the ML methods covered here can be adapted to other available algorithms in spark.ml - of which the documentation provides a long list.

### Practical note on the usage of spark.ml

The one less intuitive aspect about preparing data to fit ML models in Spark is having to first pass predictor (numerical) variables to a VectorAssembler object, which puts them all in a single Vector of features. In the case of a classification problem, also the target variable needs to be passed through a StringIndexer object, which transforms it to a label format (0/1, 0/1/2, ... especially if it comes as a string, e.g. yes/no, male/female/other, ...) as follows:

The StringIndexer step is not required if the target variable is already in label format. In the case of a regression problem, there is no need to transform the target variable, which can be used as it is, but numerical variables still need transforming:

### P4. Proposed solutions

I propose a solution for the 8 programming questions and timings obtained. I tried using always the same approach while answering the questions, i.e. preparing the dataset, splitting it into training and test sets, creating the model with some chosen parameters, fitting it to the training set and evaluating it on both the training and test sets. Sometimes I will use NumPy and Pandas libraries (or RDDs) for better handling of the results - sometimes left just as comments - because spark.ml results often come in formats which cannot be used in related contexts in a straightforward way, like matplotlib graphic visualisations. However, I will try to stick to mostly Spark programming, which lets the user solve a ML task from start to finish anyway. Timings only refer to the execution of Spark programming. 

### Solutions

### Q0. Useful libraries

In [None]:
import findspark
findspark.init()

# Fundamentals
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import numpy as np
import pandas as pd
import time

# Vector Assembler and String Indexer
#from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

# Correlation
from pyspark.ml.stat import Correlation

# PCA
from pyspark.ml.feature import PCA
from pyspark.ml.feature import StandardScaler

# Linear Regression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Logistic Regression and classification evaluators
from pyspark.ml.classification import LogisticRegression # default index rmse
from pyspark.ml.evaluation import BinaryClassificationEvaluator # default metrics area under roc, does not support accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator # default metrics accuracy (also good for binary classification)

# Linear SVC
from pyspark.ml.classification import LinearSVC

# KMeans
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# initialise spark session
sc = pyspark.SparkContext()
spark = SparkSession.builder.appName("PySpark Project").getOrCreate()

### Q1. Statistics

In [None]:
# Read the dataset and print some info

start = time.time()
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("Datasets/bank/bank.csv")

In [None]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [None]:
df.show(5)

+---+----------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|age|       job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+----------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
| 59|    admin.|married|secondary|     no|   2343|    yes|  no|unknown|  5|  may|    1042|       1|   -1|       0| unknown|    yes|
| 56|    admin.|married|secondary|     no|     45|     no|  no|unknown|  5|  may|    1467|       1|   -1|       0| unknown|    yes|
| 41|technician|married|secondary|     no|   1270|    yes|  no|unknown|  5|  may|    1389|       1|   -1|       0| unknown|    yes|
| 55|  services|married|secondary|     no|   2476|    yes|  no|unknown|  5|  may|     579|       1|   -1|       0| unknown|    yes|
| 54|    admin.|married| tertiary|     no|    184|     no|  no|unknown|  5| 

In [None]:
# Summary statistics

# quantitative variables
df.describe(['age','balance','duration']).show()
df.describe(['campaign','pdays','previous']).show()

+-------+------------------+------------------+------------------+
|summary|               age|           balance|          duration|
+-------+------------------+------------------+------------------+
|  count|             11162|             11162|             11162|
|   mean|41.231947679627304|1528.5385235620856|371.99381831213043|
| stddev|11.913369192215518| 3225.413325946149|347.12838571630687|
|    min|                18|             -6847|                 2|
|    max|                95|             81204|              3881|
+-------+------------------+------------------+------------------+

+-------+------------------+------------------+------------------+
|summary|          campaign|             pdays|          previous|
+-------+------------------+------------------+------------------+
|  count|             11162|             11162|             11162|
|   mean| 2.508421429851281| 51.33040673714388|0.8325568894463358|
| stddev|2.7220771816614824|108.75828197197717| 2.29200721867

In [None]:
# qualitative variables

df.select('job','age','balance','duration')\
        .groupby('job')\
        .agg(f.mean('age'),f.mean('balance'),f.mean('duration'))\
        .show()

df.select('marital','age','balance','duration')\
        .groupby('marital')\
        .agg(f.mean('age'),f.mean('balance'),f.mean('duration'))\
        .show()
             
df.select('education','age','balance','duration')\
        .groupby('education')\
        .agg(f.mean('age'),f.mean('balance'),f.mean('duration'))\
        .show()
             
df.select('housing','age','balance','duration')\
        .groupby('housing')\
        .agg(f.mean('age'),f.mean('balance'),f.mean('duration'))\
        .show()

df.select('deposit','age','balance','duration')\
        .groupby('deposit')\
        .agg(f.mean('age'),f.mean('balance'),f.mean('duration'))\
        .show()

+-------------+------------------+------------------+------------------+
|          job|          avg(age)|      avg(balance)|     avg(duration)|
+-------------+------------------+------------------+------------------+
|   management| 40.21823850350741|1793.6636788776304| 361.4820732657833|
|      retired|  65.4241645244216|2417.2506426735217| 389.9935732647815|
|      unknown|46.857142857142854|1945.4571428571428|330.37142857142857|
|self-employed|39.809876543209874|1865.3728395061728|396.15555555555557|
|      student|26.102777777777778|1500.7833333333333| 330.6722222222222|
|  blue-collar| 39.50514403292181|1203.9264403292182|394.65895061728395|
| entrepreneur| 42.88719512195122|1621.9420731707316| 370.1829268292683|
|       admin.|39.374062968515744|1195.8665667166417| 347.9295352323838|
|   technician|  39.0016456390565|1556.2945693911136|363.79155238617665|
|     services| 38.14192849404117|1081.1711809317444|385.95557963163594|
|    housemaid| 47.44525547445255|1366.160583941605

In [None]:
# qualitative variables

#df.groupBy("job").count().show()
df.groupBy("marital").count().show()
df.groupBy("education").count().show()
df.groupBy("housing").count().show()
df.groupBy("deposit").count().show()
df.crosstab('marital','deposit').show()
df.crosstab('education','deposit').show()
df.crosstab('housing','deposit').show()

+--------+-----+
| marital|count|
+--------+-----+
|divorced| 1293|
| married| 6351|
|  single| 3518|
+--------+-----+

+---------+-----+
|education|count|
+---------+-----+
|  unknown|  497|
| tertiary| 3689|
|secondary| 5476|
|  primary| 1500|
+---------+-----+

+-------+-----+
|housing|count|
+-------+-----+
|     no| 5881|
|    yes| 5281|
+-------+-----+

+-------+-----+
|deposit|count|
+-------+-----+
|     no| 5873|
|    yes| 5289|
+-------+-----+

+---------------+----+----+
|marital_deposit|  no| yes|
+---------------+----+----+
|        married|3596|2755|
|         single|1606|1912|
|       divorced| 671| 622|
+---------------+----+----+

+-----------------+----+----+
|education_deposit|  no| yes|
+-----------------+----+----+
|         tertiary|1693|1996|
|        secondary|3026|2450|
|          primary| 909| 591|
|          unknown| 245| 252|
+-----------------+----+----+

+---------------+----+----+
|housing_deposit|  no| yes|
+---------------+----+----+
|            yes|33

In [None]:
df.groupBy("default").count().show()
df.groupBy("loan").count().show()
df.groupBy("contact").count().show()

+-------+-----+
|default|count|
+-------+-----+
|     no|10994|
|    yes|  168|
+-------+-----+

+----+-----+
|loan|count|
+----+-----+
|  no| 9702|
| yes| 1460|
+----+-----+

+---------+-----+
|  contact|count|
+---------+-----+
|  unknown| 2346|
| cellular| 8042|
|telephone|  774|
+---------+-----+



### Q2. Correlation matrix

In [None]:
# Prepare dataset for ML operations

qual_vars = ['job', 'marital', 'education', 'housing', 'deposit', 'default', 'loan', 'contact'] 
quan_vars = ['age','balance','duration','campaign','pdays','previous']

# put quantitative variables in a Vector of features
assembler = VectorAssembler(inputCols=quan_vars, outputCol = 'features')
input_dat = assembler.transform(df).select(qual_vars + ['features'])
input_dat.printSchema()

# Correlation matrix
pearsonCorr = Correlation.corr(input_dat, 'features', 'pearson').collect()[0][0]
print('Correlation matrix :')
print(pearsonCorr)
print('Time elapsed: {}'.format(time.time()-start))

#pandas visualisation
df_corr = pd.DataFrame(pearsonCorr.toArray(), columns = quan_vars, index = quan_vars)
df_corr

root
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- deposit: string (nullable = true)
 |-- default: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- features: vector (nullable = true)

Correlation matrix :
DenseMatrix([[ 1.00000000e+00,  1.12299889e-01,  1.89228074e-04,
              -5.27793616e-03,  2.77383431e-03,  2.01685612e-02],
             [ 1.12299889e-01,  1.00000000e+00,  2.24361313e-02,
              -1.38938225e-02,  1.74111486e-02,  3.08052469e-02],
             [ 1.89228074e-04,  2.24361313e-02,  1.00000000e+00,
              -4.15574588e-02, -2.73915532e-02, -2.67161713e-02],
             [-5.27793616e-03, -1.38938225e-02, -4.15574588e-02,
               1.00000000e+00, -1.02726048e-01, -4.96994980e-02],
             [ 2.77383431e-03,  1.74111486e-02, -2.73915532e-02,
              -1.02726048e-01,  1.000

Unnamed: 0,age,balance,duration,campaign,pdays,previous
age,1.0,0.1123,0.000189,-0.005278,0.002774,0.020169
balance,0.1123,1.0,0.022436,-0.013894,0.017411,0.030805
duration,0.000189,0.022436,1.0,-0.041557,-0.027392,-0.026716
campaign,-0.005278,-0.013894,-0.041557,1.0,-0.102726,-0.049699
pdays,0.002774,0.017411,-0.027392,-0.102726,1.0,0.507272
previous,0.020169,0.030805,-0.026716,-0.049699,0.507272,1.0


### Q3. Feature selection

In [None]:
# Feature selection
start = time.time()

# center and normalise column-wise before applying PCA
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=False, withMean=True)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(input_dat)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(input_dat)
scaledData.select(qual_vars + ['scaledFeatures']).show()

+-----------+--------+---------+-------+-------+-------+----+-------+--------------------+
|        job| marital|education|housing|deposit|default|loan|contact|      scaledFeatures|
+-----------+--------+---------+-------+-------+-------+----+-------+--------------------+
|     admin.| married|secondary|    yes|    yes|     no|  no|unknown|[17.7680523203725...|
|     admin.| married|secondary|     no|    yes|     no|  no|unknown|[14.7680523203725...|
| technician| married|secondary|    yes|    yes|     no|  no|unknown|[-0.2319476796274...|
|   services| married|secondary|    yes|    yes|     no|  no|unknown|[13.7680523203725...|
|     admin.| married| tertiary|     no|    yes|     no|  no|unknown|[12.7680523203725...|
| management|  single| tertiary|    yes|    yes|     no| yes|unknown|[0.76805232037259...|
| management| married| tertiary|    yes|    yes|     no| yes|unknown|[14.7680523203725...|
|    retired|divorced|secondary|    yes|    yes|     no|  no|unknown|[18.7680523203725...|

In [None]:
# Apply PCA

pca = PCA(k=3, inputCol = scaler.getOutputCol(), outputCol="pcaFeatures")
model = pca.fit(scaledData)
transformed_features = model.transform(scaledData)

In [None]:
# Principal components and explained variance

print('Principal components:')
print(model.pc) # principal components
transformed_features.select('pcaFeatures').show(5)
print('Explained variance: {}'.format(model.explainedVariance)) # explained variance
print('Time elapsed:',np.round(time.time()-start,2))

#pandas and numpy visualisations
#from pyspark.sql import Row
#pcs = np.round(model.pc.toArray(),4)
#df_pc = pd.DataFrame(pcs, columns = ['PC1','PC2','PC3'], index = df.columns[2:])
#df_pc
#transformed_features.select('pcaFeatures').rdd.map(lambda x: Row(pcaFeatures = str([x for x in list(np.round(x[0].toArray(),4))]))).toDF().show(5,truncate = False)

Principal components:
DenseMatrix([[-4.14792301e-04,  8.11977961e-05,  8.30434367e-05],
             [-9.99996757e-01,  2.43703351e-03, -6.11336625e-04],
             [-2.44285482e-03, -9.99950324e-01,  9.65648450e-03],
             [ 1.17364875e-05,  3.21116157e-04, -2.60118942e-03],
             [-5.87509530e-04,  9.65731689e-03,  9.99892808e-01],
             [-2.18924525e-05,  1.91221157e-04,  1.06760443e-02]])
+--------------------+
|         pcaFeatures|
+--------------------+
|[-816.07218813510...|
|[1480.88339123302...|
|[256.084128093277...|
|[-947.93905590571...|
|[1343.82431001608...|
+--------------------+
only showing top 5 rows

Explained variance: [0.987431930087071,0.011432185128056245,0.001121519333585062]
Time elapsed: 3.32


### Q4. Linear regression

In [None]:
# Build a new assembled dataset, try to predict value of 'pdays' variable using the 'previous' variable
# note: corr(pday,previous) = 0.5, the only significant one, so it's worth trying this setting

start = time.time()
#reg_vars = ['age','balance','duration','campaign','previous']
assembler = VectorAssembler(inputCols=['previous'], outputCol = 'features')
input_reg = assembler.transform(df).select('pdays', 'features')
input_reg.show(5, truncate = False)

+-----+--------+
|pdays|features|
+-----+--------+
|-1   |[0.0]   |
|-1   |[0.0]   |
|-1   |[0.0]   |
|-1   |[0.0]   |
|-1   |[0.0]   |
+-----+--------+
only showing top 5 rows



In [None]:
# split training and test sets

splits = input_reg.randomSplit([0.9, 0.1])
train_df = splits[0]
test_df = splits[1]
print(train_df.count(),test_df.count())

10056 1106


In [None]:
# perform linear regression for variable 'pdays'

lr = LinearRegression(featuresCol='features', labelCol='pdays', maxIter=100, regParam=0.00001, elasticNetParam=0.85)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [23.551551677403662]
Intercept: 31.239931851804332


In [None]:
# summary of results on training data

trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("R2: %f" % trainingSummary.r2)

RMSE: 93.148806
R2: 0.253946


In [None]:
# summary of results on test data

lr_predictions = lr_model.transform(test_df)
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="pdays", metricName="r2")
test_result = lr_model.evaluate(test_df)
print("RMSE: %g" % test_result.rootMeanSquaredError)
print("R2: %g" % lr_evaluator.evaluate(lr_predictions))
# NOTE: R2 and RMSE are not very good as a result of all variables being poorly correlated in the first place.

RMSE: 98.8671
R2: 0.28124


In [None]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","pdays","features").show(5)

+------------------+-----+--------+
|        prediction|pdays|features|
+------------------+-----+--------+
|31.239931851804332|   -1|   [0.0]|
|31.239931851804332|   -1|   [0.0]|
|31.239931851804332|   -1|   [0.0]|
|31.239931851804332|   -1|   [0.0]|
|31.239931851804332|   -1|   [0.0]|
+------------------+-----+--------+
only showing top 5 rows



In [None]:
print('Time elapsed:',(time.time()-start))

Time elapsed: 5.2130045890808105


### Q5. Logistic regression

In [None]:
# Ternary classification of 'marital' variable (0-1-2) using all the other quantitative variables

start = time.time()

# turn target string variable to label format
label_stringIdx = StringIndexer(inputCol = 'marital', outputCol = 'label')
model = label_stringIdx.fit(df)
df1 = model.transform(df)

# turn quantitative variables to feature vector (all quan_vars)
assembler = VectorAssembler(inputCols=quan_vars, outputCol = 'features')
input_log = assembler.transform(df1).select('label', 'features')
input_log.show(5, truncate = False)

# split into training and test
splits = input_log.randomSplit([0.95, 0.05])
train_df = splits[0]
test_df = splits[1]
print(train_df.count(),test_df.count())

# create and fit LR model
log = LogisticRegression(featuresCol='features', labelCol='label', regParam=1e-4,
                         maxIter=100, elasticNetParam=0.65, tol=1e-8)
logModel = log.fit(train_df)

+-----+---------------------------------+
|label|features                         |
+-----+---------------------------------+
|0.0  |[59.0,2343.0,1042.0,1.0,-1.0,0.0]|
|0.0  |[56.0,45.0,1467.0,1.0,-1.0,0.0]  |
|0.0  |[41.0,1270.0,1389.0,1.0,-1.0,0.0]|
|0.0  |[55.0,2476.0,579.0,1.0,-1.0,0.0] |
|0.0  |[54.0,184.0,673.0,2.0,-1.0,0.0]  |
+-----+---------------------------------+
only showing top 5 rows

10589 573


In [None]:
# evaluate model

# Create evaluators
evaluatorMulti = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction") # for accuracy
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName='areaUnderROC') # for roc

# Make predicitons
out_tr = logModel.transform(train_df).select("label", "prediction")
out_te = logModel.transform(test_df).select("label", "prediction")

# Get metrics
acc_tr = evaluatorMulti.evaluate(out_tr, {evaluatorMulti.metricName: "accuracy"})
auc_tr = evaluator.evaluate(out_tr)
acc_te = evaluatorMulti.evaluate(out_te, {evaluatorMulti.metricName: "accuracy"})
auc_te = evaluator.evaluate(out_te)
print('Training, accuracy: {:.4f}, area under ROC: {:.4f}'.format(acc_tr,auc_tr))
print('Test, accuracy: {:.4f}, area under ROC: {:.4f}'.format(acc_te,auc_te))

print('Time elapsed:',(time.time()-start)) # end time for cluster comparison

Training, accuracy: 0.6642, area under ROC: 0.6522
Test, accuracy: 0.6928, area under ROC: 0.6814
Time elapsed: 19.11789298057556


### Q6. Support Vector Machine

In [None]:
# Binary classification of 'deposit' variable using all the other quantitative variables

start = time.time()

# turn target string variable to label format
label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')
model = label_stringIdx.fit(df)
df2 = model.transform(df)

# turn quantitative variables to feature vector (all quan_vars)
assembler = VectorAssembler(inputCols=quan_vars, outputCol = 'features')
input_svm = assembler.transform(df2).select('label', 'features')
input_svm.show(5, truncate = False)

# split into training and test
splits = input_svm.randomSplit([0.95, 0.05])
train_df = splits[0]
test_df = splits[1]
print(train_df.count(),test_df.count())

svm = LinearSVC(featuresCol='features', labelCol='label', regParam=0.1, maxIter=200)
svmModel = svm.fit(train_df)

+-----+---------------------------------+
|label|features                         |
+-----+---------------------------------+
|1.0  |[59.0,2343.0,1042.0,1.0,-1.0,0.0]|
|1.0  |[56.0,45.0,1467.0,1.0,-1.0,0.0]  |
|1.0  |[41.0,1270.0,1389.0,1.0,-1.0,0.0]|
|1.0  |[55.0,2476.0,579.0,1.0,-1.0,0.0] |
|1.0  |[54.0,184.0,673.0,2.0,-1.0,0.0]  |
+-----+---------------------------------+
only showing top 5 rows

10608 554


In [None]:
# evaluate model

# Create evaluators
evaluatorMulti = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction") # for accuracy
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName='areaUnderROC') # for roc

# Make predicitons
out_tr = svmModel.transform(train_df).select("label", "prediction")
out_te = svmModel.transform(test_df).select("label", "prediction")

# Get metrics
acc_tr = evaluatorMulti.evaluate(out_tr, {evaluatorMulti.metricName: "accuracy"})
auc_tr = evaluator.evaluate(out_tr)
acc_te = evaluatorMulti.evaluate(out_te, {evaluatorMulti.metricName: "accuracy"})
auc_te = evaluator.evaluate(out_te)

print('Training, accuracy: {:.4f}, area under ROC: {:.4f}'.format(acc_tr,auc_tr))
print('Test, accuracy: {:.4f}, area under ROC: {:.4f}'.format(acc_te,auc_te))

print('Time elapsed:',(time.time()-start)) # end time for cluster comparison

Training, accuracy: 0.7328, area under ROC: 0.7264
Test, accuracy: 0.7310, area under ROC: 0.7293
Time elapsed: 12.639104843139648


### Q7. K-Means

In [None]:
# K-means clustering of records by quantitative variables

start = time.time()
silhouette_score=[]
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='features', \
                                metricName='silhouette', distanceMeasure='squaredEuclidean')

for i in range(2,10):
    kmeans = KMeans(featuresCol='features', k=i)
    model_km = kmeans.fit(input_dat)
    output = model_km.transform(input_dat)
    score = evaluator.evaluate(output)
    silhouette_score.append(score)
    print("k = {}, silhouette score: {}".format(i,score))

k = 2, silhouette score: 0.9398805175144744
k = 3, silhouette score: 0.8777665380478766
k = 4, silhouette score: 0.8278052038397186
k = 5, silhouette score: 0.8234949531297522
k = 6, silhouette score: 0.7725640000849053
k = 7, silhouette score: 0.7202539616957179
k = 8, silhouette score: 0.7310444194004343
k = 9, silhouette score: 0.6110342465304519


In [None]:
print('Time elapsed:',(time.time()-start,2))

Time elapsed: (26.960657119750977, 2)


### Q8. Cross-Validation

In [None]:
# Example with logistic regression on variable 'marital'

input_log.show(5, truncate = False)

# split into training and test
splits = input_log.randomSplit([0.95, 0.05])
train_df = splits[0]
test_df = splits[1]
print(train_df.count(),test_df.count())

lr = LogisticRegression(featuresCol='features', labelCol='label')

# Create ParamGrid for Cross Validation
lrparamGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.0001, 0.001, 0.01, 0.1])
             #  .addGrid(lr.regParam, [0.01, 0.1, 0.5])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             #  .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())

# Evaluate model
lrevaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction") # for accuracy

# Create 5-fold CrossValidator
lrcv = CrossValidator(estimator = lr,
                    estimatorParamMaps = lrparamGrid,
                    evaluator = lrevaluator,
                    numFolds = 5)

# Run cross validations
lrcvModel = lrcv.fit(train_df)

+-----+---------------------------------+
|label|features                         |
+-----+---------------------------------+
|0.0  |[59.0,2343.0,1042.0,1.0,-1.0,0.0]|
|0.0  |[56.0,45.0,1467.0,1.0,-1.0,0.0]  |
|0.0  |[41.0,1270.0,1389.0,1.0,-1.0,0.0]|
|0.0  |[55.0,2476.0,579.0,1.0,-1.0,0.0] |
|0.0  |[54.0,184.0,673.0,2.0,-1.0,0.0]  |
+-----+---------------------------------+
only showing top 5 rows

10606 556


In [None]:
# Create evaluator for area under roc (not optimised)
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName='areaUnderROC') # for roc

# Make predicitons using best model
out_tr = lrcvModel.bestModel.transform(train_df).select("label", "prediction")
out_te = lrcvModel.bestModel.transform(test_df).select("label", "prediction")

# Get metrics
acc_tr = lrevaluator.evaluate(out_tr, {evaluatorMulti.metricName: "accuracy"})
auc_tr = evaluator.evaluate(out_tr)
acc_te = lrevaluator.evaluate(out_te, {evaluatorMulti.metricName: "accuracy"})
auc_te = evaluator.evaluate(out_te)

print('Training, accuracy: {:.4f}, area under ROC: {:.4f}'.format(acc_tr,auc_tr))
print('Test, accuracy: {:.4f}, area under ROC: {:.4f}'.format(acc_te,auc_te))
print('Time elapsed:',(time.time()-start))

Training, accuracy: 0.6229, area under ROC: 0.6542
Test, accuracy: 0.6133, area under ROC: 0.6388
Time elapsed: 468.4383044242859
