# PySpark AWS EMR Notebook

with Eric Wolos

The objective of this project is to demonstrate the ability of an EMR cluster to efficiently solve machine learning problems like the one presented below: predicting the success of a bank telemarketer convinving potential customers to open an account. 

This dataset was found in the UCI Machine Learning Repository.

In this notebook, I will compare Random Forest and GBT Classifiers.

In [1]:
# To demonstrate notebook is running on EMR at AWS us-west-1
sc.getConf().get('spark.driver.host')

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
11,application_1660092954641_0012,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'ip-172-31-30-82.us-west-1.compute.internal'

In [2]:
sc.install_pypi_package('sklearn')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting sklearn
  Using cached sklearn-0.0-py2.py3-none-any.whl
Collecting scikit-learn
  Using cached scikit_learn-0.24.2-cp36-cp36m-manylinux2010_x86_64.whl (22.2 MB)
Collecting joblib>=0.11
  Using cached joblib-1.1.0-py2.py3-none-any.whl (306 kB)
Collecting scipy>=0.19.1
  Using cached scipy-1.5.4-cp36-cp36m-manylinux1_x86_64.whl (25.9 MB)
Collecting threadpoolctl>=2.0.0
  Using cached threadpoolctl-3.1.0-py3-none-any.whl (14 kB)
Installing collected packages: threadpoolctl, scipy, joblib, scikit-learn, sklearn
Successfully installed joblib-1.1.0 scikit-learn-0.24.2 scipy-1.5.4 sklearn-0.0 threadpoolctl-3.1.0

In [3]:
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, Normalizer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import numpy as np

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Load Data from AWS S3 bucket

In [4]:
filePath = "s3://bucket90024/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
data = spark.read \
    .option('sep',';') \
    .option('inferSchema',"True") \
    .csv(f'{filePath}bank-additional-full.csv',header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
data = data.withColumnRenamed('emp.var.rate', 'emp_var_rate').withColumnRenamed('cons.price.idx', 'cons_price_idx').withColumnRenamed('cons.conf.idx','cons_conf_idx').withColumnRenamed('nr.employed','nr_employed')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
data.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['age', 'job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'day_of_week', 'duration', 'campaign', 'pdays', 'previous', 'poutcome', 'emp_var_rate', 'cons_price_idx', 'cons_conf_idx', 'euribor3m', 'nr_employed', 'y']

## Pipeline 

In [8]:
catColList=['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'day_of_week', 'poutcome', 'y']
newCatCols = list(map(lambda x: f'{x}_new', catColList))
featureCols = [col for col in data.columns if col not in catColList]
featureCols.extend(newCatCols)
featureCols.remove('y_new')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
labelsIndexed = []
for col in catColList:
    labelIndexer = StringIndexer(inputCol=col, outputCol=f'{col}_new', handleInvalid = 'keep')
    labelsIndexed.append(labelIndexer)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
inputs = featureCols+newCatCols
inputs.remove('y_new')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
numVector = VectorAssembler(inputCols=inputs, outputCol = 'features')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
normalizer = Normalizer(inputCol='features', outputCol='features_norm', p=1.0)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
(trainingData, testData) = data.randomSplit([0.8, 0.2])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
gbt = GBTClassifier(labelCol="y_new", featuresCol="features_norm")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
rf = RandomForestClassifier(labelCol="y_new", featuresCol="features_norm")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
pipelineGBT = Pipeline(stages=labelsIndexed + [numVector, normalizer, gbt])
pipelineRF = Pipeline(stages=labelsIndexed + [numVector, normalizer, rf])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Model Generation and Evaluation

In [17]:
def fitAndTestModel(pipeline):
    model = pipeline.fit(trainingData)
    prediction = model.transform(testData)
    auROC = BinaryClassificationEvaluator(rawPredictionCol = 'rawPrediction', labelCol = 'y_new').evaluate(prediction)
    return auROC

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The Binary Classification Evaluator returns the area under the ROC curve, a value that ranges from [0.5,1] and helps to reveal the performance of a binary classifier.

For more about the ROC curve: https://developers.google.com/machine-learning/crash-course/classification/roc-and-auc

In [18]:
print(f'GBT auROC: { round(fitAndTestModel(pipelineGBT), 3)}')
print(f'RF auROC: { round(fitAndTestModel(pipelineRF), 3)}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

GBT auROC: 0.947
RF auROC: 0.937