## Installation

Install the packages required for executing this notebook.

In [None]:
import os

! pip install --upgrade google-cloud-aiplatform {USER_FLAG} -q
! pip install -U google-cloud-storage {USER_FLAG} -q
! pip install {USER_FLAG} kfp google-cloud-pipeline-components --upgrade -q
! pip install kfp==1.8.14

### Restart the kernel

Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Check the versions of the packages you installed.  The KFP SDK version should be >=1.6.

In [None]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

In [17]:
PROJECT_ID = "vertex-nirvana-poc"  # @param {type:"string"}

#### Region

You can also change the `REGION` variable, which is used for operations
throughout the rest of this notebook.  Below are regions supported for Vertex AI. We recommend that you choose the region closest to you.

- Americas: `us-central1`
- Europe: `europe-west4`
- Asia Pacific: `asia-east1`

You may not use a multi-regional bucket for training with Vertex AI. Not all regions provide support for all Vertex AI services.

Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations)

In [18]:
REGION = "us-central1"  # @param {type:"string"}

In [19]:
if REGION == "[your-region]":
    REGION = "us-central1"

#### Timestamp

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a timestamp for each instance session, and append the timestamp onto the name of resources you create in this tutorial.

In [20]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

### Create a Cloud Storage bucket

**The following steps are required, regardless of your notebook environment.**

When you initialize the Vertex AI SDK for Python, you specify a Cloud Storage staging bucket. The staging bucket is where all the data associated with your dataset and model resources are retained across sessions.

Set the name of your Cloud Storage bucket below. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.

In [21]:
BUCKET_NAME = "child-pipelines"  # Replace with your bucket name
BUCKET_URI = f"gs://{BUCKET_NAME}"

In [22]:
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "[your-bucket-name]":
    BUCKET_NAME = PROJECT_ID + "aip-" + TIMESTAMP
    BUCKET_URI = "gs://" + BUCKET_NAME

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

Finally, validate access to your Cloud Storage bucket by examining its contents:

#### Service Account

**If you don't know your service account**, try to get your service account using `gcloud` command by executing the second cell below.

#### Set service account access for Vertex AI Pipelines

Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step -- you only need to run these once per service account.

### Set up variables

Next, set up some variables used throughout the tutorial.
### Import libraries and define constants

In [23]:
from typing import NamedTuple

import google.cloud.aiplatform as aip
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component

#### Vertex AI constants

Setup up the following constants for Vertex AI:

- `API_ENDPOINT`: The Vertex AI API service endpoint for `Dataset`, `Model`, `Job`, `Pipeline` and `Endpoint` services.

In [24]:
# API service endpoint
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)

#### Vertex AI Pipelines constants

Setup up the following constants for Vertex AI Pipelines:

In [25]:
PIPELINE_ROOT = "{}/pipeline_root/intro".format(BUCKET_URI)

## Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [None]:
# Initialize the Vertex AI SDK for your project and bucket
# TODO 1: Your code goes here

In [26]:
@component(output_component_file="C1.yaml", base_image="python:3.9")
def Input_Number(number: str) -> str:
    return number

As you'll see below, compilation of this component creates a [task factory function](https://www.kubeflow.or/docs/components/pipelines/sdk/python-function-components/)—called `hello_world`— that you can use in defining a pipeline step.

While not shown here, if you want to share this componentdefinition, or use it in another context, you could also load it from its yaml file like this:
`hello_world_op = components.load_component_from_file('./hw.yaml')`.
You can also use the `load_component_from_url` method, if your component yaml file is stored online. (For GitHub URLs, load the 'raw' file.)

#### Define two_outputs component

The first component below, `two_outputs`, demonstrates installing a package -- in this case the `google-cloud-storage` package. Alternatively, you can specify a base image that includes the necessary installations.

*Note:* The component function won't actually use the package.

Alternatively, you can specify a base image that includes the necessary installations.

The `two_outputs` component returns two named outputs.

In [27]:
@component(packages_to_install=["google-cloud-storage"])
def Add_Two_Numbers(
    number: str,
) -> NamedTuple(
    "Outputs",
    [
        ("output_one", str),  # Return parameters
        ("output_two", str),
    ],
):
    # the import is not actually used for this simple example, but the import
    # is successful, as it was included in the `packages_to_install` list.
    from google.cloud import storage  

    o1 = f"output one from number + 7: {int(number)+7}"
    o2 = f"output two from number + 5: {int(number)+5}"
    print("output one: {}; output_two: {}".format(o1, o2))
    return (o1, o2)

In [28]:
@component
def sub_pipeline(number1: str, number2: str, number3: str):
    print(f"number1: {number1}; number2: {number2}; number3: {number3}")

In [29]:
@component
def trigger_childflow(message : str) -> str:
    print(f"The next flow message is {message}")
    return message

### Define a pipeline that uses the components

Next, define a pipeline that uses these three components.

By evaluating the component definitions above, you've created task factory functions that are used in the pipeline definition to create the pipeline steps.

The pipeline takes an input parameter, and passes that parameter as an argument to the first two pipeline steps (`hw_task` and `two_outputs_task`).

Then, the third pipeline step (`consumer_task`) consumes the outputs of the first and second steps.  Because the `hello_world` component definition just returns one unnamed output, you refer to it as `hw_task.output`.  The `two_outputs` task returns two named outputs, which you access as `two_outputs_task.outputs["<output_name>"]`.

*Note:* In the `@dsl.pipeline` decorator, you're defining the `PIPELINE_ROOT` Cloud Storage path to use.  If you had not included that info here, it would be required to specify it when creating the pipeline run, as you'll see below.

In [30]:
# Define the first pipeline
@dsl.pipeline(
    name="pipeline1",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline1(number: str = "999"):
    hw_task = Input_Number(number)
    two_outputs_task = Add_Two_Numbers(hw_task.output)
    # Call the second pipeline
    pipeline2_task = pipeline2(two_outputs_task.outputs["output_one"], two_outputs_task.outputs["output_two"])


# Define the second pipeline
@dsl.pipeline(
    name="pipeline2",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline2(number1_output: str, number2_output: str):
    # Use the two_outputs component
    next_flow = trigger_childflow("The addition of two numbers are done on pipeline 1")
    consumer_task = sub_pipeline(  
        number1=next_flow.output,
        number2=number1_output,
        number3=number2_output,
    )

## Compile the pipeline

Next, compile the pipeline.

In [31]:
# Compile the pipeline
from kfp.v2 import compiler  

compiler.Compiler().compile(pipeline_func=pipeline1, package_path="two-pipelines-v2.json")

## Run the pipeline

Next, run the pipeline.

In [None]:
# Create and run the pipeline job run
DISPLAY_NAME = "intro_" + TIMESTAMP

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="two-pipelines-v2.json",
    pipeline_root=PIPELINE_ROOT,
)


job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/665554893092/locations/us-central1/pipelineJobs/pipeline1-20231030113026
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/665554893092/locations/us-central1/pipelineJobs/pipeline1-20231030113026')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipeline1-20231030113026?project=665554893092


### Delete the pipeline job

You can delete the pipeline job with the method `delete()`.

In [None]:
job.delete()