# SQL UDF / ML Example
Notes on SQL UDF
* When you use a CREATE FUNCTION to make a pure SQL UDF it DOES write to the metadata catalog and persist between cluster restarts
* When you use a spark.udf.register to make a Python wrapper for a SQL UDF it DOES NOT write to the metadata catalog and persist between cluster restarts
## Catboost to ONNX Conversion Experiment


### Outline of experiments
1. Persist baseline random dataset for scoring sample of 100,000 rows
1. Apply current MLflow model and clock end to end run time
1. Generate histogram of output and save output
1. Extract native Catboost from MLFlow
1. Convert model to ONNX
1. Build UDF to predict on native ONNX model 
1. Apply ONNX model and clock end to end run time
1. Persist ONNX results, generate histogram and compare
1. Register an ONNX based model in MLFlow
1. Import ONNX model and rerun Steps 6-8

## Setup

In [None]:
!pip install dbldatagen
!pip install catboost
!pip install onnx
!pip install onnxruntime

#### Encountered based on DBR runtime being not current

In [None]:
!pip install mlflow==1.3
!pip install cloudpickle==2.0.0
!pip install scikit-learn==0.24.2

## 1. Persist baseline random dataset for scoring sample of 100,000 rows

In [None]:
import dbldatagen as dg
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, FloatType
shuffle_partitions_requested = 8
# Control number of Spark Tasks
partitions_requested = 32
# Total Records to be generated
data_rows = 100000
spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# uniqueCustomers = total_records
dataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested)
            .withColumn("c1", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c2", FloatType(), minValue=0, maxValue=1000, random=True)
            .withColumn("c3", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c4", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c5", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c6", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c7", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c8", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c9", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c10", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c11", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c12", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c13", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c14", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c15", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c16", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c17", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c18", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c19", FloatType(),minValue=0, maxValue=1000, random=True)
            .withColumn("c20", FloatType(),minValue=0, maxValue=1000, random=True)
            #.withColumn("c21", FloatType(),minValue=0, maxValue=1000, random=True)
            )
df = dataspec.build()

In [None]:
df.write.format("delta").mode("overwrite").save('/dbfs/FileStore/num_cb_op/python_sudf_onnx/numerical_reg_cb_op_1_20col')

## 2. Apply current MLflow model and clock end to end run time

In [None]:
input_test = spark.read.format("delta").load('/dbfs/FileStore/num_cb_op/python_sudf_onnx/numerical_reg_cb_op_1_20col')

In [None]:
import mlflow
import pandas as pd
from pyspark.sql.functions import struct, col
logged_model = 'runs:/4d02005792134f94b350c615f0637853/model'

# Load model
loaded_model = mlflow.pyfunc.load_model(logged_model)

In [None]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import mlflow
import catboost
@pandas_udf(returnType=DoubleType())
def predict_pandas_udf(*features):
    """ Executes the prediction using numpy arrays.
         
        Parameters
        ----------
        features : List[pd.Series]
            The features for the model, with each feature in it's
            owns pandas Series.
         
        Returns
        -------
        pd.Series
            The predictions.
    """
    # Need a multi-dimensional numpy array for sklearn models.
    X = pd.concat(features, axis=1).values
    # If model is somewhere in the driver we're good.
    y = loaded_model.predict(X)  # <- This is vectorized. Kachow.
    return pd.Series(y)

catboost_baseline = input_test.withColumn(
    "prediction",
    predict_pandas_udf(col("c1"),
col("c2"),
col("c3"),
col("c4"),
col("c5"),
col("c6"),
col("c7"),
col("c8"),
col("c9"),
col("c10"),
col("c11"),
col("c12"),
col("c13"),
col("c14"),
col("c15"),
col("c16"),
col("c17"),
col("c18"),
col("c19"),
col("c20")))

In [None]:
import catboost
catboost_baseline.write.format("delta").mode("overwrite").save('/dbfs/FileStore/num_cb_op/python_sudf_onnx/numerical_reg_cb_op_1_20col_baseline')

## 4. Extract native Catboost from MLFlow

### Extract model pickle from MLFlow

In [None]:
 
import pickle
from mlflow.tracking import MlflowClient

client = MlflowClient()

tmp_path = client.download_artifacts(run_id='4d02005792134f94b350c615f0637853', path='model/model.pkl')

f = open(tmp_path,'rb')

model = pickle.load(f)

f.close()

### Get Catboost model out of SKLearn wrapper

In [None]:
catmodel = model.best_estimator_
type(catmodel)

## 5. Convert model to ONNX

In [None]:
catmodel.save_model(
    "catboost_ransserachCV.onnx",
    format="onnx",
    export_parameters={
        'onnx_domain': 'ai.catboost',
        'onnx_model_version': 1,
        'onnx_doc_string': 'test model for BinaryClassification',
        'onnx_graph_name': 'CatBoostModel_for_BinaryClassification'
    }
)

## 6. Build UDF to predict on native ONNX model

### Practice getting ONNX model working

In [None]:
import onnxruntime as rt

cat_onnx_sess = rt.InferenceSession("catboost_ransserachCV.onnx") 
probabilities = cat_onnx_sess.run(['predictions'],
                         {'features': dfp.to_numpy()})
probabilities[0]

### UDF and test run

In [None]:
import onnxruntime as rt
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
 
@pandas_udf(returnType=DoubleType())
def predict_pandas_udf_onnx(*features):
    """ Executes the prediction using numpy arrays.
         
        Parameters
        ----------
        features : List[pd.Series]
            The features for the model, with each feature in it's
            owns pandas Series.
         
        Returns
        -------
        pd.Series
            The predictions.
    """
    # Need a multi-dimensional numpy array for sklearn models.
    X = pd.concat(features, axis=1).values


    cat_onnx_sess = rt.InferenceSession("catboost_ransserachCV.onnx")      
    # If model is somewhere in the driver we're good.
    y = cat_onnx_sess.run(['predictions'],
                         {'features': features.to_numpy()})[0]  # <- This is vectorized. Kachow.
    return pd.Series(y)

df = df.withColumn(
    "prediction",
    predict_pandas_udf_onnx(col("c1"),
col("c2"),
col("c3"),
col("c4"),
col("c5"),
col("c6"),
col("c7"),
col("c8"),
col("c9"),
col("c10"),
col("c11"),
col("c12"),
col("c13"),
col("c14"),
col("c15"),
col("c16"),
col("c17"),
col("c18"),
col("c19"),
col("c20")))

### Load ONNX model from disk

In [None]:
import mlflow
import onnx

model_path = "catboost_ransserachCV.onnx"
onnx_model = onnx.load(model_path)





### Create a dummy run to write artifacts and register

In [None]:
with mlflow.start_run() as run:
    mlflow.onnx.log_model(onnx_model, artifact_path="model")
model_uri = "runs:/{}/model".format(run.info.run_id)
mv = mlflow.register_model(model_uri, "CatBoostONNXModel")    
print("Name: {}".format(mv.name))
print("Version: {}".format(mv.version))

## 10. Use MLFlow based ONNX model and rerun Steps 6-8

## Miscellanous code for understanding prior work, debugging

In [None]:
import mlflow
import catboost
mlflow.__version__

In [None]:
import mlflow.tracking

client = mlflow.tracking.MlflowClient()
run = client.get_run(run_id="4d02005792134f94b350c615f0637853")
experiment_id = run.info.experiment_id
experiment = client.get_experiment(experiment_id)
experiment


In [None]:
type(cat_onnx_sess)


In [None]:
import mlflow.pyfunc
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, DoubleType
def predict_udf(model_uri):
    model = mlflow.pyfunc.load_model(model_uri)
    @pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITER)
    def predict(iterator):
        for input_df in iterator:
            predictions = model.predict(input_df[['col1', 'col2', 'col3']])
            yield predictions
    return predict
 

In [None]:
import mlflow.pyfunc
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, DoubleType
def predict_udf_onnx(model_uri):
    model = mlflow.pyfunc.load_model(model_uri)
    @pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITER)
    def predict(iterator):
        for input_df in iterator:
            predictions = model.predict(input_df[['col1', 'col2', 'col3']])
            yield predictions
    X = pd.concat(features, axis=1).values


    cat_onnx_sess = rt.InferenceSession("catboost_ransserachCV.onnx")      
    # If model is somewhere in the driver we're good.
    y = cat_onnx_sess.run(['predictions'],
                         {'features': features.to_numpy()})[0]  # <- This is vectorized. Kachow.
    return pd.Series(y)
            
    return predict
 

In [None]:
# Register the predict_udf function as a UDF
from pyspark.sql.types import StructType, StructField, DoubleType
spark.udf.register("predict_udf_onnx", predict_pandas_udf_onnx, DoubleType())

# Use the UDF in a SQL query
#df = spark.read.parquet("data.parquet")
#df.createOrReplaceTempView("my_table")

result = spark.sql("SELECT id, predict_udf(models:/CatBoostONNXModel/2)(array(c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20) as prediction FROM input_test_tbl")
result.show()


In [None]:
input_test.createOrReplaceTempView("input_test_tbl")

In [None]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, DoubleType, ArrayType
spark.udf.register("predict_pandas_udf_onnx_sql", predict_pandas_udf_onnx, ArrayType(DoubleType()))

In [None]:
%sql
SELECT *, predict_udf(c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20) FROM input_test_tbl limit 10

In [None]:
c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20

In [None]:
model_uri= f'models:/CatBoostONNXModel/2'

In [None]:
%sql
SELECT   predict_udf($model_uri)(array(c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20) ) as prediction FROM input_test_tbl

In [None]:
result = spark.sql("SELECT predict_udf("+model_uri+")(array(c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20) as prediction FROM input_test_tbl")
result.show()