# Run pipeline job, deploy model to an endpoint, and test deployed model in Azure Machine Learning (Azure ML)

A pipeline enables you to combine several tasks into a single workflow. You can create a pipeline using components, where each component represents a Python script to execute. The details of a component are defined in a YAML file, which outlines both the script and instructions for running it.

## Before you start

The latest version of the **azure-ai-ml** package is required in order to run the code in this notebook.

Run the cell below to verify it is installed - If not, run **pip install azure-ai-ml** to install.

In [None]:
pip show azure-ai-ml

## Connecting to your workspace

With the required SDK packages installed, we are ready to connect to our workspace

Since we're working with a compute instance, managed by Azure ML, we can use the default credential values to connect to the workspace.

In [None]:
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient
from azure.ai.ml.entities import Environment

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential does not work
    credential = InteractiveBrowserCredential()

In [None]:
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

## Configuring and registering custom environment

We'll set up and register a custom environment for our pipeline job by starting with a base Docker image and adding a conda specification file to add the necessary dependencies. This ensures all the required packages are installed and configured so the pipeline runs successfully.

In [None]:
# Define the environment
env = Environment(
    image='mcr.microsoft.com/azureml/openmpi5.0-ubuntu24.04',
    conda_file='conda.yml',
    name='custom-env',
    description='Environment with sklearn, pandas, nltk, etc.'
)

In [None]:
# Register or update it in the workspace
ml_client.environments.create_or_update(env)

print(f"Environment '{env.name}' registered successfully.")

## Create the scripts

Our pipeline requires two steps:

1 - **Prepare the data**: Subset and clean data, and perform feature engineering.

2 - **Train the model**: Pipeline to preprocess text data, scale, and train a logistic regression model.

Run the following cells to create the **src** folder and the two scripts described above.

In [None]:
# Create src folder for the script files
import os

script_folder = 'src'
os.makedirs(script_folder, exist_ok=True)
print(script_folder, 'folder created')

In [None]:
%%writefile $script_folder/prep-data.py
# Import libraries
import argparse
import pandas as pd
import numpy as np
import nltk
nltk.download('punkt_tab')
from langdetect import detect_langs, DetectorFactory
from nltk import word_tokenize
from pathlib import Path

def main(args):
    # Read data
    df = get_data(args.input_data)

    cleaned_data = clean_data(df)

    feature_engineer_data = feature_engineer(cleaned_data)

    output_df = feature_engineer_data.to_csv((Path(args.output_data) / 'customer-support-tickets.csv'), index = False)

# Function that reads the data
def get_data(path):
    df = pd.read_csv(path)

    # Count the rows and print the result
    row_count = (len(df))
    print('Preparing {} rows of data'.format(row_count))

    return df

# Function that removes missing values
def clean_data(df):
    # Subset data frame
    df_set = df[['body', 'type', 'language']].copy()

    # Remove missing values
    df_set = df_set.dropna().reset_index(drop=True)

    return df_set

# Function to feature engineer data
def feature_engineer(df):
    # Ensure languages are correct and keep English 'en' tickets only
    languages = []
    
    DetectorFactory.seed = 9
    
    for row in range(len(df)):
        languages.append(detect_langs(df.iloc[row, 0]))

    languages = [str(lang).split(':')[0][1:] for lang in languages]

    df['language'] = languages

    it_ticks = df[df['language'] == 'en'].copy()
    it_ticks.reset_index(inplace=True, drop=True)

    # Rename columns
    it_ticks.rename(columns={'body': 'text', 'type': 'label'}, inplace=True)

    # Create new len_words column
    word_tokens = [word_tokenize(text) for text in it_ticks['text']]

    len_tokens = []

    for i in range(len(word_tokens)):
        len_tokens.append(len(word_tokens[i]))

    # Add new len_words column
    it_ticks['len_words'] = len_tokens

    # Convert ints to floats
    it_ticks['len_words'] = it_ticks['len_words'].astype(float)

    return it_ticks

def parse_args():
    # Setup arg parser
    parser = argparse.ArgumentParser()

    # Add arguments
    parser.add_argument('--input_data', dest='input_data',
                        type=str)
    parser.add_argument('--output_data', dest='output_data',
                        type=str)

    # Parse args
    args = parser.parse_args()

    # Return args
    return args

# Run script
if __name__ == '__main__':
    # Add space in logs
    print('\n\n')
    print('*' * 60)

    # Parse args
    args = parse_args()

    # Run main function
    main(args)

    # Add space in logs
    print('*' * 60)
    print('\n\n')

In [None]:
%%writefile $script_folder/train-model.py
# Import libraries
import mlflow
import glob
import argparse
import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MaxAbsScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from mlflow.models.signature import infer_signature

def main(args):
    
    # Start MLflow run
    with mlflow.start_run():

        # Read data
        df = get_data(args.training_data)

        # Split data
        X_train, X_test, y_train, y_test = split_data(df)

        # Create pipeline to train model
        model = create_pipeline(args.reg_rate, X_train, X_test, y_train, y_test)

        # Evaluate model
        y_pred = eval_model(model, X_test, y_test)

        # Create the signature by inferring it from the datasets
        signature = infer_signature(X_train, y_pred)

        # Log the entire pipeline model
        mlflow.sklearn.log_model(model, 'model', signature=signature)

# Function that reads the data
def get_data(data_path):
    print('Reading data...')
    all_files = glob.glob(data_path + '/*.csv')
    df = pd.concat((pd.read_csv(f) for f in all_files), sort=False)

    return df

# Function that splits the data
def split_data(df):
    print('Splitting data...')
    X, y = df[['text', 'len_words']], df['label']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=9)

    return X_train, X_test, y_train, y_test

# Function to create pipeline
def create_pipeline(reg_rate, X_train, X_test, y_train, y_test):
    mlflow.log_param('Regularization rate', reg_rate)
    preprocessor = ColumnTransformer(
        transformers=[
            ('vect', TfidfVectorizer(lowercase=False, ngram_range=(1, 2)), 'text'),
            ('len', 'passthrough', ['len_words'])
        ]
    )

    pipeline = Pipeline([
        ('preprocess', preprocessor),
        ('scaler', MaxAbsScaler()),
        ('logreg', LogisticRegression(C=1/reg_rate, max_iter=5000, random_state=9))
    ])

    model = pipeline.fit(X_train, y_train)

    mlflow.sklearn.save_model(model, args.model_output)

    return model

# Function that evaluates the model
def eval_model(model, X_test, y_test):
    # Calculate accuracy
    y_pred = model.predict(X_test)
    acc_score = accuracy_score(y_test, y_pred)
    print('LogisticRegression Accuracy score: {:.1%}'.format(acc_score))
    mlflow.log_metric('Accuracy', acc_score)
    
    # Display confusion matrix and classification report
    conf_matr = confusion_matrix(y_test, y_pred)
    cla_rep = classification_report(y_test, y_pred)
    print('\nConfusion Matrix:\n{}'.format(conf_matr))
    print('\nClassification Report:\n{}'.format(cla_rep))

    return y_pred

def parse_args():
    # Setup arg parser
    parser = argparse.ArgumentParser()

    # Add arguments
    parser.add_argument('--training_data', dest='training_data',
                        type=str)
    parser.add_argument('--reg_rate', dest='reg_rate',
                        type=float, default=1.0)
    parser.add_argument('--model_output', dest='model_output',
                        type=str)

    # Parse args
    args = parser.parse_args()

    # Return args
    return args

# Run script
if __name__ == '__main__':
    # Add space in logs
    print('\n\n')
    print('*' * 60)

    # Parse args
    args = parse_args()

    # Run main function
    main(args)

    # Add space in logs
    print('*' * 60)
    print('\n\n')


## Define the components

Because our pipeline includes two steps, we need to create a separate YAML file for each component we intend to execute as part of the pipeline job.

A YAML file specifies details such as the **Metadata**, **Interface**, and **Command, code & environment**

Run the following cells to create a YAML for each component.

In [None]:
%%writefile prep-data.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: prep_data
display_name: Prepare training data
version: 1
type: command
inputs:
  input_data: 
    type: uri_file
outputs:
  output_data:
    type: uri_folder
code: ./src
environment: azureml:custom-env@latest
command: >-
  python prep-data.py 
  --input_data ${{inputs.input_data}} 
  --output_data ${{outputs.output_data}}

In [None]:
%%writefile train-model.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: train_model
display_name: Train a logistic regression model
version: 1
type: command
inputs:
  training_data: 
    type: uri_folder
  reg_rate:
    type: number
    default: 1.0
outputs:
  model_output:
    type: mlflow_model
code: ./src
environment: azureml:custom-env@latest
command: >-
  python train-model.py 
  --training_data ${{inputs.training_data}}
  --reg_rate ${{inputs.reg_rate}} 
  --model_output ${{outputs.model_output}} 

## Load the components

We will now load the components by referring to the YAML files, in the cell below.

In [None]:
from azure.ai.ml import load_component
parent_dir = ''

prep_data = load_component(source=parent_dir + './prep-data.yml')
train_logistic_regression = load_component(source=parent_dir + './train-model.yml')

## Build the pipeline

Once the components are created and loaded, you can set up the pipeline by linking them together. Start by running the `prep_data` component, then use its output as the input for the `train_logistic_regression` component, which will handle the model training.

The `tickets_classification` function serves as the complete pipeline for this process. It takes one input variable: `pipeline_job_input`. A data asset was created during setup, and you'll use this registered asset as the input for the pipeline.

In [None]:
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline

@pipeline
def tickets_classification(pipeline_job_input):
    clean_data = prep_data(input_data=pipeline_job_input)
    train_model = train_logistic_regression(training_data=clean_data.outputs.output_data)

    return {
        'pipeline_job_transformed_data': clean_data.outputs.output_data,
        'pipeline_job_trained_model': train_model.outputs.model_output,
    }

pipeline_job = tickets_classification(Input(type=AssetTypes.URI_FILE, path='azureml:tickets-data:1'))

Check out the configuration of the pipeline job by printing the `pipeline_job` object.

In [None]:
print(pipeline_job)

You can make edits to any of the pipeline job configuration parameters by referring to the parameter and assigning the new value.

In [None]:
# Change the output mode
pipeline_job.outputs.pipeline_job_transformed_data.mode = 'upload'
pipeline_job.outputs.pipeline_job_trained_model.mode = 'upload'
# Set pipeline level compute
pipeline_job.settings.default_compute = 'aml-cluster'
# Set pipeline level datastore
pipeline_job.settings.default_datastore = 'workspaceblobstore'

# Print the pipeline job again to review the changes
print(pipeline_job)

## Submit the pipeline job

Now that we have built the pipeline and configured the pipeline job to run as required, we can finally submit the pipeline job by running the following cell.

In [None]:
# Submit the pipeline job to workspace
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name='pipeline_tickets'
)
pipeline_job

## Register the model

Using MLflow in the `train-model` script, we were able to log our pipeline model. The MLmodel file stores all the model's metadata.

To register our pipeline model, we will refer to the name of the `train_model` component. Registering the model as an MLflow model will make deploying it later on a simple task.

In [None]:
# Grab train_model run_id
train_model_run_id = None
children = ml_client.jobs.list(parent_job_name=pipeline_job.name)

for child in children:
    if child.display_name == "train_model":
        train_model_run_id = child.name  # This is the run_id for train_model
        break

print("Train model run ID:", train_model_run_id)

In [None]:
# Register the model/pipeline
from azure.ai.ml.entities import Model
from azure.ai.ml.constants import AssetTypes

run_model = Model(
    path=f'azureml://jobs/{train_model_run_id}/outputs/artifacts/paths/model/',
    name='mlflow-tickets',
    description='Model created from run.',
    type=AssetTypes.MLFLOW_MODEL,
)

ml_client.models.create_or_update(run_model)

## Define and create an endpoint

Our goal is to ultimately deploy a model to a HTTPS endpoint so that an application can call to receive predictions from the model. An application can consume an endpoint by using its URI, and authenticating with a key or token.

To achieve a unique endpoint name we will use the `datetime` function to generate it. Run the following cell to define the endpoint.

In [None]:
from azure.ai.ml.entities import ManagedOnlineEndpoint
import datetime

online_endpoint_name = 'endpoint-' + datetime.datetime.now().strftime('%m%d%H%M%f')

# Create an online endpoint
endpoint = ManagedOnlineEndpoint(
    name=online_endpoint_name,
    description='Online endpoint for MLFlow tickets model',
    auth_mode='key',
)

Now that the endpoint has been defined, it is time to create the endpoint.

In [None]:
ml_client.begin_create_or_update(endpoint).result()

## Configure the deployment

To set up deployment, you'll need to indicate which model should be assigned to the endpoint. In the next step, you'll reference the model that was trained and saved in the local model directory. **Note: Follow steps outlined in this repo's `README.md` to save and upload model to Azure ML studio.**

Infrastructure needed for model deployment will also be specified in this step.

In [None]:
from azure.ai.ml.entities import Model, ManagedOnlineDeployment
from azure.ai.ml.constants import AssetTypes

# Create blue deployment
model = Model(
    path='./model',
    type=AssetTypes.MLFLOW_MODEL,
    description='my sample mlflow model',
)

blue_deployment = ManagedOnlineDeployment(
    name='blue',
    endpoint_name=online_endpoint_name,
    model=model,
    instance_type='Standard_D2as_v4',
    instance_count=1,
)

## Create the deployment

Now we can deploy the model to the endpoint.

In [None]:
ml_client.online_deployments.begin_create_or_update(blue_deployment).result()

Since we only have one model deployed to the endpoint, we want this deployment to take 100% of the traffic.

In [None]:
# blue deployment takes 100 traffic
endpoint.traffic = {'blue': 100}
ml_client.begin_create_or_update(endpoint).result()

## Test the deployment

We will now test the deployed model by invoking the endpoint. A JSON file with a sample text and associated text word count is used as input. The trained model predicts which class an IT Customer Support ticket will be assigned to. The output can be one of four classes: **Change,** **Incident,** **Problem,** and **Request.**

In [None]:
# Test the blue deployment with some sample data
response = ml_client.online_endpoints.invoke(
    endpoint_name=online_endpoint_name,
    deployment_name='blue',
    request_file='sample-data.json',
)

In [None]:
print(response)

## List endpoints

Using the SDK, you can list all endpoints.

In [None]:
# List endpoints
endpoints = ml_client.online_endpoints.list()
for endp in endpoints:
    print(endp.name)

## Retrieve endpoint details

If you were feeling curious and wanted more information about a specific endpoint, you can once again put the SDK to use.

In [None]:
# Get endpoint details
endpoint = ml_client.online_endpoints.get(name=online_endpoint_name)

# Existing traffic details
print(endpoint.traffic)

# Get the scoring URI
print(endpoint.scoring_uri)

## Delete the endpoint and deployment

Since an endpoint remains active at all times, there is no option to pause it for cost savings. To prevent incurring unnecessary charges, you should delete the endpoint.

In [None]:
# Delete endpoint and deployment - endpoints are always available and can't be paused
ml_client.online_endpoints.begin_delete(name=online_endpoint_name)