<a href="https://colab.research.google.com/github/cagBRT/PySpark/blob/master/PySpark_ML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

https://towardsdatascience.com/machine-learning-on-a-large-scale-2eef3bb749ee

# Machine Learning with PySpark

In this notebook you use PySpark and create, train, test an ML model. <br>
The dataset used for this example is not large enough to require PySpark, but is used to demonstrate the techniques. 

In [None]:
!pip install pyspark

The model used for this notebook comes from the PySpark library. <br><br>
PySpark is designed to handle large datasets that are not feasible to work with on a single machine using pandas. If you have a dataset that is too large to fit in memory, or if you need to perform iterative or distributed computations, PySpark is the better choice.<br><br>



In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

from pyspark.ml import Pipeline, Estimator, Transformer, Model, PipelineModel
import pyspark.ml.feature as MFT
import pyspark.ml.functions as MF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

Start the Spark session

In [None]:
# get a spark session, UI at http://localhost:4040/executors/
spark = (
    SparkSession.builder
        .appName('learn')
        .config('spark.driver.memory', '8g')
        .master('local[4]',)
        .config('spark.sql.execution.arrow.pyspark.enabled', True)
        .config('spark.sql.execution.arrow.pyspark.fallback.enabled', False)
        .getOrCreate()
)

In [None]:
import seaborn as sns

Load the Iris dataset form the Seaborn library

In [None]:
df = sns.load_dataset('iris')

This dataset is small: 150 rows and four features

In [None]:
df.describe()

In [None]:
sns.pairplot(df, hue='species')

Create a binary class datset: virginica and not virginica<br>
There are:<br>
>100 not virginica data points<br>
50 virginica data points


In [None]:
df_binary = df.assign(species=df['species'].where(df['species']=='virginica', 'not virginica'))

df_binary['species'].value_counts()

In [None]:
df_binary = spark.createDataFrame(df_binary)
df_binary.printSchema()

In [None]:
df_binary.head(15)

Create a features column of the 4 features 

In [None]:
# pack the features into a vector (this is a transformer)
feature_assembler = MFT.VectorAssembler(inputCols=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'], outputCol='features')

tmp = feature_assembler.transform(df_binary)
tmp.show(n=3, truncate=False)

**The pprint module** in Python is a utility module that you can use to print data structures in a readable, pretty way. It's a part of the standard library that's especially useful for debugging code dealing with API requests, large JSON files, and data in general.

In [None]:
from pprint import pprint
pprint(tmp.schema['features'].metadata)
# {'ml_attr': {'attrs': {'numeric': [{'idx': 0, 'name': 'sepal_length'},
#                                    {'idx': 1, 'name': 'sepal_width'},
#                                    {'idx': 2, 'name': 'petal_length'},
#                                    {'idx': 3, 'name': 'petal_width'}]},
#              'num_attrs': 4}}

pprint([col['name'] for col in tmp.schema['features'].metadata['ml_attr']['attrs']['numeric']])
# ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']

**Normalize the data using the MinMaxScaler function**<br>
Rescale each feature individually to a common range [min, max] linearly using column summary statistics, which is also known as min-max normalization or Rescaling

In [None]:
# pack the features into a vector (this is a transformer)
from sklearn.preprocessing import MinMaxScaler
#feature_assembler = MFT.VectorAssembler(inputCols=['sepal_length', 'sepal_width', 
#                                                   'petal_length', 'petal_width'], outputCol='features')

# create a min max scaler (this is an estimator)
minMax_scaler = MFT.MinMaxScaler(min=0., max=1., inputCol='features', outputCol='features_scaled')
minmax_scaler_model=minMax_scaler.fit(tmp)

tmp = minmax_scaler_model.transform(tmp)
tmp.show(n=3, truncate=False)

Create a pipeline to automate assembling the features and the data normalization.

In [None]:
# initiate the pipeline
pipeline = Pipeline(stages=[feature_assembler, minMax_scaler])

# fit the pipeline
pipeline_model = pipeline.fit(df_binary)

tmp = pipeline_model.transform(df_binary)
tmp.show(n=3, truncate=False)


StringIndexer encodes a string column of labels to a column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels).

By default, this is ordered by label frequencies so the most frequent label gets index 0.

In [None]:
# string indexer (this is a transformer)
fromlabelsModel = MFT.StringIndexerModel.from_labels(["not virginica", "virginica"],
                                                     inputCol="species", outputCol="species_indexed",
                                                     handleInvalid="error")
# add a stage to the pipeline
pipeline = pipeline.setStages(pipeline.getStages() + [fromlabelsModel])

pipeline.getStages()
# [VectorAssembler_d5758e224502, MinMaxScaler_f73f5af81295, StringIndexerModel: uid=strIdx_f67ba1442b96, handleInvalid=error]   

In [None]:
print(fromlabelsModel.labels)
print(fromlabelsModel.getOutputCol())

In [None]:
train, test = df_binary.randomSplit([0.5, 0.5], seed=11)
train.cache()

In [None]:
for npar in [10, 20, 30]:
    df_binary = df_binary.repartition(npar)
    train, test = df_binary.randomSplit([0.5, 0.5], seed=14)
    print(train.count(), test.count())

In [None]:
# first attempt to build a model (whole code given)
import seaborn as sns
df = sns.load_dataset('iris')

# binary classification
df_binary = df.assign(species=df['species'].where(df['species']=='virginica', 'not virginica'))
df_binary = spark.createDataFrame(df_binary)

# pack the features into a vector (this is a transformer)
# we do not use all features to allow some room for model tuning
# feature_assembler = MFT.VectorAssembler(inputCols=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'], outputCol='features')
feature_assembler = MFT.VectorAssembler(inputCols=['sepal_width', 'petal_width'], outputCol='features')

# create a min max scaler (this is an estimator)
minMax_scaler = MFT.MinMaxScaler(min=0., max=1., inputCol='features', outputCol='features_scaled')

# string indexer (this is a transformer)
labels = ["not virginica", "virginica"]
fromlabelsModel = MFT.StringIndexerModel.from_labels(labels,
                                                     inputCol="species", outputCol="species_indexed",
                                                     handleInvalid="error")

# create a binomial logistic regression model (this is an estimator)
lr = LogisticRegression(featuresCol='features_scaled', labelCol='species_indexed', predictionCol='prediction')

# build the pipeline
pipeline = Pipeline(stages=[feature_assembler, minMax_scaler, fromlabelsModel, lr])

# cache the dataset to ensure deterministic behaviour and ensure fast model fitting
df_binary.cache()
train, test = df_binary.randomSplit([0.2, 0.8], seed=8)

# fit the model
pipeline_model = pipeline.fit(train)

# apply the model to the test set
results = pipeline_model.transform(test)
results = MFT.IndexToString(inputCol='prediction', outputCol='species_predicted', labels=labels).transform(results)

In [None]:
# compute the confusion matrix manually (for the training set)
confusion_matrix = (results.groupby('species').pivot('species_predicted').count().toPandas()
                    .set_index('species').sort_index(axis='index', ascending=False).sort_index(axis='columns', ascending=False)
                    .fillna(0).astype(int)
                    )

In [None]:
# plot the confusion matrix
ax = sns.heatmap(confusion_matrix, cmap='Blues', annot=True, cbar=False)
ax.set_xlabel('Prediction')
ax.set_ylabel('True value')

In [None]:
# evaluate the model (test set)
lr_model = pipeline_model.stages[-1]
metrics = lr_model.evaluate(results.select('features_scaled', 'species_indexed'))

print(f'Recall, sensitivity or true positive rate {metrics.recallByLabel[1]:.4f}')
print(f'Precision or positive predictive value {metrics.precisionByLabel[1]:.4f}')
print(f'Specificity or true negative rate {1-metrics.falsePositiveRateByLabel[1]:.4f}')
print(f'False positive rate or (1 - specificity) {metrics.falsePositiveRateByLabel[1]:.4f}')


In [None]:
# compute the ROC curve manually
import pandas as pd
import numpy as np

from tqdm import tqdm
results = results.withColumn('probability', MF.vector_to_array(F.col('probability'))).withColumn('prob 0', F.col('probability')[0]).withColumn('prob 1', F.col('probability')[1])
raw = results.select(MF.vector_to_array(F.col('rawPrediction')).alias('raw prediction')).select(F.col('raw prediction')[0].alias('raw prediction 0'), F.col('raw prediction')[1].alias('raw prediction 1'))
roc_manually = pd.DataFrame({'threshold':[0., 1.], 'FPR': [1., 0.], 'TPR': [1., 0.]})
for threshold in tqdm(np.concatenate((np.arange(0.001, 0.02, 0.001),
                                      np.arange(0.02, 1.0, 0.02),
                                      np.arange(0.981, 1.00, 0.001)
                                      ))):
    TP = results.where((F.col('prob 1')>=threshold) & (F.col('species_indexed')==1.)).count()
    TN = results.where((F.col('prob 1')<threshold) & (F.col('species_indexed')==0.)).count()
    FP = results.where((F.col('prob 1')>=threshold) & (F.col('species_indexed')==0.)).count()
    FN = results.where((F.col('prob 1')<threshold) & (F.col('species_indexed')==1.)).count()
    TPR = TP/(TP+FN)
    FPR = FP/(TN+FP)
    print(TP, TN, FP, FN)
    roc_manually = pd.concat([roc_manually, pd.DataFrame([[threshold, FPR, TPR]], columns=roc_manually.columns)], axis='index', ignore_index=True)

# compute the ROC curve using the PySpark API
roc_pyspark = metrics.roc.toPandas()

# visualise the ROC curve
ax = sns.scatterplot(data=roc_manually.sort_values(by='FPR'), x='FPR', y='TPR', marker='o', ec='b', facecolor='none', label='ROC (manually)')
sns.scatterplot(data=roc_pyspark, x='FPR', y='TPR', color='r', marker='+', ax=ax, label='ROC (PySpark)')
ax.set_xlabel('False positive rate (1 - specificity)')
ax.set_xlabel('True positive rate (sensitivity)')

In [None]:
# evaluate the model performance
evaluator = BinaryClassificationEvaluator(labelCol='species_indexed', rawPredictionCol='rawPrediction', metricName='areaUnderROC')
areaUnderROC = evaluator.evaluate(results.select('species_indexed', 'rawPrediction'))

print(f'The area under the ROC curve for the test set is {areaUnderROC:.4f}')

**Pyspark cache()** method is used to cache the intermediate results of the transformation so that other transformation runs on top of cached will perform faster. Caching the result of the transformation is one of the optimization tricks to improve the performance of the long-running PySpark applications/jobs.



Caching a DataFrame that can be reused for multi-operations will significantly improve any PySpark job. Below are the benefits of cache().<br>

Cost-efficient – Spark computations are very expensive hence reusing the computations are used to save cost.<br>
Time-efficient – Reusing repeated computations saves lots of time.
Execution time – Saves execution time of the job and we can perform more jobs on the same cluster.


In [None]:
# complete code, incuding cross-validation
import seaborn as sns
df = sns.load_dataset('iris')

# binary classification
df_binary = df.assign(species=df['species'].where(df['species']=='virginica', 'not virginica'))
df_binary = spark.createDataFrame(df_binary)

# cache the dataset to ensure deterministic behaviour and ensure fast model fitting
df_binary.cache()
train, test = df_binary.randomSplit([0.6, 0.4], seed=8)

# pack the features into a vector (this is a transformer)
feature_assembler = MFT.VectorAssembler(inputCols=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'], outputCol='features')

# create a min max scaler (this is an estimator)
minMax_scaler = MFT.MinMaxScaler(min=0., max=1., inputCol='features', outputCol='features_scaled')

# string indexer (this is a transformer)
labels = ["not virginica", "virginica"]
fromlabelsModel = MFT.StringIndexerModel.from_labels(labels,
                                                     inputCol="species", outputCol="species_indexed",
                                                     handleInvalid="error")

# create a binomial logistic regression model (this is an estimator)
lr = LogisticRegression(featuresCol='features_scaled', labelCol='species_indexed', predictionCol='prediction')

# build the pipeline
pipeline = Pipeline(stages=[feature_assembler, minMax_scaler, fromlabelsModel, lr])

# build the parameter map (grid)
from itertools import combinations, chain
from pprint import pprint
from time import time
cols = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
cols_combinations = list(chain(combinations(cols, 2), combinations(cols, 3), combinations(cols, 4)))
pprint(cols_combinations)
# [('sepal_length', 'sepal_width'),
#  ('sepal_length', 'petal_length'),
#  ('sepal_length', 'petal_width'),
#  ('sepal_width', 'petal_length'),
#  ('sepal_width', 'petal_width'),
#  ('petal_length', 'petal_width'),
#  ('sepal_length', 'sepal_width', 'petal_length'),
#  ('sepal_length', 'sepal_width', 'petal_width'),
#  ('sepal_length', 'petal_length', 'petal_width'),
#  ('sepal_width', 'petal_length', 'petal_width'),
#  ('sepal_length', 'sepal_width', 'petal_length', 'petal_width')]
param_grid = (ParamGridBuilder()
              .addGrid(lr.elasticNetParam, [0., 0.5, 1.])
              .addGrid(lr.regParam, [0., 0.1, 1., 2., 5.])
              .addGrid(feature_assembler.inputCols, cols_combinations)
              .build())

# evaluate the model performance
evaluator = BinaryClassificationEvaluator(labelCol='species_indexed', rawPredictionCol='rawPrediction', metricName='areaUnderROC')

# setup the cross validation estimator
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator,
                    numFolds=4,
                    seed=8,
                    collectSubModels=True)

# fit the model using cross-validation and grid search
start_time = time()
cv_model = cv.fit(train)

print(f'Hyper-parameter tuning using 4-fold validation took {time()-start_time: 0.2f} sec')
# Hyper-parameter tuning using 4-fold validation took  348.70 sec

In [None]:
# obtain the best model and the optimised hyper-parameters
best_model = cv_model.bestModel
print(f'features maintained in the best model {best_model.stages[0].getInputCols()}')
print(f'elastic net parameter in the best model {best_model.stages[-1].getElasticNetParam()}')
print(f'regularization parameter in the best model {best_model.stages[-1].getRegParam()}')
# features maintained in the best model ['sepal_width', 'petal_width']
# elastic net parameter in the best model 0.0
# regularization parameter in the best model 0.0

# apply the model to the training set
results = best_model.transform(train)
results = MFT.IndexToString(inputCol='prediction', outputCol='species_predicted', labels=labels).transform(results)

# compute the confusion matrix manually (for the training set)
confusion_matrix = (results.groupby('species').pivot('species_predicted').count().toPandas()
                    .set_index('species').sort_index(axis='index', ascending=False).sort_index(axis='columns', ascending=False)
                    .fillna(0).astype(int)
                    )
print(confusion_matrix)


In [None]:
# retrieve the coefficients
results = best_model.transform(train)
results = MFT.IndexToString(inputCol='prediction', outputCol='species_predicted', labels=labels).transform(results)
coef_names = ['intercept'] + [x['name'] for x in results.schema['features'].metadata['ml_attr']['attrs']['numeric']]
coef_values = [best_model.stages[-1].intercept] + list(best_model.stages[-1].coefficients)
coefs = pd.Series(coef_values, index=coef_names)

print(coefs)

In [None]:
# draw the decision boundary and show the training set as a scatter plot
results = results.select('features_scaled', 'species', 'species_predicted')
results = results.withColumn('sepal_width_scaled', MF.vector_to_array(F.col('features_scaled'))[0])
results = results.withColumn('petal_width_scaled', MF.vector_to_array(F.col('features_scaled'))[1])
results_pd = results.select('sepal_width_scaled', 'petal_width_scaled', 'species', 'species_predicted').toPandas()

ax = sns.scatterplot(data=results_pd, x='sepal_width_scaled', y='petal_width_scaled', hue='species')
xs = np.linspace(results_pd['sepal_width_scaled'].min(), results_pd['sepal_width_scaled'].max(), 3)
ys = (-coefs['intercept'] - coefs['sepal_width']*xs)/coefs['petal_width']
sns.lineplot(x=xs, y=ys, ax=ax)
ax.lines[0].set_linestyle("--")
ax.lines[0].set_color("k")
ax.set_xlabel('sepal width (scaled)')
ax.set_ylabel('petal width (scaled)')

In [None]:
# multinomial logistic regression (softmax)
import seaborn as sns
df = sns.load_dataset('iris')

# binary classification
df_multinomial = spark.createDataFrame(df)

# pack the features into a vector (this is a transformer)
feature_assembler = MFT.VectorAssembler(inputCols=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'], outputCol='features')

# create a min max scaler (this is an estimator)
minMax_scaler = MFT.MinMaxScaler(min=0., max=1., inputCol='features', outputCol='features_scaled')

# string indexer (this is a transformer)
labels = ['setosa', 'versicolor', 'virginica']
fromlabelsModel = MFT.StringIndexerModel.from_labels(labels,
                                                     inputCol="species", outputCol="species_indexed",
                                                     handleInvalid="error")

# create a binomial logistic regression model (this is an estimator)
lr = LogisticRegression(featuresCol='features_scaled', labelCol='species_indexed', predictionCol='prediction')

# build the pipeline
pipeline = Pipeline(stages=[feature_assembler, minMax_scaler, fromlabelsModel, lr])

# cache the dataset to ensure deterministic behaviour and ensure fast model fitting
df_multinomial.cache()
train, test = df_multinomial.randomSplit([0.8, 0.2], seed=8)

# fit the model
pipeline_model = pipeline.fit(train)

# apply the model to the test set
results = pipeline_model.transform(test)
results = MFT.IndexToString(inputCol='prediction', outputCol='species_predicted', labels=labels).transform(results)

# compute the confusion matrix manually (for the test set)
confusion_matrix = (results.groupby('species').pivot('species_predicted').count().toPandas()
                    .set_index('species').sort_index(axis='index', ascending=False).sort_index(axis='columns', ascending=False)
                    .fillna(0).astype(int)
                    )
print(confusion_matrix)