In [2]:
# Import Required Libraries
import os
import logging
import yaml
import json
import pandas as pd
from azure.ai.ml import MLClient, Input
from azure.ai.ml.entities import (
    Environment, 
    BatchEndpoint, 
    BatchDeployment, 
    CodeConfiguration,
    BatchRetrySettings,
    ResourceConfiguration,
    AmlCompute
)
from azure.ai.ml.constants import AssetTypes
from azure.identity import DefaultAzureCredential
from tqdm import tqdm

In [3]:
# Uncomment the following line to log in to Azure
#!az login

In [4]:
# Set global logging level
logging.getLogger().setLevel(logging.WARNING)

# Specifically reduce Azure-related logging in this notebook
logging.getLogger("azure").setLevel(logging.ERROR)
logging.getLogger("azure.identity").setLevel(logging.ERROR)
logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(logging.ERROR)

In [68]:
# Read the Azure ML workspace configuration from config.yml
with open("config.yml", "r") as f:
    config = yaml.safe_load(f)

# Azure ML workspace configuration
subscription_id = config["subscription_id"]
resource_group = config["resource_group"]
workspace_name = config["workspace_name"]

# Fintuned Model configuration
finetuned_model_name = config["finetuned_model_name"]
finetuned_model_version = config["finetuned_model_version"]

# Inference environment configuration
inference_env_name = config["inference_env_name"]
inference_env_version = config["inference_env_version"]
inference_env_conda_file = config["inference_env_conda_file"]
inference_env_base_image = config["inference_env_base_image"]

# Finetuned model batch endpoint configuration
batch_endpoint_name = config["batch_endpoint_name"]
batch_deployment_name = config["batch_deployment_name"]
batch_cluster_name = config["batch_cluster_name"]

In [6]:
# Initialize ML Client
ml_client = MLClient(DefaultAzureCredential(), subscription_id, resource_group, workspace_name)
print("ML Client initialized successfully")

ML Client initialized successfully


In [49]:
# Get the registered model
registered_model = ml_client.models.get(name=finetuned_model_name, version=finetuned_model_version)

In [17]:
# Create or get inference environment
try:
    # Try to get existing environment
    env_asset = ml_client.environments.get(name=inference_env_name, version=inference_env_version)
    print(f"Using existing environment: {inference_env_name}:{inference_env_version}")
except:
    # Create new environment if it doesn't exist
    print(f"Creating new environment: {inference_env_name}")
    env_asset = Environment(
        name=inference_env_name,
        conda_file=inference_env_conda_file,
        image=inference_env_base_image,
        description="Environment for batch inference with fine-tuned model"
    )
    env_asset = ml_client.environments.create_or_update(env_asset)
    print(f"Environment created: {env_asset.id}")

✓ Using existing environment: finetuned-phi4-model-env:4


In [9]:
# Create the compute -  I have done this via the Azure ML Studio

In [13]:
# Create Batch Endpoint
try:
    # Try to get existing endpoint
    batch_endpoint = ml_client.batch_endpoints.get(batch_endpoint_name)
    print(f"Using existing batch endpoint: {batch_endpoint_name}")
except:
    print(f"Creating new batch endpoint: {batch_endpoint_name}")
    
    # Create new batch endpoint
    batch_endpoint = BatchEndpoint(
        name=batch_endpoint_name,
        description="Batch endpoint for fine-tuned model inference"
    )
    batch_endpoint = ml_client.batch_endpoints.begin_create_or_update(batch_endpoint).result()
    print(f"Batch endpoint '{batch_endpoint_name}' created successfully")

Creating new batch endpoint: b-ft-phi4-mini-instruct-endpoint
Batch endpoint 'b-ft-phi4-mini-instruct-endpoint' created successfully


In [None]:
# Create Batch Deployment
try:
    # Try to get existing deployment
    batch_deployment = ml_client.batch_deployments.get(batch_deployment_name, batch_endpoint_name)
    print(f"Using existing batch deployment: {batch_deployment_name}")
except:
    print(f"Creating new batch deployment: {batch_deployment_name}")
    
    batch_deployment = BatchDeployment(
        name=batch_deployment_name,
        endpoint_name=batch_endpoint_name,
        model=registered_model.id,
        environment=env_asset.id,
        code_configuration=CodeConfiguration(
            code="./serve", 
            scoring_script="score_batch.py"
        ),
        # Use the dedicated compute cluster
        compute=batch_cluster_name,
        
        # Resource configuration for batch processing
        resources=ResourceConfiguration(
            instance_count=1,  # Number of nodes for parallel processing
        ),
        
        # Retry settings for handling low-priority VM interruptions
        retry_settings=BatchRetrySettings(
            max_retries=3,  # Retry up to 3 times if interrupted
            timeout=300  # 5 minutes timeout per batch
        ),
        
        # Batch-specific settings
        max_concurrency_per_instance=1,
        mini_batch_size=5,  # Process 5 items per mini-batch
        logging_level="info",
        description="Batch deployment"
    )
    
    print("Creating batch deployment... This may take several minutes.")
    batch_deployment = ml_client.batch_deployments.begin_create_or_update(batch_deployment).result()
    print(f"Batch deployment '{batch_deployment_name}' created successfully")

Creating new batch deployment: green
Creating batch deployment... This may take several minutes.


* ModelBatchDeployment - For model-based batch deployments
* PipelineComponentBatchDeployment - For pipeline component-based batch deployments


Batch deployment 'green' created successfully


In [70]:
# Set default deployment for the batch endpoint
batch_endpoint.defaults.deployment_name = batch_deployment_name
ml_client.batch_endpoints.begin_create_or_update(batch_endpoint).result()
print(f"Set '{batch_deployment_name}' as default deployment for batch endpoint")

Set 'green' as default deployment for batch endpoint


# Testing the batch end point

In [20]:
# Load test data
with open("data/test.jsonl", "r", encoding='utf-8') as f:
    test_data = [json.loads(line) for line in f]

print(f"Loaded {len(test_data)} test samples")
print(f"Sample item keys: {list(test_data[0].keys()) if test_data else 'No data'}")

Loaded 1273 test samples
Sample item keys: ['question', 'answer', 'options', 'meta_info', 'answer_idx']


In [None]:
# Create batch input data for scoring
output_file = "batch_input.jsonl"
max_items = 20
batch_data = []
for i, item in enumerate(test_data[:max_items]):
    question = item["question"]
    options = item["options"]
    answer_idx = item["answer_idx"]
    
    # Format options as A. Option text...
    formatted_options = "\n".join([f"{key}. {val}" for key, val in sorted(options.items())])
    
    # Create request in the format expected by the model
    request = {
        "id": f"item_{i}",  # Unique identifier for tracking
        "messages": [
            {
                "role": "system",
                "content": "You are a medical expert. Read the following USMLE question and choose the best answer. Give me the answer as A/B/C/D/E."
            },
            {
                "role": "user",
                "content": f"Question:\n{question}\n\nOptions:\n{formatted_options}"
            }
        ],
        "max_tokens": 10,
        "temperature": 0.1,
        "ground_truth": answer_idx  # For evaluation purposes
    }
    batch_data.append(request)

# Save to JSONL format for batch processing
with open(output_file, "w", encoding='utf-8') as f:
    for item in batch_data:
        f.write(json.dumps(item) + "\n")

print(f"Created batch input file '{output_file}' with {len(batch_data)} items")

Created batch input file 'batch_input.jsonl' with 20 items


In [None]:
# Submit batch scoring job
input_file = "batch_input.jsonl"
try:
    # Use direct file path approach
    input_data = Input(
        type=AssetTypes.URI_FILE,
        path=f"./{input_file}"
    )
    
    print(f"Invoking batch endpoint...")
    job = ml_client.batch_endpoints.invoke(
        endpoint_name=batch_endpoint_name,
        input=input_data,
        deployment_name=batch_deployment_name
    )
    
    print(f"Batch job submitted successfully!")
    print(f"Job name: {job.name}")


except Exception as e:
    print(f"Batch scoring job failed: {e}")
    print(f"Error details: {str(e)}")

Invoking batch endpoint...
Batch job submitted successfully!
Job name: batchjob-95796ea7-142b-47b6-bdcc-e9bfeeead3e4


In [46]:
# Download the results
ml_client.jobs.download(job.name, download_path="./batch_results")
print("Results downloaded to './batch_results'")

Downloading artifact azureml://datastores/workspaceblobstore/paths/azureml/addd61c0-bb32-4580-b793-e8abd5b27e29/score/ to batch_results


Results downloaded to './batch_results'


In [53]:
# Show the results
results_file = os.path.join("batch_results", "batch_input_results.json")
# read the json results
with open(results_file, "r", encoding='utf-8') as f:
    results = json.load(f)
# Display the results
if results:
    df_results = pd.DataFrame(results)
    print("Batch scoring results:")
    print(df_results.head())


Batch scoring results:
       id prediction full_response ground_truth  prompt_length  \
0  item_0          B             B            C            219   
1  item_1          C             C            E            169   
2  item_2          C             C            C            338   
3  item_3          E             E            D            349   
4  item_4          B             B            B            233   

   response_length  
0                3  
1                3  
2                3  
3                3  
4                3  


### Make sure to delete the clusters (dedicated ones) and endpoints after evaluation. Low-priority clusters scale to 0 automatically, so they don't incur costs when idle.