# Vertex AI Pipelines’ Schedules API in Private Preview Guide

In [33]:
%pip install kfp==2.0.0b13 google-cloud-aiplatform==1.19.0 google-api-python-client==1.8.0 \
    --user \
    # --index-url https://repository.walmart.com/repository/pypi-proxy/simple/ \
    # --default-timeout 300

Collecting kfp==2.0.0b13
  Downloading kfp-2.0.0-beta.13.tar.gz (362 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m362.1/362.1 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: kfp
  Building wheel for kfp (setup.py) ... [?25ldone
[?25h  Created wheel for kfp: filename=kfp-2.0.0b13-py3-none-any.whl size=511194 sha256=0da4955f5c34fa7ac2a88e82437f196fd46030b26e575d9fa129e75d62d15e82
  Stored in directory: /home/jupyter/.cache/pip/wheels/e5/1f/56/7a5fd687bb2d8da63b26289633cbee057593b00c4cd9ceb722
Successfully built kfp
Installing collected packages: kfp
  Attempting uninstall: kfp
    Found existing installation: kfp 2.0.0b12
    Uninstalling kfp-2.0.0b12:
      Successfully uninstalled kfp-2.0.0b12
[0mSuccessfully installed kfp-2.0.0b13
Note: you may need to restart the kernel to use updated packages.


Restart kernel:

In [34]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Verify your Kubeflow Pipelines SDK ([kfp](https://pypi.org/project/kfp/#history)) is version v2.0.0b1 or higher ([required by Artifact Registry](https://cloud.google.com/vertex-ai/docs/pipelines/create-pipeline-template)):

In [1]:
import kfp
kfp.__version__

'2.0.0-beta.13'

Either manually set your PROJECT_ID or use the below `gcloud` command to retrieve it:

In [None]:
shell_output = ! gcloud config list --format 'value(core.project)' 2>/dev/null
PROJECT_ID=shell_output[0]

PROJECT_ID

'axel-argolis-1'

Set the region or leave it as `us-central1`.

Note: If you change the region, make sure your network is configured to run in that region.

In [None]:
REGION="us-central1"

Set your GCS bucket name:

In [None]:
GCS_BUCKET_NAME="axel-argolis-usc1-bucket"
GCS_BUCKET_URI=f"gs://{GCS_BUCKET_NAME}"

**Only if your bucket doesn't already exist:** Run the following cell to create your Cloud Storage bucket

In [None]:
! gsutil mb -l $REGION $GCS_BUCKET_URI

Finally, validate access to your Cloud Storage bucket by examining its contents:

In [None]:
! gsutil ls -al $BUCKET_URI

gs://artifacts.axel-argolis-1.appspot.com/
gs://axel-argolis-1-build-logs/
gs://axel-argolis-1-phs/
gs://axel-argolis-1-us-notebooks/
gs://axel-argolis-1-vertex-pipelines-us-central1/
gs://axel-argolis-1_cloudbuild/
gs://axel-argolis-usc1-bucket/
gs://axel-demo-jupyter/
gs://cloud-ai-platform-a11c0982-1d6c-4b9a-b3d3-4f2169c795ff/
gs://dataproc-staging-us-central1-1023019892523-hnmfmgyk/
gs://dataproc-temp-us-central1-1023019892523-mraxqlra/
gs://gcs-bucket-axel-metastore-166d2b1b-32d0-4593-8366-4c27f8626e9b/
gs://gcs-bucket-dataproc-metastore-ax-192f505a-506f-4b34-bf4a-16d334/
gs://gcs-bucket-demo-metastore-80037dbc-c8fe-4df7-864b-f94ab30808f4/
gs://gcs-bucket-dp-metastore-13ee6dbd-91e7-44bd-a28e-dee0f7e34ddd/
gs://gcs-bucket-hive-metastore-3231f858-8a44-4b6e-93bc-1e55613814af/
gs://vision-ai-retail-0x5a6c9d9e08d887c8/


## Define and Compile Pipeline

Imports:

In [None]:
from kfp import compiler
from kfp.dsl import pipeline, component, Artifact, Dataset, Model, Input, Output, OutputPath, InputPath
from typing import NamedTuple

Set pipeline inputs or use suggested values set below:

In [None]:
# PIPELINE_NETWORK="projects/12856960411/global/networks/vpcnet-private-svc-access-usc1"
PIPELINE_NETWORK="projects/860472322816/global/networks/host-shared-vpc"
PIPELINE_NAME="hello-world-pipeline"
PIPELINE_ROOT=f"{GCS_BUCKET_URI}/pipeline-root/{PIPELINE_NAME}"
PIPELINE_YAML="hello_world_pipeline.yaml"
PIPELINE_PARAMS={"text": "Hello World!!"}

Hello world component:

In [None]:
@component
def hello_world(text: str):
    print(text)

Pipeline definition:

In [None]:
@pipeline(
    name=PIPELINE_NAME,
    description="Hello world example pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(text: str = "Hello world!"):
    hello_world(text=text)

Compile pipeline into YAML file:

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path=PIPELINE_YAML
)

Take a look at the contents of the pipeline definition YAML:

In [None]:
! cat $PIPELINE_YAML

# PIPELINE DEFINITION
# Name: hello-world-pipeline
# Description: Hello world example pipeline
# Inputs:
#    text: str [Default: 'Hello world!']
components:
  comp-hello-world:
    executorLabel: exec-hello-world
    inputDefinitions:
      parameters:
        text:
          parameterType: STRING
defaultPipelineRoot: gs://axel-argolis-usc1-bucket/pipeline-root/hello-world-pipeline
deploymentSpec:
  executors:
    exec-hello-world:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - hello_world
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet     --no-warn-script-location 'kfp==2.0.0-beta.12'\
          \ && \"$0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)

    

## Create kfp Artifact Registry and Upload Pipeline Template

Set the name for your kfp Artifact Registry or use sugggested value below:

In [None]:
KFP_REG_NAME="kfp-registry"

Create a kfp Artifact Registry if you don't already have one:

In [None]:
! gcloud artifacts repositories create $KFP_REG_NAME \
    --location=$REGION \
    --repository-format=KFP

Connect to registry via client:

In [None]:
from kfp.registry import RegistryClient

client = RegistryClient(host=f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{KFP_REG_NAME}")

Set pipeline template tags (like version) and generate template URL path from other inputs:

In [None]:
TEMPLATE_TAGS=["b12", "v1", "latest"]
TEMPLATE_PATH=f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{KFP_REG_NAME}/{PIPELINE_NAME}/{TEMPLATE_TAGS[0]}"

Upload pipeline template to the registry with extra headers like a description:

In [None]:
templateName, versionName = client.upload_pipeline(
    file_name=PIPELINE_YAML,
    tags=TEMPLATE_TAGS,
    extra_headers={"description":"This is an example hello world pipeline template."})

Setting default compute engine service account for pipeline:

In [None]:
shell_output = ! gcloud projects describe $PROJECT_ID
project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
PIPELINE_SA = f"{project_number}-compute@developer.gserviceaccount.com"

PIPELINE_SA

'1023019892523-compute@developer.gserviceaccount.com'

If you'd like to use a custom service account for the pipeline (rather than the default compute engine service account), then add your custom service account below:

In [None]:
# PIPELINE_SA=""

## Run the Vertex Pipeline via SDK and CURL

Import the Vertex SDK (aiplatform) and run the Vertex pipeline with kfp Artifact Registry template path:

In [None]:
import google.cloud.aiplatform as vertex

In [None]:
job = vertex.PipelineJob(
    display_name=PIPELINE_NAME,
    template_path=TEMPLATE_PATH,
    project=PROJECT_ID,
    location=REGION,
    parameter_values=PIPELINE_PARAMS,
    enable_caching=False)

job.submit(network=PIPELINE_NETWORK, service_account=PIPELINE_SA)

Creating PipelineJob
PipelineJob created. Resource name: projects/1023019892523/locations/us-central1/pipelineJobs/hello-world-pipeline-20230321185611
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/1023019892523/locations/us-central1/pipelineJobs/hello-world-pipeline-20230321185611')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/hello-world-pipeline-20230321185611?project=1023019892523


Set the Vertex AI endpoint and auth token, and construct the URL and JSON body for the CURL command (refer to the [pipelineJobs REST API documentation](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/projects.locations.pipelineJobs) for more details):

In [20]:
ENDPOINT=f"https://{REGION}-aiplatform.googleapis.com/v1"
shell_output=!gcloud auth application-default print-access-token
AUTH_TOKEN=shell_output[0]
URL=f"{ENDPOINT}/projects/{PROJECT_ID}/locations/{REGION}/pipelineJobs"

RUNTIME_BODY={
    "displayName": PIPELINE_NAME,
    "runtimeConfig": {
            "parameterValues": PIPELINE_PARAMS,
            "gcsOutputDirectory": PIPELINE_ROOT,
    },
    "network": PIPELINE_NETWORK,
    "templateUri": TEMPLATE_PATH,
    "serviceAccount": PIPELINE_SA
}

RUNTIME_BODY

{'displayName': 'hello-world-pipeline',
 'runtimeConfig': {'parameterValues': {'text': 'Hello World!!'},
  'gcsOutputDirectory': 'gs://axel-argolis-usc1-bucket/pipeline-root/hello-world-pipeline'},
 'network': 'projects/860472322816/global/networks/host-shared-vpc',
 'templateUri': 'https://us-central1-kfp.pkg.dev/axel-argolis-1/kfp-registry/hello-world-pipeline/b12',
 'serviceAccount': '1023019892523-compute@developer.gserviceaccount.com'}

Run the Vertex pipeline via the below CURL command:

In [21]:
! curl -X POST $URL?pipelineJobId=$PIPELINE_NAME-$(date +%Y%m%d%H%M%S) -d "$RUNTIME_BODY" \
 -H "Content-Type: application/json" \
 -H "Authorization: Bearer $AUTH_TOKEN" -v

Note: Unnecessary use of -X or --request, POST is already inferred.
* Expire in 0 ms for 6 (transfer 0x558fc1526110)
* Expire in 1 ms for 1 (transfer 0x558fc1526110)
* Expire in 0 ms for 1 (transfer 0x558fc1526110)
* Expire in 1 ms for 1 (transfer 0x558fc1526110)
* Expire in 0 ms for 1 (transfer 0x558fc1526110)
* Expire in 0 ms for 1 (transfer 0x558fc1526110)
* Expire in 1 ms for 1 (transfer 0x558fc1526110)
* Expire in 0 ms for 1 (transfer 0x558fc1526110)
* Expire in 0 ms for 1 (transfer 0x558fc1526110)
* Expire in 1 ms for 1 (transfer 0x558fc1526110)
* Expire in 0 ms for 1 (transfer 0x558fc1526110)
* Expire in 0 ms for 1 (transfer 0x558fc1526110)
* Expire in 1 ms for 1 (transfer 0x558fc1526110)
* Expire in 0 ms for 1 (transfer 0x558fc1526110)
* Expire in 0 ms for 1 (transfer 0x558fc1526110)
* Expire in 1 ms for 1 (transfer 0x558fc1526110)
* Expire in 0 ms for 1 (transfer 0x558fc1526110)
* Expire in 0 ms for 1 (transfer 0x558fc1526110)
* Expire in 1 ms for 1 (transfer 0x558fc1526110)
*

Note if you'd like to disable caching when running the pipeline via the REST API, you must edit the pipeline spec YAML file directly (set `enableCache` to `false`) and then re-upload the pipeline spec to the kfp artifact registry). See the example below:
```yaml
root:
  dag:
    tasks:
      hello-world:
        cachingOptions:
          enableCache: false
```

## Vertex AI Pipelines’ Schedules API in Private Preview

Before being able to use this new API, please fill out [this form](https://docs.google.com/forms/d/e/1FAIpQLScDxABxIvqjeM_279dwTMmVfFBJD7qmW2leyU_ZBTYutJ62uA/viewform?usp=sf_link) and wait for a confirmation email (may take several days)

In [22]:
SCHEDULE_NAME = f"{PIPELINE_NAME}-schedule-api"
# SCHEDULE_START_TIME = "2023-02-03T00:00:00Z"
SCHEDULE_CRON = "TZ=America/Los_Angeles */5 * * * *"
MAX_CONCURRENT_RUN_COUNT = "1"

### Create schedule

In [23]:
RUNTIME_BODY_VAPS_API = {
  "display_name": SCHEDULE_NAME,
  # "start_time": SCHEDULE_START_TIME,
  "cron": SCHEDULE_CRON,
  "max_concurrent_run_count": MAX_CONCURRENT_RUN_COUNT,
  "create_pipeline_job_request": {
      "parent": "projects/1023019892523/locations/us-central1",
      "pipelineJob": RUNTIME_BODY
  }
}

RUNTIME_BODY_VAPS_API

{'display_name': 'hello-world-pipeline-schedule-api',
 'cron': 'TZ=America/Los_Angeles */5 * * * *',
 'max_concurrent_run_count': '1',
 'create_pipeline_job_request': {'parent': 'projects/1023019892523/locations/us-central1',
  'pipelineJob': {'displayName': 'hello-world-pipeline',
   'runtimeConfig': {'parameterValues': {'text': 'Hello World!!'},
    'gcsOutputDirectory': 'gs://axel-argolis-usc1-bucket/pipeline-root/hello-world-pipeline'},
   'network': 'projects/860472322816/global/networks/host-shared-vpc',
   'templateUri': 'https://us-central1-kfp.pkg.dev/axel-argolis-1/kfp-registry/hello-world-pipeline/b12',
   'serviceAccount': '1023019892523-compute@developer.gserviceaccount.com'}}}

In [24]:
! curl -i -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" \
https://us-central1-aiplatform.googleapis.com/v1beta1/projects/axel-argolis-1/locations/us-central1/schedules/ \
-d "$RUNTIME_BODY_VAPS_API"

HTTP/2 200 
[1mcontent-type[0m: application/json; charset=UTF-8
[1mvary[0m: X-Origin
[1mvary[0m: Referer
[1mvary[0m: Origin,Accept-Encoding
[1mdate[0m: Wed, 22 Mar 2023 15:34:55 GMT
[1mserver[0m: ESF
[1mcache-control[0m: private
[1mx-xss-protection[0m: 0
[1mx-frame-options[0m: SAMEORIGIN
[1mx-content-type-options[0m: nosniff
[1maccept-ranges[0m: none

{
  "name": "projects/1023019892523/locations/us-central1/schedules/3192066973387194368",
  "displayName": "hello-world-pipeline-schedule-api",
  "startTime": "2023-03-22T15:34:55.336593Z",
  "state": "ACTIVE",
  "createTime": "2023-03-22T15:34:55.336593Z",
  "nextRunTime": "2023-03-22T15:35:00Z",
  "cron": "TZ=America/Los_Angeles */5 * * * *",
  "maxConcurrentRunCount": "1",
  "createPipelineJobRequest": {
    "parent": "projects/1023019892523/locations/us-central1",
    "pipelineJob": {
      "displayName": "hello-world-pipeline",
      "runtimeConfig": {
        "gcsOutputDirectory": "gs://axel-argolis-usc1-bucket

### Get a schedule

In [25]:
SCHEDULE_ID="3192066973387194368"

In [26]:
! curl -i -X GET -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" \
https://us-central1-aiplatform.googleapis.com/v1beta1/projects/1023019892523/locations/us-central1/schedules/$SCHEDULE_ID

HTTP/2 200 
[1mcontent-type[0m: application/json; charset=UTF-8
[1mvary[0m: X-Origin
[1mvary[0m: Referer
[1mvary[0m: Origin,Accept-Encoding
[1mdate[0m: Wed, 22 Mar 2023 15:35:39 GMT
[1mserver[0m: ESF
[1mcache-control[0m: private
[1mx-xss-protection[0m: 0
[1mx-frame-options[0m: SAMEORIGIN
[1mx-content-type-options[0m: nosniff
[1maccept-ranges[0m: none

{
  "name": "projects/1023019892523/locations/us-central1/schedules/3192066973387194368",
  "displayName": "hello-world-pipeline-schedule-api",
  "startTime": "2023-03-22T15:34:55.336593Z",
  "state": "ACTIVE",
  "createTime": "2023-03-22T15:34:55.336593Z",
  "nextRunTime": "2023-03-22T15:40:00Z",
  "cron": "TZ=America/Los_Angeles */5 * * * *",
  "maxConcurrentRunCount": "1",
  "createPipelineJobRequest": {
    "parent": "projects/1023019892523/locations/us-central1",
    "pipelineJob": {
      "displayName": "hello-world-pipeline",
      "runtimeConfig": {
        "gcsOutputDirectory": "gs://axel-argolis-usc1-bucket

### List schedules

In [32]:
! curl -i -X GET -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" \
https://us-central1-aiplatform.googleapis.com/v1beta1/projects/1023019892523/locations/us-central1/schedules

HTTP/2 200 
[1mcontent-type[0m: application/json; charset=UTF-8
[1mvary[0m: X-Origin
[1mvary[0m: Referer
[1mvary[0m: Origin,Accept-Encoding
[1mdate[0m: Wed, 22 Mar 2023 16:43:14 GMT
[1mserver[0m: ESF
[1mcache-control[0m: private
[1mx-xss-protection[0m: 0
[1mx-frame-options[0m: SAMEORIGIN
[1mx-content-type-options[0m: nosniff
[1maccept-ranges[0m: none

{}


### Pause a schedule

In [None]:
SCHEDULE_ID="446700788039811072"

In [None]:
! curl -i -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" \
https://us-central1-aiplatform.googleapis.com/v1beta1/projects/1023019892523/locations/us-central1/schedules/$SCHEDULE_ID:pause

HTTP/2 200 
[1mcontent-type[0m: application/json; charset=UTF-8
[1mvary[0m: X-Origin
[1mvary[0m: Referer
[1mvary[0m: Origin,Accept-Encoding
[1mdate[0m: Fri, 24 Feb 2023 18:53:03 GMT
[1mserver[0m: ESF
[1mcache-control[0m: private
[1mx-xss-protection[0m: 0
[1mx-frame-options[0m: SAMEORIGIN
[1mx-content-type-options[0m: nosniff
[1maccept-ranges[0m: none

{}


### Resume a paused schedule

In [None]:
SCHEDULE_ID="516788057240764416"

In [None]:
! curl -i -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" \
https://us-central1-aiplatform.googleapis.com/v1beta1/projects/1023019892523/locations/us-central1/schedules/$SCHEDULE_ID:resume

HTTP/2 200 
[1mcontent-type[0m: application/json; charset=UTF-8
[1mvary[0m: X-Origin
[1mvary[0m: Referer
[1mvary[0m: Origin,Accept-Encoding
[1mdate[0m: Fri, 03 Feb 2023 03:28:22 GMT
[1mserver[0m: ESF
[1mcache-control[0m: private
[1mx-xss-protection[0m: 0
[1mx-frame-options[0m: SAMEORIGIN
[1mx-content-type-options[0m: nosniff
[1maccept-ranges[0m: none

{}


### Delete a schedule

In [30]:
SCHEDULE_ID="3192066973387194368"

In [31]:
! curl -i -X DELETE -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" \
https://us-central1-aiplatform.googleapis.com/v1beta1/projects/1023019892523/locations/us-central1/schedules/$SCHEDULE_ID

HTTP/2 200 
[1mcontent-type[0m: application/json; charset=UTF-8
[1mvary[0m: X-Origin
[1mvary[0m: Referer
[1mvary[0m: Origin,Accept-Encoding
[1mdate[0m: Wed, 22 Mar 2023 16:43:09 GMT
[1mserver[0m: ESF
[1mcache-control[0m: private
[1mx-xss-protection[0m: 0
[1mx-frame-options[0m: SAMEORIGIN
[1mx-content-type-options[0m: nosniff
[1maccept-ranges[0m: none

{
  "name": "projects/1023019892523/locations/us-central1/operations/8571271275471699968",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.aiplatform.v1beta1.DeleteOperationMetadata",
    "genericMetadata": {
      "createTime": "2023-03-22T16:43:09.163867Z",
      "updateTime": "2023-03-22T16:43:09.163867Z"
    }
  },
  "done": true,
  "response": {
    "@type": "type.googleapis.com/google.protobuf.Empty"
  }
}


### List pipeline jobs created by a schedule

In [28]:
SCHEDULE_ID="3192066973387194368"

In [29]:
! curl -i -X GET -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" \
https://us-central1-aiplatform.googleapis.com/v1beta1/projects/1023019892523/locations/us-central1/pipelineJobs?filter=schedule_name=projects/1023019892523/locations/us-central1/schedules/$SCHEDULE_ID

HTTP/2 200 
[1mcontent-type[0m: application/json; charset=UTF-8
[1mvary[0m: X-Origin
[1mvary[0m: Referer
[1mvary[0m: Origin,Accept-Encoding
[1mdate[0m: Wed, 22 Mar 2023 15:37:42 GMT
[1mserver[0m: ESF
[1mcache-control[0m: private
[1mx-xss-protection[0m: 0
[1mx-frame-options[0m: SAMEORIGIN
[1mx-content-type-options[0m: nosniff
[1maccept-ranges[0m: none

{
  "pipelineJobs": [
    {
      "name": "projects/1023019892523/locations/us-central1/pipelineJobs/scheduled-pipeline-3192066973387194368-20230322083500553",
      "displayName": "hello-world-pipeline",
      "createTime": "2023-03-22T15:35:00.826573Z",
      "startTime": "2023-03-22T15:35:01.476431Z",
      "endTime": "2023-03-22T15:35:03.465279Z",
      "updateTime": "2023-03-22T15:35:03.465279Z",
      "pipelineSpec": {
        "deploymentConfig": {
          "@type": "type.googleapis.com/ml_pipelines.PipelineDeploymentConfig",
          "executors": {
            "exec-hello-world": {
              "container":