# Deploy mapping pipeline


# Install requirements

In [1]:
import sys
!{sys.executable} -m pip install python-dotenv



### Init workspace

In [2]:
from dotenv import dotenv_values
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential
import os

env_vars = dotenv_values("env")
for key, value in env_vars.items():
     os.environ[key] = value
subscription_id = os.getenv("SUBSCRIPTION_ID")
workspace_name = os.getenv("WORKSPACE_NAME")
version = os.getenv("VERSION")
print(f"workspace_name={workspace_name}")
print(f"version={version}")

# authenticate
credential = DefaultAzureCredential()

# Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id=subscription_id,
    resource_group_name="DocuBank",
    workspace_name=workspace_name,
)
cpu_cluster = None

workspace_name=bankmap
version=0.0.5


### Create a compute resource to run your pipeline (Optional)


In [7]:
from azure.ai.ml.entities import AmlCompute

# Name assigned to the compute cluster
cpu_compute_target = "cpu-cluster-lp"

try:
    # let's see if the compute target already exists
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    print(
        f"You already have a cluster named {cpu_compute_target}, we'll reuse it as is."
    )

except Exception:
    print("Creating a new cpu compute target...")

    # Let's create the Azure Machine Learning compute object with the intended parameters
    # if you run into an out of quota error, change the size to a comparable VM that is available.
    # Learn more on https://azure.microsoft.com/en-us/pricing/details/machine-learning/.
    cpu_cluster = AmlCompute(
        name=cpu_compute_target,
        # Azure Machine Learning Compute is the on-demand VM service
        type="amlcompute",
        # VM Family
        size="STANDARD_DS3_V2",
        # Minimum running nodes when there is no job running
        min_instances=0,
        # Nodes in cluster
        max_instances=20,
        # How many seconds will the node running after the job termination
        idle_time_before_scale_down=600,
        # Dedicated or LowPriority. The latter is cheaper but there is a chance of job termination
        tier="LowPriority",
    )
    print(
        f"AMLCompute with name {cpu_cluster.name} will be created, with compute size {cpu_cluster.size}"
    )
    # Now, we pass the object to MLClient's create_or_update method
    cpu_cluster = ml_client.compute.begin_create_or_update(cpu_cluster)

You already have a cluster named cpu-cluster-lp, we'll reuse it as is.


### Set Params

In [3]:
conda_file = "./conda.yaml"
code_dir = "./src"
env_name = "bankmap"
component_name = "bankmap"

config_path=f"azureml://subscriptions/ae0eff97-7885-4c1e-b23c-d8a627ef292f/resourcegroups/DocuBank/workspaces/{workspace_name}/datastores/configs/paths/"
input_path=f"azureml://subscriptions/ae0eff97-7885-4c1e-b23c-d8a627ef292f/resourcegroups/DocuBank/workspaces/{workspace_name}/datastores/datawork/paths/"

### Create environment


In [4]:
from azure.ai.ml.entities import Environment

pipeline_job_env = Environment(
    name=env_name,
    description="Env to run map pipeline",
    tags={"bankmap": version},
    conda_file=conda_file,
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version=version,
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name bankmap is registered to workspace, the environment version is 0.0.5


### Create component

And register the component in the workspace for future reuse.


In [5]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

outputs={"output": Output(type="uri_file", mode="rw_mount")}
map_component = command(
    name=component_name,
    display_name="Maps statement to DB for a company",
    description="Maps statement to DB for a company",
    inputs={
        "data":  Input(type="uri_file"),
        "config_path":  Input(type="uri_folder"),
        "company": Input(type="string")
    },
    is_deterministic=True, 
    outputs=outputs,
    # The source folder of the component
    code=code_dir,
    version=version,
    command="""python map.py \
            --company '${{inputs.company}}' \
            --input_file '${{inputs.data}}'\
            --config_path '${{inputs.config_path}}'\
            --output '${{outputs.output}}'\
            """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

# Now we register the component to the workspace
map_component = ml_client.create_or_update(map_component.component)

# Create (register) the component in your workspace
print(f"Component {map_component.name} with Version {map_component.version} is registered")

[32mUploading src (0.0 MBs):   0%|          | 0/4200 [00:00<?, ?it/s][32mUploading src (0.0 MBs): 100%|██████████| 4200/4200 [00:00<00:00, 39990.28it/s][32mUploading src (0.0 MBs): 100%|██████████| 4200/4200 [00:00<00:00, 39606.89it/s]
[39m



Component bankmap with Version 0.0.5 is registered


### Optional: Test component



In [8]:
# the dsl decorator tells the sdk that we are defining an Azure Machine Learning pipeline
from azure.ai.ml import dsl, Input, Output

map_component = ml_client.components.get(component_name)
print(f'output: {map_component}')

@dsl.pipeline(compute=cpu_compute_target, description="test map pipeline")
def map_pipeline(company, data, config_path):
    job = map_component(company=company, data=data, config_path=config_path)
    # job.outputs["output_path"] = Output(type="uri_folder", mode="rw_mount", path=output_path)
    print(f'output: {job.outputs["output"]}')
    return {
        "output": job.outputs.output
    }

output: name: bankmap
version: 0.0.5
display_name: Maps statement to DB for a company
description: Maps statement to DB for a company
type: command
inputs:
  data:
    type: uri_file
    optional: false
  config_path:
    type: uri_folder
    optional: false
  company:
    type: string
    optional: false
outputs:
  output:
    type: uri_file
command: 'python map.py             --company ''${{inputs.company}}''             --input_file
  ''${{inputs.data}}''            --config_path ''${{inputs.config_path}}''            --output
  ''${{outputs.output}}''            '
environment: azureml:/subscriptions/ae0eff97-7885-4c1e-b23c-d8a627ef292f/resourceGroups/DocuBank/providers/Microsoft.MachineLearningServices/workspaces/bankmap/environments/bankmap/versions/0.0.5
code: azureml:/subscriptions/ae0eff97-7885-4c1e-b23c-d8a627ef292f/resourceGroups/DocuBank/providers/Microsoft.MachineLearningServices/workspaces/bankmap/codes/9226ca34-952a-48b1-84a9-2ade8f2d942f/versions/1
resources:
  instance_

### Run job

In [11]:
# Let's instantiate the pipeline with the parameters of our choice
company=""
pipeline = map_pipeline(
    company=company,
    data=Input(type="uri_file", path=f"azureml://subscriptions/ae0eff97-7885-4c1e-b23c-d8a627ef292f/resourcegroups/DocuBank/workspaces/{workspace_name}/datastores/datacopy/paths/{company}.zip"),
    config_path=Input(type="uri_folder", path=config_path)
)

pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    experiment_name="Test fo map deploy",
)
print(f'output: {pipeline_job.outputs.output.path}')
print(f'output: {pipeline_job.name}')

output: ${{parent.jobs.None.outputs.output}}
output: None
output: ashy_heart_ptbyrb0c3d


In [44]:
# ml_client.jobs.stream(pipeline_job.name)
retrieved_job = ml_client.jobs.get(pipeline_job.name)
print(f'output: {retrieved_job}')

output: name: clever_neck_7w8w1hvx5s
display_name: map_pipeline
description: test map pipeline
type: pipeline
inputs:
  company: hum
  data:
    mode: ro_mount
    type: uri_file
    path: azureml://subscriptions/ae0eff97-7885-4c1e-b23c-d8a627ef292f/resourcegroups/DocuBank/workspaces/test/datastores/devcopy/paths/hum.zip
  config_path:
    mode: ro_mount
    type: uri_folder
    path: azureml://subscriptions/ae0eff97-7885-4c1e-b23c-d8a627ef292f/resourcegroups/DocuBank/workspaces/test/datastores/configs/paths/
outputs:
  output:
    mode: rw_mount
    type: uri_file
jobs:
  job:
    type: command
    inputs:
      data:
        path: ${{parent.inputs.data}}
      config_path:
        path: ${{parent.inputs.config_path}}
      company:
        path: ${{parent.inputs.company}}
    outputs:
      output: ${{parent.outputs.output}}
    resources:
      instance_count: 1
    component: azureml:bankmap@default
id: azureml:/subscriptions/ae0eff97-7885-4c1e-b23c-d8a627ef292f/resourceGroups/Docu

In [53]:
output = ml_client.jobs.download(name=pipeline_job.name, output_name="output")
print(f'output: {output}')

Downloading artifact azureml://subscriptions/ae0eff97-7885-4c1e-b23c-d8a627ef292f/resourcegroups/DocuBank/workspaces/test/datastores/workspaceblobstore/paths/azureml/22943b06-74c2-4587-bc45-7e93bc21636a/output to named-outputs/output


output: None
