# Creating a pipeline job

In [3]:
from azure.identity import AzureCliCredential
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
#import mlflow
import argparse
import pandas as pd
from azure.ai.ml import MLClient

credentials = AzureCliCredential(tenant_id='79fa5f3d-6f5b-4abe-a5bc-25abcb328320')

try:
    ml_client = MLClient.from_config(credential=credentials)
except Exception as ex:
    # NOTE: Update following workspace information if not correctly configure before
    client_config = {
        "subscription_id": "a8b0cced-98ab-4669-bf6a-a77f4c102009",
        "resource_group": "ml_resource",
        "workspace_name": "ml_workspace",
    }

    if client_config["subscription_id"].startswith("<"):
        print(
            "please update your <SUBSCRIPTION_ID> <RESOURCE_GROUP> <AML_WORKSPACE_NAME> in notebook cell"
        )
        raise ex
    else:  # write and reload from config file
        import json, os

        config_path = "../config/config.json"
        os.makedirs(os.path.dirname(config_path), exist_ok=True)
        with open(config_path, "w") as fo:
            fo.write(json.dumps(client_config))
        ml_client = MLClient.from_config(credential=credentials, path=config_path)
print(ml_client)

from azure.ai.ml.entities import AmlCompute

# specify aml compute name.
cpu_compute_target = "cpu-cluster"

try:
    ml_client.compute.get(cpu_compute_target)
except Exception:
    print("Creating a new cpu compute target...")
    compute = AmlCompute(
        name=cpu_compute_target, size="STANDARD_E16S_V3", min_instances=0, max_instances=4
    )
    ml_client.compute.begin_create_or_update(compute).result()
    

Found the config file in: ../config/config.json


MLClient(credential=<azure.identity._credentials.azure_cli.AzureCliCredential object at 0x7fcea09c4ac0>,
         subscription_id=a8b0cced-98ab-4669-bf6a-a77f4c102009,
         resource_group_name=ml_resource,
         workspace_name=ml_workspace)


## Pipeline job environment

In [4]:
import os

dependencies_dir = "./dependencies"
os.makedirs(dependencies_dir, exist_ok=True)

#### create the conda dependencies- required libraries

In [5]:
%%writefile {dependencies_dir}/conda.yaml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.8
  - numpy=1.21.2
  - pip=21.2.4
  - scikit-learn=0.24.2
  - scipy=1.7.1
  - pandas>=1.1,<1.2
  - pip:
    - inference-schema[numpy-support]==1.3.0
    - xlrd==2.0.1
    - mlflow== 2.4.1
    - azureml-mlflow==1.51.0

Overwriting ./dependencies/conda.yaml


##### create the environment

In [6]:
from azure.ai.ml.entities import Environment

custom_env_name = "aml-scikit-learn"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for Credit Card Defaults pipeline",
    tags={"scikit-learn": "0.24.2"},
    conda_file=os.path.join(dependencies_dir, "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="0.2.0",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name aml-scikit-learn is registered to workspace, the environment version is 0.2.0


## data prep job component

In [7]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

prep_data_component = command(
    inputs=dict(
        prep_input=Input(
            type="uri_file",
            path="/Users/ejenamvictor/Desktop/mlops_classicRegression/data/taxi-data.csv",
        )
    ),
    outputs=dict(
        train_data=Output(type="uri_folder",
                          path='azureml://subscriptions/a8b0cced-98ab-4669-bf6a-a77f4c102009/resourcegroups/ml_resource/workspaces/ml_workspace/datastores/workspaceblobstore/paths/LocalUpload/9292ec840b5d1db6306dba71da69ab7f/train_data',
                          mode="rw_mount"),
        val_data=Output(type="uri_folder",
                        path='azureml://subscriptions/a8b0cced-98ab-4669-bf6a-a77f4c102009/resourcegroups/ml_resource/workspaces/ml_workspace/datastores/workspaceblobstore/paths/LocalUpload/9292ec840b5d1db6306dba71da69ab7f/val_data',
                        mode="rw_mount"),
        test_data=Output(type="uri_folder",
                         path='azureml://subscriptions/a8b0cced-98ab-4669-bf6a-a77f4c102009/resourcegroups/ml_resource/workspaces/ml_workspace/datastores/workspaceblobstore/paths/LocalUpload/9292ec840b5d1db6306dba71da69ab7f/test_data',
                         mode="rw_mount"),
    ),
    code="./src/",  # location of source code
    command="python prep.py --raw_data ${{inputs.prep_input}} --train_data ${{outputs.train_data}} --val_data ${{outputs.val_data}} --test_data ${{outputs.test_data}}",
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
    compute="cpu-cluster", # delete this line to use serverless compute
)


In [9]:
prep_data_component = ml_client.create_or_update(prep_data_component.component)
# Create (register) the component in your workspace
print(
    f"Component {prep_data_component.name} with Version {prep_data_component.version} is registered"
)

Component azureml_anonymous with Version 1 is registered


## train and test component

In [10]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

train_data_component = command(
    inputs={
        "train_input":Input(
            type="uri_folder",
            path="azureml://subscriptions/a8b0cced-98ab-4669-bf6a-a77f4c102009/resourcegroups/ml_resource/workspaces/ml_workspace/datastores/workspaceblobstore/paths/LocalUpload/9292ec840b5d1db6306dba71da69ab7f/train_data/",
            #mode="rw_mount",
        ),
        "test_data":Input(type="uri_folder",
                         path="azureml://subscriptions/a8b0cced-98ab-4669-bf6a-a77f4c102009/resourcegroups/ml_resource/workspaces/ml_workspace/datastores/workspaceblobstore/paths/LocalUpload/9292ec840b5d1db6306dba71da69ab7f/test_data/"),
        "n_estimators": 100,
        "max_depth": 10,
        "max_features": "auto",
        "min_samples_leaf": 1,
        "min_samples_split": 2,
        "registered_model_name": "RFregressor"
    },
    outputs={
        "model_output":Output(type="uri_folder",
                          path='azureml://subscriptions/a8b0cced-98ab-4669-bf6a-a77f4c102009/resourcegroups/ml_resource/workspaces/ml_workspace/datastores/workspaceblobstore/paths/LocalUpload/9292ec840b5d1db6306dba71da69ab7f/model_folder',),
        "evaluation_output":Output(type="uri_folder",
                          path='azureml://subscriptions/a8b0cced-98ab-4669-bf6a-a77f4c102009/resourcegroups/ml_resource/workspaces/ml_workspace/datastores/workspaceblobstore/paths/LocalUpload/9292ec840b5d1db6306dba71da69ab7f/eval_outputs',)
                         
                          },
    code="./src/train/",  # location of source code
    command="python train.py --training_data ${{inputs.train_input}} --n_estimators ${{inputs.n_estimators}} --max_depth ${{inputs.max_depth}} --max_features ${{inputs.max_features}} --min_samples_leaf ${{inputs.min_samples_leaf}} --test_data ${{inputs.test_data}} --evaluation_output ${{outputs.evaluation_output}}\
    --min_samples_split ${{inputs.min_samples_split}} --registered_model_name ${{inputs.registered_model_name}} --model ${{outputs.model_output}} ", 
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
    compute="cpu-cluster", # delete this line to use serverless compute
    display_name="traininig-model",
    experiment_name="taxi-training",
)

In [11]:
train_data_component = ml_client.create_or_update(train_data_component.component)
train_data_component

CommandComponent({'intellectual_property': None, 'auto_increment_version': False, 'source': 'REMOTE.WORKSPACE.COMPONENT', 'is_anonymous': True, 'auto_delete_setting': None, 'name': 'azureml_anonymous', 'description': None, 'tags': {}, 'properties': {}, 'print_as_yaml': True, 'id': '/subscriptions/a8b0cced-98ab-4669-bf6a-a77f4c102009/resourceGroups/ml_resource/providers/Microsoft.MachineLearningServices/workspaces/ml_workspace/components/azureml_anonymous/versions/3246f705-8df1-4929-8e21-05bde4af2d1a', 'Resource__source_path': None, 'base_path': '/Users/ejenamvictor/Desktop/mlproject_pipeline/codes', 'creation_context': <azure.ai.ml._restclient.v2022_10_01.models._models_py3.SystemData object at 0x7fcea79348e0>, 'serialize': <msrest.serialization.Serializer object at 0x7fcea791bd90>, 'command': 'python train.py --training_data ${{inputs.train_input}} --n_estimators ${{inputs.n_estimators}} --max_depth ${{inputs.max_depth}} --max_features ${{inputs.max_features}} --min_samples_leaf ${{in

# Define the pipeline

In [13]:
# the dsl decorator tells the sdk that we are defining an Azure Machine Learning pipeline
from azure.ai.ml import dsl, Input, Output

cpu_compute_target = "cpu-cluster"
@dsl.pipeline(
    compute=cpu_compute_target
    #if (cpu_cluster)
    #else "serverless",  # "serverless" value runs pipeline on serverless compute
    #description="E2E data_perp-train pipeline",
)
def taxi_price_pipeline(
    pipeline_job_data_input,
    pipeline_job_registered_model_name,
):
    # using data_prep_function like a python call with its own inputs
    data_prep_job = prep_data_component(
        prep_input=pipeline_job_data_input,
    )

    # using train_func like a python call with its own inputs
    train_job = train_data_component(
        train_input=data_prep_job.outputs.train_data,  # note: using outputs from previous step
        test_data = data_prep_job.outputs.test_data, # note: using a pipeline input as parameter
        registered_model_name=pipeline_job_registered_model_name,
    )
    # keys will code for the pipeline output identifier
    return {
        "pipeline_job_train_data": data_prep_job.outputs.train_data,
        "pipeline_job_test_data": data_prep_job.outputs.test_data,
    }

In [14]:
registered_model_name = "RFregressor"

# Let's instantiate the pipeline with the parameters of our choice
pipeline = taxi_price_pipeline(
    pipeline_job_data_input=Input(type="uri_file", path="/Users/ejenamvictor/Desktop/mlops_classicRegression/data/taxi-data.csv"),
    pipeline_job_registered_model_name=registered_model_name,
)

In [15]:
# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="taxi_price_registered_components",
)
ml_client.jobs.stream(pipeline_job.name)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


RunId: busy_stone_6gvzjp0bj8
Web View: https://ml.azure.com/runs/busy_stone_6gvzjp0bj8?wsid=/subscriptions/a8b0cced-98ab-4669-bf6a-a77f4c102009/resourcegroups/ml_resource/workspaces/ml_workspace

Streaming logs/azureml/executionlogs.txt

[2023-08-31 20:11:12Z] Completing processing run id b9c1db9d-081d-441d-8ec0-5f7531eefdde.
[2023-08-31 20:11:13Z] Completing processing run id adaac684-cf6f-4e18-bd67-e7d931ec11dd.
[2023-08-31 20:11:14Z] Finishing experiment: no runs left and nothing to schedule.

Execution Summary
RunId: busy_stone_6gvzjp0bj8
Web View: https://ml.azure.com/runs/busy_stone_6gvzjp0bj8?wsid=/subscriptions/a8b0cced-98ab-4669-bf6a-a77f4c102009/resourcegroups/ml_resource/workspaces/ml_workspace

