## Model training and registration

This notebook shows the process for training the model, converting the model to ONNX and uploading the ONNX model to Azure Storage.

### Mount Azure storage accounts in Azure Databricks

We will use `dbutils` to mount the remote Azure storage accounts as local DBFS folders.

Once we do that we'll be able to read/write directly from/to Azure storage.

>**IMPORTANT!**
>In the code below, perform the following replacements:
>- Replace `<data_storage_account_name>` with the name of the Data Lake storage account (has the `asadatalakeNNNNNN` form).
>- Replace `<data_storage_account_key>` with the access key associated to the Data Lake storage account.
>- Replace `<model_storage_account_name>` with the name of the blob storage account (has the `asastoreNNNNNN` form).
>- Replace `<model_storage_account_key>` with the access key associated to the blob storage account.

In [0]:
data_storage_account_name = '<data_storage_account_name>'
data_storage_account_key = '<data_storage_account_key>'
model_storage_account_name = '<model_storage_account_name>'
model_storage_account_key = '<model_storage_account_key>'

data_mount_point = '/mnt/data'
model_mount_point = '/mnt/model'

data_file_path = '/bronze/wwi-factsale.csv'
model_file_path = '/onnx/model.onnx'

dbutils.fs.mount(
  source = f"wasbs://dev@{data_storage_account_name}.blob.core.windows.net",
  mount_point = data_mount_point,
  extra_configs = {f"fs.azure.account.key.{data_storage_account_name}.blob.core.windows.net": data_storage_account_key})

dbutils.fs.mount(
  source = f"wasbs://models@{model_storage_account_name}.blob.core.windows.net",
  mount_point = model_mount_point,
  extra_configs = {f"fs.azure.account.key.{model_storage_account_name}.blob.core.windows.net": model_storage_account_key})

Test the data mount point.

In [0]:
dbutils.fs.ls(f'{data_mount_point}/bronze')

Test the model mount point.

In [0]:
dbutils.fs.ls(f'{model_mount_point}')

### Explore the training data

The following cells load the source CSV file into a Spark DataFrame and create a temporary view that can be used to query the data with Spark SQL.

In [0]:
from pyspark.sql.functions import col
df = spark.read.load(f'{data_mount_point}/{data_file_path}', format="csv", header=True, sep="|")

df.createOrReplaceTempView("facts")
display(spark.sql("SELECT * FROM facts WHERE `Customer Key` == '11' ORDER BY `Stock Item Key`"))

You may also query your table via SQL

In [0]:
%sql

SELECT * FROM facts LIMIT 100;

### Predict Quantity given Customer Key and Stock Item Key

In the following cells we load a subset of the data that contains only the fields needed for training.

In [0]:
df3 = spark.sql("SELECT double(`Customer Key`) as customerkey, double(`Stock Item Key`) as stockitemkey, double(`Quantity`) as quantity FROM facts").where(col("quantity").isNotNull())
df3.cache()

Next, we package the data into the format expected by Spark ML's `LinearRegression`. 

It requires a DataFrame with two columns: `features` and a column with the labels to predict (`quantity` in this case).

In [0]:
#package the data into the format expected by Spark ML's LinearRegression. It requires a DataFrame with two columns: features and a column with the labels to predict (quantity in this case)
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['customerkey', 'stockitemkey'], outputCol = 'features')
df4 = vectorAssembler.transform(df3)
df5 = df4.select(['features', 'quantity'])
df5.show(10)

Now, we split our DataFrame into training and testing DataFrames.

In [0]:
trainingFraction = 0.7
testingFraction = (1 - trainingFraction)
seed = 42
df_train, df_test = df5.randomSplit([trainingFraction, testingFraction], seed=seed)

In the following cell, we train our `LinearRegression` model.

In [0]:
from pyspark.ml.regression import LinearRegression

lin_reg = LinearRegression(featuresCol = 'features', labelCol='quantity', maxIter = 10, regParam=0.3)
lin_reg_model = lin_reg.fit(df_train)
print("Coefficients: " + str(lin_reg_model.coefficients))
print("Intercept: " + str(lin_reg_model.intercept))

With a trained model in hand, we can use it to make predictions against the test DataFrame.

In [0]:
df_pred = lin_reg_model.transform(df_test)
display(df_pred)

### Convert model to ONNX format

In the cells that follow, we convert the model to ONNX and show how an output of how ONNX represents the Spark ML model.

In [0]:
from onnxmltools import convert_sparkml
from onnxmltools.convert.common.data_types import FloatTensorType

initial_types = [ 
    ("features", FloatTensorType([1, lin_reg_model.numFeatures])),
    # (repeat for the required inputs)
]

model_onnx = convert_sparkml(lin_reg_model, 'sparkml GeneralizedLinearRegression', initial_types)
model_onnx

### Upload the model to Azure Storage

To upload the ONNX model to Azure Storage we can use the locally mounted folder.

In [0]:
contents = model_onnx.SerializeToString()
print(contents)

f=open("/dbfs/tmp/model.onnx", "wb") 
f.write(contents)
f.close()
dbutils.fs.cp("dbfs:/tmp/model.onnx", f'{model_mount_point}{model_file_path}')

As a last step we browse the mounted folder to verify the presence of the ONNX model on the remote storage.

In [0]:
dbutils.fs.ls(f'{model_mount_point}/onnx')