## Pipeline training and deployment

After having developed a machine learning model through with an exploratory approach in the previous notebook, the next step is to register the model to put it into production by deploying it on an online deployment. 

Although the previously developed model could have been logged via MLFlow and then registered and deployed, are trained again making use of pipeline jobs. Therefore, data preparation and training scripts are written to create yaml pipeline components that complete the training process in sequence. 

Finally the model logged by MLFlow during the pipeline job is registered and deployed to 100% traffic on an Azure managed online deployment.


#### Import libraries

In [None]:
import os
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential
from azure.ai.ml import load_component


## Connect to the MLClient

In [1]:
ml_client = MLClient(
    DefaultAzureCredential(), "e1f27d73-68d8-4f59-900b-77783d4c5b3b", "End2endCPC", "End2End_CPC"
)


Create a _src_ directory to store the generated data preparation and training scripts.

In [2]:
script_folder = 'src'
os.makedirs(script_folder, exist_ok=True)
print(script_folder, 'folder created')

src folder created


### Training via pipeline jobs

##### Definition of scripts to prepare the data and train the model

In [3]:
%%writefile src/prepare-data.py
import argparse
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.preprocessing import RobustScaler

def main(args):
    df = get_data(args.input_data)

    cleaned_data = clean_data(df)

    imputed_data = impute(cleaned_data)

    scaled_data = scale_data(imputed_data)

    output_df = scaled_data.to_csv((Path(args.output_data)), index = False)

def get_data(path):
    df = pd.read_csv(path)

    print(f'Preparing {df.shape[1]} columns and {df.shape[0]} rows of data')

    df = df.drop("keyword",axis=1)
    
    return df

def clean_data(df):
    zero_mask = (df['lower_bid']==df['upper_bid'])|(df['lower_bid']==0)
    df = df[~zero_mask]
    lower_mask = (df['lower_bid']>df['upper_bid'])|(df['lower_bid']>df['CPC'])|(df['upper_bid']<df['CPC'])
    df = df[~lower_mask]

    return df

def impute(df):
    for column in df.columns:
        if df[column].dtype in ['float64', 'int64']:
            fill_value = df[column].median()
        else:
            fill_value = df[column].mode()[0]
        
        df[column].fillna(fill_value, inplace=True)
    
    return df

def scale_data(df):
    scaler = RobustScaler()
    num_cols = df.select_dtypes(['float64', 'int64']).columns.to_list()
    df[num_cols] = scaler.fit_transform(df[num_cols])

    return df

def parse_args():
    parser = argparse.ArgumentParser()

    parser.add_argument("--input_data", dest='input_data',
                        type=str)
    parser.add_argument("--output_data", dest='output_data',
                        type=str)

    args = parser.parse_args()

    return args


if __name__ == "__main__":
    print("\n\n")
    print("*" * 60)

    args = parse_args()

    main(args)

    print("*" * 60)
    print("\n\n")

Overwriting src/prepare-data.py


In [4]:
%%writefile src/train-model.py
import mlflow
import glob
import argparse
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor
from sklearn.metrics import mean_absolute_error
import matplotlib.pyplot as plt

def main(args):
    mlflow.autolog()

    df = get_data(args.training_data)

    X_train, X_test, y_train, y_test = split_data(df,args.target_feature)

    model = train_model(args.algorithm, X_train, X_test, y_train, y_test)

    eval_model(model, X_test, y_test)

def get_data(path):
    df = pd.read_csv(path)

    print(f'Modeling with {df.shape[1]} columns and {df.shape[0]} rows of data')
    
    return df

def split_data(df,target_feature):
    print("Splitting data...")
    X, y = df.drop(target_feature,axis=1), np.ravel(df[target_feature])

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, shuffle=True, random_state=99)

    return X_train, X_test, y_train, y_test

def train_model(algorithm,X_train, X_test, y_train, y_test):
    print("Training model...")
    if algorithm == "gradient-boosting":
        model = GradientBoostingRegressor()
    if algorithm == "random-forest":
        model = RandomForestRegressor()
    else:
        model = LinearRegression()
    
    model.fit(X_train, y_train)

    mlflow.sklearn.save_model(model, args.model_output)

    return model


def eval_model(model, X_test, y_test):

    y_pred = model.predict(X_test)
    mae = mean_absolute_error(y_pred, y_test)
    mlflow.log_param('MAE',mae)
    print('MAE:', mae)


def parse_args():
    parser = argparse.ArgumentParser()

    parser.add_argument("--training_data", dest='training_data',
                        type=str)
    parser.add_argument("--algorithm", dest='algorithm',
                        type=str, default='linear-regression')
    parser.add_argument("--target_feature", dest='target_feature',
                        type=str, default='CPC')
    parser.add_argument("--model_output", dest='model_output',
                        type=str)

    args = parser.parse_args()

    return args

if __name__ == "__main__":
    print("\n\n")
    print("*" * 60)

    args = parse_args()

    main(args)

    print("*" * 60)
    print("\n\n")


Overwriting src/train-model.py


##### Definition of pipeline components out of prepare and train scripts

In [5]:
%%writefile prepare-data.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: prep_data
display_name: Prepare training data
version: 1
type: command
inputs:
  input_data: 
    type: uri_file
outputs:
  output_data:
    type: uri_file
code: ./src
environment: azureml://registries/azureml/environments/sklearn-1.1/versions/34
command: >-
  python prepare-data.py 
  --input_data ${{inputs.input_data}}
  --output_data ${{outputs.output_data}}

Overwriting prepare-data.yml


In [6]:
%%writefile train-model.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: train_model
display_name: Train a linear, gradient boosting or random forest regression model
version: 1
type: command
inputs:
  training_data: 
    type: uri_file
  algorithm:
    type: string
    default: 'linear-regression'
  target_feature:
    type: string
    default: 'CPC'
outputs:
  model_output:
    type: mlflow_model
code: ./src
environment: azureml://registries/azureml/environments/sklearn-1.1/versions/34
command: >-
  python train-model.py 
  --training_data ${{inputs.training_data}} 
  --algorithm ${{inputs.algorithm}} 
  --target_feature ${{inputs.target_feature}} 
  --model_output ${{outputs.model_output}} 

Overwriting train-model.yml


In [7]:
prep_data = load_component(source= "./prepare-data.yml")
train_regression = load_component(source="./train-model.yml")

In [8]:
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline

@pipeline()
def kw_CPC_prediction(pipeline_job_input,algorithm='linear-regression',target_feature='CPC'):
    clean_data = prep_data(input_data=pipeline_job_input)
    train_model = train_regression(training_data=clean_data.outputs.output_data,algorithm=algorithm,target_feature=target_feature)

    return {
        "pipeline_job_transformed_data": clean_data.outputs.output_data,
        "pipeline_job_trained_model": train_model.outputs.model_output,
    }

pipeline_job = kw_CPC_prediction(Input(type=AssetTypes.URI_FILE, path="azureml:kw-dataset:1"),'random-forest')

In [9]:
print(pipeline_job)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


display_name: kw_CPC_prediction
type: pipeline
inputs:
  pipeline_job_input:
    type: uri_file
    path: azureml:kw-dataset:1
  algorithm: random-forest
  target_feature: CPC
outputs:
  pipeline_job_transformed_data:
    type: uri_file
  pipeline_job_trained_model:
    type: mlflow_model
jobs:
  clean_data:
    type: command
    inputs:
      input_data:
        path: ${{parent.inputs.pipeline_job_input}}
    outputs:
      output_data: ${{parent.outputs.pipeline_job_transformed_data}}
    component:
      $schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
      name: prep_data
      version: '1'
      display_name: Prepare training data
      type: command
      inputs:
        input_data:
          type: uri_file
      outputs:
        output_data:
          type: uri_file
      command: python prepare-data.py  --input_data ${{inputs.input_data}} --output_data
        ${{outputs.output_data}}
      environment: azureml://registries/azureml/environments

In [10]:
from azure.ai.ml.entities import AmlCompute

cpu_compute_target = "aml-cluster2"

try:
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    print(
        f"There is already a cluster named {cpu_compute_target}. Reusing it."
    )

except Exception:
    print("Creating a new cpu compute target...")

    cpu_cluster = AmlCompute(
        name=cpu_compute_target,
        type="amlcompute",
        size="Standard_D11_v2",
        min_instances=0,
        max_instances=2,
        idle_time_before_scale_down=60,
        tier="Dedicated",
    )

    cpu_cluster = ml_client.compute.begin_create_or_update(cpu_cluster)


There is already a cluster named aml-cluster2. Reusing it.


In [11]:
from azure.ai.ml.entities import AmlCompute

cluster_scale = AmlCompute(
    name="aml-cluster2",
    max_instances=2,
)
ml_client.begin_create_or_update(cluster_scale)

<azure.core.polling._poller.LROPoller at 0x7f3a244da1c0>

In [12]:

pipeline_job.outputs.pipeline_job_transformed_data.mode = "upload"
pipeline_job.outputs.pipeline_job_trained_model.mode = "upload"
pipeline_job.settings.default_compute = "aml-cluster2"
pipeline_job.settings.default_datastore = "workspaceblobstore"

In [13]:
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_kw_CPC_prediction"
)
pipeline_job

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFileJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.MLFlowModelJobOutput'> and will be ignored


Experiment,Name,Type,Status,Details Page
pipeline_kw_CPC_prediction,bubbly_feast_ny2qlb9vj6,pipeline,NotStarted,Link to Azure Machine Learning studio


### Model registration and deployment

In [15]:
from azure.ai.ml.entities import Model
from azure.ai.ml.constants import AssetTypes

child_jobs = [child_job for child_job in ml_client.jobs.list(parent_job_name=pipeline_job.name)]
child_job_name = child_jobs[1].name

run_model = Model(
    path=f"azureml://jobs/{child_job_name}/outputs/artifacts/paths/model/",
    name="mlflow-kw-CPC",
    description="Model created from run.",
    type=AssetTypes.MLFLOW_MODEL,
)

ml_client.models.create_or_update(run_model)

Model({'job_name': 'a0577f77-9fb1-433f-a19c-6e1dc735679a', 'intellectual_property': None, 'is_anonymous': False, 'auto_increment_version': False, 'auto_delete_setting': None, 'name': 'mlflow-kw-CPC', 'description': 'Model created from run.', 'tags': {}, 'properties': {}, 'print_as_yaml': False, 'id': '/subscriptions/e1f27d73-68d8-4f59-900b-77783d4c5b3b/resourceGroups/End2endCPC/providers/Microsoft.MachineLearningServices/workspaces/End2End_CPC/models/mlflow-kw-CPC/versions/11', 'Resource__source_path': '', 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/notebook-compute-cpc/code/Users/miguel_lopez_virues/azure_keyword_CPC', 'creation_context': <azure.ai.ml.entities._system_data.SystemData object at 0x7f3a0eccc5b0>, 'serialize': <msrest.serialization.Serializer object at 0x7f3a0ece85e0>, 'version': '11', 'latest_version': None, 'path': 'azureml://subscriptions/e1f27d73-68d8-4f59-900b-77783d4c5b3b/resourceGroups/End2endCPC/workspaces/End2End_CPC/datastores/workspaceartifact

In [16]:
from azure.ai.ml.entities import ManagedOnlineEndpoint, ManagedOnlineDeployment
from azure.ai.ml.constants import AssetTypes
import datetime

online_endpoint_name = "endpoint-CPC-prediction"

endpoint = ManagedOnlineEndpoint(
    name=online_endpoint_name,
    description="Online endpoint for MLflow keyword CPC prediction.",
    auth_mode="key",
)


In [17]:


production_model = Model(
    path=f"azureml://jobs/{child_job_name}/outputs/artifacts/paths/model/",
    name="mlflow-kw-CPC-blue-deployment",
    description="Model created from run.",
    type=AssetTypes.MLFLOW_MODEL,
)

blue_deployment = ManagedOnlineDeployment(
    name="blue-kw-cpc",
    endpoint_name=online_endpoint_name,
    description="Blue online deployment for keyword CPC prediction",
    model=production_model,
    instance_type="Standard_F4s_v2",
    instance_count=1,
)

In [18]:
ml_client.begin_create_or_update(endpoint).result()

ManagedOnlineEndpoint({'public_network_access': 'Enabled', 'provisioning_state': 'Succeeded', 'scoring_uri': 'https://endpoint-cpc-prediction.francecentral.inference.ml.azure.com/score', 'openapi_uri': 'https://endpoint-cpc-prediction.francecentral.inference.ml.azure.com/swagger.json', 'name': 'endpoint-cpc-prediction', 'description': 'Online endpoint for MLflow keyword CPC prediction.', 'tags': {}, 'properties': {'createdBy': 'miguel_lopez_virues', 'createdAt': '2024-05-31T10:55:45.535660+0000', 'lastModifiedAt': '2024-05-31T10:55:45.535660+0000', 'azureml.onlineendpointid': '/subscriptions/e1f27d73-68d8-4f59-900b-77783d4c5b3b/resourcegroups/end2endcpc/providers/microsoft.machinelearningservices/workspaces/end2end_cpc/onlineendpoints/endpoint-cpc-prediction', 'AzureAsyncOperationUri': 'https://management.azure.com/subscriptions/e1f27d73-68d8-4f59-900b-77783d4c5b3b/providers/Microsoft.MachineLearningServices/locations/francecentral/mfeOperationsStatus/oe:5fc20ec8-dbaf-49b8-a9c2-e6072f4

In [19]:
ml_client.online_deployments.begin_create_or_update(blue_deployment).result()

Check: endpoint endpoint-CPC-prediction exists


..............................................................................

ManagedOnlineDeployment({'private_network_connection': None, 'package_model': False, 'provisioning_state': 'Succeeded', 'endpoint_name': 'endpoint-cpc-prediction', 'type': 'Managed', 'name': 'blue-kw-cpc', 'description': 'Blue online deployment for keyword CPC prediction', 'tags': {}, 'properties': {}, 'print_as_yaml': False, 'id': '/subscriptions/e1f27d73-68d8-4f59-900b-77783d4c5b3b/resourceGroups/End2endCPC/providers/Microsoft.MachineLearningServices/workspaces/End2End_CPC/onlineEndpoints/endpoint-cpc-prediction/deployments/blue-kw-cpc', 'Resource__source_path': '', 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/notebook-compute-cpc/code/Users/miguel_lopez_virues/azure_keyword_CPC', 'creation_context': None, 'serialize': <msrest.serialization.Serializer object at 0x7f3a0efe47c0>, 'model': '/subscriptions/e1f27d73-68d8-4f59-900b-77783d4c5b3b/resourceGroups/End2endCPC/providers/Microsoft.MachineLearningServices/workspaces/End2End_CPC/models/mlflow-kw-CPC-blue-deployment/

In [20]:
endpoint.traffic = {"blue-kw-cpc": 100}
ml_client.begin_create_or_update(endpoint).result()

ManagedOnlineEndpoint({'public_network_access': 'Enabled', 'provisioning_state': 'Succeeded', 'scoring_uri': 'https://endpoint-cpc-prediction.francecentral.inference.ml.azure.com/score', 'openapi_uri': 'https://endpoint-cpc-prediction.francecentral.inference.ml.azure.com/swagger.json', 'name': 'endpoint-cpc-prediction', 'description': 'Online endpoint for MLflow keyword CPC prediction.', 'tags': {}, 'properties': {'createdBy': 'miguel_lopez_virues', 'createdAt': '2024-05-31T10:55:45.535660+0000', 'lastModifiedAt': '2024-05-31T10:55:45.535660+0000', 'azureml.onlineendpointid': '/subscriptions/e1f27d73-68d8-4f59-900b-77783d4c5b3b/resourcegroups/end2endcpc/providers/microsoft.machinelearningservices/workspaces/end2end_cpc/onlineendpoints/endpoint-cpc-prediction', 'AzureAsyncOperationUri': 'https://management.azure.com/subscriptions/e1f27d73-68d8-4f59-900b-77783d4c5b3b/providers/Microsoft.MachineLearningServices/locations/francecentral/mfeOperationsStatus/oe:5fc20ec8-dbaf-49b8-a9c2-e6072f4

In [21]:
import json 
sample_deployment_data = {
  "input_data": {
    "columns": [
      "competition",
      "lower_bid",
      "upper_bid"
    ],
    "index": [1],
    "data": [
      [
      30,2.1,5
    ]
    ]
  }
}

sample_data_path = "data/sample_deployment_data.json"

with open(sample_data_path, 'w') as json_file:
    json.dump(sample_deployment_data, json_file)

response = ml_client.online_endpoints.invoke(
    endpoint_name=online_endpoint_name,
    deployment_name="blue-kw-cpc",
    request_file=sample_data_path,
)
response

'[2.5990833333333323]'