In [None]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Vertex AI Pipelines: pipeline control structures using the KFP SDK

<table align="left">
  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/control_flow_kfp.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/control_flow_kfp.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
<a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/official/pipelines/control_flow_kfp.ipynb" target='_blank'>
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      Open in Vertex AI Workbench
     </a>
  </td>
</table>
<br/><br/><br/>

In [2]:
PROJECT_ID = "epishova-joonix-sandbox"

In [3]:
REGION = "us-central1"  # @param {type: "string"}

In [4]:
BUCKET_URI = "gs://epishova-team-pipelines"  # @param {type:"string"}

#### Service Account

**If you don't know your service account**, try to get your service account using `gcloud` command by executing the second cell below.

In [6]:
SERVICE_ACCOUNT = "[your-service-account]"  # @param {type:"string"}

In [7]:
import sys

IS_COLAB = "google.colab" in sys.modules
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    if IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

Service Account: 933318019720-compute@developer.gserviceaccount.com


### Set up variables

Next, set up some variables used throughout the tutorial.
### Import libraries and define constants

In [8]:
import json

import google.cloud.aiplatform as aip
from kfp import compiler, dsl
from kfp.dsl import component

#### Vertex AI Pipelines constants

Setup up the following constants for Vertex AI Pipelines:

In [9]:
PIPELINE_ROOT = "{}/pipeline_root/control".format(BUCKET_URI)

## Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [10]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

## Define pipeline components

The following example defines three simple pipeline components:

- A component that generates a list of dicts and outputs the stringified json.
(Note: This component requires an `import json` in the component function definition)
- A component that just prints its input string
- A component that does a 'coin flip' and outputs `heads` or `tails`.

In [11]:
@component
def args_generator_op() -> str:
    import json

    return json.dumps(
        [{"cats": "1", "dogs": "2"}, {"cats": "10", "dogs": "20"}],
        sort_keys=True,
    )


@component
def print_op(msg: str):
    print(msg)


@component
def flip_coin_op() -> str:
    """Flip a coin and output heads or tails randomly."""
    import random

    result = "heads" if random.randint(0, 1) == 0 else "tails"
    return result

## Define a pipeline that uses control structures

The following example defines a pipeline that uses these components and demonstrates the use of  `dsl.Condition` and `dsl.ParallelFor`.

The `json_string` input's default value is a nested JSON list converted to a string. As the pipeline definition shows, the loop and conditional expressions are able to process this string as a list, and access list items and sub-items.
The same holds for the list output by the `args_generator_op`.

In [12]:
@dsl.pipeline(
    name="control",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(
    json_string: str = json.dumps(
        [
            {
                "snakes": "anaconda",
                "lizards": "anole",
                "bunnies": [{"cottontail": "bugs"}, {"cottontail": "thumper"}],
            },
            {
                "snakes": "cobra",
                "lizards": "gecko",
                "bunnies": [{"cottontail": "roger"}],
            },
            {
                "snakes": "boa",
                "lizards": "iguana",
                "bunnies": [
                    {"cottontail": "fluffy"},
                    {"fuzzy_lop": "petunia", "cottontail": "peter"},
                ],
            },
        ],
        sort_keys=True,
    )
):

    flip1 = flip_coin_op()

    with dsl.Condition(
        flip1.output != "no-such-result", name="alwaystrue"
    ):  # always true

        args_generator = args_generator_op()
        with dsl.ParallelFor(args_generator.output) as item:
            print_op(msg=json_string)

            with dsl.Condition(flip1.output == "heads", name="heads"):
                print_op(msg=item.cats)

            with dsl.Condition(flip1.output == "tails", name="tails"):
                print_op(msg=item.dogs)

    with dsl.ParallelFor(json_string) as item:
        with dsl.Condition(item.snakes == "boa", name="snakes"):
            print_op(msg=item.snakes)
            print_op(msg=item.lizards)
            print_op(msg=item.bunnies)

    # it is possible to access sub-items
    with dsl.ParallelFor(json_string) as item:
        with dsl.ParallelFor(item.bunnies) as item_bunnies:
            print_op(msg=item_bunnies.cottontail)

  with dsl.Condition(
  with dsl.Condition(flip1.output == "heads", name="heads"):
  with dsl.Condition(flip1.output == "tails", name="tails"):
  with dsl.Condition(item.snakes == "boa", name="snakes"):


## Compile the pipeline

Next, compile the pipeline.

In [18]:
REPO_HOME = "/home/jupyter/mlops-demo"

In [19]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path=f"{REPO_HOME}/pipelines/control_pipeline.yaml"
)

## Run the pipeline

Next, run the pipeline.

In [22]:
DISPLAY_NAME = "control"

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path=f"{REPO_HOME}/pipelines/control_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/933318019720/locations/us-central1/pipelineJobs/control-20231006192217
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/933318019720/locations/us-central1/pipelineJobs/control-20231006192217')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/control-20231006192217?project=933318019720
PipelineJob projects/933318019720/locations/us-central1/pipelineJobs/control-20231006192217 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob run completed. Resource name: projects/933318019720/locations/us-central1/pipelineJobs/control-20231006192217
