# Scheduling Vertex Pipelines with Cloud Scheduler

## Setup

Pip install Kubeflow Pipelines SDK ([kfp](https://pypi.org/project/kfp/#history)) version v2.0.0b1 or higher ([required by Artifact Registry](https://cloud.google.com/vertex-ai/docs/pipelines/create-pipeline-template)) along with the Vertex AI SDK (aiplatform) and other required packages:

In [4]:
%pip install kfp==2.1.2 google-cloud-aiplatform==1.28.1

Collecting kfp==2.1.2
  Downloading kfp-2.1.2.tar.gz (280 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m280.4/280.4 kB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting google-cloud-aiplatform==1.28.1
  Obtaining dependency information for google-cloud-aiplatform==1.28.1 from https://files.pythonhosted.org/packages/8f/3f/979e65dc13c11b4c53241d105d41d4b937b131e56b68397cc279f372be96/google_cloud_aiplatform-1.28.1-py2.py3-none-any.whl.metadata
  Using cached google_cloud_aiplatform-1.28.1-py2.py3-none-any.whl.metadata (24 kB)
Collecting click<9,>=8.0.0 (from kfp==2.1.2)
  Obtaining dependency information for click<9,>=8.0.0 from https://files.pythonhosted.org/packages/1a/70/e63223f8116931d365993d4a6b7ef653a4d920b41d03de7c59499962821f/click-8.1.6-py3-none-any.whl.metadata
  Downloading click-8.1.6-py3-none-any.whl.metadata (3.0 kB)
Collecting docstring-parser<1,>=0.7.3 (from kfp==2.1.2)
  Downloadi

Restart kernel:

In [5]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Verify your Kubeflow Pipelines SDK ([kfp](https://pypi.org/project/kfp/#history)) is version v2.0.0b1 or higher ([required by Artifact Registry](https://cloud.google.com/vertex-ai/docs/pipelines/create-pipeline-template)):

In [1]:
import kfp
kfp.__version__

'2.1.2'

Either manually set your PROJECT_ID or use the below `gcloud` command to retrieve it:

In [2]:
shell_output = ! gcloud config list --format 'value(core.project)' 2>/dev/null
PROJECT_ID=shell_output[0]

PROJECT_ID

'axel-argolis-1'

Set the region or leave it as `us-central1`.

Note: If you change the region, make sure your network is configured to run in that region.

In [3]:
REGION="us-central1"

Set your GCS bucket name:

In [4]:
GCS_BUCKET_NAME="axel-argolis-usc1-bucket/vkfp"
GCS_BUCKET_URI=f"gs://{GCS_BUCKET_NAME}"

**Only if your bucket doesn't already exist:** Run the following cell to create your Cloud Storage bucket

In [None]:
! gsutil mb -l $REGION $GCS_BUCKET_URI

Finally, validate access to your Cloud Storage bucket by examining its contents:

In [6]:
! gsutil ls -al $BUCKET_URI

gs://test-training-pipeline-mlops/
gs://wmt-mlp-p-demoorg-vvl19-export-bucket/


## Define and Compile Pipeline

Imports:

In [5]:
from kfp import compiler
from kfp.dsl import pipeline, component, Artifact, Dataset, Model, Input, Output, OutputPath, InputPath
from typing import NamedTuple

Set pipeline inputs or use suggested values set below:

In [6]:
# PIPELINE_NETWORK=
PIPELINE_NAME="hello-world-pipeline"
PIPELINE_ROOT=f"{GCS_BUCKET_URI}/pipeline-root/{PIPELINE_NAME}"
PIPELINE_YAML="hello_world_pipeline.yaml"
PIPELINE_PARAMS={"text": "Hello World!"}

Hello world component:

In [7]:
@component
def hello_world(text: str):
    print(text)

Pipeline definition:

In [8]:
@pipeline(
    name=PIPELINE_NAME,
    description="Hello world example pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(text: str = "Hello world!"):
    hello_world(text=text)

Compile pipeline into YAML file:

In [9]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path=PIPELINE_YAML
)

Take a look at the contents of the pipeline definition YAML:

In [10]:
! cat $PIPELINE_YAML

# PIPELINE DEFINITION
# Name: hello-world-pipeline
# Description: Hello world example pipeline
# Inputs:
#    text: str [Default: 'Hello world!']
components:
  comp-hello-world:
    executorLabel: exec-hello-world
    inputDefinitions:
      parameters:
        text:
          parameterType: STRING
defaultPipelineRoot: gs://axel-argolis-usc1-bucket/vkfp/pipeline-root/hello-world-pipeline
deploymentSpec:
  executors:
    exec-hello-world:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - hello_world
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet     --no-warn-script-location 'kfp-dsl==2.1.2'\
          \ && \"$0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)

   

## Create kfp Artifact Registry and Upload Pipeline Template

Set the name for your kfp Artifact Registry or use sugggested value below:

In [11]:
KFP_REG_NAME="kfp-registry"

Create a kfp Artifact Registry if you don't already have one:

In [14]:
! gcloud artifacts repositories create $KFP_REG_NAME \
    --location=$REGION \
    --repository-format=KFP

Create request issued for: [kfp-registry]
Waiting for operation [projects/wmt-mlp-p-demoorg-vvl19/locations/us-central1/o
perations/ace2d8f0-8637-424f-9edb-a069a3b5346f] to complete...done.            
Created repository [kfp-registry].


Connect to registry via client:

In [12]:
from kfp.registry import RegistryClient

client = RegistryClient(host=f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{KFP_REG_NAME}")

Set pipeline template tags (like version) and generate template URL path from other inputs:

In [13]:
TEMPLATE_TAGS=["2.1.2", "latest"]
TEMPLATE_PATH=f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{KFP_REG_NAME}/{PIPELINE_NAME}/{TEMPLATE_TAGS[0]}"

Upload pipeline template to the registry with extra headers like a description:

In [14]:
templateName, versionName = client.upload_pipeline(
    file_name=PIPELINE_YAML,
    tags=TEMPLATE_TAGS,
    extra_headers={"description":"This is an example hello world pipeline template."})

## Run the Vertex Pipeline via SDK and CURL

Import the Vertex SDK (aiplatform) and run the Vertex pipeline with kfp Artifact Registry template path:

In [15]:
import google.cloud.aiplatform as vertex


job = vertex.PipelineJob(
    display_name=PIPELINE_NAME,
    template_path=TEMPLATE_PATH,
    project=PROJECT_ID,
    location=REGION,
    parameter_values=PIPELINE_PARAMS)

# job.submit(network=PIPELINE_NETWORK)
job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/1023019892523/locations/us-central1/pipelineJobs/hello-world-pipeline-20230801044241
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/1023019892523/locations/us-central1/pipelineJobs/hello-world-pipeline-20230801044241')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/hello-world-pipeline-20230801044241?project=1023019892523


Set the Vertex AI endpoint and auth token, and construct the URL and JSON body for the CURL command:

In [19]:
ENDPOINT=f"https://{REGION}-aiplatform.googleapis.com/v1"
shell_output=!gcloud auth application-default print-access-token
AUTH_TOKEN=shell_output[0]
URL=f"{ENDPOINT}/projects/{PROJECT_ID}/locations/{REGION}/pipelineJobs"

RUNTIME_BODY={
    "displayName": PIPELINE_NAME,
    "runtimeConfig": {
            "parameterValues": PIPELINE_PARAMS,
            "gcsOutputDirectory": PIPELINE_ROOT,
    },
    "network": PIPELINE_NETWORK,
    "templateUri": TEMPLATE_PATH,
}

RUNTIME_BODY

{'displayName': 'hello-world-pipeline',
 'runtimeConfig': {'parameterValues': {'text': 'Hello World!'},
  'gcsOutputDirectory': 'gs://test-training-pipeline-mlops/pipeline-root/hello-world-pipeline'},
 'network': 'projects/12856960411/global/networks/vpcnet-private-svc-access-usc1',
 'templateUri': 'https://us-central1-kfp.pkg.dev/wmt-mlp-p-demoorg-vvl19/kfp-registry/hello-world-pipeline/v1'}

Run the Vertex pipeline via the below CURL command:

In [20]:
! curl -X POST $URL?pipelineJobId=$PIPELINE_NAME-$(date +%Y%m%d%H%M%S) -d "$RUNTIME_BODY" \
 -H "Content-Type: application/json" \
 -H "Authorization: Bearer $AUTH_TOKEN" -v

Note: Unnecessary use of -X or --request, POST is already inferred.
* Expire in 0 ms for 6 (transfer 0x55e5ec7900f0)
* Expire in 1 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 0 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 1 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 0 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 0 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 1 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 0 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 0 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 1 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 0 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 0 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 1 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 0 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 0 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 1 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 0 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 0 ms for 1 (transfer 0x55e5ec7900f0)
* Expire in 1 ms for 1 (transfer 0x55e5ec7900f0)
*

## Create Cloud Scheduler Job to Run Vertex Pipeline

Set Cloud Scheduler specific inputs like name, cron schedule, time zone, and service account (below command sets the default compute engine service account):

In [21]:
SCHEDULE_NAME=f"{PIPELINE_NAME}-http-schedule"
SCHEDULE_CRON="0 */3 * * *"
SCHEDULE_TIME_ZONE="PST"

shell_output = ! gcloud projects describe $PROJECT_ID
project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
SCHEDULE_SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

SCHEDULE_SERVICE_ACCOUNT

'122477604599-compute@developer.gserviceaccount.com'

Create Cloud Scheduler job (see [documentation](https://cloud.google.com/sdk/gcloud/reference/scheduler/jobs/create/http) for details on the arguments):

In [22]:
! gcloud scheduler jobs create http $SCHEDULE_NAME \
    --schedule="$SCHEDULE_CRON" \
    --time-zone=$SCHEDULE_TIME_ZONE \
    --uri=$URL \
    --http-method=POST \
    --oauth-service-account-email=$SCHEDULE_SERVICE_ACCOUNT \
    --headers=Content-Type=application/json,User-Agent=Google-Cloud-Scheduler \
    --max-retry-attempts=2 \
    --message-body="$RUNTIME_BODY" \
    --location=$REGION

Manually trigger Cloud Scheduler job:

In [23]:
! gcloud scheduler jobs run $SCHEDULE_NAME \
    --location $REGION \
    --quiet

Describe the Cloud Scheduler job:

In [24]:
! gcloud scheduler jobs describe $SCHEDULE_NAME \
    --location $REGION

attemptDeadline: 180s
httpTarget:
  body: eydkaXNwbGF5TmFtZSc6ICdoZWxsby13b3JsZC1waXBlbGluZScsICdydW50aW1lQ29uZmlnJzogeydwYXJhbWV0ZXJWYWx1ZXMnOiB7J3RleHQnOiAnSGVsbG8gV29ybGQhJ30sICdnY3NPdXRwdXREaXJlY3RvcnknOiAnZ3M6Ly90ZXN0LXRyYWluaW5nLXBpcGVsaW5lLW1sb3BzL3BpcGVsaW5lLXJvb3QvaGVsbG8td29ybGQtcGlwZWxpbmUnfSwgJ25ldHdvcmsnOiAncHJvamVjdHMvMTI4NTY5NjA0MTEvZ2xvYmFsL25ldHdvcmtzL3ZwY25ldC1wcml2YXRlLXN2Yy1hY2Nlc3MtdXNjMScsICd0ZW1wbGF0ZVVyaSc6ICdodHRwczovL3VzLWNlbnRyYWwxLWtmcC5wa2cuZGV2L3dtdC1tbHAtcC1kZW1vb3JnLXZ2bDE5L2tmcC1yZWdpc3RyeS9oZWxsby13b3JsZC1waXBlbGluZS92MSd9
  headers:
    Content-Type: application/json
    User-Agent: Google-Cloud-Scheduler
  httpMethod: POST
  oauthToken:
    scope: https://www.googleapis.com/auth/cloud-platform
    serviceAccountEmail: 122477604599-compute@developer.gserviceaccount.com
  uri: https://us-central1-aiplatform.googleapis.com/v1/projects/wmt-mlp-p-demoorg-vvl19/locations/us-central1/pipelineJobs
lastAttemptTime: '2022-11-30T06:19:24.884253Z'
name: projects

## Cleanup

Delete Cloud Scheduler job:

In [24]:
! gcloud scheduler jobs delete $SCHEDULE_NAME \
    --location $REGION \
    --quiet

Deleted job [hello-world-pipeline-http-schedule].


**Only delete if you'd like to delete the entire kfp Artifact Registry that was listed above:**

In [25]:
! gcloud artifacts repositories delete $KFP_REG_NAME \
    --location $REGION \
    --quiet

Delete request issued for: [kfp-registry]
Waiting for operation [projects/wmt-mlp-p-demoorg-vvl19/locations/us-central1/o
perations/e9997143-5335-4ba6-9c5a-80bd4b9b4bdd] to complete...done.            
Deleted repository [kfp-registry].
