In [19]:
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient, Input, Output, load_component
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import Environment, ResourceConfiguration
from azure.ai.ml.constants import AssetTypes, InputOutputModes
from azure.ai.ml.parallel import parallel_run_function, RunFunction

In [20]:
try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

In [21]:
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

# Retrieve an already attached Azure Machine Learning Compute.
cpu_compute_target = "aml-eli-cluster-strong"
print(ml_client.compute.get(cpu_compute_target))

Found the config file in: /config.json


enable_node_public_ip: true
id: /subscriptions/e02ff02a-3d3d-4fa7-828d-54c7d1f4899f/resourceGroups/aauki_eli_demos/providers/Microsoft.MachineLearningServices/workspaces/aauki_eli_demos_amlws01/computes/aml-eli-cluster-strong
idle_time_before_scale_down: 120
location: uksouth
max_instances: 17
min_instances: 0
name: aml-eli-cluster-strong
provisioning_state: Succeeded
size: Standard_D13_v2
ssh_public_access_enabled: false
tags:
  contact: eli.kling"avanade.com
tier: dedicated
type: amlcompute



# 2. Define components and jobs in pipeline

## 2.1 Load existing command component

In [22]:
scenario_prep_component = load_component(
    source="../components/scenario_prep/scenario_prep.yaml"
)

## 2.2 Declare parallel job by `parallel_run_function`


In [23]:
# Declare parallel job with run_function task
parallel_bird_models_component = parallel_run_function(
    name="parallel_bird_models",
    display_name="run bird models in parallel",
    description="run bird models in parallel configured in previus step via CSVs",
    tags={
        "contact": "eli.kling@avanade.com",
        "type": "Demo",
    },
    inputs=dict(
        input_data=Input(
            type=AssetTypes.URI_FOLDER,
            description="Folder holding CSV files defining the configuration of the scenarios",
            mode=InputOutputModes.DOWNLOAD,
        ),
    ),
    outputs=dict(
        job_output_folder=Output(
            type=AssetTypes.URI_FOLDER,
            mode=InputOutputModes.RW_MOUNT,
        ),
    ),
    input_data="${{inputs.input_data}}",  # Define which input data will be splitted into mini-batches
    instance_count=2,  # Use 2 nodes from compute cluster to run this parallel job.
    max_concurrency_per_instance=2,  # Create 2 worker processors in each compute node to execute mini-batches.
    error_threshold=-1,  # Monitor the failures of item processed by the gap between mini-batch input count and returns. 'Many model training' scenario doesn't fit this setting and '-1' means ignore counting failure items by mini-batch returns.
    mini_batch_error_threshold=5,  # Monitor the failed mini-batch by exception, time out, or null return. When failed mini-batch count is higher than this setting, the parallel job will be marked as 'failed'.
    retry_settings=dict(
        max_retries=2,  # Define how many retries when mini-batch execution is failed by exception, time out, or null return.
        timeout=60,  # Define the timeout in second for each mini-batch execution.
    ),
    logging_level="DEBUG",
    environment_variables={
        "AZUREML_PARALLEL_EXAMPLE": "notebook",
    },
    task=RunFunction(
        code="../components/parallel_bird_models/",
        entry_script="parallel_bird_models.py",
        environment=Environment(
            image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04",
            conda_file="../components/parallel_bird_models/conda.yaml",
        ),
        program_arguments="--input_data ${{inputs.input_data}} "
        "--output_folder ${{outputs.job_output_folder}} ",
    ),
)

# 3. Build pipeline

In [24]:
# Declare pipeline structure.
@pipeline(
    display_name="parallel job for many bird model scenarios",
)
def partition_job_in_pipeline():
    # Declare 1st data partition command job.
    scenario_prep_step = scenario_prep_component()

    # Declare 2nd parallel model training job.
    parallel_bird_models_step = parallel_bird_models_component(
        input_data=scenario_prep_step.outputs.output_folder,
    )

    # User could override parallel job run-level property when invoke that parallel job/component in pipeline.
    parallel_bird_models_step.resources.instance_count = 3
    parallel_bird_models_step.max_concurrency_per_instance = 2
    parallel_bird_models_step.mini_batch_error_threshold = 10
    # parallel_bird_models_step.outputs.job_output_file.path = "azureml://datastores/${{default_datastore}}/paths/${{name}}/aggregated_returns.csv"


# Create pipeline instance
my_job = partition_job_in_pipeline()

# Set pipeline level compute
my_job.tags.update
my_job.settings.default_compute = cpu_compute_target
my_job.settings.ForceRerun = True

In [25]:
print(my_job)

display_name: parallel job for many bird model scenarios
type: pipeline
jobs:
  scenario_prep_step:
    type: command
    component:
      name: scenario_prep
      display_name: scenario configuration
      type: command
      outputs:
        output_folder:
          type: uri_folder
          mode: rw_mount
      command: python scenario_prep.py  --output_folder ${{outputs.output_folder}};
      environment:
        name: CliV2AnonymousEnvironment
        version: 17c46db455cf0e6f082f5942c5f2d33e
        image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest
        conda_file:
          name: birdmodel
          channels:
          - conda-forge
          dependencies:
          - python=3.10
          - pip
          - pip:
            - mlflow
            - azureml-mlflow
      code: /mnt/batch/tasks/shared/LS_root/mounts/clusters/uki-aai-eli-aml01/code/Users/eli.kling/ado/azure_machine_learning/Exploration/bird_model/components/scenario_prep
      is_deterministic: tru

# 4. Submit pipeline job

In [26]:
pipeline_job = ml_client.jobs.create_or_update(
    my_job,
    experiment_name="Notebook-parallel-job",
)
pipeline_job

[32mUploading parallel_bird_models (0.01 MBs): 100%|██████████| 5093/5093 [00:00<00:00, 41813.81it/s]
[39m



Experiment,Name,Type,Status,Details Page
Notebook-parallel-job,joyful_wing_9kzl8mxtq9,pipeline,Preparing,Link to Azure Machine Learning studio


In [27]:
# wait until the job completes
ml_client.jobs.stream(pipeline_job.name)

RunId: joyful_wing_9kzl8mxtq9
Web View: https://ml.azure.com/runs/joyful_wing_9kzl8mxtq9?wsid=/subscriptions/e02ff02a-3d3d-4fa7-828d-54c7d1f4899f/resourcegroups/aauki_eli_demos/workspaces/aauki_eli_demos_amlws01

Streaming logs/azureml/executionlogs.txt

[2024-03-10 11:58:19Z] Submitting 1 runs, first five are: c7cdc814:e3aa2b7e-9a0e-410e-9026-da0345f0c88d
[2024-03-10 12:04:21Z] Completing processing run id e3aa2b7e-9a0e-410e-9026-da0345f0c88d.
[2024-03-10 12:04:21Z] Submitting 1 runs, first five are: b0cc04af:348fd0e7-0920-4451-90c1-c735a83cbe27
[2024-03-10 12:10:30Z] Completing processing run id 348fd0e7-0920-4451-90c1-c735a83cbe27.

Execution Summary
RunId: joyful_wing_9kzl8mxtq9
Web View: https://ml.azure.com/runs/joyful_wing_9kzl8mxtq9?wsid=/subscriptions/e02ff02a-3d3d-4fa7-828d-54c7d1f4899f/resourcegroups/aauki_eli_demos/workspaces/aauki_eli_demos_amlws01

