# E2E Machine Learning Workflow on Azure ML using the Python SDK v2 pt.1

##### Model training pipeline

**Learning Objectives** - By the end of this tutorial, you should be able to use Azure Machine Learning (Azure ML) to productionise your ML project.

This means you will be able to leverage the AzureML Python SDK to:

- connect to your Azure ML workspace
- create Azure ML data assets
- create reusable Azure ML components
- create, validate and run Azure ML pipelines
- deploy the newly-trained model as an endpoint
- call the Azure ML endpoint for inferencing

**Motivations** - This tutorial is intended to introduce Azure ML to data scientists who want to scale up or publish their ML projects. By completing a familiar end-to-end project, which starts by loading the data and ends by creating and calling an online inference endpoint, the user should become familiar with the core concepts of Azure ML and their most common usage. Each step of this tutorial can be modified or performed in other ways that might have security or scalability advantages.

**Requirements** - In order to benefit from this tutorial, you need to have:
- basic understanding of Machine Learning projects workflow
- an Azure subscription. If you don't have an Azure subscription, [create a free account](https://aka.ms/AMLFree) before you begin.
- a working Azure ML workspace. A workspace can be created via Azure Portal, Azure CLI, or Python SDK. [Read more](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-manage-workspace?tabs=python).
- a Python environmnet
- [installed Azure Machine Learning Python SDK v2](https://github.com/Azure/azureml-examples/blob/sdk-preview/sdk/setup.sh)

## Introduction

In this tutorial, you'll create an Azure ML pipeline to train a model for credit default prediction. The pipeline handles the data cleaning, preparation, training and registering the trained model. You'll then run the pipeline, deploy the model and use it.

### Set up the pipeline resources

The Azure ML framework can be used from CLI, Python SDK, or studio interface. In this example, you'll use the AzureML Python SDK v2 to create a pipeline. 

Before creating the pipeline, you'll set up the resources the pipeline will use:

* The dataset for training
* The software environment to run the pipeline

### Connect to the workspace

Before we dive in the code, you'll need to connect to your Azure ML workspace. The workspace is the top-level resource for Azure Machine Learning, providing a centralized place to work with all the artifacts you create when you use Azure Machine Learning.

We are using `DefaultAzureCredential` to get access to workspace. 
`DefaultAzureCredential` should be capable of handling most Azure SDK authentication scenarios. 

Reference for more available credentials if it does not work for you: [configure credential example](../../configuration.ipynb), [azure-identity reference doc](https://docs.microsoft.com/en-us/python/api/azure-identity/azure.identity?view=azure-python).

### Step 1: Set-up - client handle & data exploration
This section sets up the MLClient object and tests the connection to the dataset

In [1]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

subscription_id = ''
resource_group = ''
workspace = ''

ml_client = MLClient(DefaultAzureCredential(), subscription_id, resource_group, workspace)

> [!IMPORTANT]
> Creating MLClient will not connect to the workspace. The client initialization is lazy, it will wait for the first time it needs to make a call (in the notebook below, that will happen during dataset registration).

## Register data from a local file

The data you use for training is usually in one of the locations below:

* Local machine
* Web
* Big Data Storage services (for example, Azure Blob, Azure Data Lake Storage, SQL)
 
Azure ML uses a [`Data`](https://docs.microsoft.com/azure/machine-learning/how-to-create-register-data-assets?tabs=Python-SDK) object to register a reusable definition of data, and consume data within a pipeline. In the section below, you'll consume some data from web url as one example. `Data` assets ets from other sources can be created as well.

In [2]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

credit_data = Data(
    name="credit_card_default_data",
    path="../data/credit_card_default_data_v1.csv",
    type=AssetTypes.URI_FILE,
    description="Dataset for credit card defaults",
    version="1",
)

ml_client.data.create_or_update(credit_data)

Data({'skip_validation': False, 'mltable_schema_url': None, 'referenced_uris': None, 'type': 'uri_file', 'is_anonymous': False, 'auto_increment_version': False, 'auto_delete_setting': None, 'name': 'credit_card_default_data', 'description': 'Dataset for credit card defaults', 'tags': {}, 'properties': {}, 'print_as_yaml': True, 'id': '/subscriptions/876b91eb-54d6-4433-af3b-5c9914d5ccea/resourceGroups/ej_vision_playground/providers/Microsoft.MachineLearningServices/workspaces/ej-workshop-workspace/data/credit_card_default_data/versions/3', 'Resource__source_path': None, 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/jonesethan2/code/Users/jonesethan/AML_workshop/e2e_ml_workflow/credit_card_training_pipe', 'creation_context': <azure.ai.ml.entities._system_data.SystemData object at 0x7f8b74be0e50>, 'serialize': <msrest.serialization.Serializer object at 0x7f8b60bed900>, 'version': '3', 'latest_version': None, 'path': 'azureml://subscriptions/876b91eb-54d6-4433-af3b-5c9914d5

In [3]:
credit_data = ml_client.data.get(name="credit_card_default_data", version="1")
print(f"Data asset URI: {credit_data.path}")

Data asset URI: azureml://subscriptions/876b91eb-54d6-4433-af3b-5c9914d5ccea/resourcegroups/ej_vision_playground/workspaces/ej-workshop-workspace/datastores/workspaceblobstore/paths/LocalUpload/b3ca843612e2ff4c8fab1474c9d0bb1a/credit_card_default_data_v1.csv


In [4]:
import pandas as pd

raw_df = pd.read_csv(credit_data.path, header=0)

In [5]:
raw_df.describe()

Unnamed: 0,ID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,...,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,default payment next month
count,2500.0,2500.0,2500.0,2500.0,2500.0,2500.0,2500.0,2500.0,2500.0,2500.0,...,2500.0,2500.0,2500.0,2500.0,2500.0,2500.0,2500.0,2500.0,2500.0,2500.0
mean,28750.5,165392.0,1.6368,1.9296,1.5288,35.8076,-0.0576,-0.1568,-0.188,-0.24,...,43291.1628,38652.2748,37710.5552,6447.8484,6441.45,6233.4596,4774.9376,5024.6564,5487.6616,0.2168
std,721.83216,127514.98094,0.481018,0.832419,0.52429,9.330799,1.127648,1.21425,1.201513,1.231506,...,64202.771958,59876.673254,59390.266423,27426.696364,37781.92,25715.8828,18620.056749,18159.838678,19088.935962,0.412148
min,27501.0,10000.0,1.0,1.0,0.0,21.0,-2.0,-2.0,-2.0,-2.0,...,-50616.0,-53007.0,-94625.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,28125.75,50000.0,1.0,1.0,1.0,28.0,-1.0,-1.0,-1.0,-1.0,...,1923.0,1606.5,1050.0,969.25,736.75,646.75,286.5,179.0,239.5,0.0
50%,28750.5,140000.0,2.0,2.0,2.0,34.0,0.0,0.0,0.0,0.0,...,18474.5,16520.5,15830.0,2100.0,2050.0,2000.0,1500.0,1500.0,1600.0,0.0
75%,29375.25,230000.0,2.0,2.0,2.0,42.0,0.0,0.0,0.0,0.0,...,55975.75,49844.5,48498.25,5010.25,5000.0,4704.5,4000.0,4000.0,4100.0,0.0
max,30000.0,780000.0,2.0,6.0,3.0,74.0,6.0,6.0,8.0,7.0,...,504474.0,587067.0,498316.0,873552.0,1227082.0,889043.0,621000.0,426529.0,443001.0,1.0


### Step 2: Environment definitions and creation
So far, we've been using a development environment on the compute instance, your development machine. You'll also need an [environment](https://docs.microsoft.com/azure/machine-learning/concept-environments) to use for each step of the pipeline. Each step can have its own environment, or you can use some common environments for multiple steps.

#### Step 2.1: Environment definitions

We will make use of Conda .YAML configuration files to create the custom environments:

1. Environment #1 for the data cleaning stage of the pipeline - this will handle removing any missing values.
2. Environment #2 for the data preparation and model training - as these two steps will require the sklearn library, we will create one environment for them to share.

In [10]:
%%writefile environments/data_clean.yaml
name: prep-env
channels:
  - conda-forge
dependencies:
  - python=3.11
  - numpy
  - pip
  - pandas
  - pip:
    - mlflow
    - azureml-mlflow

Writing environments/data_clean.yaml


In [11]:
%%writefile environments/model_train.yaml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.11
  - numpy
  - pip
  - scikit-learn
  - scipy
  - pandas
  - pip:
    - inference-schema[numpy-support]
    - xlrd
    - mlflo
    - azureml-mlflow

Writing environments/model_train.yaml


#### Step 2.2: Registering the environments within the workspace

In [6]:
import os
from azure.ai.ml.entities import Environment

custom_env_name = "credit-data-clean"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for Credit Card Defaults pipeline",
    conda_file=os.path.join("environments", "data_clean.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="1",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name credit-data-clean is registered to workspace, the environment version is 2


In [None]:
custom_env_name = "credit-model-train"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for Credit Card Defaults pipeline",
    conda_file=os.path.join("environments", "model_train.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="1",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

### Step 3: Building the pipeline: component definition
Now that you have all assets required to run your pipeline, it's time to build the pipeline itself, using the Azure ML Python SDK v2.

Azure ML pipelines are reusable ML workflows that usually consist of several components. The typical life of a component is:

* Write the specification of the component.
* Optionally, register the component with a name and version in your workspace, to make it reusable and shareable.
* Load that component from the pipeline code.
* Implement the pipeline using this component inputs, outputs and parameters.
* Submit the pipeline.

In this tutorial we will be defining with our components programmatically using the Python SDK, but there is also the option to do so via YAML file configuration.

#### Step 3.2: Programmatically defining the components

##### Component #1

This component handles the cleaning of the data. The cleaning task is performed in the `clean_data.py` Python file. MLFlow will be used to log the parameters and metrics during our pipeline run.

In [13]:
%%writefile pipeline_components/clean_data/clean_data.py
import os
import argparse
import pandas as pd
import logging
import mlflow


def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--cleaned_data", type=str, help="out path to cleaned data")
    args = parser.parse_args()

    # Start Logging
    mlflow.start_run()
    credit_df_raw = pd.read_csv(args.data, header=0)
    credit_df_raw.dropna(inplace=True) 

    mlflow.log_metric("num_samples", credit_df_raw.shape[0])
    mlflow.log_metric("num_features", credit_df_raw.shape[1] - 1)

    # output paths are mounted as folder, therefore, we are adding a filename to the path
    credit_df_raw.to_csv(os.path.join(args.cleaned_data, "cleaned_data.csv"), index=False)

    # Stop Logging
    mlflow.end_run()

if __name__ == "__main__":
    main()

Writing pipeline_components/clean_data/clean_data.py


##### Component #2

This component handles the pre-processing of the data, performed in the `prep_data.py` Python file. This script performs the simple task of splitting the data into train and test datasets.

In [14]:
%%writefile pipeline_components/prep_data/prep_data.py
import os
import argparse
import pandas as pd
from sklearn.model_selection import train_test_split
import logging
import mlflow

def select_first_file(path):
    """Selects first file in folder, use under assumption there is only one file in folder
    Args:
        path (str): path to directory or file to choose
    Returns:
        str: full path of selected file
    """
    files = os.listdir(path)
    return os.path.join(path, files[0])


def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--test_train_ratio", type=float, required=False, default=0.25)
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    args = parser.parse_args()

    # Start Logging
    mlflow.start_run()

    print("input data:", args.data)
    
    credit_df = pd.read_csv(select_first_file(args.data), header=0)

    credit_train_df, credit_test_df = train_test_split(
        credit_df,
        test_size=args.test_train_ratio,
    )

    # output paths are mounted as folder, therefore, we are adding a filename to the path
    credit_train_df.to_csv(os.path.join(args.train_data, "data.csv"), index=False)
    credit_test_df.to_csv(os.path.join(args.test_data, "data.csv"), index=False)

    # Stop Logging
    mlflow.end_run()

if __name__ == "__main__":
    main()

Writing pipeline_components/prep_data/prep_data.py


##### Component #3

The last component that you'll create will consume the training and test data, train a tree based model and return the output model. You'll use Azure ML logging capabilities to record and visualize the learning progress.

In [15]:
%%writefile pipeline_components/train_model/train.py
import argparse
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import classification_report
import os
import pandas as pd
import mlflow


def select_first_file(path):
    """Selects first file in folder, use under assumption there is only one file in folder
    Args:
        path (str): path to directory or file to choose
    Returns:
        str: full path of selected file
    """
    files = os.listdir(path)
    return os.path.join(path, files[0])


# Start Logging
mlflow.start_run()

# enable autologging
mlflow.sklearn.autolog()

os.makedirs("./outputs", exist_ok=True)


def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    parser.add_argument("--n_estimators", required=False, default=100, type=int)
    parser.add_argument("--learning_rate", required=False, default=0.1, type=float)
    parser.add_argument("--registered_model_name", type=str, help="model name")
    parser.add_argument("--model", type=str, help="path to model file")
    args = parser.parse_args()

    # paths are mounted as folder, therefore, we are selecting the file from folder
    train_df = pd.read_csv(select_first_file(args.train_data))

    # Extracting the label column
    y_train = train_df.pop("default payment next month")

    # convert the dataframe values to array
    X_train = train_df.values

    # paths are mounted as folder, therefore, we are selecting the file from folder
    test_df = pd.read_csv(select_first_file(args.test_data))

    # Extracting the label column
    y_test = test_df.pop("default payment next month")

    # convert the dataframe values to array
    X_test = test_df.values

    print(f"Training with data of shape {X_train.shape}")

    clf = GradientBoostingClassifier(
        n_estimators=args.n_estimators, learning_rate=args.learning_rate
    )
    clf.fit(X_train, y_train)

    y_pred = clf.predict(X_test)

    print(classification_report(y_test, y_pred))

    # Registering the model to the workspace
    print("Registering the model via MLFlow")
    mlflow.sklearn.log_model(
        sk_model=clf,
        registered_model_name=args.registered_model_name,
        artifact_path=args.registered_model_name,
    )

    # Saving the model to a file
    mlflow.sklearn.save_model(
        sk_model=clf,
        path=os.path.join(args.model, "trained_model"),
    )

    # Stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()


Writing pipeline_components/train_model/train.py


#### Step 3.2: Registering the components within the workspace

In [None]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

src_dir = "./pipeline_components/clean_data"

data_clean_component = command(
    name="data_clean_credit_card_defaults",
    display_name="Data cleaning for credit training",
    inputs={
        "data": Input(type="uri_file")
    },
    outputs=dict(
        cleaned_data=Output(type="uri_folder", mode="rw_mount")
    ),
    # The source folder of the component
    code=src_dir,
    command="""python clean_data.py \
            --data ${{inputs.data}} --cleaned_data ${{outputs.cleaned_data}}
            """,
    environment="credit-data-clean:1",
)

# Now we register the component to the workspace
data_clean_component = ml_client.create_or_update(data_clean_component.component)

# Create (register) the component in your workspace
print(
    f"Component {data_clean_component.name} with Version {data_clean_component.version} is registered"
)

In [None]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

src_dir = "./pipeline_components/prep_data"

data_prep_component = command(
    name="data_prep_credit_card_defaults",
    display_name="Data prep for credit training",
    inputs={
        "data": Input(type="uri_folder")
    },
    outputs=dict(
        train_data=Output(type="uri_folder", mode="rw_mount"),
        test_data=Output(type="uri_folder", mode="rw_mount")
    ),
    # The source folder of the component
    code=src_dir,
    command="""python prep_data.py \
            --data ${{inputs.data}} --train_data ${{outputs.train_data}} \
            --test_data ${{outputs.test_data}}
            """,
    environment="credit-model-train:1",
)

# Now we register the component to the workspace
data_prep_component = ml_client.create_or_update(data_prep_component.component)

# Create (register) the component in your workspace
print(
    f"Component {data_prep_component.name} with Version {data_prep_component.version} is registered"
)

In [None]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

src_dir = "./pipeline_components/train_model"

train_component = command(
    name="credit_default_model_training",
    display_name="Credit defaults model training",
    inputs={
        "train_data": Input(type="uri_folder"),
        "test_data": Input(type="uri_folder"),
        "learning_rate": Input(type="number"),
        "registered_model_name": Input(type="string")
    },
    outputs=dict(
        model=Output(type="uri_folder", mode="rw_mount")
    ),
    # The source folder of the component
    code=src_dir,
    command="""python train.py \
            --train_data ${{inputs.train_data}} \
            --test_data ${{inputs.test_data}} \
            --learning_rate ${{inputs.learning_rate}}
            """,
    environment="credit-model-train:1",
)

# Now we register the component to the workspace
train_component = ml_client.create_or_update(train_component.component)

# Create (register) the component in your workspace
print(
    f"Component {train_component.name} with Version {train_component.version} is registered"
)

### Step 4: Create the pipeline from components
Now that all of the components are defined and registered, you can start implementing the pipeline.

Here, you'll use input data, split ratio and registered model name as input variables. Then call the components and connect them via their inputs /outputs identifiers. The outputs of each step can be accessed via the .outputs property.

To code the pipeline, we use a specific `@dsl.pipeline` decorator that identifies the Azure ML pipelines. In the decorator, we can specify the pipeline description and default resources like compute and storage. Like a Python function, pipelines can have inputs, you can then create multiple instances of a single pipeline with different inputs.

Here, we used input data, split ratio and registered model name as input variables. We then call the components and connect them via their inputs /outputs identifiers. The outputs of each step can be accessed via the .outputs property.

In [None]:
# the dsl decorator tells the sdk that we are defining an Azure Machine Learning pipeline
from azure.ai.ml import dsl, Input, Output

@dsl.pipeline(
    compute="ej-cluster2",  # "serverless" value runs pipeline on serverless compute
    description="E2E data_perp-train pipeline",
)
def credit_defaults_pipeline(
    pipeline_job_data_input,
    pipeline_job_learning_rate,
    pipeline_job_registered_model_name,
):
    # using data_prep_function like a python call with its own inputs

    data_clean_job = data_clean_component(
        data=pipeline_job_data_input
    )

    data_prep_job = data_prep_component(
        data=data_clean_job.outputs.cleaned_data,
    )

    # using train_func like a python call with its own inputs
    train_job = train_component(
        train_data=data_prep_job.outputs.train_data,  # note: using outputs from previous step
        test_data=data_prep_job.outputs.test_data,  # note: using outputs from previous step
        learning_rate=pipeline_job_learning_rate,  # note: using a pipeline input as parameter
        registered_model_name=pipeline_job_registered_model_name,
    )

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "pipeline_job_cleaned_data" : data_clean_job.outputs.cleaned_data,
        "pipeline_job_train_data": data_prep_job.outputs.train_data,
        "pipeline_job_test_data": data_prep_job.outputs.test_data,
    }

### Step 5: Define and submit the job
It's now time to submit the job to run in Azure ML. This time you'll use `create_or_update` on ml_client.jobs.

Here you'll also pass an experiment name. An experiment is a container for all the iterations one does on a certain project. All the jobs submitted under the same experiment name would be listed next to each other in Azure ML studio.

Once completed, the pipeline will register a model in your workspace as a result of training.

In [None]:
registered_model_name = "credit_defaults_model"

# Let's instantiate the pipeline with the parameters of our choice
pipeline = credit_defaults_pipeline(
    pipeline_job_data_input=Input(type="uri_file", path=credit_data.path),
    pipeline_job_learning_rate=0.05,
    pipeline_job_registered_model_name=registered_model_name,
)

In [None]:
# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="e2e_registered_components",
)
ml_client.jobs.stream(pipeline_job.name)

You can track the progress of your pipeline, by using the link generated in the cell above or in this notebook using the following code:


```python
    ml_client.jobs.stream(pipeline_job.name)
```

When you select on each component, you'll see more information about the results of that component. 
There are two important parts to look for at this stage:
* `Outputs+logs` > `user_logs` > `std_log.txt`
This section shows the script run sdtout.
* `Outputs+logs` > `Metric`
This section shows different logged metrics. In this example. mlflow `autologging`, has automatically logged the training metrics.