![](https://drive.google.com/uc?export=view&id=17aU5CN_8dGvyZI89lNTvtydVJ6PNZBdU)
***

![](https://drive.google.com/uc?export=view&id=1pWDTauVuTj4nRcjQNSF2fZ-rocpJtecU)

![](https://drive.google.com/uc?id=1YLdS-5ae_WjK4ux7K4dRRKG3CTIkIt40)


# 👋 Help me get to know you

![](https://drive.google.com/uc?id=1kcK7ocsz1ZK6HgtZsAT5ICIND7xn6JUM)

https://xebia.ai/scalable-ml
***

# ✅ Agenda

**This webinar is a preview of a course that is under development**

<br> This course is for you, if:

✔ You have no prior experience with distributed ML, but want to change that?

✔ You are interested in scaling your machine learning workflows across a cluster.

✔ You want to understand the practical challenges involved in scaling machine learning workflows.


## 💻 Code-along

Our trainings are designed to be hands-on and practical. <br> Please click on the link in the chat to access the notebook used for the webinar. <br> If you have questions, please add them to the Q&A, and we will discuss them at the end of the session.

https://colab.research.google.com/drive/1M88L3pb-uGG89pfRDjwA91PskHGl7cec?usp=sharing


# Scaling ML Workflows

As a data scientist, scaling your ML workflows is a crucial aspect of the machine learning model development lifecycle, especially when your data gets too big (memory bound), or your models get too complex (compute bound). 🧰



![](https://drive.google.com/uc?export=view&id=1BR7lg1KWnyessf29bGGI2ZvRCR1TUON6)

## Scaling paradigms

![](https://drive.google.com/uc?export=view&id=18fijm3KbNG8ShWX0StHEeH6BJHRNZTP1)

## Intermezzo: Do you need to scale your ML training? tldr: it depends 📢

&#x21AA; It’s worth emphasizing that not everyone needs distributed model training. Tools like [sampling](https://en.wikipedia.org/wiki/Sampling_(statistics)) can sometimes be effective. Moreover, always plot your [learning curve](https://scikit-learn.org/stable/auto_examples/model_selection/plot_learning_curve.html). However, the same can't be said for the other steps in your ML pipeline.

&#x21AA; e.g., hyperparameter search : tools like [hyperopt](http://hyperopt.github.io/hyperopt/scaleout/spark/) can distribute hyperparameter tuning and parallelize it's search across a cluster.


&#x21AA; e.g., inference : Pandas UDFs allow you to use your favourite libraries (sklearn and pandas) while getting the benefits of parallelization and distribution on a spark cluster.


&#x21AA; That being said, natively distributed tools like SparkML are worth exploring if you are already working in the spark ecosystem (e.g., via a vendor like Databricks).

***

## Apache Spark Ecosystem

Apache Spark is an open-source analytics engine for big data processing and machine learning. <br> The top level ML API abstracts away a lot of the Spark internals.


![](https://drive.google.com/uc?export=view&id=112rxhy7DkmecFWGCClx3J-UfaUULizlZ)
***

## Spark ML Demo: Predicting house prices

Let's now install `pyspark`, a high level python API for Spark.

In [None]:
# Uncomment the line below to install pyspark

#!pip install pyspark

We first need to set up a **`Spark Session`**. <br> A SparkSession is the entry point to interact with the spark internals.

In [None]:
# set up a spark session using pyspark
# the master is set to local[*] to use all available cores

import pyspark

master = 'local[*]'

spark = (
    pyspark.sql.SparkSession.builder
    .master(master)
    .getOrCreate()
)
spark

### Let's now read in the dataset 🏠

In [None]:
from sklearn.datasets import fetch_california_housing

california_df = fetch_california_housing(as_frame=True)
california_sdf = spark.createDataFrame(california_df['frame'])
california_sdf.show(6)

We can look at the description of the (sklearn) dataset

In [None]:
print(california_df.DESCR)

Let's visualize the spatial patterns present in the dataset, who would say no to a house by the beach? 🏖

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

sns.scatterplot(
    data=california_df['frame'],
    x="Longitude",
    y="Latitude",
    size="MedHouseVal",
    hue="MedHouseVal",
    palette="viridis",
    alpha=0.5,
)
plt.legend(title="MedHouseVal")
plt.title("Median house value based on\n their spatial locations");

### Fitting a model 🛠

&#x1F4A1; Spark.ml (by default) expects the **label** or **target** variable to be in a column named `label`. We can use the `withColumnRenamed` method that comes with the DataFrame API to rename our columns.

In [None]:
california_sdf = california_sdf.withColumnRenamed(existing="MedHouseVal", new="label")

Let's now split the data into a training and a test dataset. We can use the `randomSplit` method that comes with the DataFrame API to rename our columns.

In [None]:
train_sdf, test_sdf = california_sdf.randomSplit([0.8, 0.2], seed=707)

&#x1F4A1; Spark.ml expects all features in a **vector**. This can be done using a `VectorAssembler`.

By default a model assumes that the vector is in a column called `features`.

In [None]:
from pyspark.ml.feature import VectorAssembler

feature_cols = ["MedInc", "HouseAge", "AveRooms", "AveBedrms", "Population", "AveOccup"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')

train_sdf_vector = (
    assembler.transform(train_sdf)
    .select(['features', 'label'])
)

test_sdf_vector = (
    assembler.transform(test_sdf)
    .select(['features', 'label'])
)

train_sdf_vector.show(6)

Training our model is now a familiar ```.fit()``` away

In [None]:
from pyspark.ml.regression import LinearRegression

# build the model object
regressor = LinearRegression()

# fit the model object
model = regressor.fit(train_sdf_vector)

What would the model predict for the houses in the test set?

We get predictions with `.transform()`:

In [None]:
predictions = model.transform(test_sdf_vector)
predictions.show(6)

We can evaluate the model using the **`Evaluator`** class

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# build the evaluator object
evaluator = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='rmse')

# evaluate the predictions
rmse = evaluator.evaluate(predictions)

print(f'RMSE: {rmse:.2f}')


## Recap

What did we just do?

&#x2705; Converted a pandas df to a spark df <br>
&#x2705; __Preprocessed__ our data <br>
&#x2705; __Split__ our data into training and test sets with `randomSplit()` <br>
&#x2705; __Assembled__ our data with the `VectorAssembler` <br>
&#x2705; __Initialized__ a model and __trained__ it with `.fit()` <br>
&#x2705; __Predicted__ with `.transform()`

## Exercise: Use other regressors and metrics

&#x1f4d6; Replace the Linear Regressor with other flavours and evaluate the performance. Click [here](https://spark.apache.org/docs/latest/ml-classification-regression.html#regression) to explore the other models.<br>
&#x1f4d6; In addition to the `rmse`, use the `r2` [metric](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.evaluation.RegressionMetrics.html) to evaluate your model. <br>

In [None]:
#your solution here

# Pipelines API


Spark ML exposes a **`Pipeline`** API, which can be used to encapsulate your ML workflows. <br>


&#x1F4A1; For people who have used sklearn: a lot of the concepts are similar to (derived from) those from sklearn.

## Pipeline API Abstractions

Build ML workflows as a chain of Transformers and Estimators.

Major classes:
- __Transformer__: transforms one DataFrame into another DataFrame.
- __Estimator__: fits on a DataFrame and produces a Transformer.

## Transformers

Transform one DataFrame into another.

__Examples__:

- Feature engineering
    - Compute new/derived features, adding extra columns.
    - Assemble features into feature vectors.
- Learned models - predict response for given dataset, adding a new column.

In [None]:
from pyspark.ml import Estimator, Transformer
from pyspark.ml.feature import VectorAssembler

feature_cols = ["MedInc", "HouseAge", "AveRooms", "AveBedrms", "Population", "AveOccup"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')

california_sdf_vector = (
    assembler.transform(california_sdf)
    .select(['features', 'label'])
)

print('assembler is Estimator:', isinstance(assembler, Estimator))
print('assembler is Transformer:', isinstance(assembler, Transformer))

## Estimators

Are fit on a DataFrame and produce a Transformer.

__Examples__:

- Feature engineering
    - Transformations that learn mapping from dataset
    - E.g., QuantileScaler, OneHotEncoderEstimator
- Models - fit model on given dataset, producing a learned model.

For example, a `LinearRegression` Estimator doesn't know how to transform as it has to be fit first:

In [None]:
from pyspark.ml.regression import LinearRegression

regressor = LinearRegression()

print('regressor is Estimator:', isinstance(regressor, Estimator))
print('regressor is Transformer:', isinstance(regressor, Transformer))

After fitting it knows how to predict and a `Transformer` is returned:

In [None]:
fitted_regressor = regressor.fit(california_sdf_vector)

print('fitted_regressor is Estimator:', isinstance(fitted_regressor, Estimator))
print('fitted_regressor is Transformer:', isinstance(fitted_regressor, Transformer))

Once we have a fitted model, making predictions is nothing more than a transform: a DataFrame gets returned after prediction.

For example, calling `transform` on our model returns a `DataFrame`:

In [None]:
transformed = fitted_regressor.transform(california_sdf_vector)
transformed.show(2)

## Pipeline: Glue everything together

- A Pipeline is an Estimator that chains other Tranformers and Estimators.
- Pipelines return a Transformer after fitting.

In [None]:
from pyspark.ml.pipeline import Pipeline

# Initialize assembler and regressor.
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
regressor = LinearRegression()

# Combine steps in a single pipeline.
pipeline = Pipeline(stages=[assembler, regressor])
fitted_pipeline = pipeline.fit(train_sdf)

print('pipeline is Transformer:', isinstance(pipeline, Transformer))
print('fitted_pipeline is Transformer:', isinstance(fitted_pipeline, Transformer))

In [None]:
with_predictions = fitted_pipeline.transform(test_sdf)
with_predictions.select(["features", "label", "prediction"]).show(5)

# Model selection & evaluation

## Model selection

Model selection has to do with:

- finding the best hyper parameters for a model;
- finding the best model from various types;
- and making sure you've validated that properly!

### Cross Validation

Estimate the generalization error of your model by simulating multiple experiments:

<img src="http://i.stack.imgur.com/1fXzJ.png" alt="Drawing" style="width: 800px;"/>

## Hyperparameter tuning

SparkML allows us to distribute hyperparameter tuning when using a SparkML Estimator.


First, we can create a model and a grid to search over:

In [None]:
from pyspark.ml.tuning import ParamGridBuilder

regressor = LinearRegression()
evaluator = RegressionEvaluator(metricName='rmse')

reg_param = [0.001, 0.003, 0.005, 0.008, 0.01]
grid = (
    ParamGridBuilder()
    .addGrid(regressor.regParam, reg_param)
    .build()
)

Next, we create a `CrossValidator` and set the model, evaluator and grid.

`cv_model` now contains our best model based on the RMSE.

In [None]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

cv = CrossValidator(numFolds=4,
                    estimator=regressor,
                    estimatorParamMaps=grid,
                    evaluator=evaluator)

cv_model = cv.fit(train_sdf_vector)

How many models are we training in this example? And what was the optimal value for the `regParam` hyperparameter?

In [None]:
fig, ax = plt.subplots()
ax.plot(reg_param, cv_model.avgMetrics, '-o')
ax.set_xlabel('regParam')
ax.set_ylabel('rmse');

In [None]:
print(cv_model.bestModel.explainParams())

What's the score on the test set?

In [None]:
predictions = cv_model.transform(test_sdf_vector)

evaluator = RegressionEvaluator()
print('rmse: ', evaluator.evaluate(predictions))

# Would you be interested in knowing more about...

✅ scaling up feature engineering

✅ distributed hyperparamter tuning

✅ distributed model inference

✅ scaling single node model workflows

✅ using mlflow in a distributed workflow

✅ and more concepts in our lab sandboxes simulating a real-world prod environment
...

## ... then please contact our training advisor, Rozaliiya


![](https://drive.google.com/uc?id=1csdtbnC3MFa7LvGYw2Zzz7k3UIo0Y7dG)


https://xebia.ai/rozaliia

# Your opinion matters to us

![](https://drive.google.com/uc?id=1kcK7ocsz1ZK6HgtZsAT5ICIND7xn6JUM)

https://xebia.ai/scalable-ml

# Thank you!

💡 We love to share knowledge, check out our [blog](https://xebia.com/blog/category/topics/data-science-and-ai/) to find out what we've been upto!