Reference: https://colab.research.google.com/drive/1x2EGrZ1WdgBVfsihB5ihxnnMfL2mJJZo#scrollTo=5UmtuvhYixkz

In [None]:
# updated to the latest dependencies on November 2022
!pip install google-cloud-aiplatform==1.19.0 --upgrade
!pip install google-cloud-pipeline-components==1.0.27 --upgrade
!pip install kfp==1.8.16 --upgrade # for kubeflow pipeline

In [42]:
import kfp

from typing import NamedTuple

from kfp.v2.dsl import pipeline
from kfp.v2.dsl import component
from kfp.v2.dsl import OutputPath
from kfp.v2.dsl import InputPath

from kfp.v2.dsl import Output
from kfp.v2.dsl import Metrics

from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClient


from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

from google_cloud_pipeline_components import aiplatform as gcc_aip

from google_cloud_pipeline_components.v1.model import ModelUploadOp

# Authentication

We want to allow our Colab to communicate with Google platform. 

In [43]:
from google.colab import auth
auth.authenticate_user()

credentials = auth._check_adc()
print(credentials)

True


set the project id and saving path

In [44]:
PROJECT_ID = "vertex-basic" #"sascha-playground-doit"
PIPELINE_ROOT = "gs://doit-vertex-demo/"

# Clients

In [45]:
# for vertex ai 
aiplatform.init(project=PROJECT_ID,
                location='us-central1')

# Pipeline Basic

## Components

We now just tray to create some very simple components for pipeline. </br>
**Component**:  a specific task (in this case, it's a function)

In [46]:
# first component
# take two strings, combine them
@component() 
def concat(a: str, b: str) -> str:
  return a + b

In [47]:
# second component
# take output from the first component, and reverse it
@component
def reverse(a: str)->NamedTuple("outputs", [("before", str), ("after", str)]):
  return a, a[::-1]

After we have components, we put all components into a pipeline.

## Pipeline

**Pipeline**: it contains all components; it does everything at once.

In [48]:
# pipeline declaration 
@pipeline(name="basic-pipeline",
          pipeline_root=PIPELINE_ROOT + "basic-pipeline")
def basic_pipeline(a: str='stres', b: str='sed'):
    concat_task = concat(a, b) # get output from first pipeline
    reverse_task = reverse(concat_task.output) 

## Compile

The compiler takes our pipeline function and compiles it into our pipeline specifiction as json file. This json file we can use to create our pipeline in Vertex AI Pipelines.

In [49]:
compiler.Compiler().compile(
pipeline_func=basic_pipeline, package_path="basic_pipeline.json"
)

We take a compile to run our pipeline, and get an output, a Json file.

Note: you can look at your left file folder, you can seee a new Json file create (basic_pipeline.json).

The Json file just "zip version" of our components/functionalities/modules.

## Run

Create the run job using the API. 
You can also directly upload the pipeline JSON file in the Vertx AI UI.

Use the Json file to fun Vertex AI

In [50]:
job = pipeline_jobs.PipelineJob(
    display_name="basic-pipeline", # name of pipeline
    template_path="basic_pipeline.json", # input Json file
    parameter_values={"a": "stres", "b": "sed"} # user's input 
)

In [51]:
job.run(sync=False)