# Use Flow in Azure ML Pipeline

**Requirements** - In order to benefit from this tutorial, you will need:
- A basic understanding of Machine Learning
- An Azure account with an active subscription - [Create an account for free](https://azure.microsoft.com/free/?WT.mc_id=A261C142F)
- An Azure ML workspace - [Configure workspace](../../configuration.ipynb)
- A python environment
- Installed Azure Machine Learning Python SDK v2
- Installed PromptFlow SDK

**Learning Objectives** - By the end of this tutorial, you should be able to:
- Connect to your AML workspace from the Python SDK
- Load a flow as a `ParallelComponent`
- Using the component along with other components loaded from yaml in one `PipelineJob`.

**Motivations** - This guide will introduce how to use a flow along with other data processing steps in a pipeline.

**Known issues** - This feature is not stable now and here are known issues we are actively fixing:
- You must include a `.promptflow/flow.tools.json` in the flow directory first. This file will automatically generated when you run the flow locally.
- Component of the same name (even with different version) can be created only once. An auto-generated component name based on hash will be used when component name & version are neither provided.
- The flow nodes can only run on computer cluster with managed identity assigned Azure ML Data Scientist role.
- connection/columns_mapping overwrite doesn't work for now.
- This feature works on canary workspace only for now: [sample job link](https://ml.azure.com/experiments/id/9ce1a534-9d3d-4761-a5e7-5299dd6912f1/runs/clever_leek_4xh6x9z7s5?wsid=/subscriptions/96aede12-2f73-41cb-b983-6d11a904839b/resourcegroups/promptflow/workspaces/promptflow-canary-dev&tid=72f988bf-86f1-41af-91ab-2d7cd011db47)

## 0. Install dependent packages

Please follow [configuration.ipynb](../../configuration.ipynb) to install dependent packages and connect to a workspace first.

In [None]:
%pip install -r ../../requirements.txt

## 1. Connect to PLClient and create necessary connections
Similar to other SDK in azure-ai-ml, you need to import related packages and prepare a ML client connecting to a specific workspace first.

In [None]:
# Import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential

from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import load_component
from promptflow.azure import PFClient

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

# Create PFClient connected to workspace
pf = PFClient.from_config(credential=credential)

## 2. Load flow as a component

Suppose you have already authored a flow, you can load it as component:

In [None]:
flow_component = pf.load_as_component(
    "../../flows/standard/web-classification/",
    columns_mapping={
        "url": "Channel",
        "groundtruth": "${data.answer}",
    },
    component_type="parallel",
)

print(flow_component)

## 3. Use the component in a pipeline

Then you can use this component along with other components in a pipeline:

In [None]:
tsv2jsonl_component = load_component("./tsv2jsonl-component/component_spec.yaml")


@pipeline
def pipeline_with_flow(input_data):
    data_transfer = tsv2jsonl_component(input_data=input_data)

    flow_node = flow_component(
        # this can be either a URI jsonl file or a URI folder containing multiple jsonl files
        data=data_transfer.outputs.output_data,
        # you can overwrite inputs mapping here
        groundtruth="${data.url}",
        # this is to overwrite connection settings
        connections={
            # this is to overwrite connection related settings for a LLM node
            # "summarize_text_content" is the node name
            "summarize_text_content": {
                "deployment_name": "gpt-35-turbo",
            },
            # you can overwrite custom connection input of a python node here
            # "convert_to_dict": {
            #     "conn1": "another_connection"
            # }
        },
    )
    # node level run settings for flow node is similar to `ParallelComponent`
    flow_node.logging_level = "DEBUG"
    flow_node.max_concurrency_per_instance = 2
    return flow_node.outputs


pipeline = pipeline_with_flow(
    input_data=Input(path="./data.tsv", type=AssetTypes.URI_FILE),
)

pipeline.settings.default_compute = "cpu-cluster"

created_job = pf.ml_client.jobs.create_or_update(pipeline)

Like other pipeline jobs in azure-ai-ml, you can monitor the status of the job via `ml_client.jobs.stream`:

In [None]:
pf.ml_client.jobs.stream(created_job.name)