# End to end ML workflow with Databricks

This notebook illustrates and end-to-end data science workflow using features of both DataRobot and Databricks. You will leverage DataRobot for model training, selection, and MLOps monitoring while using Databricks to facilitate datasource access and utilize the Spark backbone to perform distributed scoring to support large-scale use cases.

This notebook covers the following steps:
- Acquiring a training dataset from a data table
- Building a new DataRobot project
- Deploying a recommended model
- Scoring via Spark using DataRobot's exportable Java scoring code
- Scoring via prediction API
- Reporting monitoring data to DataRobot's MLOps agent framework
- Writing results back to a new table

Prior to execution, you need to install a few dependencies to the Databricks cluster:
- **datarobot**, provided via PyPI (Python library used to communicate with the DataRobot platform)
- **com.datarobot:datarobot-prediction:2.2.1**, provided via Maven Central (Java library used to establish interface with DataRobot scoring code)
- **com.datarobot:scoring-code-spark-api_3.0.0:0.0.4**, provided via Maven Central (Java library used to wrap scoring code with Spark functionality)
- **mlops_utils_for_spark_3_2_0_8_1_0-4c992.jar**, provided via downloadable MLOps package which is available on the Developer Tools page in the DataRobot UI (Java library used to report monitoring statistics to MLOps Agent)

## Setup

### Import libraries

The first cell of the notebook imports necessary packages, and sets up the connection to the DataRobot platform. There are also optional values that can be provided to use an existing project and deployment - if they are omitted then a new Autopilot session will be kicked off and a new deployment will be created using DataRobot's recommended model.

In [None]:
import datarobot as dr
from io import StringIO
import pandas as pd
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
import requests
import time

api_key = "" # Get this from the Developer Tools page in the DataRobot UI
endpoint = "https://app.datarobot.com/" # This should be the URL you use to access the DataRobot UI

client = dr.Client(
    token=api_key, 
    endpoint=endpoint,
    user_agent_suffix='AIA-E2E-DBX-8' #Optional but helps DataRobot improve this workflow
)

dr.client._global_client = client

# Set these to empty strings to create a new project and/or deployment
project_id = ""
deployment_id = ""

### Connect to DataRobot

In [2]:
dr.Client()
# The `config_path` should only be specified if the config file is not in the default location described in the API Quickstart guide
# dr.Client(config_path = 'path-to-drconfig.yaml')

<datarobot.rest.RESTClientObject at 0x7fb7a067f940>

Read more about different options for [connecting to DataRobot from the client](https://docs.datarobot.com/en/docs/api/api-quickstart/api-qs.html).

### Import data

Here you'll pull in some data to work with. If a data table is available, you can provide the input table name, destination table name, and target feature in this cell. If none of those are provided, load the sample dataset provided by Databricks. This is also where any necessary data preparation would occur before sending the dataset to DataRobot. Note that DataRobot does not currently ingest Spark dataframes directly, so the dataframe will need to be converted to a Pandas dataframe prior to upload.

In [None]:
training_table = ""
scoring_table = ""
target = ""

if training_table == "":
    scoring_table = "white_wine_scored"
    target = "quality"
    input_df = spark.read.option("header",True).option("delimiter",";").csv("dbfs:/databricks-datasets/wine-quality/winequality-white.csv")
    input_df = input_df.select([col(column).alias(column.replace(" ","_")) for column in input_df.columns])
else:
    input_df = sql("select * from %s" % (training_table))

df = input_df.toPandas()
display(input_df)

### Create a project

The Pandas dataframe is uploaded to the DataRobot platform and a name is given to the project.

In [None]:
# Create a project wothout setting the target
if project_id == "":
    project = dr.Project.create(project_name="New Test Project (Databricks)", sourcedata=df)
    print(project.id)

## Modeling

### Set the target feature

Here you can define any advanced options needed for your project, including the Autopilot mode you wish to run (Standard Autopilot, Quick Mode, Comprehensive Mode, Manual). This API call will set our desired target feature and then kick off the EDA2 process, followed immediately by model training.

In [None]:
if project_id == "":
    mode = dr.enums.AUTOPILOT_MODE.QUICK

    project.analyze_and_model(
        target = target,
        mode = mode,
        worker_count = -1, #Setting the worker count to -1 will ensure that you use the maximum number of modeling workers available to your account
        max_wait = 600
    )
    # When you get control back, that means EDA is finished and model jobs are in flight

### Start Autopilot

This optional API call will block execution of the notebook until the full autopilot process has completed. This can take several minutes or hours, depending on the autopilot mode selected, the size of the dataset, and the type of problem we're trying to solve.

In [None]:
if project_id == "":
    # This is helpful if you want to keep execution serial:
    project.wait_for_autopilot()

    # Otherwise you can periodically ask the project for its current autopilot status:
    #project.stage
    #project.get_model_jobs()

### List models

This API call outputs a list of all the models trained in the project, sorted by the selected validation metric.

In [None]:
# Optionally, skip Autopilot and start here:
if project_id != "":
    project = dr.Project.get(project_id)

# Pull the list of all models. You can iterate over these and examine them.
project.get_models()

### Retrieve the recommended model

DataRobot provides a recommendation for an accurate and performant model at the end of Autopilot. This API call will fetch that recommendation.

In [None]:
print(dr.ModelRecommendation.get_all(project.id))
rec = dr.ModelRecommendation.get(
    project_id=project.id, 
    recommendation_type=dr.enums.RECOMMENDED_MODEL_TYPE.RECOMMENDED_FOR_DEPLOYMENT
)
selection = rec.get_model()

## Deploy a model

If no deployment ID was specified during setup, deploy DataRobot's recommended model. This will make the model available via the dedicated prediction API, and will wrap the model in our MLOps monitoring framework. Optional monitoring features are also enabled here, including accuracy tracking and data drift monitoring.

In [None]:
# When you are happy with your model you can automate deployment
if deployment_id == "":
    prediction_server = dr.PredictionServer.list()[0] # This line of code is only needed if you are using the DataRobot multi-tenant SaaS environment.
    deployment = dr.Deployment.create_from_learning_model(
        model_id = selection.id,
        label = "New Test Deployment",
        description = "Some extra data that I can use to search later.",
        default_prediction_server_id = prediction_server.id # This line of code is only needed if you are using the DataRobot multi-tenant SaaS environment.
    )
    deployment.update_association_id_settings(
        column_names = ["id"],
        required_in_prediction_requests = False
    )
    deployment.update_drift_tracking_settings(
        target_drift_enabled = True,
        feature_drift_enabled = True
    )
else:
    deployment = dr.Deployment.get(deployment_id)
    
print(deployment.id)

## Score a Spark Dataframe

The Spark wrapper that you imported into your cluster allows you to use the distributed power of the Spark cluster to quickly score large datasets. The following cells provide examples of scoring a Spark dataframe using Python or Scala.

### Score with Python

Python can be used to invoke the Java methods you provide to score with DataRobot models. The method call on **line 7** dynamically reaches out to the DataRobot platform to download the Scoring Code and make it available in your classpath. To avoid waiting for the network transfer, the scoring code can be downloaded ahead of time and imported as a new library in the Databricks cluster.

In order to perform the scoring transformation on the Spark dataframe, you must convert it to a Java dataframe and then back to a PySpark dataframe after scoring. You also capture the overall time it took to score in order to report that metric back to DataRobot MLOps in a later step.

In [None]:
java_import(spark._jvm, "com.datarobot.prediction.Predictors")
java_import(spark._jvm, "com.datarobot.prediction.spark30.Model")
java_import(spark._jvm, "com.datarobot.prediction.spark30.Predictors")

start_time = time.time() # Grab timestamps before and after scoring to provide MLOps with an estimated execution time.
# This next method call will use the endpoint, API token, and Deployment ID that were defined in previous cells to fetch our Scoring Code.
dr_model = spark._jvm.com.datarobot.prediction.spark30.Predictors.getPredictorFromDeployment(endpoint,deployment.id,api_key)

output_df = DataFrame(dr_model.transform(input_df._jdf), spark) # Apply the scoring transformation
score_time = time.time() - start_time # Get the total runtime of the fetching and scoring process

display(output_df)

### Score with Scala

The following cell performs the same scoring action as the previous one, only using Scala instead of Python.

This cell is commented out by default since variable values aren't shared between language contexts.

In [None]:
%scala
/**
import com.datarobot.prediction.spark30.Predictors

val apiKey = "" //Provide DataRobot API token here
val endpoint = "https://app.datarobot.com/" //This is the URL that you use to access the DataRobot UI
val deploymentId = "" //The ID oif the deployment you'd like to use for scoring
val inputDf = sql("select * from loans") //Substitute a table name here

val javaModel = Predictors.getPredictorFromDeployment(endpoint,deploymentId,apiKey)

val outputDf = javaModel.transform(inputDf)
display(outputDf)
**/


### Score with the Prediction API

This cell demonstrates scoring using a Pandas dataframe and the native DataRobot prediction API. This scoring method is limited to payloads under 50MB, so is not ideal for large datasets. An advantage to using this method would be easier access to monitoring data, since it does not require setup of the agent-based external monitoring framework.

In [None]:
host = "https://example.dynamic.orm.datarobot.com" # This should be the URL of your prediction server, which you can find in the Deployment Overview page of the UI
headers = {
    "Content-Type": "application/json; charset=utf-8",
    "Accept": "text/csv",
    "datarobot-key": "", # This line of code is only needed if you are using the DataRobot multi-tenant SaaS environment.
    "Authorization": "Bearer %s" % (api_key)
}

params = {
    "passthroughColumnsSet" : "all" # This line tells the API to reflect back the input data along with the predictions
}

data = df.to_json(orient="records")
response = requests.post("{:}/predApi/v1.0/deployments/{:}/predictions".format(host, deployment.id),data=data, headers=headers, params=params)

api_df = pd.read_csv(StringIO(response.text)) # Here you read the API's CSV output into a Pandas dataframe
display(api_df)

### Report monitoring data

Pass monitoring data to the appropriate message channel - a Kafka topic, in this case. From there our external monitoring agent will pick up this data and pass it back to the DataRobot platform for display in the MLOps dashboard.

Note that **this cell is provided as an example only**, and will not be executable without completing the full setup of the MLOps monitoring agent. More information regarding MLOps Monitoring Agent setup can be found [in the DataRobot documentation](https://docs.datarobot.com/en/docs/mlops/deployment/mlops-agent/monitoring-agent/index.html). This code cell illustrates the client library invocation that will push monitoring data to a message queue. More services need to be setup external to this notebook to complete the transfer of monitoring data to DataRobot.

In [None]:
# java_import(spark._jvm, "com.datarobot.mlops_spark_utils.MLOpsSparkUtils")
# channelConfig = "spooler_type=kafka;kafka_topic_name=monitoring-agent-topic"

# spark._jvm.com.datarobot.mlops_spark_utils.MLOpsSparkUtils.reportPredictions(
#     output_df._jdf, # scoring data
#     deployment.id, # DeploymentId
#     selection.id, # ModelId
#     channelConfig, # MLOps channel configuration
#     float(score_time), # scoring time
#     ['target_1_PREDICTION','target_0_PREDICTION'] # target columns
# )

### Write Results
You can now write our results back to a table. In this case you'll create a new table since the original source table's schema doesn't include columns to hold the scores or prediction explanations. In this example you are converting the results from the DataRobot Prediction API back to a Spark dataframe to facilitate writing to a table.

In [None]:
api_spark_df = spark.createDataFrame(api_df)
api_spark_df.write.mode("overwrite").saveAsTable(scoring_table)