# Deploy to an batch endpoint

Imagine a health clinic takes patient measurements all day, saving the details for each patient in a separate file. Then overnight, the diabetes prediction model can be used to process all of the day's patient data as a batch, generating predictions that will be waiting the following morning so that the clinic can follow up with patients who are predicted to be at risk of diabetes. With Azure Machine Learning, you can accomplish this by creating a batch endpoint; and that's what you'll implement in this exercise.

## Before you start

You'll need the latest version of the  **azure-ai-ml** package to run the code in this notebook. Run the cell below to verify that it is installed.

> **Note**:
> If the **azure-ai-ml** package is not installed, run `pip install azure-ai-ml` to install it.

In [1]:
pip show azure-ai-ml

Name: azure-ai-ml
Version: 1.25.0
Summary: Microsoft Azure Machine Learning Client Library for Python
Home-page: https://github.com/Azure/azure-sdk-for-python
Author: Microsoft Corporation
Author-email: azuresdkengsysadmins@microsoft.com
License: MIT License
Location: /anaconda/envs/azureml_py38/lib/python3.10/site-packages
Requires: azure-common, azure-core, azure-mgmt-core, azure-monitor-opentelemetry, azure-storage-blob, azure-storage-file-datalake, azure-storage-file-share, colorama, isodate, jsonschema, marshmallow, msrest, pydash, pyjwt, pyyaml, strictyaml, tqdm, typing-extensions
Required-by: 
Note: you may need to restart the kernel to use updated packages.


## Connect to your workspace

With the required SDK packages installed, now you're ready to connect to your workspace.

To connect to a workspace, we need identifier parameters - a subscription ID, resource group name, and workspace name. Since you're working with a compute instance, managed by Azure Machine Learning, you can use the default values to connect to the workspace.

In [2]:
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

In [3]:
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

Found the config file in: /config.json


## create the model

- Customize the model with a defined signature

You can manually log the model using `mlflow.sklearn.log_model`. You'll also create a signature manually. And finally, you'll log the scikit-learn model.

Run the following cell to create the **train-model-signature.py** script in the **src** folder. The script trains a classification model by using the **accident.csv** file in the same folder, which is passed as an argument. 

In [4]:
import os

# create a folder for the script files
script_folder = 'src'
os.makedirs(script_folder, exist_ok=True)
print(script_folder, 'folder created')

src folder created


In [6]:
# Import necessary libraries
import os
import joblib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.linear_model import LogisticRegression
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, roc_curve
import mlflow

In [6]:
stores = ml_client.datastores.list()
for ds_name in stores:
    print(ds_name.name)

workspacefilestore
workspaceworkingdirectory
workspaceblobstore
workspaceartifactstore


In [7]:


# Load the dataset (Ensure the file exists in the directory)
try:
    df = pd.read_csv('accident.csv')
except FileNotFoundError:
    raise FileNotFoundError("accident.csv not found in the current directory")

df['Age'] = df['Age'].astype(float)
df['Speed_of_Impact'] = df['Speed_of_Impact'].astype(float)
df = df.dropna().copy()
df.head(5)

# Define categorical and numeric columns
cat_columns = ['Gender', 'Helmet_Used', 'Seatbelt_Used']
num_columns = ['Age', 'Speed_of_Impact']

# Define feature transformations
numeric_transformer = make_pipeline(
    SimpleImputer(strategy="mean"),
    StandardScaler()
)

categorical_transformer = make_pipeline(
    SimpleImputer(strategy="most_frequent"),
    OneHotEncoder(drop='first')
)

# Combine transformations
features_transformer = ColumnTransformer(
    transformers=[
        ("numeric", numeric_transformer, num_columns),
        ("categorical", categorical_transformer, cat_columns)
    ]
)

# Extract features and labels
X = df[['Age', 'Gender', 'Speed_of_Impact', 'Helmet_Used', 'Seatbelt_Used']]
y = df['Survived'].values

# Split data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=42)

# Apply transformations
X_train_transformed = features_transformer.fit_transform(X_train)
X_test_transformed = features_transformer.transform(X_test)

# Define path for saving transformer
transformer_path = os.path.join('./src', "features_transformer.joblib")

# Save the feature transformer for future use
joblib.dump(features_transformer, transformer_path)
print(f"Feature transformer saved at: {transformer_path}")

print("Training model...")

# Train logistic regression model
model = LogisticRegression(C=1/0.01, solver="liblinear")
model.fit(X_train_transformed, y_train)  # Use Transformed Data

# Make predictions
y_hat = model.predict(X_test_transformed)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)

# Calculate AUC Score
y_scores = model.predict_proba(X_test_transformed)  # Compute Probabilities
auc = roc_auc_score(y_test, y_scores[:, 1])
print("AUC Score:", auc)

# Plot ROC curve
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:, 1])
plt.figure(figsize=(6, 4))
plt.plot([0, 1], [0, 1], 'k--')  # Diagonal reference line
plt.plot(fpr, tpr)  # ROC curve
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.close()

# Define path for saving model
# Log the model and feature transformer in MLflow
#mlflow.sklearn.log_model(
    #sk_model=model,
    #artifact_path="model"  # This is the relative path within MLflow's artifact storage
#)

# Optional: Save as MLflow model format locally
mlflow.sklearn.save_model(
    sk_model=model,
    path="model"
)

print("Model saved locally ")

Feature transformer saved at: ./src/features_transformer.joblib
Training model...
Accuracy: 0.5254237288135594
AUC Score: 0.5816091954022988




## Register the model

Batch deployments can only deploy models registered in the workspace. You'll register an MLflow model, which is stored in the local `model` folder. 

In [8]:
from azure.ai.ml.entities import Model
from azure.ai.ml.constants import AssetTypes

model_name = "accident-survival-model"
model = ml_client.models.create_or_update(
    Model(name=model_name, path='./model', type=AssetTypes.MLFLOW_MODEL)
)

[32mUploading model (0.0 MBs): 100%|██████████| 1929/1929 [00:00<00:00, 32894.96it/s]
[39m



In [9]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

my_path = './src/features_transformer.joblib'

transformer = Data(
    path=my_path,
    type=AssetTypes.URI_FILE,
    description="Data asset pointing to a pkl file for transforming, automatically uploaded to the default datastore",
    name="accident-transformer"
)

ml_client.data.create_or_update(transformer)

[32mUploading features_transformer.joblib[32m (< 1 MB): 0.00B [00:00, ?B/s][32mUploading features_transformer.joblib[32m (< 1 MB): 100%|██████████| 4.85k/4.85k [00:00<00:00, 308kB/s]
[39m



Data({'path': 'azureml://subscriptions/cda9116f-5326-4a9b-9407-bc3a4391c27c/resourcegroups/gabby102/workspaces/health-update/datastores/workspaceblobstore/paths/LocalUpload/6812fe07f6f59ac0e574121d1e03e0d9/features_transformer.joblib', 'skip_validation': False, 'mltable_schema_url': None, 'referenced_uris': None, 'type': 'uri_file', 'is_anonymous': False, 'auto_increment_version': False, 'auto_delete_setting': None, 'name': 'accident-transformer', 'description': 'Data asset pointing to a pkl file for transforming, automatically uploaded to the default datastore', 'tags': {}, 'properties': {}, 'print_as_yaml': False, 'id': '/subscriptions/cda9116f-5326-4a9b-9407-bc3a4391c27c/resourceGroups/gabby102/providers/Microsoft.MachineLearningServices/workspaces/health-update/data/accident-transformer/versions/1', 'Resource__source_path': '', 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/captgt0071/code/Users/captgt007/azure-ml-labs/Labs/11', 'creation_context': <azure.ai.ml.entit

In [6]:
df = pd.read_csv('accident.csv')
experiment = df.head().drop('Survived', axis = 1)
experiment

Unnamed: 0,Age,Gender,Speed_of_Impact,Helmet_Used,Seatbelt_Used
0,56,Female,27.0,No,No
1,69,Female,46.0,No,Yes
2,46,Male,46.0,Yes,Yes
3,32,Male,117.0,No,Yes
4,60,Female,40.0,Yes,Yes


In [7]:
# 1. Get the registered data asset
asset = ml_client.data.get(name="accident-transformer", version="1")
asset.path

'azureml://subscriptions/cda9116f-5326-4a9b-9407-bc3a4391c27c/resourcegroups/gabby102/workspaces/health-update/datastores/workspaceblobstore/paths/LocalUpload/6812fe07f6f59ac0e574121d1e03e0d9/features_transformer.joblib'

In [12]:
import os
import sys
import warnings
import joblib
import fsspec

# 1️ Disable OpenTelemetry before anything else
os.environ["OTEL_PYTHON_DISABLED"] = "true"
os.environ["OTEL_TRACES_EXPORTER"] = "none"
os.environ["OTEL_METRICS_EXPORTER"] = "none"
os.environ["OTEL_LOGS_EXPORTER"] = "none"
os.environ["OTEL_PROPAGATORS"] = "none"
os.environ["OTEL_PYTHON_LOG_LEVEL"] = "ERROR"

# 2 Suppress warnings from OpenTelemetry and others
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", message="Attempting to instrument while already instrumented")
warnings.filterwarnings("ignore", message="Overriding of current TracerProvider is not allowed")
warnings.filterwarnings("ignore", message="Overriding of current LoggerProvider is not allowed")
warnings.filterwarnings("ignore", message="Overriding of current MeterProvider is not allowed")

#  Unload OpenTelemetry modules if they are loaded
for module in list(sys.modules.keys()):
    if "opentelemetry" in module:
        del sys.modules[module]

#  Load the `.pkl` transformer file safely
uri = f"azureml://subscriptions/{ml_client.subscription_id}/resourceGroups/{ml_client.resource_group_name}/workspaces/{ml_client.workspace_name}/data/{asset.name}/versions/{asset.version}"
uri

try:
    with fsspec.open(uri, "rb") as f:
        features_transformer = joblib.load(f)
    print("Feature transformer loaded successfully!")
except Exception as e:
    print(f"Error loading transformer: {e}")

#  Apply the transformation to the new data
try:
    transformed_data = features_transformer.transform(experiment)
    print("Transformation applied successfully!")
    print(transformed_data)
except Exception as e:
    print(f"Error during transformation: {e}")


Feature transformer loaded successfully!
Transformation applied successfully!
[[ 0.83358372 -1.37479361  0.          0.          0.        ]
 [ 1.68976168 -0.75843333  0.          0.          1.        ]
 [ 0.17498528 -0.75843333  1.          1.          1.        ]
 [-0.74705253  1.5448077   1.          0.          1.        ]
 [ 1.09702309 -0.95307342  0.          1.          1.        ]]


In [37]:
asset = ml_client.data.get(name="accident-transformer", version="1")
asset


Data({'path': 'azureml://subscriptions/cda9116f-5326-4a9b-9407-bc3a4391c27c/resourcegroups/gabby102/workspaces/health-update/datastores/workspaceblobstore/paths/LocalUpload/6812fe07f6f59ac0e574121d1e03e0d9/features_transformer.joblib', 'skip_validation': False, 'mltable_schema_url': None, 'referenced_uris': None, 'type': 'uri_file', 'is_anonymous': False, 'auto_increment_version': False, 'auto_delete_setting': None, 'name': 'accident-transformer', 'description': 'Data asset pointing to a pkl file for transforming, automatically uploaded to the default datastore', 'tags': {}, 'properties': {}, 'print_as_yaml': False, 'id': '/subscriptions/cda9116f-5326-4a9b-9407-bc3a4391c27c/resourceGroups/gabby102/providers/Microsoft.MachineLearningServices/workspaces/health-update/data/accident-transformer/versions/1', 'Resource__source_path': '', 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/captgt0071/code/Users/captgt007/azure-ml-labs/Labs/11', 'creation_context': <azure.ai.ml.entit

In [40]:
from azure.ai.ml import MLClient
from azureml.core import Workspace
import azure.ai.ml._artifacts._artifact_utilities as artifact_utils
from azure.identity import DefaultAzureCredential,InteractiveBrowserCredential

# Create a local directory to download the asset
download_dir = "./temp_transformer"
os.makedirs(download_dir, exist_ok=True)


asset = ml_client.data.get(name="accident-transformer", version="1")
artifact_utils.download_artifact_from_aml_uri(uri = asset.path, destination = download_dir, datastore_operation=ml_client.datastores)
# Determine the local file name by taking the basename of the asset's path.
local_file = os.path.join(download_dir, os.path.basename(asset.path))
print("Local file path:", local_file)

# Load the joblib transformer from the downloaded file.
with open(local_file, "rb") as f:
    features_transformer = joblib.load(f)

print("Feature transformer loaded successfully!")

Local file path: ./temp_transformer/features_transformer.joblib
Feature transformer loaded successfully!


In [13]:
## Prepare the data
import pandas as pd
import os
import numpy as np

# Load the dataset
df = pd.read_csv('accident.csv').dropna()
df['Age'] = df['Age'].astype(float)
df['Speed_of_Impact'] = df['Speed_of_Impact'].astype(float)

# Drop the target column 'Survived'
df = df.drop(columns=['Survived'])

# Split the dataframe into 5 equal parts
split_dfs = np.array_split(df, 5)

# Create the "data" folder if it doesn't exist
os.makedirs("data", exist_ok=True)

# Save each split dataset into the "data" folder
for i, split_df in enumerate(split_dfs):
    file_path = f"data/accident_part_{i+1}.csv"
    split_df.to_csv(file_path, index=False)
    print(f" Saved: {file_path}")

 Saved: data/accident_part_1.csv
 Saved: data/accident_part_2.csv
 Saved: data/accident_part_3.csv
 Saved: data/accident_part_4.csv
 Saved: data/accident_part_5.csv


In [14]:
first_data = pd.read_csv('data/accident_part_1.csv')
first_data.columns
# Define required columns
required_columns = ['Age', 'Gender', 'Speed_of_Impact', 'Helmet_Used', 'Seatbelt_Used']

# Check for missing columns
for col in required_columns:
    if col not in first_data.columns:
        print(f"Column '{col}' is not present in the dataset.")

In [15]:
os.listdir('./data')
f = os.listdir('./data')
f

['accident_part_1.csv',
 'accident_part_2.csv',
 'accident_part_3.csv',
 'accident_part_4.csv',
 'accident_part_5.csv']

In [16]:
# List all files in the directory
f = os.listdir('./data')

# Iterate through the files and filter only CSV files
for i in f:
    if i.endswith('.csv'):
        print(i)  # Prints only CSV files


accident_part_1.csv
accident_part_2.csv
accident_part_3.csv
accident_part_4.csv
accident_part_5.csv


In [21]:
%%writefile src/transform_data.py
import os
import sys
import warnings
import argparse
import tempfile
from pathlib import Path

import joblib
import pandas as pd
import numpy as np

from azure.ai.ml import MLClient
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
import azure.ai.ml._artifacts._artifact_utilities as artifact_utils

def get_data(ml_client, input_dir, feature_transformer_name):
    """
    Reads CSV files from input_dir, applies the feature transformer asset,
    and returns a list of transformed DataFrames.
    """
    required_columns = ['Age', 'Gender', 'Speed_of_Impact', 'Helmet_Used', 'Seatbelt_Used']
    all_files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if f.endswith('.csv')]
    missing_columns = []
    dataframes = []
    
    # Load the joblib transformer asset
    try:
        asset = ml_client.data.get(name=feature_transformer_name, version="1")
        with tempfile.TemporaryDirectory() as temp_dir:
            # Download the artifact into the temporary directory.
            # Note: We use temp_dir as the destination directory.
            artifact_utils.download_artifact_from_aml_uri(
                uri=asset.path,
                destination=temp_dir,
                datastore_operation=ml_client.datastores
            )
            # Construct a candidate local path by joining the temp_dir and basename of the asset's path.
            candidate_path = os.path.join(temp_dir, os.path.basename(asset.path))
            print("Downloaded candidate path:", candidate_path)
            
            # Check if the candidate_path is a directory.
            if os.path.isdir(candidate_path):
                files_in_dir = os.listdir(candidate_path)
                if not files_in_dir:
                    raise Exception("Downloaded directory is empty.")
                # If only one file exists, use that; otherwise, search for a .joblib file.
                if len(files_in_dir) == 1:
                    local_file = os.path.join(candidate_path, files_in_dir[0])
                else:
                    joblib_files = [f for f in files_in_dir if f.endswith('.joblib')]
                    if not joblib_files:
                        raise Exception("No joblib file found in the downloaded directory.")
                    local_file = os.path.join(candidate_path, joblib_files[0])
            else:
                local_file = candidate_path
            
            print("Using local file:", local_file)
            with open(local_file, "rb") as f:
                features_transformer = joblib.load(f)
            print("Feature transformer loaded successfully!")
    except Exception as e:
        print(f"Error loading transformer: {e}")
        return None

    # Process each CSV file
    for file in all_files:
        try:
            df = pd.read_csv(file)
        except Exception as e:
            print(f"Error reading file {file}: {e}")
            continue

        # Check if all required columns are present
        for col in required_columns:
            if col not in df.columns and col not in missing_columns:
                missing_columns.append(col)

        # Apply the transformation to the data
        try:
            transformed_data = features_transformer.transform(df)
            transformed_df = pd.DataFrame(transformed_data, columns=required_columns)
            print(f"Transformation applied successfully on {file}!")
        except Exception as e:
            print(f"Error during transformation for {file}: {e}")
            continue

        dataframes.append(transformed_df)
    
    if missing_columns:
        print(f"Missing columns: {missing_columns}")
        return None
    return dataframes

def main(args):
    try:
        credential = DefaultAzureCredential()
        ml_client = MLClient(
            credential=credential,
            subscription_id="cda9116f-5326-4a9b-9407-bc3a4391c27c",
            resource_group_name="gabby102",
            workspace_name="health-update"
        )
    except Exception as e:
        print(f"Default credential failed: {e}")
        print("Falling back to interactive browser login...")
        try:
            credential = InteractiveBrowserCredential()
            ml_client = MLClient(
                credential=credential,
                subscription_id="cda9116f-5326-4a9b-9407-bc3a4391c27c",
                resource_group_name="gabby102",
                workspace_name="health-update"
            )
        except Exception as e:
            print(f"Interactive login failed: {e}")
            sys.exit(1)
    
    datas = get_data(ml_client, args.input_data, args.feature_transformer_name)
    if datas is None or len(datas) == 0:
        print("Error: No data was transformed successfully")
        sys.exit(1)
    
    os.makedirs(args.output_data, exist_ok=True)
    try:
        for i, split_df in enumerate(datas):
            output_path = Path(args.output_data) / f"accident_part_{i+1}.csv"
            split_df.to_csv(output_path, index=False)
            print(f"Saved: {output_path}")
    except Exception as e:
        print(f"Error saving transformed data: {e}")
        sys.exit(1)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Transform accident data in Azure ML")
    parser.add_argument(
        "--input_data",
        type=str,
        required=True,
        default="./data",
        help="Path to the input data directory containing CSV files"
    )
    parser.add_argument(
        "--output_data",
        type=str,
        required=True,
        help="Path where the transformed data will be written"
    )
    parser.add_argument(
        "--feature_transformer_name",
        type=str,
        required=True,
        default="accident-transformer",
        help="Name of the feature transformer asset"
    )
    
    print("\n" + "*" * 60)
    args = parser.parse_args()
    main(args)
    print("*" * 60 + "\n")


Overwriting src/transform_data.py


In [8]:
%%writefile src/batch_scoring.py

import os
import logging
import mlflow
import numpy as np
import pandas as pd
from typing import List
import glob

def init():
    """Initialize the scoring environment by loading the model."""
    global model
    try:
        model_path = os.path.join(os.environ["AZUREML_MODEL_DIR"], "model")
        model = mlflow.pyfunc.load_model(model_path)
    except Exception as e:
        logging.error(f"Error loading model: {e}")
        raise

def run(training_data_path: str) -> List[str]:
    try:
        results = []
        # List all CSV files in the input folder
        csv_files = glob.glob(os.path.join(training_data_path, "*.csv"))
        
        if not csv_files:
            logging.error(f"No CSV files found in the folder: {training_data_path}")
            return results

        for file_path in csv_files:
            try:
                # Read the CSV file
                data = pd.read_csv(file_path)
                
                # Validate input data
                expected_columns = ['Age', 'Gender', 'Speed_of_Impact', 'Helmet_Used', 'Seatbelt_Used']
                if not all(col in data.columns for col in expected_columns):
                    raise ValueError(f"Missing required columns in {file_path}")
                
                input_data = data[expected_columns]
                
                # Make predictions
                predictions = model.predict(input_data)
                probabilities = model.predict_proba(input_data)
                
                # Validate predictions
                if len(predictions) != len(data):
                    raise ValueError("Prediction length mismatch")
                
                # Create output dataframe
                output_df = data.copy()
                output_df['prediction'] = predictions
                output_df['survival_probability'] = probabilities[:, 1]
                
                results.append(output_df.to_json(orient='records'))
            except Exception as e:
                logging.error(f"Error processing file {file_path}: {str(e)}")
                continue
                
        return results
    except Exception as e:
        logging.error(f"Error in run: {str(e)}")
        raise


Overwriting src/batch_scoring.py


In [9]:
%%writefile src/batch_endpoint.py

import time
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml import MLClient,Input
from azure.identity import DefaultAzureCredential
from azure.ai.ml.entities import BatchEndpoint, BatchDeployment, Model, Environment, CodeConfiguration

# Function to create batch endpoint if it doesn't exist
def create_batch_endpoint(ml_client,endpoint_name):
    try:
        # Try to get existing endpoint
        endpoint = ml_client.batch_endpoints.get(endpoint_name)
        print(f"Batch endpoint {endpoint_name} already exists.")
    except Exception:
        # Create new endpoint
        endpoint = BatchEndpoint(
            name=endpoint_name,
            description="Batch endpoint for accident prediction",
        )
        ml_client.batch_endpoints.begin_create_or_update(endpoint).result()
        print(f"Created new batch endpoint: {endpoint_name}")
    return endpoint


# Function to create and deploy the batch deployment
def create_batch_deployment(ml_client, endpoint_name,env_name, model_name, deployment_name,code_path,scoring_path,compute):
    # Get the latest version of the model
    model = ml_client.models.get(name=model_name, label="latest")
    
    # Code configuration
    code_configuration = CodeConfiguration(
        code=code_path,  # Local path to the code directory
        scoring_script=scoring_path  # The script that contains the init() and run() functions
    )
    # Create the deployment
    deployment = BatchDeployment(
        name=deployment_name,
        description="Deployment for accident prediction model",
        endpoint_name=endpoint_name,
        model=model,
        environment=env_name , # Use the registered environment ID
        code_configuration=code_configuration,
        compute=compute,  # Replace with your compute target name
        instance_count=1,
        max_concurrency_per_instance=1,
        mini_batch_size=10,
        output_action=AssetTypes.MLFLOW_MODEL,
        output_file_name="predictions.csv",
        retry_settings={"max_retries": 3, "timeout": 300},
        logging_level="info",
    )

    # Create or update the deployment
    ml_client.batch_deployments.begin_create_or_update(deployment).result()
    print("Deployment created successfully!")

    # Set the deployment as the default for the endpoint
    endpoint = ml_client.batch_endpoints.get(endpoint_name)
    endpoint.defaults.deployment_name = deployment_name
    ml_client.batch_endpoints.begin_create_or_update(endpoint).result()
    return deployment

    # Function to create and submit a batch job
def submit_batch_job(ml_client, endpoint_name,training_data_path,name_of_data):

    # Register the data in Azure ML
    data_path = training_data_path
    dataset_name = name_of_data
    dataset_in_use = Data(
        path=data_path,
        type=AssetTypes.URI_FOLDER,
        description="An unlabeled dataset for survivor classification",
        name=dataset_name,
    )
    
    try:
        registered_data = ml_client.data.create_or_update(dataset_in_use)
        print(f"Dataset registered with name: {registered_data.name}")
        print(f"Dataset version: {registered_data.version}")
        print(f"Dataset registration path: {registered_data.path}")
        return registered_data
    except Exception as e:
        print(f"Error registering dataset: {e}")
        return None
    else:
        print("Could not proceed due to missing columns or transformation errors.")
        return None
    
    #get data 
    data_to_be_processed = ml_client.data.get(name=name_of_data, label="latest")

    # Create job input
    job_input = Input(path=AssetTypes.URI_FOLDER, type=data_to_be_processed.id)
    
    # Submit the job
    job = ml_client.batch_endpoints.invoke(
        endpoint_name=endpoint_name,
        input=job_input
    )
    # Monitor job progress
    while True:
        job_status = ml_client.jobs.get(job.name)
        print(f"Job status: {job_status.status}")
        
        if job_status.status in ['Completed', 'Failed', 'Canceled']:
            print(f"Job finished with status: {job_status.status}")
            if job_status.status == 'Completed':
                print(f"Output available at: {job_status.outputs}")
            break
            
        time.sleep(30)  # Wait 30 seconds before checking again
            
        return job
    print(f"Submitted batch job with ID: {job.name}")
    return job

# Main function
def main(args):
    # Get Azure ML client
    # Get Azure ML client
    try:
        # Try DefaultAzureCredential first
        credential = DefaultAzureCredential()
        ml_client = MLClient(
            credential=credential,
            subscription_id="cda9116f-5326-4a9b-9407-bc3a4391c27c",
            resource_group_name="gabby102",
            workspace_name="health-update"
        )
    except Exception as e:
        print(f"Default credential failed: {e}")
        print("Falling back to interactive browser login...")
        try:
            # Fall back to interactive login
            credential = InteractiveBrowserCredential()
            ml_client = MLClient(
                credential=credential,
                subscription_id="cda9116f-5326-4a9b-9407-bc3a4391c27c",
                resource_group_name="gabby102",
                workspace_name="health-update"
            )
        except Exception as e:
            print(f"Interactive login failed: {e}")
            return None

    # Create or get batch endpoint
    endpoint = create_batch_endpoint(ml_client, args.endpoint_name)
    # Create batch deployment if requested
    deployment = create_batch_deployment(
            ml_client, 
            args.endpoint_name, 
            args.env_name,
            args.model_name, 
            args.deployment_name,
            args.code_path,
            args.scoring_path,
            args.compute
        )
    # Submit batch job if requested
    job = submit_batch_job(
            ml_client, 
            args.endpoint_name,
            args.training_data_path,
            args.name_of_data)

    return endpoint


# Run script
if __name__ == "__main__":
    # Set up argument parser
    parser = argparse.ArgumentParser(description='deploy batchpipeline in Azure ML')
    
    
    parser.add_argument('--endpoint_name', type=str, default="accident-prediction-endpoint",
                       help='Name of the batch endpoint')

    parser.add_argument('--model_name', type=str, required = True,
                       help='Name of the registered model to use for batch deployment')
    
    parser.add_argument('--deployment_name', type=str, default="accident-prediction-deployment",
                       help='Name for the batch deployment')
    
    parser.add_argument('--env_name', type=str, default="accident-prediction-environment",
                       help='name of already created environment')
    
    parser.add_argument('--code_path', type=str, default="./src",
                       help='Path to the code directory')
    parser.add_argument('--scoring_path', type=str, default="batch_scoring.py",
                       help='Name of the scoring script')
    parser.add_argument('--compute', type=str, default="captgt0071",
                       help='Name of the compute target')
    parser.add_argument('--training_data_path', type=str,
                       help='the link from the output of  transforming data ')

    
    parser.add_argument('--name_of_data', type=str,default = 'accident_survival_data',
                       help='Name of the registered dataset to use for the job')
    
    
    print("\n" + "*" * 60)
    args = parser.parse_args()  
    main(args)
    print("*" * 60 + "\n")

Overwriting src/batch_endpoint.py


# Define the components

To define the component you need to specify:

- **Metadata**: *name*, *display name*, *version*, *description*, *type* etc. The metadata helps to describe and manage the component.
- **Interface**: *inputs* and *outputs*. For example, a model training component will take training data and the regularization rate as input, and generate a trained model file as output. 
- **Command, code & environment**: the *command*, *code* and *environment* to run the component. Command is the shell command to execute the component. Code usually refers to a source code directory. Environment could be an AzureML environment (curated or custom created), docker image or conda environment.

Run the following cells to create a YAML for each component you want to run as a pipeline step.

In [10]:
%%writefile $script_folder/accident-prediction-env.yml
name: accident-prediction-env
channels:
  - conda-forge
dependencies:
  - python=3.11
  - pip
  - pip:
      - mlflow
      - pandas
      - scikit-learn
      - numpy
      - azureml-core  
      - azureml-defaults
      - fsspec 
      - azure-identity
      - azure-ai-ml

Overwriting src/accident-prediction-env.yml


In [11]:
from azure.ai.ml.entities import Environment

# Create environment with Conda and Docker
environment_conda = Environment(
    image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04",
    conda_file="src/accident-prediction-env.yml",  # Ensure this file exists
    name="accident-prediction-environment",
    description="Environment for accident prediction batch deployment",

)

# Register the environment
registered_env = ml_client.environments.create_or_update(environment_conda)
print(f" Environment registered: {registered_env.name}, Version: {registered_env.version}")

 Environment registered: accident-prediction-environment, Version: 1


In [22]:
%%writefile src/transform_data.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: transform_data
display_name: transform Data in Azure ML
version: 1
type: command
inputs:
  input_data: 
    type: uri_folder
    description: Directory containing multiple CSV files
  feature_transformer_name:
    type: string
    description: Name of the feature transformer to use
outputs:
   output_data:
     type: uri_folder
     description: output the transformed data here for the batch prediction endpoint
code: .
environment: azureml:accident-prediction-environment:1
command: >-
  python transform_data.py 
  --input_data ${{inputs.input_data}}
  --feature_transformer_name ${{inputs.feature_transformer_name}}
  --output_data ${{outputs.output_data}}


Overwriting src/transform_data.yml


In [13]:
%%writefile src/batch_prediction.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: batch_prediction
display_name: Batch Prediction
version: 1
type: command
inputs:
  training_data:
    type: uri_folder
    description: Input data for batch prediction
  model_name:
    type: string
    default: accident-survival-model
    description: Name of the model to use for prediction
outputs:
  predictions:
    type: uri_folder
    description: Output directory for predictions
code: .
environment: azureml:accident-prediction-environment@latest
command: >-
  python batch_scoring.py
  --training_data_path ${{inputs.training_data}}
  --model_name ${{inputs.model_name}}


Overwriting src/batch_prediction.yml


In [23]:
from azure.ai.ml import load_component, dsl, Input
from azure.ai.ml.entities import Pipeline
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml import MLClient

@dsl.pipeline(
    description="End-to-end accident classification pipeline",
    default_compute="captgt0071"
)
def accident_classification(
    pipeline_job_input: Input(type=AssetTypes.URI_FOLDER)
):
    # Load components with explicit paths
    prep_data = load_component(source="./src/transform_data.yml")
    batch_work = load_component(source="./src/batch_prediction.yml")
    
    clean_data = prep_data(
        input_data=pipeline_job_input,
        feature_transformer_name="accident-transformer"
    )
    
    # Make predictions using batch endpoint
    predictions = batch_work(
        training_data=clean_data.outputs.output_data,
        model_name="accident-survival-model"
    )
    
    return {
        "transformed_batch_data": clean_data.outputs.output_data,
        "computed_predictions": predictions.outputs.predictions
    }

def submit_pipeline(ml_client):
    try:
        # Create pipeline job
        pipeline_job = accident_classification(
            pipeline_job_input=Input(type=AssetTypes.URI_FOLDER, path="./data")
        )
        print(pipeline_job)
        
        # Configure pipeline settings
        pipeline_job.settings.default_compute = "captgt0071"
        pipeline_job.settings.default_datastore = "workspaceblobstore"
        
        # Set output modes
        pipeline_job.outputs.transformed_batch_data.mode = "upload"
        pipeline_job.outputs.computed_predictions.mode = "upload"
        
        # Submit the pipeline
        submitted_pipeline = ml_client.jobs.create_or_update(
            pipeline_job,
            experiment_name="pipeline_accident_survival_prediction"
        )
        
        return submitted_pipeline
        
    except Exception as e:
        print(f"Error submitting pipeline: {e}")
        raise



if __name__ == "__main__":
    # Initialize ML client
    # Get Azure ML client
    try:
        # Try DefaultAzureCredential first
        credential = DefaultAzureCredential()
        ml_client = MLClient(
            credential=credential,
            subscription_id="cda9116f-5326-4a9b-9407-bc3a4391c27c",
            resource_group_name="gabby102",
            workspace_name="health-update"
        )
    except Exception as e:
        print(f"Default credential failed: {e}")
        print("Falling back to interactive browser login...")
        try:
            # Fall back to interactive login
            credential = InteractiveBrowserCredential()
            ml_client = MLClient(
                credential=credential,
                subscription_id="cda9116f-5326-4a9b-9407-bc3a4391c27c",
                resource_group_name="gabby102",
                workspace_name="health-update"
            )
        except Exception as e:
            print(f"Interactive login failed: {e}")
        

    
    # Submit job to workspace
    try:
        pipeline_job = submit_pipeline(ml_client)
        print(f"Pipeline submitted successfully. Job ID: {pipeline_job.id}")
    except Exception as e:
        print(f"Failed to submit pipeline: {e}")

Overriding of current TracerProvider is not allowed
Overriding of current LoggerProvider is not allowed
Overriding of current MeterProvider is not allowed
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
[32mUploading src (0.02 MBs): 100%|██████████| 22867/22867 [00:00<00:00, 689616.33it/s]
[39m

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored


display_name: accident_classification
description: End-to-end accident classification pipeline
type: pipeline
inputs:
  pipeline_job_input:
    type: uri_folder
    path: azureml:./data
outputs:
  transformed_batch_data:
    type: uri_folder
  computed_predictions:
    type: uri_folder
jobs:
  clean_data:
    type: command
    inputs:
      input_data:
        path: ${{parent.inputs.pipeline_job_input}}
      feature_transformer_name: accident-transformer
    outputs:
      output_data: ${{parent.outputs.transformed_batch_data}}
    component:
      $schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
      name: transform_data
      version: '1'
      display_name: transform Data in Azure ML
      type: command
      inputs:
        input_data:
          type: uri_folder
          description: Directory containing multiple CSV files
        feature_transformer_name:
          type: string
          description: Name of the feature transformer to use
      

In [24]:


ml_client.jobs.get('plucky_fowl_9zxf0dznl9')

Experiment,Name,Type,Status,Details Page
pipeline_accident_survival_prediction,plucky_fowl_9zxf0dznl9,pipeline,Completed,Link to Azure Machine Learning studio


## Get the results

When the pipeline job that invokes the batch endpoint is completed, you can view the results. All predictions are collected in the `predictions.csv` file that is stored in the default datastore. You can download the file and visualize the data by running the following cells. 

In [25]:
ml_client.jobs.download(name='plucky_fowl_9zxf0dznl9', download_path=".", output_name="score")

In [26]:
with open("predictions.csv", "r") as f:
    data = f.read()

FileNotFoundError: [Errno 2] No such file or directory: 'predictions.csv'

In [None]:
from ast import literal_eval
import pandas as pd

score = pd.DataFrame(
    literal_eval(data.replace("\n", ",")), columns=["file", "prediction"]
)
score