In [1]:
from azureml.core import Workspace,Dataset,Datastore

In [2]:
ws=Workspace.from_config()

In [5]:
import pandas as pd
import numpy as np

In [10]:
df=pd.read_csv("data/loan_data.csv")

In [11]:
dstore=ws.get_default_datastore()

In [17]:
Dataset.Tabular.register_pandas_dataframe(dataframe=df,target=(dstore,"/loandata"),name="loans",description="loan dataset")

Validating arguments.
Arguments validated.
Successfully obtained datastore reference and path.
Uploading file to /loandata/cbd512e6-0730-47fe-bf88-46ff95382a81/
Successfully uploaded file to datastore.
Creating and registering a new dataset.
Successfully created and registered a new dataset.


{
  "source": [
    "('workspaceblobstore', '/loandata/cbd512e6-0730-47fe-bf88-46ff95382a81/')"
  ],
  "definition": [
    "GetDatastoreFiles",
    "ReadParquetFile",
    "DropColumns"
  ],
  "registration": {
    "id": "9174958c-e81a-483b-9043-2e28ba018501",
    "name": "loans",
    "version": 1,
    "description": "loan dataset",
    "workspace": "Workspace.create(name='eymlws', subscription_id='afe91b36-9760-4bb8-9dd6-72761af8d4ef', resource_group='eymlops')"
  }
}

In [18]:
loans_ds=ws.datasets['loans']

In [19]:
loans_ds

{
  "source": [
    "('workspaceblobstore', '/loandata/cbd512e6-0730-47fe-bf88-46ff95382a81/')"
  ],
  "definition": [
    "GetDatastoreFiles",
    "ReadParquetFile",
    "DropColumns"
  ],
  "registration": {
    "id": "9174958c-e81a-483b-9043-2e28ba018501",
    "name": "loans",
    "version": 1,
    "description": "loan dataset",
    "workspace": "Workspace.create(name='eymlws', subscription_id='afe91b36-9760-4bb8-9dd6-72761af8d4ef', resource_group='eymlops')"
  }
}

In [20]:
compute_target=ws.compute_targets["akt-cluster"]

You will need to create a configuration object that will dictate the use of the 
AzureML-Tutorial Environment when each step gets executed. To do that, 
you will need to create a RunConfiguration using the following code:

In [21]:
from azureml.core import RunConfiguration

In [23]:
runconfig=RunConfiguration()
runconfig.environment=ws.environments["AzureML-lightgbm-3.2-ubuntu18.04-py37-cpu"]

You will then need to define a temporary storage folder where the first step will drop 
the output files. You will use the PipelineData class using the following code:

In [24]:
from azureml.pipeline.core import PipelineData

In [26]:
step01_output=PipelineData(
    "training_data",
    datastore=ws.get_default_datastore(),
    is_directory=True)

In this code, you are creating an intermediate data location named training_
data, which is stored as a folder in the default datastore that is registered in your 
AzureML workspace. You should not care about the actual path of this temporary 
data, but if you are curious, the actual path of that folder in the default storage 
container is something like azureml/{step01_run_id}/training_data.

## Now that you have all the prerequisites for your pipeline's first step, it is time to define it.

In [27]:
from azureml.pipeline.steps import PythonScriptStep

In [28]:
step01=PythonScriptStep(script_name="prepare_data.py",
                        source_directory="step01",
                        arguments = [
                             "--dataset", loans_ds.as_named_input('loans'), 
                             "--output-path", step01_output],
                        name="Prepare data",
                        runconfig=runconfig,
                        compute_target=compute_target,
                        outputs=[step01_output],
                        allow_reuse=True)

This code defines a PythonScriptStep that will be using the source code in the 
step01 folder. It will execute the script named prepare_data.py, passing the 
following arguments:

• --dataset: This passes the loans_ds dataset ID to that variable. This dataset 
ID is a unique GUID representing the specific version of the dataset as it is 
registered within your AzureML workspace. The code goes one step further and 
makes a call to the as_named_input method. This method is available in both 
FileDataset and TabularDataset and is only applicable when a Run
executes within the AzureML workspace. To invoke the method, you must provide 
a name, in this case, loans, that can be used within the script to retrieve the 
dataset. The AzureML SDK will make the TabularDataset object available 
within the prepare_data.py script in the input_datasets dictionary of the 
run object. Within the prepare_data.py script, you can get a reference to that 
dataset using the following code:

    
run = Run.get_context()

loans_dataset = run.input_datasets["loans"]


• --output-path: This passes the PipelineData object you created in previous steps. 
This parameter will be a string representing a path where the script can store its 
output files. The datastore location is mounted to the local storage of the compute 
node that is about to execute the specific step. This mounting path is passed to 
the script, allowing your script to transparently write the outputs directly to 
the datastore.

The allow_reuse Boolean parameter allows you to reuse the outputs of this 
PythonScriptStep if the inputs of the script and the source code within the 
step01 folder haven't changed since the last execution of the pipeline.

## Defining the prerequisites for the second step

step02 requires a parameter named learning_rate. Instead of hardcoding the learning 
rate hyperparameter of the LightGBM model within your training script, you will 
use a PipelineParameter to pass in this value. This parameter will be defined at 
Pipeline level, and it will be passed to the training script as an argument

In [30]:
from azureml.pipeline.core import PipelineParameter

In [32]:
learning_rate_param = PipelineParameter( name="learning_rate", default_value=0.05)

You will store the trained model in the /models/loans/ folder of the default 
datastore attached to the AzureML workspace. To specify the exact location where 
you want to store the files, you will use the OutputFileDatasetConfig class.

In [33]:
from azureml.data import OutputFileDatasetConfig

datastore = ws.get_default_datastore()

step02_output = OutputFileDatasetConfig(
 name = "model_store",
 destination = (datastore, '/models/loans/'))

Now that you have all the prerequisites defined, you can define the second step of 
your Pipeline.

In [37]:
step02 = PythonScriptStep(
 'train_model.py', 
 source_directory='step02',
 arguments = [
 "--learning-rate", learning_rate_param,
 "--input-path", step01_output,
 "--output-path", step02_output],
 name='Train model',
 runconfig=runconfig,
 compute_target=compute_target,
 inputs=[step01_output],
 outputs=[step02_output]
)

Similar to the step_01 folder you created in previous steps; this code defines 
a PythonScriptStep that will invoke the train_model.py script located in 
the step02 folder. It will populate the --learning-rate argument using the 
value passed to the PipelineParameter you defined in previous steps. It will also pass 
the output of step_01 to the --input-path argument. 

Note that step01_output is also added to the list of inputs of this PythonScriptStep. This forces 
step_02 to wait for step_01 to complete in order to consume the data stored in 
step01_output. 

The last script argument is --output-path, where you pass 
the OutputFileDatasetConfig object you created in the previous step. This 
object is also added to the list of outputs of this PythonScriptStep

## It's time to define the actual Pipeline you have been building so far

In [38]:
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[step01, 
step02])

## To execute the pipeline, you will need to submit it in an Experiment.

In [39]:
from azureml.core import Experiment

experiment = Experiment(ws, "pipeline_exp")
pipeline_run = experiment.submit(
 pipeline,
 pipeline_parameters= {
 "learning_rate" : 0.5
 }
)
pipeline_run.wait_for_completion()

Created step Prepare data [88b2da09][3690b903-651a-4e18-9819-48bc1e7812df], (This step will run and generate new outputs)
Created step Train model [d6fb068c][5d3ab1b2-c8c8-46ba-8e44-7dba9768b2f2], (This step will run and generate new outputs)
Submitted PipelineRun 5f85466f-1b84-40ae-ad01-2cf7a9d9cc04
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/5f85466f-1b84-40ae-ad01-2cf7a9d9cc04?wsid=/subscriptions/afe91b36-9760-4bb8-9dd6-72761af8d4ef/resourcegroups/eymlops/workspaces/eymlws&tid=bdf67597-3a9e-4e19-958e-1d8222afa3de
PipelineRunId: 5f85466f-1b84-40ae-ad01-2cf7a9d9cc04
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/5f85466f-1b84-40ae-ad01-2cf7a9d9cc04?wsid=/subscriptions/afe91b36-9760-4bb8-9dd6-72761af8d4ef/resourcegroups/eymlops/workspaces/eymlws&tid=bdf67597-3a9e-4e19-958e-1d8222afa3de
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: f4aea2da-ed30-4905-b0de-f3093539eb67
Link to Azure Machine Learning Portal: https://ml.azure

'Finished'

Suppose you try to rerun the pipeline by executing the code you wrote in previous steps for 
a second time. In that case, you will notice that the execution will be almost instant (just 
a few seconds compared to the minute-long execution you should have seen the first 
time). The pipeline detected that no input had changed and reused the outputs of the 
previously executed steps. This demonstrates what the allow_reuse=True
does, and it also proves that even though we didn't specify that parameter, the 
default value is True. This means that, by default, all steps will reuse previous executions 
if the inputs and the code files are the same as the ones of an earlier execution. If you want 
to force a retrain even if the same learning_rate variable is passed to the pipeline, 
you can specify allow_reuse=False

## Please Note : 
If you wanted to pass the training dataset as a PipelineParameter, 
you would have to use the following code:

from azureml.data.dataset_consumption_config 
import DatasetConsumptionConfig

ds_pipeline_param = 
PipelineParameter(name="dataset ", default_
value=loans_ds)

dataset_consumption = DatasetConsumptionConfig("loans", ds_pipeline_param)

Using this code and passing the dataset_consumption object
instead of loans_ds.as_named_input('loans') would allow 
you to select the input dataset and its version while submitting a pipeline 
to execute.

## So far, you have defined a pipeline that executes two Python scripts. step_01
pre-processes the training data and stores it in an intermediate data store for step_02
to pick up. From there, the second step trains a LightGBM model and stores it in the /
models/loans/ folder of the default datastore attached to the AzureML workspace.

## Troubleshooting code issues

What happens if a script has a coding issue 
or if a dependency is missing? In that case, your pipeline will fail. In the graphical 
representation you saw in experiment, you will be able to identify the failing step. If you 
want to get the details of a specific child step, you will have to first locate it using find_
step_run of the pipeline_run object you got when you executed the pipeline. 

In [40]:
train_step_run = pipeline_run.find_step_run("Train model")[0]
train_step_run.get_details_with_logs()


{'runId': '550eedd7-8aab-43c5-8a9c-4c0e3092088a',
 'target': 'akt-cluster',
 'status': 'Completed',
 'startTimeUtc': '2022-10-27T13:46:12.137293Z',
 'endTimeUtc': '2022-10-27T13:46:29.833216Z',
 'services': {},
 'properties': {'ContentSnapshotId': '94d70895-9222-4018-bdbc-2cb13af3f02d',
  'StepType': 'PythonScriptStep',
  'ComputeTargetType': 'AmlCompute',
  'azureml.moduleid': '5d3ab1b2-c8c8-46ba-8e44-7dba9768b2f2',
  'azureml.moduleName': 'Train model',
  'azureml.runsource': 'azureml.StepRun',
  'azureml.nodeid': 'd6fb068c',
  'azureml.pipelinerunid': '5f85466f-1b84-40ae-ad01-2cf7a9d9cc04',
  'azureml.pipeline': '5f85466f-1b84-40ae-ad01-2cf7a9d9cc04',
  'azureml.pipelineComponent': 'masterescloud',
  '_azureml.ComputeTargetType': 'amlctrain',
  'ProcessInfoFile': 'azureml-logs/process_info.json',
  'ProcessStatusFile': 'azureml-logs/process_status.json'},
 'inputDatasets': [],
 'outputDatasets': [{'identifier': {'savedId': '7d23aeb9-c3ea-4983-b7cb-1135cd7a896e'},
   'outputType': 'R

This code finds all steps with the name Train model, and you select the first result located 
on the 0 index. This retrieves a StepRun object, which is for the step_02 folder you 
defined in the previous section. StepRun inherits from the base Run class, exposing 
the get_details_with_logs method that is also available in the ScriptRun class

## Publishing a pipeline to expose it as an endpoint

So far, you have defined a pipeline using the AzureML SDK. If you had to restart the 
kernel of your Jupyter notebook, you would lose the reference to the pipeline you defined, 
and you would have to rerun all the cells to recreate the pipeline object. The AzureML 
SDK allows you to publish a pipeline that effectively registers it as a versioned object 
within the workspace. Once a pipeline is published, it can be submitted without the 
Python code that constructed it

In [41]:
published_pipeline = pipeline.publish(
 "Loans training pipeline", 
 description="A pipeline to train a LightGBM model")

This code publishes the pipeline and returns a PublishedPipeline object, the 
versioned object registered within the workspace. The most interesting attribute of that 
object is the endpoint, which returns the REST endpoint URL to trigger the execution 
of the specific pipeline.

To invoke the published pipeline, you will need an authentication header. To acquire this 
security header, you can use the InteractiveLoginAuthentication class, as seen 
in the following code snippet:

In [43]:
from azureml.core.authentication import InteractiveLoginAuthentication
auth = InteractiveLoginAuthentication()
aad_token = auth.get_authentication_header()

Then you can use the Python requests package to make a POST request to the specific 
endpoint using the following code:

In [45]:
import requests
response = requests.post(published_pipeline.endpoint, 
 headers=aad_token, 
 json={"ExperimentName": "pipeline_run_test",
 "ParameterAssignments": {"learning_rate" : 0.02}})
print(f"Made a POST request to {published_pipeline.endpoint} and got {response.status_code}.")


Made a POST request to https://eastus.api.azureml.ms/pipelines/v1.0/subscriptions/afe91b36-9760-4bb8-9dd6-72761af8d4ef/resourceGroups/eymlops/providers/Microsoft.MachineLearningServices/workspaces/eymlws/PipelineRuns/PipelineSubmit/9539b997-83c7-48e6-920f-c36ba5419b52 and got 200.


In [46]:
print(f"The portal url for the run is {response.json()['RunUrl']}")

The portal url for the run is https://ml.azure.com/experiments/pipeline_run_test/runs/9d6dcbf5-e0d2-4efc-8194-1abae2a14256?tid=bdf67597-3a9e-4e19-958e-1d8222afa3de&wsid=/subscriptions/afe91b36-9760-4bb8-9dd6-72761af8d4ef/resourcegroups/eymlops/workspaces/eymlws


This code only needs the URL and not the actual pipeline code. If you ever lose 
the endpoint URL, you can retrieve it by code through the list method of the 
PublishedPipeline class, which enumerates all the published pipelines registered in 
the workspace. The preceding script invokes the REST endpoint using the HTTP POST 
verb and passing the value 0.02 as the learning_rate parameter. 

Once you select a pipeline, you can trigger it using a graphical wizard that allows you to 
specify the pipeline parameters and the experiment under which the pipeline will execute.

## Scheduling a recurring pipeline

Being able to invoke a pipeline through the published REST endpoint is great when you 
have third-party systems that need to invoke a training process after a specific event has 
occurred. For example, suppose you are using Azure Data Factory to copy data from your 
on-premises databases. You could use the Machine Learning Execute Pipeline activity 

If you wanted to schedule the pipeline to be triggered monthly, you would need to publish 
the pipeline as you did in the previous section, get the published pipeline ID, create 
a ScheduleRecurrence, and then create the Schedule.

In [48]:
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule
from datetime import datetime

recurrence = ScheduleRecurrence(frequency="Month", 
                                interval=1, 
                                start_time = datetime.now())

schedule = Schedule.create(workspace=ws, name="pipeline-schedule",
                           pipeline_id=published_pipeline.id, 
                           experiment_name="pipeline-scheduled-run",
                           recurrence=recurrence,
                           wait_for_provisioning=True,
                           description="Schedule to retrain model")

print("Created schedule with id: {}".format(schedule.id))

Provisioning status: Completed
Created schedule with id: 9f0ddb70-34dc-4716-aaf2-7a1d68c8e5f3


In this code, you define a ScheduleRecurrence with monthly frequency. By 
specifying the start_time = datetime.now(), you are preventing the immediate 
execution of the pipeline, which is the default behavior when creating a new Schedule. 
Once you have the recurrence you want to use, you can schedule the pipeline execution 
by calling the create method of the Schedule class. You are passing in the ID of the 
published_pipeline you want to trigger, and you specify the experiment name 
under which each execution will occur

## If you want to disable a scheduled execution, you can use the disable method of the Schedule class. The following code disables all scheduled pipelines in your workspace:

In [49]:
from azureml.pipeline.core.schedule import Schedule

schedules = Schedule.list(ws, active_only=True) 
print("Your workspace has the following schedules set up:")
for schedule in schedules:
    print(f"Disabling {schedule.id} (Published pipeline: {schedule.pipeline_id}")
    schedule.disable(wait_for_provisioning=True)

Your workspace has the following schedules set up:
Disabling 9f0ddb70-34dc-4716-aaf2-7a1d68c8e5f3 (Published pipeline: 9539b997-83c7-48e6-920f-c36ba5419b52
Provisioning status: Completed


This code lists all active schedules within the workspace and then disables them one by 
one. Make sure you don't accidentally disable a pipeline that should have been scheduled 
in your workspace. 
