In [1]:
# import required libraries
from azure.ai.ml import MLClient, command, Input, Output, load_component
from azure.identity import DefaultAzureCredential
from azure.ai.ml.entities import Data, Environment

In [2]:
# Enter details of your AML workspace
subscription_id = "<subscription_id>"
resource_group = "<resouce_group_name>"
workspace = "<workspace_name>"

In [3]:
# get a handle to the workspace
ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

# Batch Endpoint

**Batch endpoints** are endpoints that are used to do batch inferencing on large volumes of data over a period of time. 

**Batch endpoints** receive pointers to data and run jobs asynchronously to process the data in parallel on compute clusters. Batch endpoints store outputs to a data store for further analysis.

<center>
<img src="../../imgs/endpoint_batch_concept.png" width = "700px" alt="Concept batch endpoint">
</center>

## 1. Create Batch Compute Cluster (Optional)

``` python
# create compute cluster to be used by batch cluster
from azure.ai.ml.entities import AmlCompute

my_cluster = AmlCompute(
    name="batch-cluster",
    type="amlcompute", 
    size="STANDARD_DS3_V2", 
    min_instances=0, 
    max_instances=3,
    location="westeurope", 	
)
ml_client.compute.begin_create_or_update(my_cluster)
```

In [4]:
from azure.ai.ml.entities import AmlCompute

try:
    ml_client.compute.get(name="cpu-test")
    print("Compute already exists")

except:
    print("Compute not found; Proceding to create")

    my_cluster = AmlCompute(
        name="batch-cluster",
        type="amlcompute", 
        size="STANDARD_DS3_V2", 
        min_instances=0, 
        max_instances=3,
    )
    ml_client.compute.begin_create_or_update(my_cluster)

<azure.core.polling._poller.LROPoller at 0x7f085c2d3550>

## 2. Create Batch Endpoint

We can create the **batch endpoint** with cli v2 or sdk v2 using the following syntax:


<center>
<img src="../../imgs/create_batch_endpoint.png" width = "700px" alt="Create batch endpoint cli vs sdk">
</center>

In [5]:
# create batch endpoint
from azure.ai.ml.entities import BatchEndpoint
import random

rand = random.randint(0, 10000)

endpoint_name = f"taxi-batch-endpoint-{rand}"
batch_endpoint = BatchEndpoint(
    name=endpoint_name,
    description="Taxi batch endpoint",
    tags={"model": "taxi-model@latest"},
)

poller = ml_client.begin_create_or_update(batch_endpoint)
poller.wait()


In [6]:
from azure.ai.ml.exceptions import DeploymentException

status = poller.status()
if status != "Succeeded":
    raise DeploymentException(status)
else:
    print("Endpoint creation succeeded")
    endpoint = poller.result()
    print(endpoint)

Endpoint creation succeeded
{'additional_properties': {}, 'id': '/subscriptions/14585b9f-5c83-4a76-8055-42149123f99f/resourceGroups/mldemorg/providers/Microsoft.MachineLearningServices/workspaces/mldemo/batchEndpoints/taxi-batch-endpoint-6853', 'name': 'taxi-batch-endpoint-6853', 'type': 'Microsoft.MachineLearningServices/workspaces/batchEndpoints', 'system_data': <azure.ai.ml._restclient.v2022_05_01.models._models_py3.SystemData object at 0x7f085c30ceb0>, 'tags': {'model': 'taxi-model@latest'}, 'location': 'eastus2', 'identity': <azure.ai.ml._restclient.v2022_05_01.models._models_py3.ManagedServiceIdentity object at 0x7f085c30e620>, 'kind': None, 'properties': <azure.ai.ml._restclient.v2022_05_01.models._models_py3.BatchEndpointDetails object at 0x7f085c30e200>, 'sku': None}


## 3. Create Batch Deployment

We can create the **batch deployment** with cli v2 or sdk v2 using the following syntax:

<center>
<img src="../../imgs/create_batch_deployment.png" width = "700px" alt="Create batch deployment cli vs sdk">
</center>

Note that if you're deploying **MLFlow models**, there's no need to provide **a scoring script** and execution **environment**, as both are autogenerated.

In [7]:
# create batch deployment
from azure.ai.ml.entities import BatchDeployment, Model, Environment
from azure.ai.ml.constants import BatchDeploymentOutputAction

model = "taxi-model@latest"

batch_deployment = BatchDeployment(
    name="taxi-batch-dp",
    description="this is a sample batch deployment",
    endpoint_name=endpoint_name,
    model=model,
    compute="batch-cluster",
    instance_count=2,
    max_concurrency_per_instance=2,
    mini_batch_size=10,
    output_action=BatchDeploymentOutputAction.APPEND_ROW,
    output_file_name="predictions.csv",
)

poller = ml_client.begin_create_or_update(batch_deployment)
poller.wait()


Set deployment as the default deployment in the endpoint:

In [8]:
batch_endpoint = ml_client.batch_endpoints.get(endpoint_name)
batch_endpoint.defaults.deployment_name = batch_deployment.name
poller = ml_client.batch_endpoints.begin_create_or_update(batch_endpoint)
poller.wait()

## 4. Invoke and Test Endpoint

We can invoke the **batch deployment** with cli v2 or sdk v2 using the following syntax:

<center>
<img src="../../imgs/invoke_batch_deployment.png" width = "700px" alt="Invoke batch deployment cli vs sdk">
</center>

In [9]:
# invoke and test endpoint
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes, InputOutputModes

input = Input(path="../../data/taxi-batch.csv", 
              type=AssetTypes.URI_FILE, 
              mode=InputOutputModes.DOWNLOAD)


# invoke the endpoint for batch scoring job
ml_client.batch_endpoints.invoke(
    endpoint_name=endpoint_name,
    input=input,
    deployment_name="taxi-batch-dp"
)


[32mUploading taxi-batch.csv[32m (< 1 MB): 100%|██████████| 133k/133k [00:00<00:00, 7.89MB/s]
[39m



<azure.ai.ml._restclient.v2020_09_01_dataplanepreview.models._models_py3.BatchJobResource at 0x7f085c160460>