<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Objectives" data-toc-modified-id="Objectives-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Objectives</a></span></li><li><span><a href="#Set-Up-Spark-Context" data-toc-modified-id="Set-Up-Spark-Context-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Set Up Spark Context</a></span></li><li><span><a href="#Loading-and-Preprocessing-the-Example-Data" data-toc-modified-id="Loading-and-Preprocessing-the-Example-Data-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Loading and Preprocessing the Example Data</a></span><ul class="toc-item"><li><span><a href="#Process-the-Features" data-toc-modified-id="Process-the-Features-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span>Process the Features</a></span></li></ul></li><li><span><a href="#Train-and-Predict-with-Random-Forest" data-toc-modified-id="Train-and-Predict-with-Random-Forest-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Train and Predict with Random Forest</a></span></li><li><span><a href="#Evaluate-the-Model" data-toc-modified-id="Evaluate-the-Model-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Evaluate the Model</a></span></li><li><span><a href="#Using-Pipeline-and-Performing-a-Grid-Search-for-Optimal-Parameters" data-toc-modified-id="Using-Pipeline-and-Performing-a-Grid-Search-for-Optimal-Parameters-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Using Pipeline and Performing a Grid Search for Optimal Parameters</a></span><ul class="toc-item"><li><span><a href="#Evaluate-with-Cross-Validation-to-Find-Optimal-Model" data-toc-modified-id="Evaluate-with-Cross-Validation-to-Find-Optimal-Model-6.1"><span class="toc-item-num">6.1&nbsp;&nbsp;</span>Evaluate with Cross Validation to Find Optimal Model</a></span></li></ul></li></ul></div>

<a href="https://colab.research.google.com/github/flatiron-school/ds-spark/blob/main/spark-ml.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# Run for Google Colab environment
!pip install pyspark
!apt install openjdk-8-jdk-headless -qq
!pip install mlflow

Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m570.0 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.9.5
[1;31mE: [0mCould not open lock file /var/lib/dpkg/lock-frontend - open (13: Permission denied)[0m
[1;31mE: [0mUnable to acquire the dpkg frontend lock (/var/lib/dpkg/lock-frontend), are you root?[0m
Collecting mlflow
  Downloading mlflow-1.30.0-py3-none-any.whl (17.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.0/17.0 MB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting Flask<3
  Downloading Flask-2.2.2-py3-none-any.whl (101 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m101.5/101.5 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting gitpython<4,>=2.1.0
  Downloading GitPython-3.1.29-py3-none-any.wh

In [3]:
import pyspark
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import feature
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

In [4]:
# Get data directly from repo
!wget https://github.com/flatiron-school/ds-spark/releases/download/v1.0/US_births_2000-2014_SSA.csv

--2022-10-31 04:37:36--  https://github.com/flatiron-school/ds-spark/releases/download/v1.0/US_births_2000-2014_SSA.csv
Resolving github.com (github.com)... 192.30.255.112
Connecting to github.com (github.com)|192.30.255.112|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/379727666/12461180-d431-11eb-8163-e15e52afc9a9?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20221031%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20221031T043736Z&X-Amz-Expires=300&X-Amz-Signature=820171703b8074b5f30e06c1806248d522dc5e0d739c6d9d296f066730af8000&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=379727666&response-content-disposition=attachment%3B%20filename%3DUS_births_2000-2014_SSA.csv&response-content-type=application%2Foctet-stream [following]
--2022-10-31 04:37:36--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/379727666/12461180-d4

# Objectives

- Use `pyspark` to build machine learning models

# Set Up Spark Context

In [5]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# Loading and Preprocessing the Example Data

This example assumes that we have a holdout validation dataset somewhere else, so we don't need to perform a train-test split, we only need to perform cross validation

In [6]:
# Load the file since we downloaded it earlie
df = spark.read.format('csv').option('header', 'true').\
load('US_births_2000-2014_SSA.csv')

In [7]:
df.toPandas().head(3)

Unnamed: 0,year,month,date_of_month,day_of_week,births
0,2000,1,1,6,9083
1,2000,1,2,7,8006
2,2000,1,3,1,11363


## Process the Features

In [8]:
df.dtypes

[('year', 'string'),
 ('month', 'string'),
 ('date_of_month', 'string'),
 ('day_of_week', 'string'),
 ('births', 'string')]

In [9]:
df = df.withColumn('births', df['births'].cast('int'))
df = df.withColumn('day_of_week', df['day_of_week'].cast('int'))
df = df.withColumn('date_of_month', df['date_of_month'].cast('int'))
df = df.withColumn('month', df['month'].cast('int'))
df = df.withColumn('year', df['year'].cast('int'))

In [10]:
ohe = feature.OneHotEncoder(inputCols=['date_of_month',
                                                'day_of_week'],
                                     outputCols=['date_vec',
                                                  'day_vec'],
                                     dropLast=True)
one_hot_encoded = ohe.fit(df).transform(df)
one_hot_encoded.head()

Row(year=2000, month=1, date_of_month=1, day_of_week=6, births=9083, date_vec=SparseVector(31, {1: 1.0}), day_vec=SparseVector(7, {6: 1.0}))

Note the 'SparseVector' we've created!

In [11]:
features = ['year', 'month', 'date_of_month', 'day_of_week']

target = 'births'

vector = VectorAssembler(inputCols=features, outputCol='features')
vectorized_df = vector.transform(one_hot_encoded)

The Vector Assembler is often what we want when we're building a model in Spark. [How does the VectorAssembler work?](https://spark.apache.org/docs/2.1.0/ml-features.html#vectorassembler)

In [12]:
vectorized_df.columns

['year',
 'month',
 'date_of_month',
 'day_of_week',
 'births',
 'date_vec',
 'day_vec',
 'features']

# Train and Predict with Random Forest

In [13]:
rf_model = RandomForestRegressor(featuresCol='features',
                                 labelCol='births',
                                 predictionCol="prediction").fit(vectorized_df)

In [14]:
predictions = rf_model.transform(vectorized_df).select("births", "prediction")
predictions.head(3)

[Row(births=9083, prediction=8503.872001591328),
 Row(births=8006, prediction=7886.311287850204),
 Row(births=11363, prediction=11577.272845643252)]

# Evaluate the Model

Let's evaluate our model! [Here](https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html) is a reference for the many metrics available in Spark.

In [15]:
from pyspark.ml.evaluation import RegressionEvaluator

In [16]:
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='births')

evaluator.evaluate(predictions, {evaluator.metricName:"r2"})

0.8842592605849672

In [17]:
evaluator.evaluate(predictions, {evaluator.metricName:"mae"})

494.42298907920986

# Using Pipeline and Performing a Grid Search for Optimal Parameters

In [18]:
one_hot_encoder = OneHotEncoder(inputCols=['date_of_month',
                                                'day_of_week'],
                                     outputCols=['date_vec',
                                                  'day_vec'],
                                     dropLast=True)
vector_assember = VectorAssembler(inputCols=features,
                                  outputCol='features')
random_forest = RandomForestRegressor(featuresCol='features',
                                      labelCol='births')
stages = [one_hot_encoder, vector_assember, random_forest]

pipeline = Pipeline(stages=stages)

Note: The stages in a pipeline can be either *Transformers* or *Estimators*. An estimator fits a DataFrame to produce a Transformer.

In [19]:
random_forest.params

[Param(parent='RandomForestRegressor_bbaa171aebcb', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'),
 Param(parent='RandomForestRegressor_bbaa171aebcb', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'),
 Param(parent='RandomForestRegressor_bbaa171aebcb', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'),
 Param(parent='RandomForestRegressor_bbaa171aebcb', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto'

In [20]:
params = ParamGridBuilder().addGrid(random_forest.numTrees, [20,50,100]).build()

In [21]:
reg_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='births',
                                    metricName = 'mae')

## Evaluate with Cross Validation to Find Optimal Model

In [22]:
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=params,
    evaluator=reg_evaluator,
    parallelism=4
)

In [23]:
cross_validated_model = cv.fit(df.cache())

In [24]:
cross_validated_model.avgMetrics

[468.2125503112479, 446.04567076367033, 434.6522353390137]

In [25]:
cross_validated_model.bestModel

PipelineModel_2f1d461a67f2

In [26]:
cross_validated_model.bestModel.stages

[OneHotEncoderModel: uid=OneHotEncoder_1f86f7530607, dropLast=true, handleInvalid=error, numInputCols=2, numOutputCols=2,
 VectorAssembler_40f43b614899,
 RandomForestRegressionModel: uid=RandomForestRegressor_bbaa171aebcb, numTrees=100, numFeatures=4]

In [27]:
cross_validated_model.bestModel.stages[2].getNumTrees

100

In [28]:
sc.stop()

In [29]:
spark.stop()