<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Objectives" data-toc-modified-id="Objectives-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Objectives</a></span></li><li><span><a href="#Set-Up-Spark-Context" data-toc-modified-id="Set-Up-Spark-Context-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Set Up Spark Context</a></span></li><li><span><a href="#Loading-and-Preprocessing-the-Example-Data" data-toc-modified-id="Loading-and-Preprocessing-the-Example-Data-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Loading and Preprocessing the Example Data</a></span><ul class="toc-item"><li><span><a href="#Process-the-Features" data-toc-modified-id="Process-the-Features-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span>Process the Features</a></span></li></ul></li><li><span><a href="#Train-and-Predict-with-Random-Forest" data-toc-modified-id="Train-and-Predict-with-Random-Forest-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Train and Predict with Random Forest</a></span></li><li><span><a href="#Evaluate-the-Model" data-toc-modified-id="Evaluate-the-Model-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Evaluate the Model</a></span></li><li><span><a href="#Using-Pipeline-and-Performing-a-Grid-Search-for-Optimal-Parameters" data-toc-modified-id="Using-Pipeline-and-Performing-a-Grid-Search-for-Optimal-Parameters-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Using Pipeline and Performing a Grid Search for Optimal Parameters</a></span><ul class="toc-item"><li><span><a href="#Evaluate-with-Cross-Validation-to-Find-Optimal-Model" data-toc-modified-id="Evaluate-with-Cross-Validation-to-Find-Optimal-Model-6.1"><span class="toc-item-num">6.1&nbsp;&nbsp;</span>Evaluate with Cross Validation to Find Optimal Model</a></span></li></ul></li></ul></div>

<a href="https://colab.research.google.com/github/flatiron-school/DS-Live-022122/blob/main/Phase4/62-spark-ml.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Run for Google Colab environment

!pip install mlflow

Collecting mlflow
  Downloading mlflow-1.28.0-py3-none-any.whl (17.0 MB)
[K     |████████████████████████████████| 17.0 MB 13.0 MB/s eta 0:00:01
[?25hCollecting prometheus-flask-exporter<1
  Downloading prometheus_flask_exporter-0.20.3-py3-none-any.whl (18 kB)
Collecting importlib-metadata!=4.7.0,<5,>=3.7.0
  Downloading importlib_metadata-4.12.0-py3-none-any.whl (21 kB)
Collecting Flask<3
  Downloading Flask-2.2.2-py3-none-any.whl (101 kB)
[K     |████████████████████████████████| 101 kB 14.8 MB/s ta 0:00:01
Collecting querystring-parser<2
  Downloading querystring_parser-1.2.4-py2.py3-none-any.whl (7.9 kB)
Collecting gitpython<4,>=2.1.0
  Downloading GitPython-3.1.27-py3-none-any.whl (181 kB)
[K     |████████████████████████████████| 181 kB 19.4 MB/s eta 0:00:01
[?25hCollecting cloudpickle<3
  Downloading cloudpickle-2.1.0-py3-none-any.whl (25 kB)
Collecting alembic<2
  Downloading alembic-1.8.1-py3-none-any.whl (209 kB)
[K     |████████████████████████████████| 209 kB 134.4 MB

  Attempting uninstall: importlib-metadata
    Found existing installation: importlib-metadata 2.0.0
    Uninstalling importlib-metadata-2.0.0:
      Successfully uninstalled importlib-metadata-2.0.0
  Attempting uninstall: sqlalchemy
    Found existing installation: SQLAlchemy 1.3.20
    Uninstalling SQLAlchemy-1.3.20:
      Successfully uninstalled SQLAlchemy-1.3.20
[31mERROR: After October 2020 you may experience errors when installing or updating packages. This is because pip will change the way that it resolves dependency conflicts.

We recommend you use --use-feature=2020-resolver to test your packages with the new resolver before it becomes the default.

werkzeug 2.2.2 requires MarkupSafe>=2.1.1, but you'll have markupsafe 2.0.1 which is incompatible.
pandas-profiling 3.2.0 requires markupsafe~=2.1.1, but you'll have markupsafe 2.0.1 which is incompatible.
flask 2.2.2 requires click>=8.0, but you'll have click 7.1.2 which is incompatible.[0m
Successfully installed Flask-2.2.2 

In [2]:
import pyspark
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import feature
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

In [None]:
# Get data directly from repo
!wget https://github.com/flatiron-school/ds-spark/releases/download/v1.0/US_births_2000-2014_SSA.csv

# Objectives

- Use `pyspark` to build machine learning models

# Set Up Spark Context

In [None]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# Loading and Preprocessing the Example Data

This example assumes that we have a holdout validation dataset somewhere else, so we don't need to perform a train-test split, we only need to perform cross validation

In [None]:
# Load the file since we downloaded it earlie
df = spark.read.format('csv').option('header', 'true').\
load('US_births_2000-2014_SSA.csv')

In [None]:
df.toPandas().head(3)

## Process the Features

In [None]:
# Fix Types

In [None]:
# OHE!

Note the 'SparseVector' we've created!

The Vector Assembler is often what we want when we're building a model in Spark. [How does the VectorAssembler work?](https://spark.apache.org/docs/2.1.0/ml-features.html#vectorassembler)

In [None]:
# Create the vector

# Train and Predict with Random Forest

In [None]:
# Instantiante

In [None]:
# Predictions

# Evaluate the Model

Let's evaluate our model! [Here](https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html) is a reference for the many metrics available in Spark.

In [None]:
# Create it

In [None]:
# Evaluate it!

# Using Pipeline and Performing a Grid Search for Optimal Parameters

In [None]:
# Instantiante and create steps
one_hot_encoder = OneHotEncoder(inputCols=['date_of_month',
                                                'day_of_week'],
                                     outputCols=['date_vec',
                                                  'day_vec'],
                                     dropLast=True)

vector_assember = VectorAssembler(inputCols=features,
                                  outputCol='features')

random_forest = RandomForestRegressor(featuresCol='features',
                                      labelCol='births')

In [None]:
# Create Pipeline stages

In [None]:
#Instantiate pipeline

Note: The stages in a pipeline can be either *Transformers* or *Estimators*. An estimator fits a DataFrame to produce a Transformer.

In [None]:
# Get possible params

In [None]:
# Build parameter grid

In [None]:
# Build Evaluator

## Evaluate with Cross Validation to Find Optimal Model

In [None]:
# Cross Validatate!