# Driverless AI + MLFlow

![Integration Diagram](files/tables/DB_diag.png)

In [0]:
import datatable as dt
import daimojo.model
import pandas as pd
import os

In [0]:
import os
import shutil
from sys import version_info
import argparse
import cloudpickle
import mlflow.pyfunc
def get_env():
    python_version = "{major}.{minor}.{micro}".format(
        major=version_info.major,
        minor=version_info.minor,
        micro=version_info.micro,
    )
    daimojo_path = (
        "http://artifacts.h2o.ai.s3.amazonaws.com/releases/ai/h2o/daimojo/2.5.8/x86_64-centos7/daimojo-2.5.8-cp37-cp37m-linux_x86_64.whl"
    )
    conda_env = {
        "channels": ["defaults"],
        "dependencies": [
            "python={}".format(python_version),
            "pip",
            {
                "pip": [
                    "mlflow",
                    "datatable",
                    "--no-binary=protobuf protobuf",
                    daimojo_path,
                    "cloudpickle=={}".format(cloudpickle.__version__),
                ],
            },
        ],
        "name": "dai_env",
    }
    return conda_env
def get_artifacts(mojo_path, license_path, feature_names):
    artifacts = {
        "dai_model": mojo_path,
        "license_key": license_path,
        "feature_names": feature_names
    }
    return artifacts
class DAIWrappedModel(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        import daimojo.model
        import os
        self.model = daimojo.model(context.artifacts["dai_model"])
        license_file = context.artifacts["license_key"]
        os.environ["DRIVERLESS_AI_LICENSE_FIlE"] = license_file
        with open(context.artifacts["feature_names"], "r") as file:
          self.feature_names = file.read().split(",")
    def predict(self, context, x): #test
        import datatable as dt
        #import os
        #license_file = context.artifacts["license_key"]
        #os.environ["DRIVERLESS_AI_LICENSE_FIlE"] = license_file
        x.columns = self.feature_names
        pred = self.model.predict(dt.Frame(x))
        if pred.shape[1] == 2:
            pred = pred[:, 1]
        return pred.to_pandas()
def log_model(model_path, mojo_path, license_path, feature_names):
    conda_env = get_env()
    artifacts = get_artifacts(mojo_path, license_path, feature_names)
    mlflow.pyfunc.log_model(
        artifact_path=model_path,
        python_model=DAIWrappedModel(),
        artifacts=artifacts,
        conda_env=conda_env,
    )      

In [0]:
dbutils.fs.cp("/FileStore/tables/pipeline.mojo","file:///databricks/pipeline.mojo")
dbutils.fs.cp("/FileStore/tables/license.sig","file:///databricks/license.sig")
dbutils.fs.cp("/FileStore/tables/example.csv","file:///databricks/example.csv")

In [0]:
# Specify license file location
os.environ["DRIVERLESS_AI_LICENSE_FILE"]= "/databricks/license.sig"

## Data Prep in Spark/Delta

In [0]:
# read in data
# add column
# split for training
# update delta table

In [0]:
train_df = spark.read.csv('s3://h2o-public-test-data/smalldata/kaggle/CreditCard/creditcard_train_cat.csv', header = True, inferSchema = True)
test_df = spark.read.csv('s3://h2o-public-test-data/smalldata/kaggle/CreditCard/creditcard_test_cat.csv', header = True, inferSchema = True)


## Train model in DAI

In [0]:
# connect to DAI instance

import driverlessai

address = WAVE_DAI_ADDRESS
username = WAVE_DAI_USERNAME
password = WAVE_DAI_PASSWORD
dai = driverlessai.Client(address = address, username = username, password = password)

In [0]:
# Send data to DAI from S3 path

train_path = WAVE_TRAIN_FILE_PATH
test_path = WAVE_TEST_FILE_PATH

train = dai.datasets.create(data=train_path, data_source='s3')
test = dai.datasets.create(data=test_path, data_source='s3')

In [0]:
dai.datasets.list()

In [0]:
#dai.datasets

train = dai.datasets.list()[1]
test = dai.datasets.list()[0]

In [0]:
# set up experiment

In [0]:
dai_target = WAVE_DAI_TARGET
dai_acc = WAVE_DAI_ACC
dai_time = WAVE_DAI_TIME
dai_int = WAVE_DAI_INTERPRETABILITY
dai_scorer = WAVE_DAI_SCORER
dai_cols_to_drop = WAVE_DAI_COLS2DROP
exp_preview = dai.experiments.preview(train_dataset=train,
                                      task='classification',
                                      target_column=dai_target,
                                      enable_gpus=False,
                                      accuracy=dai_acc,
                                      time=dai_time,
                                      interpretability=dai_int,
                                      config_overrides=None)
exp_preview

In [0]:
# run experiment

In [0]:
ex = dai.experiments.create(train_dataset=train,
                            test_dataset=test,
                            target_column=dai_target,
                            task='classification',
                            accuracy=dai_acc,
                            time=dai_time,
                            interpretability=dai_int,
                            scorer=dai_scorer,
                            enable_gpus=True,
                            seed=1234,
                            cols_to_drop=dai_cols_to_drop,
                            name = "credit_mlflow")

In [0]:
dai.experiments.list()

In [0]:
ex = [ex for ex in dai.experiments.list() if 'credit' in ex.name][0]

In [0]:
# get model results

In [0]:
metrics = ex.metrics()

print("Final model Score on Validation Data: " + str(round(metrics['val_score'], 3)))

In [0]:
ex.summary()

In [0]:
print(ex.variable_importance())

## Log model and metrics in MLFlow

In [0]:
# log model artifact

In [0]:
ex.artifacts.create('mojo_pipeline')

In [0]:
ex.artifacts.download(only='mojo_pipeline', dst_dir='/dbfs/FileStore/', overwrite=True)

In [0]:
%sh

unzip /dbfs/FileStore/mojo.zip -d /dbfs/FileStore/


In [0]:
with open("/databricks/feature_names.txt", "w") as file: 
  file.write(",".join(test_df.columns))

In [0]:
log_model("DAI_test3", "dbfs:/FileStore/mojo-pipeline/pipeline.mojo", "/databricks/license.sig", "/databricks/feature_names.txt")

In [0]:
metrics

In [0]:
# log model metrics

import mlflow

mlflow.log_params(metrics)

In [0]:
# leverage autodoc artifcats for tracking more metrics

##Get 
# pipeline.mojo
#Four files from summary for tracking:
# ensemble_model_description.json
# ensemble_scores.json
# features_orig.json
# features.json

##Track:
#input feature names
#original feature importance
#transformed feature importance
#Ensemble model agorithms, weights, and hyperparameters for each
#Ensemble model performance on training and test sets




In [0]:
# add experiment to model registry

In [0]:
# view experiment in mlflow

## Score new data

In [0]:
# Load Test Data
test_df = spark.read.csv(WAVE_TEST_FILE_PATH, header = True, inferSchema = True)

In [0]:
# score with MLFlow
import mlflow
logged_model = 'dbfs:/databricks/mlflow-tracking/1272602360004101/16bb39ce9ebf4390bafa6fcfa5a9eb25/artifacts/DAI_test3'

# Load model as a Spark UDF.
loaded_model = mlflow.pyfunc.spark_udf(spark, logged_model)

features = test_df.columns

# Predict on a Spark DataFrame.
pred = test_df.withColumn('my_predictions', loaded_model(*features))

In [0]:
display(pred)

ID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_1,PAY_2,PAY_3,PAY_4,PAY_5,PAY_6,BILL_AMT1,BILL_AMT2,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,my_predictions
24001,50000,male,university,single,23,2,2,0,0,0,0,51246,49758,48456,44116,21247,20066,8,2401,2254,2004,704,707,0.615347370505333
24002,60000,male,university,single,26,0,0,0,0,0,0,58072,59040,57416,55736,26958,28847,2282,2324,2049,2000,3000,1120,0.1626371704041957
24003,400000,male,university,single,27,0,0,0,0,0,0,15330,8626,11470,10745,20737,9545,2501,10009,1437,1105,510,959,0.069121959619224
24004,20000,male,other,single,27,5,4,3,2,2,2,21673,21051,20440,19709,20113,19840,0,0,0,900,0,0,0.6552194207906723
24005,50000,male,highschool,single,27,0,0,-2,-2,-1,-1,32590,-100,0,0,70,120,0,100,0,70,200,100,0.1221985165029764
24006,110000,male,university,single,27,0,0,0,0,0,0,102551,103550,105089,107164,105988,108617,5500,6000,6000,4000,5000,4000,0.0872640684247016
24007,30000,male,highschool,single,23,0,0,-2,-1,0,0,4443,370,380,590,7704,20204,430,400,601,7504,15005,5674,0.1133869532495737
24008,230000,male,university,single,27,0,0,0,0,0,0,34592,23689,27652,8430,9811,9865,1816,5105,1293,2000,528,3000,0.0682964492589235
24009,20000,male,highschool,divorce,23,0,0,0,0,0,0,18455,19990,20215,19298,19807,12294,2000,20000,1612,1121,702,1000,0.1028802599757909
24010,30000,male,graduate,single,24,0,0,0,2,0,0,6003,4912,6214,3311,4430,906,1440,2259,0,1500,425,895,0.2496905550360679


In [0]:
# score new data with Rest Server 
# kick off Megan's model monitoring and reporting (drift detection etc.) (where to store and show? MLFlow model registry?)

In [0]:
# score new data with DB scorer
# kick off Megan's model monitoring and reporting (drift detection etc.)

In [0]:
# update delta table

In [0]:
loaded_model

In [0]:
#TO DO
#can we use feature names from MOJO to pass to Spark UDF? instead of writing to a feature_names file
#provide license.sig to every worker without setting an env variable at the cluster level

#use the DAI model name when calling the model - mlflow experiment tracking
#from MOJO can track features, DAI version, tree hyperparamters
#need to track performance metrics - when training easy, but when scoring?

#full circle: push DAI model to mlflow, generate tracking ID, then load model into Databricks using the tracking ID and score 
#auto update experiments - which mlflow to push it too?

#can we transform, not just score the data on spark?

In [0]:
#workflow

#data manipulation/prep/ETL in DB
#send data to DAI for model training with Python client (extend driverlessai to include this functionality, end user: pip install driverlessai on DB)
#track and log resulting model in MLflow
#for ex mlflow.dai.log_model


In [0]:
#try pip install driverless ai and create experiment
#download artifacts

In [0]:
#data in delta
#data prep using Spark
#send to DAI