# MLflow-Based Batch Deployment on Azure Machine Learning

This notebook demonstrates how to deploy and run MLflow-logged models using Azure Machine Learning's **batch endpoints**.

By leveraging **MLflow tracking** and **no-code deployment capabilities**, you can streamline the deployment process without the need to manually define a scoring script or custom environment.

### Key Features of No-Code Batch Deployment:
- Automatically uses a curated Azure ML environment with all necessary MLflow dependencies pre-installed.
- Generates an internal scoring script and batch scoring pipeline behind the scenes.
- Simplifies deployment by referencing a previously registered MLflow model artifact.
- Supports large-scale parallel processing of input data with batch endpoints.

This approach is ideal for:
- Rapidly operationalizing MLflow models.
- Running inference asynchronously on large datasets.
- Integrating batch prediction seamlessly into production MLOps pipelines.

In the following steps, you'll configure the deployment, define batch input/output, and submit a batch job without writing custom scoring logic.


## 1. Connect to Azure Machine Learning Workspace

This section initiates the setup required to enable batch inference with MLflow-logged models. The Azure Machine Learning workspace serves as the centralized resource for managing experiments, models, data, and compute targets. 

Establishing a connection to the workspace ensures access to previously registered assets and provides the foundation for executing batch deployments programmatically.

### 1.1. Import required libraries

The necessary modules from the Azure ML SDK v2 are imported to support batch endpoint creation, model deployment, compute resource configuration, and interaction with registered assets.


In [1]:
from azure.ai.ml import MLClient, Input
from azure.ai.ml.entities import (
    BatchEndpoint,
    ModelBatchDeployment,
    ModelBatchDeploymentSettings,
    Model,
    AmlCompute,
    Data,
    BatchRetrySettings,
    CodeConfiguration,
    Environment,
)
from azure.ai.ml.constants import AssetTypes, BatchDeploymentOutputAction
from azure.identity import DefaultAzureCredential

### 1.2. Configure workspace details and initialize MLClient

To interact with Azure Machine Learning programmatically, a connection to the workspace must be established using subscription ID, resource group, and workspace name. These identifiers are passed to the `MLClient` to authenticate and manage resources within the target environment.

This notebook uses `DefaultAzureCredential` to handle authentication, which simplifies credential management by supporting multiple authentication mechanisms. This method is well-suited for local development as well as cloud-hosted environments.

The client object created here is essential for performing operations such as model registration, deployment, and job submission throughout the batch inference workflow.


In [2]:
subscription_id = "43dae9af-3755-421b-bfae-29b91f9e85dd"
resource_group = "cyberml-canada-rg"
workspace = "cyberml-ws"

In [3]:
ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

If a Azure Machine Learning compute is used:

In [4]:
ml_client = MLClient.from_config(DefaultAzureCredential())

Found the config file in: /config.json


## 2. Registering the Model

### 2.1 Model Overview

This project leverages a machine learning model designed to detect potential cybersecurity intrusions. Unlike the default example which references heart disease detection, this implementation focuses on identifying malicious activity using features derived from network traffic and system behavior logs.

The model is structured as an MLflow model pipeline and includes preprocessing and classification components. This allows raw input data to be directly passed through the model for inference, simplifying deployment and ensuring reproducibility.

### 2.2 Model Registration in Azure ML

To make the model available for batch inference deployment, it must be registered in the Azure Machine Learning model registry. The script checks if the `cyber_intrusion_model` is already available; if not, it registers the MLflow-packaged model located in the specified directory.

Registering the model ensures version control and enables tracking across environments and endpoints.


In [8]:
model_name = "cyber_intrusion_model"
model_local_path = "/home/azureuser/cloudfiles/code/cyber_intrusion_model_"

model = ml_client.models.create_or_update(
    Model(name=model_name, path=model_local_path, type=AssetTypes.MLFLOW_MODEL)
)

[32mUploading cyber_intrusion_model_ (0.88 MBs): 100%|██████████| 878601/878601 [00:00<00:00, 14243109.38it/s]
[39m



Let's get the model:

In [9]:
model_name = "cyber_intrusion_model"

## 3. Creating a Batch Endpoint

Batch endpoints facilitate asynchronous, large-scale inference tasks in Azure Machine Learning. They are particularly effective for scenarios involving high-volume data processing, such as analyzing network traffic or system logs for cybersecurity threats.

A batch endpoint enables decoupling of model scoring from real-time requirements by accepting pointers to data stored in blob storage or datastores. Jobs triggered on the endpoint are executed on compute clusters and results are written back to storage for downstream tasks such as reporting or auditing.

### 3.1 Endpoint Configuration

This section configures a batch endpoint intended to host a model deployment for cybersecurity intrusion detection. To ensure global uniqueness within the Azure region, the endpoint name is appended with a random suffix. While this approach helps prevent naming conflicts in test environments, meaningful naming conventions are typically used in production for better traceability.

The `BatchEndpoint` configuration includes:
- `name`: A unique identifier for the endpoint.
- `description`: A textual summary of the endpoint’s purpose.
- `auth_mode`: Default authentication mode is Azure Active Directory token-based access.


In [14]:
endpoint_name = "batch-cyber-intrusion"

In [15]:
import random
import string

# Creating a unique endpoint name by including a random suffix
allowed_chars = string.ascii_lowercase + string.digits
endpoint_suffix = "".join(random.choice(allowed_chars) for x in range(5))
endpoint_name = f"{endpoint_name}-{endpoint_suffix}"

print(f"Endpoint name: {endpoint_name}")

Endpoint name: batch-cyber-intrusion-so9ut


To configure the endpoint:

In [16]:
endpoint = BatchEndpoint(
    name=endpoint_name,
    description="A cyber intrusion condition classifier for batch inference",
)

### 3.2 Creating the Endpoint

This step initiates the creation of the batch endpoint within the Azure Machine Learning workspace. The previously configured `MLClient` instance is used to submit the endpoint definition to the Azure backend.

Once submitted, the operation proceeds asynchronously but returns a confirmation object immediately. This allows batch scoring jobs to be routed through a consistent endpoint interface that abstracts underlying infrastructure details.


In [17]:
ml_client.batch_endpoints.begin_create_or_update(endpoint).result()

BatchEndpoint({'scoring_uri': 'https://batch-cyber-intrusion-so9ut.canadacentral.inference.ml.azure.com/jobs', 'openapi_uri': None, 'provisioning_state': 'Succeeded', 'name': 'batch-cyber-intrusion-so9ut', 'description': 'A cyber intrusion condition classifier for batch inference', 'tags': {}, 'properties': {'BatchEndpointCreationApiVersion': '2023-10-01', 'azureml.onlineendpointid': '/subscriptions/43dae9af-3755-421b-bfae-29b91f9e85dd/resourceGroups/cyberml-canada-rg/providers/Microsoft.MachineLearningServices/workspaces/cyberml-ws/batchEndpoints/batch-cyber-intrusion-so9ut'}, 'print_as_yaml': False, 'id': '/subscriptions/43dae9af-3755-421b-bfae-29b91f9e85dd/resourceGroups/cyberml-canada-rg/providers/Microsoft.MachineLearningServices/workspaces/cyberml-ws/batchEndpoints/batch-cyber-intrusion-so9ut', 'Resource__source_path': '', 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/cpucluster01/code', 'creation_context': None, 'serialize': <msrest.serialization.Serializer objec

## 4. Create a batch deployment

A deployment is a set of resources required for hosting the model that does the actual inferencing. A deployment for the endpoint using the `BatchDeployment` class will be created as below.

### 4.1 Creating an scoring script to work with the model

> MLflow models don't require an scoring script.

### 4.2 Creating the Compute

Batch deployments in Azure Machine Learning require compute resources capable of handling large-scale data processing workloads. These deployments can leverage any available compute cluster within the workspace, supporting scalability and reusability across multiple jobs.

This step ensures the existence of an AzureML compute cluster suitable for batch operations. If the specified cluster is not found in the workspace, it is created with a defined scale range, enabling dynamic provisioning based on job demand.


In [67]:
compute_name = "batch-cluster"
if not any(filter(lambda m: m.name == compute_name, ml_client.compute.list())):
    compute_cluster = AmlCompute(
        name=compute_name, description="amlcompute", min_instances=0, max_instances=5
    )
    ml_client.begin_create_or_update(compute_cluster).result()

### 4.3 Creating the environment

> MLflow models don't require an environment.

### 4.4 Configuring the deployment

In [68]:
deployment = ModelBatchDeployment(
    name="cyber-intrusion-deployment",
    description="A cyber intrusion condition classifier for batch inference",
    endpoint_name=endpoint.name,
    model=model,
    compute=compute_name,
    settings=ModelBatchDeploymentSettings(
        instance_count=1,
        max_concurrency_per_instance=1,
        mini_batch_size=10,
        output_action=BatchDeploymentOutputAction.APPEND_ROW,
        output_file_name="predictions.csv",
        retry_settings=BatchRetrySettings(max_retries=3, timeout=300),
        logging_level="info",
    ),
)

### 4.5 Create the deployment
Using the `MLClient` created earlier, the deployment in the workspace should be created next. This command will start the deployment creation and return a confirmation response while the deployment creation continues.

In [69]:
ml_client.batch_deployments.begin_create_or_update(deployment).result()

BatchDeployment({'provisioning_state': 'Succeeded', 'endpoint_name': 'batch-cyber-intrusion-so9ut', 'type': None, 'name': 'cyber-intrusion-deployment', 'description': 'A cyber intrusion condition classifier for batch inference', 'tags': {}, 'properties': {}, 'print_as_yaml': False, 'id': '/subscriptions/43dae9af-3755-421b-bfae-29b91f9e85dd/resourceGroups/cyberml-canada-rg/providers/Microsoft.MachineLearningServices/workspaces/cyberml-ws/batchEndpoints/batch-cyber-intrusion-so9ut/deployments/cyber-intrusion-deployment', 'Resource__source_path': '', 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/cpucluster01/code', 'creation_context': <azure.ai.ml.entities._system_data.SystemData object at 0x7fc1d94400a0>, 'serialize': <msrest.serialization.Serializer object at 0x7fc1d9443010>, 'model': '/subscriptions/43dae9af-3755-421b-bfae-29b91f9e85dd/resourceGroups/cyberml-canada-rg/providers/Microsoft.MachineLearningServices/workspaces/cyberml-ws/models/cyber_intrusion_model/versions

Once created, to configure this new deployment as the default one:

In [70]:
endpoint = ml_client.batch_endpoints.get(endpoint.name)
endpoint.defaults.deployment_name = deployment.name
ml_client.batch_endpoints.begin_create_or_update(endpoint).result()

BatchEndpoint({'scoring_uri': 'https://batch-cyber-intrusion-so9ut.canadacentral.inference.ml.azure.com/jobs', 'openapi_uri': None, 'provisioning_state': 'Succeeded', 'name': 'batch-cyber-intrusion-so9ut', 'description': 'A cyber intrusion condition classifier for batch inference', 'tags': {}, 'properties': {'BatchEndpointCreationApiVersion': '2023-10-01', 'azureml.onlineendpointid': '/subscriptions/43dae9af-3755-421b-bfae-29b91f9e85dd/resourceGroups/cyberml-canada-rg/providers/Microsoft.MachineLearningServices/workspaces/cyberml-ws/batchEndpoints/batch-cyber-intrusion-so9ut'}, 'print_as_yaml': False, 'id': '/subscriptions/43dae9af-3755-421b-bfae-29b91f9e85dd/resourceGroups/cyberml-canada-rg/providers/Microsoft.MachineLearningServices/workspaces/cyberml-ws/batchEndpoints/batch-cyber-intrusion-so9ut', 'Resource__source_path': '', 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/cpucluster01/code', 'creation_context': None, 'serialize': <msrest.serialization.Serializer objec

In [71]:
print(f"The default deployment is {endpoint.defaults.deployment_name}")

The default deployment is cyber-intrusion-deployment



### 4.6 Testing the deployment

Once the deployment is created, it is ready to recieve jobs.

#### 4.6.1 Creating a data asset

This data asset is a file containing randomized entries from the original cyber intrusion dataset. We are going to download it first and then create the data asset:

In [None]:
data_path = "/home/azureuser/cloudfiles/code/Users/edwardhw.ng/cybersecurity_intrusion_data_test.csv"
dataset_name = "cybersecurity_intrusion_data_test"

cyber_dataset_unlabeled = Data(
    path=data_path,
    type=AssetTypes.URI_FILE,
    description="An unlabeled dataset for cyber intrusion classification",
    name=dataset_name,
)

In [31]:
ml_client.data.create_or_update(cyber_dataset_unlabeled)

Uploading cybersecurity_intrusion_data_test.csv (< 1 MB): 0.00B [00:00, ?B/s]Uploading cybersecurity_intrusion_data_test.csv (< 1 MB): 0.00B [00:00, ?B/s] (< 1 MB): 100%|██████████| 122k/122k [00:00<00:00, 4.45MB/s] (< 1 MB): 100%|██████████| 122k/122k [00:00<00:00, 4.45MB/s]







Data({'path': 'azureml://subscriptions/43dae9af-3755-421b-bfae-29b91f9e85dd/resourcegroups/cyberml-canada-rg/workspaces/cyberml-ws/datastores/workspaceblobstore/paths/LocalUpload/31f972944d728493651c169991abece5/cybersecurity_intrusion_data_test.csv', 'skip_validation': False, 'mltable_schema_url': None, 'referenced_uris': None, 'type': 'uri_file', 'is_anonymous': False, 'auto_increment_version': False, 'auto_delete_setting': None, 'name': 'cybersecurity_intrusion_data_test', 'description': 'An unlabeled dataset for intrusion classification', 'tags': {}, 'properties': {}, 'print_as_yaml': False, 'id': '/subscriptions/43dae9af-3755-421b-bfae-29b91f9e85dd/resourceGroups/cyberml-canada-rg/providers/Microsoft.MachineLearningServices/workspaces/cyberml-ws/data/cybersecurity_intrusion_data_test/versions/1', 'Resource__source_path': '', 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/cpucluster01/code', 'creation_context': <azure.ai.ml.entities._system_data.SystemData object at 

To get a reference of the new data asset:

In [44]:
cyber_dataset_unlabeled = ml_client.data.get(name=dataset_name, label="latest")

#### 4.6.2 Creating an input for the deployment

In [59]:
input = Input(type=AssetTypes.URI_FILE, path=cyber_dataset_unlabeled.id)

#### 4.6.3 Invoke the deployment

Using the `MLClient` created earlier, the endpoint can be invoked using the `invoke` command with the following parameters:
- `name` - Name of the endpoint
- `input_path` - Path where input data is present

In [72]:
job = ml_client.batch_endpoints.invoke(endpoint_name=endpoint.name, input=input)

Since the endpoint only has one deployment, then that one is the default one. Notes: an specific deployment can also be targetted, by indicating the argument/parameter `deployment_name`.

In [74]:
job = ml_client.batch_endpoints.invoke(
    deployment_name=deployment.name, endpoint_name=endpoint.name, input=input
)

#### 4.6.4 Get the details of the invoked job

Let us get details and logs of the invoked job:

In [75]:
ml_client.jobs.get(job.name)

Experiment,Name,Type,Status,Details Page
batch-cyber-intrusion-so9ut,batchjob-a466a77d-6e62-4ff8-be1d-c39bce7ede6f,pipeline,Running,Link to Azure Machine Learning studio


We can wait for the job to finish using the following code:

In [2]:
ml_client.jobs.stream(job.name)

NameError: name 'job' is not defined

### 4.7 Exploring the results

The deployment creates a child job that executes the scoring.

In [1]:
scoring_job = list(ml_client.jobs.list(parent_job_name=job.name))[0]

NameError: name 'ml_client' is not defined

In [None]:
print("Job name:", scoring_job.name)
print("Job status:", scoring_job.status)
print(
    "Job duration:",
    scoring_job.creation_context.last_modified_at
    - scoring_job.creation_context.created_at,
)

#### 4.7.1 Download the results

The outputs generated by the deployment job will be placed in an output named `score`:

In [None]:
ml_client.jobs.download(name=scoring_job.name, download_path=".", output_name="score")

In [None]:
import pandas as pd

score = pd.read_csv(
    "named-outputs/score/predictions.csv", names=["row", "prediction", "file"]
)

#### 4.7.2 Logging Predictions for Auditability

With the batch endpoint live and scoring logic in place, a batch scoring job was submitted to generate predictions on new input data.

#### Batch Job Submission

The job was configured to:
- Use a previously uploaded file (`cyber_intrusion.csv`) from the workspace blob store as input  
- Mount the input file in read-only mode for performance and safety  
- Define the output as a `URI_FILE`, storing predictions as a single `.csv` file in a designated output path within the blob store  

Upon submission, Azure ML handled the job asynchronously—spinning up compute, running inference, and writing the results to the specified output location.

#### Why This Matters for Auditability

Logging predictions is a critical step for ensuring:
- **Traceability**: All outputs are stored with timestamps and paths that can be referenced later  
- **Reproducibility**: Given the same input data and model version, the predictions can be regenerated  
- **Transparency**: Outputs can be inspected manually or automatically to verify model behavior  
- **Compliance**: Especially in regulated domains like cybersecurity, maintaining a clear record of predictions supports operational accountability

This logged prediction output will also serve as input for the next steps—evaluating model performance and setting up drift detection.


In [None]:
# Logging Predictions for Auditability

from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential
from datetime import datetime

# Initialize ML client
ml_client = MLClient.from_config(credential=DefaultAzureCredential())

# Optional: your endpoint name
batch_endpoint_name = "batch-cyber-intrusion"

# List all jobs in the workspace
all_jobs = list(ml_client.jobs.list())

# Filter for jobs likely related to batch inference
batch_jobs = [
    job for job in all_jobs
    if "batch" in job.name.lower() or "cyber" in job.name.lower()
]

# Safety check
if not batch_jobs:
    print("⚠️ No recent batch jobs found. Try submitting again or wait a few minutes.")
else:
    # Get the most recent job
    latest_job = sorted(
        batch_jobs,
        key=lambda x: x.creation_context.created_at,
        reverse=True
    )[0]

    # Log job details
    print(f"Latest Batch Job ID: {latest_job.name}")
    print(f"Status         : {latest_job.status}")
    print(f"Start Time     : {latest_job.creation_context.created_at}")
    print(f"Compute Target : {latest_job.compute}")
    
    # Save job info to audit log file
    output_path = "/home/azureuser/cloudfiles/code/Users/edwardhw.ng/named-outputs/score"
    log_entry = f"""Job Run: {datetime.now()}
- ID: {latest_job.name}
- Status: {latest_job.status}
- Compute: {latest_job.compute}
- Output Path: {output_path}
- Studio URI: {latest_job.services.get('Studio', {}).get('endpoint', 'Not Available') if latest_job.services else 'N/A'}

-------------------------
"""

    with open("batch_job_audit_log.txt", "a") as log_file:
        log_file.write(log_entry)

    print("\n Batch job info logged to 'batch_job_audit_log.txt'")


Found the config file in: /config.json


✅ Latest Batch Job ID: batchjob-f42d7086-b47a-419f-a06f-2b292fef07e5
Status         : Completed
Start Time     : 2025-04-06 04:12:13.733869+00:00
Compute Target : None

 Batch job info logged to 'batch_job_audit_log.txt'


## 5. Customize deployment with an scoring script

An customized scoring script with MLflow models in batch endpoints can be used also:

### 5.1 Create an scoring script

In [None]:
%%writefile code/batch_driver.py

import os
import glob
import mlflow
import pandas as pd


def init():
    global model
    global model_input_types
    global model_output_names

    # AZUREML_MODEL_DIR is an environment variable created during deployment, which is the path to the model folder
    model_path = glob.glob(os.environ["AZUREML_MODEL_DIR"] + "/*/")[0]

    # Load the model, it's input types and output names
    model = mlflow.pyfunc.load(model_path)
    if model.metadata.signature.inputs:
        model_input_types = dict(
            zip(
                model.metadata.signature.inputs.input_names(),
                model.metadata.signature.inputs.pandas_types(),
            )
        )
    if model.metadata.signature.outputs:
        if model.metadata.signature.outputs.has_input_names():
            model_output_names = model.metadata.signature.outputs.input_names()
        elif len(model.metadata.signature.outputs.input_names()) == 1:
            model_output_names = ["prediction"]


def run(mini_batch):
    print(f"run method start: {__file__}, run({len(mini_batch)} files)")

    data = pd.concat(
        map(
            lambda fp: pd.read_csv(fp).assign(filename=os.path.basename(fp)), mini_batch
        )
    )
    if model_input_types:
        data = data.astype(model_input_types)

    pred = model.predict(data)

    if pred is not pd.DataFrame:
        if not model_output_names:
            model_output_names = ["pred_col" + str(i) for i in range(pred.shape[1])]
        pred = pd.DataFrame(pred, columns=model_output_names)

    return pd.concat([data, pred], axis=1)


### 5.2 Indicate the environment:

In [None]:
environment = Environment(
    name="batch-mlflow-environment",
    conda_file="environment/conda.yaml",
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
)

### 5.3 Configure the deployment

In [None]:
deployment = ModelBatchDeployment(
    name="classifier-cyber-custom",
    description="A cyber intrusion classifier with a custom scoring script",
    endpoint_name=endpoint.name,
    model=model,
    environment=environment,
    code_configuration=CodeConfiguration(code="code", scoring_script="batch_driver.py"),
    compute=compute_name,
    settings=ModelBatchDeploymentSettings(
        instance_count=2,
        max_concurrency_per_instance=2,
        mini_batch_size=10,
        output_action=BatchDeploymentOutputAction.APPEND_ROW,
        output_file_name="predictions.csv",
        retry_settings=BatchRetrySettings(max_retries=3, timeout=300),
        logging_level="info",
    ),
)

### 5.3 Create the deployment

In [None]:
ml_client.batch_deployments.begin_create_or_update(deployment).result()

## 6. Clean up resources

Clean-up the resources created. 

In [None]:
ml_client.batch_endpoints.begin_delete(endpoint_name).result()