# Tutorial: Create production machine learning pipelines


Create a local source directory to hold training code.

In [7]:
import os

train_src_dir = "./local_src"
os.makedirs(train_src_dir, exist_ok=True)

Write the training script to disk for use in the pipeline.

In [8]:
%%writefile {train_src_dir}/main.py
import os
import argparse
import pandas as pd
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split

def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--test_train_ratio", type=float, required=False, default=0.25)
    parser.add_argument("--n_estimators", required=False, default=100, type=int)
    parser.add_argument("--learning_rate", required=False, default=0.1, type=float)
    parser.add_argument("--registered_model_name", type=str, help="model name")
    args = parser.parse_args()
   
    # Start Logging
    mlflow.start_run()

    # enable autologging
    mlflow.sklearn.autolog()

    ###################
    #<prepare the data>
    ###################
    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    print("input data:", args.data)
    
    credit_df = pd.read_csv(args.data, header=1, index_col=0)

    mlflow.log_metric("num_samples", credit_df.shape[0])
    mlflow.log_metric("num_features", credit_df.shape[1] - 1)

    #Split train and test datasets
    train_df, test_df = train_test_split(
        credit_df,
        test_size=args.test_train_ratio,
    )
    ####################
    #</prepare the data>
    ####################

    ##################
    #<train the model>
    ##################
    # Extracting the label column
    y_train = train_df.pop("default payment next month")

    # convert the dataframe values to array
    X_train = train_df.values

    # Extracting the label column
    y_test = test_df.pop("default payment next month")

    # convert the dataframe values to array
    X_test = test_df.values

    print(f"Training with data of shape {X_train.shape}")

    clf = GradientBoostingClassifier(
        n_estimators=args.n_estimators, learning_rate=args.learning_rate
    )
    clf.fit(X_train, y_train)

    y_pred = clf.predict(X_test)

    ####################
    # Log classifier accuracy
    ####################

    accuracy = clf.score(X_test, y_test)
    print('Accuracy of SVM classifier on test set: {:.2f}'.format(accuracy))
    mlflow.log_metric('accuracy', float(accuracy))

    print(classification_report(y_test, y_pred))
    ###################
    #</train the model>
    ###################

    ##########################
    #<save and register model>
    ##########################
    # Registering the model to the workspace
    print("Registering the model via MLFlow")
    mlflow.sklearn.log_model(
        sk_model=clf,
        registered_model_name=args.registered_model_name,
        artifact_path=args.registered_model_name,
    )

    # Saving the model to a file
    mlflow.sklearn.save_model(
        sk_model=clf,
        path=os.path.join(args.registered_model_name, "trained_model"),
    )
    ###########################
    #</save and register model>
    ###########################
    
    # Stop Logging
    mlflow.end_run()

if __name__ == "__main__":
    main()

Writing ./local_src/main.py


Define the YAML configuration for the training component.

In [12]:
%%writefile train.yml
# <component>
name: train_todel_to_hyper_tune
display_name: Training Component to be Hypertuned
# version: 1 # Not specifying a version will automatically update the version
type: command
inputs:
  train_data_csv: 
    type: uri_folder
    mode: ro_mount
  test_data_csv: 
    type: uri_file
    mode: ro_mount
  learning_rate:
    type: number     
  registered_model_name:
    type: string
outputs:
  model:
    type: uri_folder
code: .
environment:
  # for this step, we'll use an AzureML curate environment
  azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:1
command: >-
  python train.py 
  --train_data ${{inputs.train_data_csv}} 
  --test_data ${{inputs.test_data_csv}} 
  --learning_rate ${{inputs.learning_rate}}
  --registered_model_name ${{inputs.registered_model_name}} 
  --model ${{outputs.model}}
# </component>


Overwriting train.yml


Register the training component with Azure ML using the CLI.

In [13]:
!az ml component create -f  train.yml

{
  "code": "azureml:/subscriptions/781b03e7-6eb7-4506-bab8-cf3a0d89b1d4/resourceGroups/SandboxML/providers/Microsoft.MachineLearningServices/workspaces/quick-start-tutorial/codes/f4093bb3-4a9b-4dad-854d-3a5cfa5d660b/versions/1",
  "command": "python train.py  --train_data ${{inputs.train_data_csv}}  --test_data ${{inputs.test_data_csv}}  --learning_rate ${{inputs.learning_rate}} --registered_model_name ${{inputs.registered_model_name}}  --model ${{outputs.model}}",
  "creation_context": {
    "created_at": "2023-11-07T15:02:10.496358+00:00",
    "created_by": "Anton Slutsky",
    "created_by_type": "User",
    "last_modified_at": "2023-11-07T15:02:10.572551+00:00",
    "last_modified_by": "Anton Slutsky",
    "last_modified_by_type": "User"
  },
  "display_name": "Training Component to be Hypertuned",
  "environment": "azureml://registries/azureml/environments/AzureML-sklearn-1.0-ubuntu20.04-py38-cpu/versions/1",
  "id": "azureml:/subscriptions/781b03e7-6eb7-4506-bab8-cf3a0d89b1d4/res


Uploading aml-examples (0.43 MBs):   0%|          | 0/432459 [00:00<?, ?it/s]
Uploading aml-examples (0.43 MBs):   0%|          | 2008/432459 [00:00<00:22, 18902.67it/s]
Uploading aml-examples (0.43 MBs):  41%|####      | 176095/432459 [00:00<00:00, 999335.44it/s]
Uploading aml-examples (0.43 MBs): 100%|##########| 432459/432459 [00:00<00:00, 1572713.99it/s]




Configure a command job and specify hyperparameters for sweeping.

In [None]:
from azure.ai.ml import command
from azure.ai.ml import Input
from azure.ai.ml.sweep import Normal, Uniform

registered_model_name = "credit_defaults_model"
environment_name= custom_job_env.name + "@latest" 
job = command(
    inputs=dict(
        data=Input(
            type="uri_file",
            #path="azureml://subscriptions/f1ea6ed8-82f3-416d-881b-8b376218bc85/resourcegroups/rg_aml/workspaces/aml-default/datastores/workspaceblobstore/paths/LocalUpload/4b1dfc4d12429b46389cabdf25b886a2/default_of_credit_card_clients.csv",
            #path="https://azuremlexamples.blob.core.windows.net/datasets/credit_card/default_of_credit_card_clients.csv",
            path="azureml:credit_cards@latest",
        ),
        test_train_ratio=0.2,
        learning_rate=0.25,
        registered_model_name=registered_model_name,
    ),
    code="./src/",  # location of source code
    command="python main.py --data ${{inputs.data}} --test_train_ratio ${{inputs.test_train_ratio}} --learning_rate ${{inputs.learning_rate}} --registered_model_name ${{inputs.registered_model_name}}",
    environment=environment_name,
    compute="cpu-cluster",
    display_name="03a_train_model_credit_default_prediction",
)

print(type(job))

command_job_for_sweep = job(
    learning_rate=Uniform(min_value=0.01, max_value=0.9)
)




<class 'azure.ai.ml.entities._builders.command.Command'>


Specify an early termination policy for the hyperparameter sweep.

In [None]:
from azure.ai.ml.sweep import MedianStoppingPolicy

# Call sweep() on your command job to sweep over your parameter expressions
sweep_job = command_job_for_sweep.sweep(
    compute="cpu-cluster",
    sampling_algorithm="random",
    primary_metric="accuracy",
    goal="Maximize",
)

# Specify your experiment details
sweep_job.display_name = "credit-card-sweep-example"
sweep_job.experiment_name = "credit-card-sweep-example"
sweep_job.description = "Run a hyperparameter sweep job."

# Define the limits for this sweep
sweep_job.set_limits(max_total_trials=20, max_concurrent_trials=10, timeout=7200)

# Set early stopping on this one
sweep_job.early_termination = MedianStoppingPolicy(
    delay_evaluation=5, evaluation_interval=2
)

### Create the pipeline from components

Now that both your components are defined and registered, you can start implementing the pipeline.


Here, you use *input data*, *split ratio* and *registered model name* as input variables. Then call the components and connect them via their inputs/outputs identifiers. The outputs of each step can be accessed via the `.outputs` property.


A YAML configuration file is used to represent the Azure Machine Learning pipeline structure as a directed acyclic graph (DAG). In the YAML configuration, we can specify the pipeline description and default resources like compute and storage.  Similar to components, pipelines can have inputs and output. You can then create multiple instances of a single pipeline with different inputs.

Here, we used *input data*, *split ratio* and *registered model name* as input variables. We then call the components and connect them via their inputs/outputs identifiers. The outputs of each step can be accessed via the `.outputs` property.

Create a pipeline.yml file to describe the structure of the pipeline DAG:

Create a pipeline definition YAML describing the DAG of jobs.

In [2]:
%%writefile local_pipeline.yml
$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline

display_name: 04a_Pipeline_DataPrep_Train_CLI_Pipeline
description: Pipeline with 2 component jobs with data dependencies

settings:
  default_compute: azureml:cpu-cluster

outputs:
  final_model:
    mode: upload

jobs:
  component_data_prep:
    type: command
    inputs:
      - name: Dataset
        type: DataFrameDirectory
        description: Dataset to be cleaned
      - name: Columns to be cleaned
        type: ColumnPicker
        description: Columns for missing values clean operation
        columnPickerFor: Dataset
      - name: Minimum missing value ratio
        type: Float
        default: 0.0
        description: Clean only column with missing value ratio above specified value, out
          of set of all selected columns
        min: 0.0
        max: 1.0
      - name: Maximum missing value ratio
        type: Float
        default: 1.0
        description: Clean only columns with missing value ratio below specified value,
          out of set of all selected columns
        min: 0.0
        max: 1.0
      - name: Cleaning mode
        type: Mode
        default: Custom substitution value
        description: Algorithm to clean missing values
        options:
        - Custom substitution value:
          - name: Replacement value
            type: String
            default: '0'
            optional: true
            description: Type the value that takes the place of missing values
          - name: Generate missing value indicator column
            type: Boolean
            default: false
            description: Generate a column that indicates which rows were cleaned
        - Replace with mean:
          - name: Cols with all missing values
            type: Mode
            default: Remove
            description: Cols with all missing values
            options:
            - Propagate
            - Remove
          - name: Generate missing value indicator column
            type: Boolean
            default: false
            description: Generate a column that indicates which rows were cleaned
        - Replace with median:
          - name: Cols with all missing values
            type: Mode
            default: Remove
            description: Cols with all missing values
            options:
            - Propagate
            - Remove
          - name: Generate missing value indicator column
            type: Boolean
            default: false
            description: Generate a column that indicates which rows were cleaned
        - Replace with mode:
          - name: Cols with all missing values
            type: Mode
            default: Remove
            description: Cols with all missing values
            options:
            - Propagate
            - Remove
          - name: Generate missing value indicator column
            type: Boolean
            default: false
            description: Generate a column that indicates which rows were cleaned
        - Remove entire row
        - Remove entire column
      outputs:
      - name: Cleaned dataset
        type: DataFrameDirectory
        description: Cleaned dataset
      - name: Cleaning transformation
        type: TransformationDirectory
        description: Transformation to be passed to Apply Transformation module to clean
          new data
    code: data_prep
    environment: 
      image: mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04:latest
      conda_file: conda.yml
    compute: azureml:cpu-cluster
    command: >-
      python data_prep.py 
      --data ${{inputs.data}} 
      --test_train_ratio ${{inputs.test_train_ratio}} 
      --train_data_csv ${{outputs.train_data_csv}}
      --test_data_csv ${{outputs.test_data_csv}}
      

Writing local_pipeline.yml


Submit the pipeline job to Azure ML.

In [3]:
!az ml job create --file .\local_pipeline.yml

ERROR: Met error <class 'AttributeError'>:'str' object has no attribute 'items'
Please check log by running the command with '--debug' for more details.


Now use your pipeline definition to instantiate a pipeline with your dataset, split rate of choice and the name you picked for your model.

You can track the progress of your pipeline, by using the link generated in the previous cell. When you first select this link, you may see that the pipeline is still running. Once it's complete, you can examine each component's results.

Double-click the **Train Credit Defaults Model** component. 

There are two important results you'll want to see about training:

* View your logs:
    1. Select the **Outputs+logs** tab.
    1. Open the folders to `user_logs` > `std_log.txt`
    This section shows the script run stdout.
    ![Screenshot of std_log.txt.](media/user-logs.jpg)

* View your metrics: Select the **Metrics** tab.  This section shows different logged metrics. In this example. mlflow `autologging`, has automatically logged the training metrics.
    
    ![Screenshot shows logged metrics.txt.](./media/metrics.jpg)

## Deploy the model as an online endpoint
To learn how to deploy your model to an online endpoint, see [Deploy a model as an online endpoint tutorial](https://learn.microsoft.com/en-us/azure/machine-learning/tutorial-deploy-model).


## Next Steps

Learn how to [Schedule machine learning pipeline jobs](https://learn.microsoft.com/azure/machine-learning/how-to-schedule-pipeline-job)