
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>




# Training with Pandas Function API

This notebook demonstrates how to use Pandas Function API to manage and scale machine learning models for IoT devices. 

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Learning Objectives:<br>

By the end of this lesson, you should be able to;

* Defines a pandas functions and apply it to a model
* Use <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.GroupedData.applyInPandas.html" target="_blank"> **.groupBy().applyInPandas()** </a> to build many models in parallel
* Serve multiple models from a registered model

## 📌 Requirements

**Required Databricks Runtime Version:** 
* Please note that in order to run this notebook, you must use one of the following Databricks Runtime(s): **12.2.x-cpu-ml-scala2.12**

## Lesson Setup

The first thing we're going to do is to **run setup script**. This script will define the required configuration variables that are scoped to each user.

In [0]:
%run ./Includes/Classroom-Setup

Python interpreter will be restarted.
Python interpreter will be restarted.


Resetting the learning environment:
| No action taken

Skipping install of existing datasets to "dbfs:/mnt/dbacademy-datasets/scalable-machine-learning-with-apache-spark/v02"

Validating the locally installed datasets:
| listing local files...(2 seconds)
| validation completed...(2 seconds total)

Creating & using the schema "charlie_ohara_4mi2_da_sml" in the catalog "hive_metastore"...(1 seconds)

Predefined tables in "charlie_ohara_4mi2_da_sml":
| -none-

Predefined paths variables:
| DA.paths.working_dir: dbfs:/mnt/dbacademy-users/charlie.ohara@standard.ai/scalable-machine-learning-with-apache-spark
| DA.paths.user_db:     dbfs:/mnt/dbacademy-users/charlie.ohara@standard.ai/scalable-machine-learning-with-apache-spark/database.db
| DA.paths.datasets:    dbfs:/mnt/dbacademy-datasets/scalable-machine-learning-with-apache-spark/v02

Setup completed (7 seconds)



## Create a Spark DataFrame

Create dummy data with:
- **`device_id`**: 10 different devices
- **`record_id`**: 10k unique records
- **`feature_1`**: a feature for model training
- **`feature_2`**: a feature for model training
- **`feature_3`**: a feature for model training
- **`label`**: the variable we're trying to predict

In [0]:
import pyspark.sql.functions as f

# create sythetic aka BS data 
df = (spark
      .range(1000*100)
      .select(f.col("id").alias("record_id"), (f.col("id")%10).alias("device_id"))
      .withColumn("feature_1", f.rand() * 1)
      .withColumn("feature_2", f.rand() * 2)
      .withColumn("feature_3", f.rand() * 3)
      .withColumn("label", (f.col("feature_1") + f.col("feature_2") + f.col("feature_3")) + f.rand())
     )

display(df)

record_id,device_id,feature_1,feature_2,feature_3,label
0,0,0.5913269812644489,1.112624479962482,0.9383567332587058,3.599200876731657
1,1,0.2472129573119317,1.339393994322151,0.3635448965751505,2.8284911486508206
2,2,0.0524923268142902,1.2435866204951875,1.48153079167651,2.899007213485999
3,3,0.3645876131568811,0.583744962019102,0.7329966016597367,2.165171666135468
4,4,0.552657693182898,1.7127538422257746,1.446013545423248,4.382039674422539
5,5,0.2731973411636208,1.2702604513854396,0.6874138777647782,2.395843585005227
6,6,0.004520761335917,1.2435503189591823,0.309191731389813,2.0764288822250303
7,7,0.8191065070385846,0.3495661086740078,2.210135357499151,4.005836631215531
8,8,0.9984137037595776,0.8510422700126778,2.538979499547241,5.233024923012909
9,9,0.2480896138905483,1.2971139690365017,1.9046794438060035,4.367748918180926





Define the return schema

In [0]:
train_return_schema = "device_id integer, n_used integer, model_path string, mse float"


## Define a *pandas* Function

Define a pandas function that takes all the data for a given device, train a model, saves it as a nested run, and returns a spark object with the above schema

In [0]:
import mlflow
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

def train_model(df_pandas: pd.DataFrame) -> pd.DataFrame:
    """
    Trains an sklearn model on grouped instances
    """
    # Pull metadata
    device_id = df_pandas["device_id"].iloc[0]
    n_used = df_pandas.shape[0]
    run_id = df_pandas["run_id"].iloc[0] # Pulls run ID to do a nested run

    # Train the model
    X = df_pandas[["feature_1", "feature_2", "feature_3"]]
    y = df_pandas["label"]
    rf = RandomForestRegressor()
    rf.fit(X, y)

    # Evaluate the model
    predictions = rf.predict(X)
    mse = mean_squared_error(y, predictions) # Note we could add a train/test split

    # Resume the top-level training
    with mlflow.start_run(run_id=run_id) as outer_run:
        # Small hack for running as a job
        experiment_id = outer_run.info.experiment_id
        print(f"Current experiment_id = {experiment_id}")

        # Create a nested run for the specific device
        with mlflow.start_run(run_name=str(device_id), nested=True, experiment_id=experiment_id) as run:
            mlflow.sklearn.log_model(rf, str(device_id))
            mlflow.log_metric("mse", mse)
            mlflow.set_tag("device", str(device_id))

            artifact_uri = f"runs:/{run.info.run_id}/{device_id}"
            # Create a return pandas DataFrame that matches the schema above
            return_df = pd.DataFrame([[device_id, n_used, artifact_uri, mse]], 
                                    columns=["device_id", "n_used", "model_path", "mse"])

    return return_df 





## Apply the *pandas* Function

Apply the pandas function to grouped data. 

Note that the way you would apply this in practice depends largely on where the data for inference is located. In this example, we'll reuse the training data which contains our device and run id's.

In [0]:
with mlflow.start_run(run_name="Training session for all devices") as run:
    run_id = run.info.run_id

    model_directories_df = (df
        .withColumn("run_id", f.lit(run_id)) # Add run_id
        .groupby("device_id")
        .applyInPandas(train_model, schema=train_return_schema)
        .cache()
    )

combined_df = df.join(model_directories_df, on="device_id", how="left")
display(combined_df)

device_id,record_id,feature_1,feature_2,feature_3,label,n_used,model_path,mse
0,0,0.5913269812644489,1.112624479962482,0.9383567332587058,3.599200876731657,10000,runs:/af779da7d0f449a59df513faef788148/0,0.013517693
1,1,0.2472129573119317,1.339393994322151,0.3635448965751505,2.8284911486508206,10000,runs:/7ba333312f55447a8965b47c3bb986fa/1,0.013608545
2,2,0.0524923268142902,1.2435866204951875,1.48153079167651,2.899007213485999,10000,runs:/f4865b6c49364f518a9d289e5b91da10/2,0.013606665
3,3,0.3645876131568811,0.583744962019102,0.7329966016597367,2.165171666135468,10000,runs:/f1924115e5594d79862270d344b6d7b9/3,0.013553851
4,4,0.552657693182898,1.7127538422257746,1.446013545423248,4.382039674422539,10000,runs:/92995879b7464e5681125569b684f62c/4,0.013987291
5,5,0.2731973411636208,1.2702604513854396,0.6874138777647782,2.395843585005227,10000,runs:/4fa2dac857d1408caf87f270365bf208/5,0.013939485
6,6,0.004520761335917,1.2435503189591823,0.309191731389813,2.0764288822250303,10000,runs:/659b376e2f0a401d879dd7ba3c227200/6,0.013189818
7,7,0.8191065070385846,0.3495661086740078,2.210135357499151,4.005836631215531,10000,runs:/36450efbf22e47be900804cdc1159314/7,0.0134388
8,8,0.9984137037595776,0.8510422700126778,2.538979499547241,5.233024923012909,10000,runs:/ec80991be2b944d4af946ad87a64d4ee/8,0.013699838
9,9,0.2480896138905483,1.2971139690365017,1.9046794438060035,4.367748918180926,10000,runs:/37e991ce1dca4c429fab04150452a33c/9,0.013551955





Define a pandas function and return schema to apply the model.  *This needs only one read from DBFS per device.*

In [0]:
apply_return_schema = "record_id integer, prediction float"

def apply_model(df_pandas: pd.DataFrame) -> pd.DataFrame:
    """
    Applies model to data for a particular device, represented as a pandas DataFrame
    """
    model_path = df_pandas["model_path"].iloc[0]

    input_columns = ["feature_1", "feature_2", "feature_3"]
    X = df_pandas[input_columns]

    model = mlflow.sklearn.load_model(model_path)
    prediction = model.predict(X)

    return_df = pd.DataFrame({
        "record_id": df_pandas["record_id"],
        "prediction": prediction
    })
    return return_df

prediction_df = combined_df.groupby("device_id").applyInPandas(apply_model, schema=apply_return_schema)
display(prediction_df)

record_id,prediction
0,3.3567488
10,3.1955097
20,4.3956113
30,4.244173
40,2.777165
50,5.8057604
60,4.0739546
70,4.0036645
80,4.7343607
90,1.92923





## Serving Multiple Models from a Registered Model

MLflow allows models to deploy as real-time REST APIs. At the moment, a single MLflow model serves from one instance (typically one VM). However, sometimes multiple models need to be served from a single endpoint. Imagine 1000 similar models that need to be served with different inputs. Running 1000 endpoints could waste resources, especially if certain models are underutilized.

One way around this is to package many models into a single custom model, which internally routes to one of the models based on the input and deploys that 'bundle' of models as a single 'model.'

Below we demonstrate creating such a custom model that bundles all of the models we trained for each device. For every row of data fed to this model, the model will determine the device id and then use the appropriate model trained on that device id to make predictions for a given row. 

First, we need to access the models for each device id.

In [0]:
experiment_id = run.info.experiment_id

model_df = (spark.read.format("mlflow-experiment")
            .load(experiment_id)
            .filter("tags.device IS NOT NULL")
            .orderBy("end_time", ascending=False)
            .select("tags.device", "run_id")
            .limit(10))

display(model_df)

device,run_id
2,f4865b6c49364f518a9d289e5b91da10
4,92995879b7464e5681125569b684f62c
8,ec80991be2b944d4af946ad87a64d4ee
5,4fa2dac857d1408caf87f270365bf208
1,7ba333312f55447a8965b47c3bb986fa
3,f1924115e5594d79862270d344b6d7b9
9,37e991ce1dca4c429fab04150452a33c
6,659b376e2f0a401d879dd7ba3c227200
0,af779da7d0f449a59df513faef788148
7,36450efbf22e47be900804cdc1159314





We create a dictionary mapping device ids to the model trained on that device id.

In [0]:
device_to_model = {row["device"]: mlflow.sklearn.load_model(f"runs:/{row['run_id']}/{row['device']}") for row in model_df.collect()}
                                                          
device_to_model

Out[11]: {'2': RandomForestRegressor(),
 '4': RandomForestRegressor(),
 '8': RandomForestRegressor(),
 '5': RandomForestRegressor(),
 '1': RandomForestRegressor(),
 '3': RandomForestRegressor(),
 '9': RandomForestRegressor(),
 '6': RandomForestRegressor(),
 '0': RandomForestRegressor(),
 '7': RandomForestRegressor()}




We create a custom model that takes the device id to model mappings as an attribute and delegates input to the appropriate model based on the device id.

In [0]:
from mlflow.pyfunc import PythonModel

class OriginDelegatingModel(PythonModel):
    
    def __init__(self, device_to_model_map):
        self.device_to_model_map = device_to_model_map
        
    def predict_for_device(self, row):
        '''
        This method applies to a single row of data by
        fetching the appropriate model and generating predictions
        '''
        model = self.device_to_model_map.get(str(row["device_id"]))
        data = row[["feature_1", "feature_2", "feature_3"]].to_frame().T
        return model.predict(data)[0]
    
    def predict(self, model_input):
        return model_input.apply(self.predict_for_device, axis=1)




Here we demonstrate the use of this model.

In [0]:
example_model = OriginDelegatingModel(device_to_model)
example_model.predict(combined_df.toPandas().head(20))

Out[13]: 0     3.356749
1     2.785805
2     3.108209
3     2.143637
4     4.388737
5     2.559558
6     2.022091
7     3.901708
8     4.986222
9     4.193384
10    3.195510
11    4.470499
12    2.669877
13    1.439910
14    4.875395
15    4.658165
16    3.996539
17    2.561680
18    2.754425
19    4.023048
dtype: float64




From here we can log and then register the model to be used for serving models for all the device ids from one instance.

In [0]:
with mlflow.start_run():
    model = OriginDelegatingModel(device_to_model)
    mlflow.pyfunc.log_model("model", python_model=model)




## Classroom Cleanup

Run the following cell to remove lessons-specific assets created during this lesson:

In [0]:
DA.cleanup()

Resetting the learning environment:
| dropping the schema "charlie_ohara_4mi2_da_sml"...(1 seconds)
| removing the working directory "dbfs:/mnt/dbacademy-users/charlie.ohara@standard.ai/scalable-machine-learning-with-apache-spark"...(0 seconds)

Validating the locally installed datasets:
| listing local files...(3 seconds)
| validation completed...(3 seconds total)


&copy; 2023 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>