In [1]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

credentials = DefaultAzureCredential()

ml_client = MLClient(
    credential=credentials,
    subscription_id="a9ea30e6-5728-4375-ac04-cbe4918c3def",
    resource_group_name="rg-training",
    workspace_name="ws-training"
)

In [2]:
ws = ml_client.workspaces.get("ws-training")
print(ws.location, ":", ws.resource_group)

eastus : rg-training


In [3]:
# Access registered data asset
credit_data = ml_client.data.get(name="credit-card", version="initial")
print(f"Data asset URI: {credit_data.path}")

Data asset URI: azureml://subscriptions/a9ea30e6-5728-4375-ac04-cbe4918c3def/resourcegroups/rg-training/workspaces/ws-training/datastores/workspaceblobstore/paths/LocalUpload/f4315d633696cbc4576bf53f59a90e8f/default_of_credit_card_clients.csv


# Create job environment for pipeline steps

In [4]:
import os

dependencies_dir = "./dependencies"
os.makedirs(dependencies_dir, exist_ok=True)

In [5]:
%%writefile {dependencies_dir}/conda.yaml
name: model-env
channels: 
    - conda-forge
dependencies:
    - python=3.8
    - numpy=1.21.2
    - pip=21.2.4
    - scikit-learn=0.24.2
    - scipy=1.7.1
    - pandas>=1.1,<1.2
    - pip:
        - inference-schema[numpy-support]==1.3.0
        - xlrd==2.0.1
        - mlflow==2.4.1
        - azureml-mlflow==1.51.0

Overwriting ./dependencies/conda.yaml


In [7]:
from azure.ai.ml.entities import Environment

custom_env_name = "aml-scikit-learn"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for Credit Card Defaults pipeline",
    tags={"scikit-learn": "0.24.2"},
    conda_file=os.path.join(dependencies_dir, "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="0.3.0",
)

pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name aml-scikit-learn is registered to workspace, the environment version is 0.3.0


# Build the training pipeline

In [8]:
data_prep_src_dir = "./components/data_prep"
os.makedirs(data_prep_src_dir, exist_ok=True)

# component 1: data prep 

In [9]:
%%writefile {data_prep_src_dir}/data_prep.py
import os
import argparse
import pandas as pd
from sklearn.model_selection import train_test_split
import logging
import mlflow

def main():

    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--test_train_ratio", type=float, required=False, default=0.25)
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    args = parser.parse_args()

    mlflow.start_run()

    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))
    print("input data:", args.data)

    credit_df = pd.read_csv(args.data, header=1, index_col=0)

    mlflow.log_metric("num_samples", credit_df.shape[0])
    mlflow.log_metric("num_features", credit_df.shape[1] - 1)

    credit_train_df, credit_test_df = train_test_split(
        credit_df,
        test_size=args.test_train_ratio
    )

    credit_train_df.to_csv(os.path.join(args.train_data, "data.csv"), index=False)
    credit_test_df.to_csv(os.path.join(args.test_data, "data.csv"), index=False)

    mlflow.end_run()


if __name__ == "__main__":
    main()

Overwriting ./components/data_prep/data_prep.py


In [10]:
# create an Azure Machine Learning Component

from azure.ai.ml import command
from azure.ai.ml import Input, Output

data_prep_component = command(
    name="data_prep_credit_defaults",
    display_name="Data preperation for training",
    description="reads a .xl input, split the input data to train and test",
    inputs={
        "data": Input(type="uri_folder"),
        "test_train_ratio": Input(type="number")
    },
    outputs=dict(
        train_data=Output(type="uri_folder", mode="rw_mount"),
        test_data=Output(type="uri_folder", mode="rw_mount")
    ),
    code=data_prep_src_dir,
    command="""python data_prep.py \
    --data ${{inputs.data}} --test_train_ratio ${{inputs.test_train_ratio}} \
    --train_data ${{outputs.train_data}} --test_data ${{outputs.test_data}} \
    """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}"
)

In [11]:
# Optionally, register the component in the workspace for future reuse

data_prep_component = ml_client.create_or_update(data_prep_component.component)

print(
    f"Component {data_prep_component.name} with version {data_prep_component.version} is registered"
)

Component data_prep_credit_defaults with version 2024-07-06-04-23-47-7685213 is registered


# component 2: training

In [12]:
import os

train_src_dir = "./components/train"
os.makedirs(train_src_dir, exist_ok=True)

In [25]:
%%writefile {train_src_dir}/train.py
import argparse
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import classification_report
import os
import pandas as pd
import mlflow

def select_first_file(path):
    files = os.listdir(path)
    return os.path.join(path, files[0])

mlflow.start_run()
mlflow.sklearn.autolog()

os.makedirs("./outputs", exist_ok=True)

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    parser.add_argument("--n_estimators", required=False, default=100, type=int)
    parser.add_argument("--learning_rate", required=False, default=0.1, type=float)
    parser.add_argument("--registered_model_name", type=str, help="model name")
    parser.add_argument("--model", type=str, help="path to model file")
    args = parser.parse_args()

    train_df = pd.read_csv(select_first_file(args.train_data))
    y_train = train_df.pop("default payment next month")
    X_train = train_df.values
    
    test_df = pd.read_csv(select_first_file(args.test_data))
    y_test = test_df.pop("default payment next month")
    X_test = test_df.values

    print(f"Training with data of shape {X_train.shape}")

    clf = GradientBoostingClassifier(
        n_estimators=args.n_estimators, 
        learning_rate=args.learning_rate
    )

    clf.fit(X_train, y_train)

    y_pred = clf.predict(X_test)
    print(classification_report(y_test, y_pred))

    print("Registering the model via MLFlow")

    mlflow.sklearn.log_model(
        sk_model=clf,
        registered_model_name=args.registered_model_name,
        artifact_path=args.registered_model_name
    )

    mlflow.sklearn.save_model(
        sk_model=clf,
        path=os.path.join(args.model, "trained_model")
    )

    mlflow.end_run()

if __name__ == "__main__":
    main()

Overwriting ./components/train/train.py


In [26]:
%%writefile {train_src_dir}/train.yml
name: train_credit_defaults_model
display_name: Train Credit Defaults Model
type: command
inputs:
    train_data:
        type: uri_folder
    test_data:
        type: uri_folder
    learning_rate:
        type: number
    registered_model_name:
        type: string
outputs:
    model:
        type: uri_folder
code: .
environment: 
    # for this step, we'll use an AzureML curate environment
    azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:1
command: >-
  python train.py 
  --train_data ${{inputs.train_data}} 
  --test_data ${{inputs.test_data}} 
  --learning_rate ${{inputs.learning_rate}}
  --registered_model_name ${{inputs.registered_model_name}} 
  --model ${{outputs.model}}

Overwriting ./components/train/train.yml


In [27]:
from azure.ai.ml import load_component

train_component = load_component(
    source=os.path.join(train_src_dir, "train.yml")
)

train_component = ml_client.create_or_update(train_component)

print(
    f"Component {train_component.name} with Version {train_component.version} is registered"
)

[32mUploading train (0.0 MBs):   0%|          | 0/2898 [00:00<?, ?it/s][32mUploading train (0.0 MBs): 100%|██████████| 2898/2898 [00:00<00:00, 64239.58it/s]
[39m



Component train_credit_defaults_model with Version 2024-07-06-04-41-32-0893536 is registered


# Create the pipeline from components

In [28]:
from azure.ai.ml import dsl, Input, Output

@dsl.pipeline(
    compute="serverless", # "serverless" value runs pipeline on serverless compute
    description="E2E data_perp-train pipeline"
)
def credit_defaults_pipeline(
    pipeline_job_data_input,
    pipeline_job_test_train_ratio,
    pipeline_job_learning_rate,
    pipeline_job_registered_model_name
):
    data_prep_job = data_prep_component(
        data=pipeline_job_data_input,
        test_train_ratio=pipeline_job_test_train_ratio
    )

    train_job = train_component(
        train_data=data_prep_job.outputs.train_data,
        test_data=data_prep_job.outputs.test_data,
        learning_rate=pipeline_job_learning_rate,
        registered_model_name=pipeline_job_registered_model_name
    )

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "pipeline_job_train_data": data_prep_job.outputs.train_data,
        "pipeline_job_test_data": data_prep_job.outputs.test_data
    }

In [29]:
registered_model_name = "credit_defaults_model"

pipeline = credit_defaults_pipeline(
    pipeline_job_data_input=Input(type="uri_file", path=credit_data.path),
    pipeline_job_test_train_ratio=0.25,
    pipeline_job_learning_rate=0.05,
    pipeline_job_registered_model_name=registered_model_name
)

# Submit the job

In [30]:
pipeline_job =  ml_client.jobs.create_or_update(
    pipeline,
    experiment_name="e2e_registered_components"
)
ml_client.jobs.stream(pipeline_job.name)

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored


RunId: strong_rod_tqlyg1ymg0
Web View: https://ml.azure.com/runs/strong_rod_tqlyg1ymg0?wsid=/subscriptions/a9ea30e6-5728-4375-ac04-cbe4918c3def/resourcegroups/rg-training/workspaces/ws-training

Streaming logs/azureml/executionlogs.txt

[2024-07-06 04:41:43Z] Completing processing run id dc8a3102-6805-4b55-a478-a02ed0936366.
[2024-07-06 04:41:44Z] Submitting 1 runs, first five are: f22cd48f:8e6a879f-06dc-494e-a634-e9f6df81aab3
[2024-07-06 04:42:53Z] Completing processing run id 8e6a879f-06dc-494e-a634-e9f6df81aab3.

Execution Summary
RunId: strong_rod_tqlyg1ymg0
Web View: https://ml.azure.com/runs/strong_rod_tqlyg1ymg0?wsid=/subscriptions/a9ea30e6-5728-4375-ac04-cbe4918c3def/resourcegroups/rg-training/workspaces/ws-training

