In [None]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

from api_key import Az_subscription_id, Az_resource_group_name, Az_workspace_name

# authenticate
credential = DefaultAzureCredential()
# # Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id=Az_subscription_id,
    resource_group_name=Az_resource_group_name,
    workspace_name=Az_workspace_name,
)

In [None]:
from azure.ai.ml.entities import Environment
import os

custom_env_name = "openai"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for OpenAI Validation pipeline",
    conda_file=os.path.join("./", "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="0.1.1",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

In [None]:
envs = ml_client.environments.list()
for env in envs:
    print(env)

In [None]:
from azure.ai.ml.entities import AmlCompute

# Name assigned to the compute cluster
cpu_compute_target = "mycompute"

try:
    # let's see if the compute target already exists
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    print(
        f"You already have a cluster named {cpu_compute_target}, we'll reuse it as is."
    )

except Exception:
    print("Creating a new cpu compute target...")

    # Let's create the Azure Machine Learning compute object with the intended parameters
    cpu_cluster = AmlCompute(
        name=cpu_compute_target,
        # Azure Machine Learning Compute is the on-demand VM service
        type="amlcompute",
        # VM Family
        size="STANDARD_DS3_V2",
        # Minimum running nodes when there is no job running
        min_instances=0,
        # Nodes in cluster
        max_instances=4,
        # How many seconds will the node running after the job termination
        idle_time_before_scale_down=180,
        # Dedicated or LowPriority. The latter is cheaper but there is a chance of job termination
        tier="Dedicated",
    )
    print(
        f"AMLCompute with name {cpu_cluster.name} will be created, with compute size {cpu_cluster.size}"
    )
    # Now, we pass the object to MLClient's create_or_update method
    cpu_cluster = ml_client.compute.begin_create_or_update(cpu_cluster)

In [None]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

input_path = '../data/input.jsonl'

input_data = Data(
    path=input_path,
    type=AssetTypes.URI_FILE,
    description="input data",
    name="input_data",
    version='1.0.1'
)

ml_client.data.create_or_update(input_data)

In [None]:
# We have the data and its path
print(
    f"Dataset with name {input_data.name} was registered to workspace, the dataset version is {input_data.version}, the path is {input_data.path}"
)

In [None]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

output_path = '../data/output.jsonl'

output_file = Data(
    path=output_path,
    type=AssetTypes.URI_FILE,
    description="output file",
    name="output",
    version='1.0.1'
)

ml_client.data.create_or_update(output_file)

In [None]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output
from azure.ai.ml.constants import AssetTypes, InputOutputModes

input_mode = InputOutputModes.RO_MOUNT
output_mode = InputOutputModes.RW_MOUNT

langchain_job_inputs={
    "input_data": Input(type="uri_file", path=input_data.path, mode=input_mode)
}

langchain_job_outputs = {
    "output_data": Output(type="uri_file", path=output_file.path, mode=output_mode)
}

langchain_job = command(
    inputs=langchain_job_inputs,
    outputs=langchain_job_outputs,
    code="./",  # location of source code
    command="python OpenAIApp.py --input ${{inputs.input_data}} --output ${{outputs.output_data}}",
    environment="openai"+"@latest",
    compute="mycompute",
    display_name="extract information",
)

In [None]:
ml_client.create_or_update(langchain_job)

In [None]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

validation_path = '../data/decision.jsonl'

validation_file = Data(
    path=validation_path,
    type=AssetTypes.URI_FILE,
    description="validation file",
    name="validation",
    version='1.0.1'
)

ml_client.data.create_or_update(validation_file)

In [None]:
validation_job_inputs={
    "input_data": Input(type="uri_file", path=output_file.path, mode=input_mode)
}

validation_job_outputs = {
    "output_data": Output(type="uri_file", path=validation_file.path, mode=output_mode)
}

validation_job = command(
    inputs=validation_job_inputs,
    outputs=validation_job_outputs,
    code="./",  # location of source code
    command="python OpenAIValidation.py --input ${{inputs.input_data}} --output ${{outputs.output_data}}",
    environment="openai"+"@latest",
    compute="mycompute",
    display_name="validation result",
)

In [None]:
ml_client.create_or_update(validation_job)

In [None]:
# the dsl decorator tells the sdk that we are defining an Azure Machine Learning pipeline
from azure.ai.ml import dsl, Input, Output


@dsl.pipeline(
    compute=cpu_compute_target,
    description="E2E OpenAI Q&A and validation pipeline",
)
def credit_defaults_pipeline(
    pipeline_job_data_input,
):
    # using data_prep_function like a python call with its own inputs
    qa_job = langchain_job(
        input_data=pipeline_job_data_input,
    )

    # using train_func like a python call with its own inputs
    result_job = validation_job(
        input_data=qa_job.outputs.output_data,  # note: using outputs from previous step
    )

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "pipeline_qa_job__data": qa_job.outputs.output_data,
        "pipeline_result_job_data": result_job.outputs.output_data,
    }

In [None]:
# Let's instantiate the pipeline with the parameters of our choice
pipeline = credit_defaults_pipeline(
    pipeline_job_data_input=Input(type="uri_file", path=input_data.path),
)

In [None]:
# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="e2e_registered_components",
)
ml_client.jobs.stream(pipeline_job.name)