## AzureML Model Monitoring through Operationalization

In this sample notebook, you will observe the end-to-end lifecycle of the Machine Learning (ML) operationalization process. You will follow the following steps to train your ML model, deploy it to production, and monitor it to ensure its continuous performance:

1) Setup environment 
2) Register data assets
3) Train the model
4) Deploy the model
5) Simulate inference requests
6) Monitor the model

Let's begin. 

## Setup your environment

To start, connect to your project workspace.

In [None]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

# Connect to the project workspace
ml_client = MLClient.from_config(credential=DefaultAzureCredential())

Set up a compute cluster to use to train your model.

In [None]:
from azure.ai.ml.entities import AmlCompute

cluster_basic = AmlCompute(
    name="cpu-cluster",
    type="amlcompute",
    size="STANDARD_F2S_V2",  # you can replace it with other supported VM SKUs
    location=ml_client.workspaces.get(ml_client.workspace_name).location,
    min_instances=0,
    max_instances=1,
    idle_time_before_scale_down=360,
)

ml_client.begin_create_or_update(cluster_basic).result()

## Register data assets

Next, let's use some sample data to train our model. We will randomly split the dataset into reference and production sets. We add a timestamp column to simulate "production-like" data, since production data typically comes with timestamps. The dataset we are using in this example notebook has several columns related to credit card borrowers and contains a column on whether or not they defaulted on their credit card debt. We will train a model to predict `DEFAULT_NEXT_MONTH`, which is whether or not a borrower will default on their debt next month.

In [None]:
import pandas as pd
import datetime

# Read the default_of_credit_card_clients dataset into a pandas data frame
data_path = "https://azuremlexamples.blob.core.windows.net/datasets/credit_card/default_of_credit_card_clients.csv"
df = pd.read_csv(data_path, header=1, index_col=0).rename(
    columns={"default payment next month": "DEFAULT_NEXT_MONTH"}
)

# Split the data into production_data_df and reference_data_df
# Use the iloc method to select the first 80% and the last 20% of the rows
reference_data_df = df.iloc[: int(0.8 * len(df))].copy()
production_data_df = df.iloc[int(0.8 * len(df)) :].copy()

# Add a timestamp column in ISO8601 format
timestamp = datetime.datetime.now() - datetime.timedelta(days=45)
reference_data_df["TIMESTAMP"] = timestamp.strftime("%Y-%m-%dT%H:%M:%S")
production_data_df["TIMESTAMP"] = [
    timestamp + datetime.timedelta(minutes=i * 10)
    for i in range(len(production_data_df))
]
production_data_df["TIMESTAMP"] = production_data_df["TIMESTAMP"].apply(
    lambda x: x.strftime("%Y-%m-%dT%H:%M:%S")
)

In [None]:
import os


def write_df(df, local_path, file_name):
    # Create directory if it does not exist
    os.makedirs(local_path, exist_ok=True)

    # Write data
    df.to_csv(f"{local_path}/{file_name}", index=False)


# Write data to local directory
reference_data_dir_local_path = "../data/reference"
production_data_dir_local_path = "../data/production"

write_df(reference_data_df, reference_data_dir_local_path, "01.csv"),
write_df(production_data_df, production_data_dir_local_path, "01.csv")

In [None]:
import mltable
from mltable import MLTableHeaders, MLTableFileEncoding

from azureml.fsspec import AzureMachineLearningFileSystem
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes


def upload_data_and_create_data_asset(
    local_path, remote_path, datastore_uri, data_name, data_version
):
    # Write MLTable file
    tbl = mltable.from_delimited_files(
        paths=[{"pattern": f"{datastore_uri}{remote_path}*.csv"}],
        delimiter=",",
        header="all_files_same_headers",
        infer_column_types=True,
        include_path_column=False,
        encoding="utf8",
    )

    tbl.save(local_path)

    # Instantiate file system
    fs = AzureMachineLearningFileSystem(datastore_uri)

    # Upload data
    fs.upload(
        lpath=local_path,
        rpath=remote_path,
        recursive=False,
        **{"overwrite": "MERGE_WITH_OVERWRITE"},
    )

    # Define the Data asset object
    data = Data(
        path=f"{datastore_uri}{remote_path}",
        type=AssetTypes.MLTABLE,
        name=data_name,
        version=data_version,
    )

    # Create the data asset in the workspace
    ml_client.data.create_or_update(data)

    return data


# Datastore uri for data
datastore_uri = "azureml://subscriptions/{}/resourcegroups/{}/workspaces/{}/datastores/workspaceblobstore/paths/".format(
    ml_client.subscription_id, ml_client.resource_group_name, ml_client.workspace_name
)

# Define paths
reference_data_dir_remote_path = "data/credit-default/reference/"
production_data_dir_remote_path = "data/credit-default/production/"

# Define data asset names
reference_data_asset_name = "credit-default-reference"
production_data_asset_name = "credit-default-production"

# Write data to remote directory and create data asset
reference_data = upload_data_and_create_data_asset(
    reference_data_dir_local_path,
    reference_data_dir_remote_path,
    datastore_uri,
    reference_data_asset_name,
    "1",
)
production_data = upload_data_and_create_data_asset(
    production_data_dir_local_path,
    production_data_dir_remote_path,
    datastore_uri,
    production_data_asset_name,
    "1",
)

## Train the model

Train the model.

In [None]:
from azure.ai.ml import load_job

# Define training pipeline directory
training_pipeline_path = "../configurations/training_pipeline.yaml"

# Trigger training
training_pipeline_definition = load_job(source=training_pipeline_path)
training_pipeline_job = ml_client.jobs.create_or_update(training_pipeline_definition)



In [None]:
ml_client.jobs.stream(training_pipeline_job.name)

## Deploy the model

Deploy the model with AzureML managed online endpoints.

## Create Batch Endpoint

In [None]:
from time import sleep
from azure.ai.ml import MLClient, Input
from azure.ai.ml.entities import (
    BatchEndpoint,
    ModelBatchDeployment,
    ModelBatchDeploymentSettings,
    Model,
    AmlCompute,
    BatchRetrySettings,
    CodeConfiguration,
    Environment,
)
from azure.ai.ml.constants import AssetTypes, BatchDeploymentOutputAction
from azure.identity import DefaultAzureCredential

In [None]:
endpoint_name = "BATCH_ENDPOINT_NAME"
endpoint = BatchEndpoint(
    name=endpoint_name,
    description="A custom batch endpoint for inference",
)
ml_client.batch_endpoints.begin_create_or_update(endpoint).result()

In [None]:
compute_name = "batch-cluster"

In [None]:
compute_name = "batch-cluster"
if not any(filter(lambda m: m.name == compute_name, ml_client.compute.list())):
    compute_cluster = AmlCompute(
        name=compute_name, description="amlcompute", min_instances=0, max_instances=5
    )
    ml_client.begin_create_or_update(compute_cluster).result()

In [None]:
environment = Environment(
    name="batch-custom-env",
    conda_file="../environments/score.yaml",
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
)

In [None]:
model = ml_client.models.get(
    name="credit-default-model",
    version="1"
)

In [None]:
deployment = ModelBatchDeployment(
    name="batch_deployment_name",
    description="batch deployment",
    endpoint_name=endpoint.name,
    model=model,
    environment=environment,
    code_configuration=CodeConfiguration(
        code="../code",
        scoring_script="batch_driver.py",
    ),
    compute=compute_name,
    settings=ModelBatchDeploymentSettings(
        mini_batch_size=1,
        output_action=BatchDeploymentOutputAction.SUMMARY_ONLY,
    ),
)

In [None]:
ml_client.batch_deployments.begin_create_or_update(deployment).result()

## Simulate production inference data

### Generate Sample Data

We generate sample inference data by taking the distribution for each input feature and adding a small amount of random noise. 

In [None]:
import numpy as np
import datetime

# Define numeric and categotical feature columns
NUMERIC_FEATURES = [
    "LIMIT_BAL",
    "AGE",
    "BILL_AMT1",
    "BILL_AMT2",
    "BILL_AMT3",
    "BILL_AMT4",
    "BILL_AMT5",
    "BILL_AMT6",
    "PAY_AMT1",
    "PAY_AMT2",
    "PAY_AMT3",
    "PAY_AMT4",
    "PAY_AMT5",
    "PAY_AMT6",
]
CATEGORICAL_FEATURES = [
    "SEX",
    "EDUCATION",
    "MARRIAGE",
    "PAY_0",
    "PAY_2",
    "PAY_3",
    "PAY_4",
    "PAY_5",
    "PAY_6",
]


def generate_sample_inference_data(df_production, number_of_records=20):
    # Sample records
    df_sample = df_production.sample(n=number_of_records, replace=True)

    # Generate numeric features with random noise
    df_numeric_generated = pd.DataFrame(
        {
            feature: np.random.normal(
                0, df_production[feature].std(), number_of_records
            ).astype(np.int64)
            for feature in NUMERIC_FEATURES
        }
    ) + df_sample[NUMERIC_FEATURES].reset_index(drop=True)

    # Take categorical columns
    df_categorical = df_sample[CATEGORICAL_FEATURES].reset_index(drop=True)

    # Combine numerical and categorical columns
    df_combined = pd.concat([df_numeric_generated, df_categorical], axis=1)
    # Add a timestamp column in ISO8601 format
    timestamp = datetime.datetime.now() + datetime.timedelta(days=5)
    df_combined["TIMESTAMP"] = [
        timestamp + datetime.timedelta(minutes=i * 10)
        for i in range(len(df_combined))
    ]
    df_combined["TIMESTAMP"] = df_combined["TIMESTAMP"].apply(
        lambda x: x.strftime("%Y-%m-%dT%H:%M:%S")
    )

    return df_combined

In [None]:
import mltable
import pandas as pd
from azure.ai.ml import MLClient
import datetime

# Load production / inference data
data_asset = ml_client.data.get("credit-default-production", version="1")
tbl = mltable.load(data_asset.path)
df_production = tbl.to_pandas_dataframe()

# Generate sample data for inference
number_of_records = 40
os.makedirs("../batch_data", exist_ok=True)
for i in range(5):
    df_generated = generate_sample_inference_data(df_production, number_of_records)
    df_generated.to_csv(f"../batch_data/batch_{i}.csv", index=False)


## Call batch endpoint

In [None]:
endpoint_name = "BATCH_ENDPOINT_NAME"
endpoint = ml_client.batch_endpoints.get(endpoint_name)
deployment_name="BATCH_DEPLOYMENT_NAME"
deployment = ml_client.batch_deployments.get(deployment_name, endpoint_name)

print(endpoint.name)

In [None]:
input = Input(
    type=AssetTypes.URI_FOLDER,
    path="../batch_data/",
)

In [None]:
job = ml_client.batch_endpoints.invoke(
    endpoint_name=endpoint.name, deployment_name=deployment.name, input=input
)

In [None]:
job.name

In [None]:
scoring_job = list(ml_client.jobs.list(parent_job_name=job.name))[0]

In [None]:
scoring_job

In [None]:
os.makedirs("../scoring_output", exist_ok=True)
ml_client.jobs.download(name=scoring_job.name, download_path="../scoring_output/", output_name="score")

In [None]:
from azureml.fsspec import AzureMachineLearningFileSystem
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes


# Datastore uri for data
datastore_uri = "azureml://subscriptions/{}/resourcegroups/{}/workspaces/{}/datastores/workspaceblobstore/paths/".format(
    ml_client.subscription_id, ml_client.resource_group_name, ml_client.workspace_name
)

# Define paths
output_data_dir_remote_path = "data/credit-default/output/"

# Instantiate file system
fs = AzureMachineLearningFileSystem(datastore_uri)

# Upload data
fs.upload(
    lpath="../scoring_output/named-outputs/score/",
    rpath=output_data_dir_remote_path,
    recursive=False,
    **{"overwrite": "MERGE_WITH_OVERWRITE"},
)
output_data_asset_name="credit-default-output-folder"

# Define the Data asset object
data = Data(
    path=f"{datastore_uri}{output_data_dir_remote_path}",
    type=AssetTypes.URI_FOLDER,
    name=output_data_asset_name,
    version="2",
)

# Create the data asset in the workspace
ml_client.data.create_or_update(data)


## Create preprocessing component

In [None]:
from azure.ai.ml import load_component
parent_dir = ".."
spec_file = load_component(source=parent_dir + "components/custom_preprocessing/spec.yaml")
print(spec_file)

In [None]:
try:
    # try get back the defined component
    spec_file = ml_client.components.get(name=spec_file.name, version=spec_file.version)
except:
    # create if not exists
    spec_file = ml_client.components.create_or_update(spec_file)

print(spec_file)

# Create the Schedule pipeline

This can be done via running the file batch_endpoint_monitoring.yaml via a command `az ml schedule create -f batch_endpoint_monitoring.yaml --subscription <subscription_id> --workspace <workspace> --resource-group <resource_group>`