In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Vertex AI Pipelines: Pipelines introduction for KFP


## Overview

This notebook provides an introduction to using [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines) with [the Kubeflow Pipelines (KFP) SDK](https://www.kubeflow.org/docs/components/pipelines/).

Learn more about [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction).

### Objective

In this tutorial, you learn how to use the KFP SDK to build pipelines that generate evaluation metrics.

This tutorial uses the following Google Cloud ML services:

- `Vertex AI Pipelines`

The steps performed include:

- Define and compile a `Vertex AI` pipeline.
- Specify which service account to use for a pipeline run.

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage

Learn about [Vertex AI pricing](https://cloud.google.com/vertex-ai/pricing),
[Cloud Storage pricing](https://cloud.google.com/storage/pricing),
and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

### Install additional packages

Install the following packages required to execute this notebook.

In [14]:
!pip install --upgrade --no-warn-conflicts --user -q \
    google-cloud-pipeline-components \
    kfp \
    tensorflow

After you install the additional packages, you need to restart the notebook kernel so it can find the packages.

In [15]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):

    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Before you begin

#### Set your project ID

In [1]:
GCP_PROJECTS = !gcloud config get-value project
PROJECT_ID = GCP_PROJECTS[0]

#### Region

You can also change the `REGION` variable used by Vertex AI. Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations).

In [2]:
REGION = "us-central1"  # @param {type: "string"}

### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

In [3]:
BUCKET_URI = f"gs://{PROJECT_ID}-mlops"  # @param {type:"string"}

Run the following cell to create your Cloud Storage bucket.

In [4]:
! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

Creating gs://qinetiq-workshop23lon-5240-mlops/...
ServiceException: 409 A Cloud Storage bucket named 'qinetiq-workshop23lon-5240-mlops' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


### Set up variables

Next, set up some variables used throughout the tutorial.
### Import libraries and define constants

In [5]:
from typing import NamedTuple

import google.cloud.aiplatform as aip
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component, Input, Output, Artifact

#### Vertex AI Pipelines constants

Setup up the following constants for Vertex AI Pipelines:

In [6]:
PIPELINE_ROOT = "{}/pipeline_root/intro".format(BUCKET_URI)

## Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [7]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

### Define Python function-based pipeline components

In this tutorial, you define a simple pipeline that has three steps, where each step is defined as a component.

#### Define hello_world component

First, define a component based on a very simple Python function. It takes a string input parameter and returns that value as output.

Note the use of the `@component` decorator, which compiles the function to a KFP component when evaluated.  For example purposes, this example specifies a base image to use for the component (`python:3.9`), and a component YAML file, `hw.yaml`. The compiled component specification is written to this file.  (The default base image is `python:3.7`, which would of course work just fine too).

In [8]:
@component(output_component_file="hw.yaml", base_image="python:3.9")
def hello_world(text: str) -> str:
    print(text)
    return text

As you'll see below, compilation of this component creates a [task factory function](https://www.kubeflow.org/docs/components/pipelines/sdk/python-function-components/)—called `hello_world`— that you can use in defining a pipeline step.

While not shown here, if you want to share this component definition, or use it in another context, you could also load it from its yaml file like this:
`hello_world_op = components.load_component_from_file('./hw.yaml')`.
You can also use the `load_component_from_url` method, if your component yaml file is stored online. (For GitHub URLs, load the 'raw' file.)

#### Define three_outputs component

The first component below, `three_outputs`, demonstrates installing a package -- in this case the `google-cloud-storage` package. Alternatively, you can specify a base image that includes the necessary installations.

Alternatively, you can specify a base image that includes the necessary installations.

This component additionally showcases how to produce different inputs / outputs as part of a component.

##### Input/Output parameters
Input and Ouput parameters are declared when you use a str, int, float, bool, dict or list type annotation. The data passed to parameters typed with dict or list may only container JSON-serializable Python primitives. Union types are not permitted. When producing multiple outputs, a NamedTuple should be used. In this case `text` it's an input parameter.


##### Input/Output artifacts
Input artifacts are defined when you use an Input/Output[<ArtifactClass>] annotation. In the following cell `file_output` it's an Output artifact.

At component runtime, input artifacts are copied to the local filesystem by the executing backend. This abstracts away the need for the component author to know where artifacts are stored in remote storage and allows component authors to only interact with the local filesystem when implementing a component that uses an artifact. All artifacts implement a .path method, which can be used to access the local path where the artifact file has been copied. All the artifacts also expose an .uri method to get the relative GCS uri.
    
There are also other specialised Artifact types accepted as part of a pipeline, such as Model, Dataset, Metrics, HTML.
    
The `three_outputs` component returns:
- two named outputs, so outputs that can be returned as output of the python function.
- one File Artifact output, so a file produced by the component which gets tracked as part of the pipeline lineage. 

In [9]:
@component(packages_to_install=["google-cloud-storage"])
def three_outputs(
    text: str,
    file_output: Output[Artifact]
) -> NamedTuple(
    "Outputs",
    [
        ("output_one", str),  # Return parameters
        ("output_two", str),
    ],
):
    # the import is not actually used for this simple example, but the import
    # is successful, as it was included in the `packages_to_install` list.
    from google.cloud import storage
    
    o1 = f"output one from text: {text}"
    o2 = f"output two from text: {text}"
    print("output one: {}; output_two: {}".format(o1, o2))
    
    with open(file_output.path, 'w') as f: 
        f.write("third output")
    
    return (o1, o2)

#### Define the consumer component

The third component, `consumer`, takes three string inputs and the Artifact produced earlier (this time as Input) and prints them out.

In [10]:
@component
def consumer(text1: str, text2: str, text3: str, file: Input[Artifact]):
    with open(file.path) as f:
        print(f.read())
    print(f"text1: {text1}; text2: {text2}; text3: {text3}")

### Define a pipeline that uses the components

Next, define a pipeline that uses these three components.

By evaluating the component definitions above, you've created task factory functions that are used in the pipeline definition to create the pipeline steps.

The pipeline takes an input parameter, and passes that parameter as an argument to the first two pipeline steps (`hw_task` and `three_outputs_task`).

Then, the third pipeline step (`consumer_task`) consumes the outputs of the first and second steps.  Because the `hello_world` component definition just returns one unnamed output, you refer to it as `hw_task.output`.  The `three_outputs` task returns two named outputs and the Artifact Output, which you access as `three_outputs_task.outputs["<output_name>"]`.

*Note:* In the `@dsl.pipeline` decorator, you're defining the `PIPELINE_ROOT` Cloud Storage path to use.  If you had not included that info here, it would be required to specify it when creating the pipeline run, as you'll see below.

In [11]:
@dsl.pipeline(
    name="intro-pipeline-unique",
    description="A simple intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(text: str = "hi there"):
    hw_task = hello_world(text)
    three_outputs_task = three_outputs(text)
    consumer_task = consumer(
        file=three_outputs_task.outputs["file_output"],
        text1=hw_task.output,
        text2=three_outputs_task.outputs["output_one"],
        text3=three_outputs_task.outputs["output_two"],
    )

## Compile the pipeline

Next, compile the pipeline.

In [12]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(pipeline_func=pipeline, package_path="intro_pipeline.json")



## Run the pipeline

Next, run the pipeline.

In [13]:
DISPLAY_NAME = "intro_pipeline_job_unique"

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="intro_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/450304494793/locations/us-central1/pipelineJobs/intro-pipeline-unique-20230122175407
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/450304494793/locations/us-central1/pipelineJobs/intro-pipeline-unique-20230122175407')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/intro-pipeline-unique-20230122175407?project=450304494793
PipelineJob projects/450304494793/locations/us-central1/pipelineJobs/intro-pipeline-unique-20230122175407 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/450304494793/locations/us-central1/pipelineJobs/intro-pipeline-unique-20230122175407 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/450304494793/locations/us-central1/pipelineJobs/intro-pipeline-unique-20230122175407 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/450304494793/l

Click on the generated link to see your run in the Cloud Console.

<!-- It should look something like this as it is running:

<a href="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" width="40%"/></a> -->

In the UI, many of the pipeline DAG nodes will expand or collapse when you click on them. Here is a partially-expanded view of the DAG (click image to see larger version).

<a href="https://storage.googleapis.com/amy-jo/images/mp/intro_pipeline.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/intro_pipeline.png" width="60%"/></a>