# Deploying Python ML in PySpark

----

This notebook intends to introduce a PySpark `pandas_udf` function that can be used to **deploy both python ML models and sophisticated pipelines**. With this in mind please excuse the barbaric use of `RandomForestRegressor`. 

In this notebook we deploy sklearn's `RandomForestRegressor` in PySpark. The [Titanic](https://raw.githubusercontent.com/amueller/scipy-2017-sklearn/091d371/notebooks/datasets/titanic3.csv) dataset is used simply for convenience. In the examples below a number of features are used to estimate ticket "fare", we will fit our models/pipelines in pandas and deploy in PySpark. **Please be aware** this notebook uses the `pyspark` package, therefore any user looking to run the cells below will need to install PySpark.

In practice the model used below can be replaced by any other predictive python model, be that a `RandomForestClassifier`, `XGBoost`, `LightGBM` or any other package you care to use with an sklearn like API.

In [None]:
import shutil
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler, OrdinalEncoder, LabelEncoder
from sklearn.compose import ColumnTransformer
import pyspark.sql
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf
from pyspark.sql.types import DoubleType
import pyarrow

In [None]:
# Defining data path, target and features
TITANIC_URL = "https://raw.githubusercontent.com/amueller/scipy-2017-sklearn/091d371/notebooks/datasets/titanic3.csv"
TARGET = "fare"
NUMERICAL_FEATURES = [
    "sibsp",
    "parch",
    "age"
]
CATEGORICAL_FEATURES = [
    "sex",
    "cabin"
]
ALL_FEATURES = NUMERICAL_FEATURES + CATEGORICAL_FEATURES

In [None]:
# It is necessary for us to set a SparkSession
for dir in ["metastore_db", "derby.log", ".cache"]:
    try:
        shutil.rmtree(dir)
    except OSError:
        pass

spark = (SparkSession.builder
         .master("local[2]")
         .appName("sklearn-deploy")
         .config("spark.ui.enabled", "false")
         .getOrCreate()
         )

In [None]:
# Read the df, select relevant columns and drop any NaNs
df = (
    pd.read_csv(TITANIC_URL)[NUMERICAL_FEATURES + CATEGORICAL_FEATURES + [TARGET]]
    .dropna()
)

for num_feat in NUMERICAL_FEATURES:
    df[num_feat] = df[num_feat].astype(float)

In [None]:
# We have a number of numerical features: sibsp, parch and age.
# And a two categorical features: cabin and sex
# We will use those features to predict fare
df.head()

In [None]:
# In order to deploy our python model in PySpark we need a PySpark DataFrame
ddf = spark.createDataFrame(df)
ddf.show(5)

# (i) Deploying a simple Random Forest
----

To keep things simple we will start by using our predefined `NUMERICAL_FEATURES` to predict "fare". The cell below fits the model to all the data, in practice this is not advisable. Our goal is simply to create an object that is capable of making predictions, in the context of this quality of those predictions is of no interest.

`spark_predict` is used to deploy our model in PySpark. The function is a wrapper around a `pandas_udf`, a wrapper is used to enable a python ml model to be passed to the `pandas_udf`. The function is based on the excellent blog post ["Prediction at Scale with scikit-learn and PySpark Pandas UDFs"](https://medium.com/civis-analytics/prediction-at-scale-with-scikit-learn-and-pyspark-pandas-udfs-51d5ebfb2cd8) written by **Michael Heilman**.

In [None]:
def spark_predict(model, cols) -> pyspark.sql.column:
    """This function deploys python ml in PySpark using the `predict` method of the `model` parameter.
    
    Args:
        model: python ml model with sklearn API
        cols (list-like): Features used for predictions, required to be present as columns in the spark 
            DataFrame used to make predictions.
    """
    @sf.pandas_udf(returnType=DoubleType())
    def predict_pandas_udf(*cols):
        X = pd.concat(cols, axis=1)
        return pd.Series(model.predict(X))
    
    return predict_pandas_udf(*cols)

   
    

In [None]:
rf = RandomForestRegressor()
rf = rf.fit(df[NUMERICAL_FEATURES], df[TARGET])

In [None]:
# Let's make some predictions for comparison against our PySpark predictions.
rf.predict(df[NUMERICAL_FEATURES])[:5]

In [None]:
# Here we deploy our model in PySpark using our previously defined `spark_predict`.
# Upon looking at the DataFrame printed below we can see that the predictions in PySpark are same as made in python
(
    ddf
    .select(NUMERICAL_FEATURES + [TARGET])
    .withColumn("prediction", spark_predict(rf, NUMERICAL_FEATURES).alias("prediction"))
    .show(5)
)

# (ii) Deploying a Pipeline with Feature Scaling
----

It is common practice to scale numerical features, so in the example below we make things a little more interesting by scaling our `NUMERICAL_FEATURES` before fitting our model and making predictions. Feature scaling is performed using sklearn's `Pipeline`.

In [None]:
# Construct and fit a `Pipeline` to our Titanic dataset
pipe = Pipeline(steps=[("scaler", MinMaxScaler()), ("predictor", RandomForestRegressor())])
pipe = pipe.fit(df[NUMERICAL_FEATURES], df[TARGET])

In [None]:
# Again let's make some predictions for comparison against our PySpark predictions.
pipe.predict(df[NUMERICAL_FEATURES])[:5]

In [None]:
# Model deployment in PySpark using our `spark_predict` function
(
    ddf
    .select(NUMERICAL_FEATURES + [TARGET])
    .withColumn("pipe_predict", spark_predict(pipe, NUMERICAL_FEATURES).alias("prediction")).show(5)
)

# (iii) Deploying a Pipeline with Mixed Feature Types
----

It is not uncommon to use both categorical and numerical features in an ML model. In the next example I demonstrate how we can build an sklearn `Pipeline` capable of encoding categorical features and scaling numerical features. This pipeline is then deployed in PySpark.

In [None]:
# We create the preprocessing pipelines for both numeric and categorical data
categorical_transformer = Pipeline(steps=[("encoder", OrdinalEncoder())])
numerical_transformer = Pipeline(steps=[("scaler", MinMaxScaler())])

preprocessor = ColumnTransformer(
    transformers=[
        ("cat", categorical_transformer, [3, 4]),
        ("num", numerical_transformer, [0, 1, 2])]
)

# Append random forest to preprocessing pipeline. We now have a full prediction pipeline.
preprocessor_pipe = Pipeline(steps=[("preprocessor", preprocessor), ("predictor", RandomForestRegressor())])
preprocessor_pipe = preprocessor_pipe.fit(df[ALL_FEATURES], df[TARGET])

In [None]:
# Again let's make some predictions to compare our PySpark deployment against
preprocessor_pipe.predict(df[ALL_FEATURES])[:5]

In [None]:
# Again let's deploy our pipeline in PySpark using our `spark_predict` function
(
    ddf
    .select(ALL_FEATURES + [TARGET])
    .withColumn("pipe_predict", spark_predict(preprocessor_pipe, ALL_FEATURES).alias("prediction"))
    .show(5)
)

# Summary
----
The `spark_predict` function defined in this notebook is a versatile solution to python ml deployment in PySpark. 
We have demonstrated it's use in three **deployment** examples:
- Deploying a RandomForestRegressor in PySpark
- Deployment of ML Pipeline that scales numerical features
- Deployment of ML Pipeline that is capable of preprocessing mixed feature types