## Use of Azure ML Dataset as an approach to data mesh

In this notebook, I'll demonstrate how to leverage Azure ML Datasets to approach a data mesh strategy for any model development activities across different compute targets, including databricks and AzureML by leveraging `Azure ML Pipelines`.

<div style="text-align:center; width: 1000px"><img src="./assets/pipeline.jpg" /></div>

*The AML Piepline Image*

In order to run this example, you need to have a databricks cluster with ML Runtime created. The cluster needs azureml-sdk[databricks] package being installed.

The overal idea is to create data lineage for the entire life cycle of a model, which starts with data processing and ends with model registration and deployment.
A simple training excersice is picked to focus mostly on the use of AML Dataset.

In this example, The data preprocessing happens on Databricks through `DatabricksStep` and the model training takes place on an AML Compute through `PythonScriptStep`. 

The first step receives three input AML Datasets and prepared for a model training excersice in the DatabricksStep. Later the final dataframe is saved as a `Parquet` or `Delta` tables. Finally, the written data is registered as a AML Dataset either `Tabular` for `Parquet` file format or `File` for `Delta` file format.

Every time this step is executed, a new dataset is generated called `feature_titanic` as an AML Dataset. Which is then consumed by the next step. If the allow_reuse parameter on the `PythonScriptStep` constructor is set to True, then the output dataset registered from the previous run will be reused for the next step.

<div style="text-align:center; width: 500px"><img src="./assets/ADBStep.jpg" /></div>

*ADB Step details page; the input and output datasets.*

The registered `AML Dataset` is passed to the subsequent `PythonScriptStep` which is meant for training. The data is read based on the incoming dataset type, either Delta or Parquet. For Parquet datasets, it's read through native `dataset.to_pandas_dataframe()` or read directly from the mount point to the ADLS storage with `pd.read_parquet()`. The Delta format should be read directly from the mounted ADLS sotrage through an open source package called `deltalake`. The data is converted into a Pandas dataframe, a transfomation is applyed to the dataframe and then a model is trained from the transformed dataframe. Finally, the model is registered and connected to the dataset used for training.

<div style="text-align:center; width: 500px"><img src="./assets/AMLStep.jpg" /></div>

*AML Step details page; the input and output datasets.*

To register the model, an `AML Dataset` object is passed as a parameter to the `Model.register` function. This links the model to the dataset that was used to train the model.

<div style="text-align:center; width: 1000px"><img src="./assets/Model.jpg" /></div>

*Registered Model data tab; link to the feature_titanic AML Dataset.*

This also helps us to connect the `AML Dataset` to the models as well.

<div style="text-align:center; width: 1000px"><img src="./assets/DatasetToModel.jpg" /></div>

*Model tab of the Featurized AML Dataset; link to the titanic_model AML Model.*

During the lifecycle of the model and dataset, we leveraged `tags` parameter of the `register` function of `AML Datasets` and `AML Models`. This allows us to always keep and attach important parameters to the model and dataset objects. Parameters such as `dataset schema`, `input dataset`, `run_id`, etc.

<div style="text-align:center; width: 500px"><img src="./assets/DatasetTags.jpg" /></div>

*Taggs of the feature_titanic dataset. This identifies the input datasets, databricks feature store, data types of the final pandas dataframe, etc.*

The `Environment` object for the `PythonScriptStep` is defined in a way to account for the required packages which include:
* deltalake
* sklearn
* pandas
* azureml-core

*You need to make sure the `azureml-defaluts` is removed from the `Environment` object as it has conflict with a depancency in `deltalake` package.*

<div style="text-align:center; width: 500px"><img src="./assets/Environment.jpg" /></div>

*The environment definition for the AML Step*

In [None]:
import os
import azureml.core
import pandas as pd
from azureml.core.runconfig import JarLibrary
from azureml.core.compute import ComputeTarget, DatabricksCompute
from azureml.exceptions import ComputeTargetException
from azureml.core import Workspace, Environment, Experiment, Datastore, Dataset, ScriptRunConfig
from azureml.pipeline.core import Pipeline, PipelineData, TrainingOutput
from azureml.pipeline.steps import DatabricksStep, PythonScriptStep
from azureml.core.datastore import Datastore
from azureml.data.data_reference import DataReference

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

In [None]:
ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

In [None]:
db_compute_name = "ADBCluster" # Databricks compute name

databricks_compute = DatabricksCompute(workspace=ws, name=db_compute_name)
print('Compute target {} already exists'.format(db_compute_name))


In [None]:
from azureml.pipeline.core import PipelineParameter
from azureml.pipeline.core.pipeline_output_dataset import PipelineOutputAbstractDataset

def_blob_store = Datastore(ws, "generalpurposeaccount")
print('Datastore {} will be used'.format(def_blob_store.name))

In [None]:
step_1_output = PipelineData("output", datastore=def_blob_store)
# ds_step_1_output = PipelineOutputAbstractDataset(step_1_output) # .as_dataset()
ds_step_1_output = step_1_output.as_dataset()

In [None]:
cluster_name = "cpu-cluster-4"
compute_target = ComputeTarget(workspace=ws, name=cluster_name)

In [None]:
def register_dataset(datastore, dataset_name):
    remote_path = f'dataset-demo/{dataset_name}/'
    local_path = './data/titanic.csv'
    datastore.upload_files(files = [local_path],
                       target_path = remote_path,
                       overwrite = True,
                       show_progress = False)
    
    dataset = Dataset.Tabular.from_delimited_files(path = [(datastore, remote_path)])
    dataset = dataset.register(ws, name=dataset_name, create_new_version=True)
    return dataset

In [None]:
ds_titanic_1 = register_dataset(def_blob_store, 'titanic_1')
ds_titanic_2 = register_dataset(def_blob_store, 'titanic_2')
ds_titanic_3 = register_dataset(def_blob_store, 'titanic_3')

In [None]:
source_directory = "./scripts"

isDeltaUsed = False

if isDeltaUsed:
    databricks_script_name = "adb_run_delta.py"
    aml_script_name = 'aml_run_delta.py'
else:
    databricks_script_name = "adb_run.py"
    aml_script_name = 'aml_run.py'

feature_dataset_name = "feature_titanic"

In [None]:

dbNbStep = DatabricksStep(
    name="ADBFeatureEng",
    outputs=[ds_step_1_output],
    compute_target=databricks_compute,
    existing_cluster_id="0517-170422-mxoe0n2x", # this needs to be an Databricks Cluster with ML-Runtime - you need to install azureml-sdk[databrick] on the cluster
    python_script_params=["--feature_set_1", ds_titanic_1.name,
                          "--feature_set_2", ds_titanic_2.name,
                          "--feature_set_3", ds_titanic_3.name,
                          '--output_datastore_name', def_blob_store.name,
                          "--output_feature_set_name", feature_dataset_name],
    permit_cluster_restart=True,
    python_script_name=databricks_script_name,
    source_directory=source_directory,
    run_name='ADB_Feature_Eng',
    allow_reuse=True
)

In [None]:
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies

tf_env_c = Environment('deltalake')

conda_dep = CondaDependencies()

conda_dep.add_pip_package("sklearn")
conda_dep.add_pip_package("deltalake")
conda_dep.remove_pip_package('azureml-defaults')
conda_dep.add_pip_package('azureml-core')
conda_dep.add_pip_package('pandas')

# Adds dependencies to PythonSection of myenv
tf_env_c.python.conda_dependencies=conda_dep

tf_env_c = tf_env_c.register(workspace=ws)

rcfg = RunConfiguration()
rcfg.environment = tf_env_c

In [None]:
aml_step = PythonScriptStep(script_name=aml_script_name,
                            name="AML Train",
                            source_directory=source_directory,
                            inputs=[ds_step_1_output],
                            compute_target=compute_target,
                            arguments=['--data_folder', ds_step_1_output,
                                       '--featureset_name', feature_dataset_name,
                                       '--model_name', 'titanic_model'],
                            allow_reuse=False,
                            runconfig=rcfg)


In [None]:
steps = [aml_step]
pipeline = Pipeline(workspace=ws, steps=steps)
pipeline_run = Experiment(ws, 'DB_FeatureStore').submit(pipeline)


In [None]:
pipeline_run

In [None]:
pipeline_run.wait_for_completion()


Once the pipeline is completed, then you can access the `dataset` information from the registered model by accessing the `datasets` properties of the registered model. In this example, you'll recieve a dictionary that the key is the name provided when the model was registered, `featurized data` in this case.

In [None]:
from azureml.core import Model

model = Model(ws, name='titanic_model')
model

In [None]:
model_datasets = model.datasets
input_dataset = model_datasets['featurized data'][0]


In [None]:
input_dataset.tags

In [None]:
pdf = input_dataset.to_pandas_dataframe()

In [None]:
Dataset.get_by_name