# Supporting event-triggered Managed Pipeline runs via Cloud Functions

## Introduction

This notebook shows an example of how to launch a Managed Pipelines job from a [**Cloud Functions**](https://cloud.google.com/functions/docs/) (GCF) function, where the Function run is triggered by addition of a file to a given [Cloud Storage](https://cloud.google.com/storage) (GCS) bucket.

In many cases, you wouldn't want to launch a new pipeline run for every new file added to a GCS bucket (e.g., suppose you were uploading a set of new data files).  Rather, it is often preferable to trigger a pipeline run after the upload of a *batch* of new data has completed.

So, this example uses an approach where the the 'trigger' bucket is different from the bucket used to store new files. A 'trigger file' contains the path to the new data (or other new info that needs processing); then the trigger file is uploaded once the new data upload has completed, and that upload triggers a run of the GCF function, which in turn launches the pipeline job.

The example includes use of the [**Secret Manager**](https://cloud.google.com/secret-manager/docs#docs) to store the API key needed to deploy the pipeline from the GCF function (it is bad practice to hardwire the key in the GCF definition).  **The use of the API key will not be necessary once Managed Pipelines is in Preview**, thus in future this part of the process won't be required.

The example uses a very simple (non-ML) pipeline in order to focus on the GCF setup; this pipeline does not actually do any data processing.

(A follow-on notebook will show an ML-oriented scenario in which the addition of new data triggers re-validation of the dataset).

## Setup

Before you run this notebook, ensure that your Google Cloud user account and project are granted access to Managed Pipelines Experimental. To be granted access to Managed Pipelines Experimental, fill out this [form](http://go/cloud-mlpipelines-signup) and let your account representative know you have requested access. 

This notebook is intended to be run on either one of:
* [AI Platform Notebooks](https://cloud.google.com/ai-platform-notebooks). See the "AI Platform Notebooks" section in the Experimental [User Guide](https://docs.google.com/document/d/1JXtowHwppgyghnj1N1CT73hwD1caKtWkLcm2_0qGBoI/edit?usp=sharing) for more detail on creating a notebook server instance.
* [Google Colab](https://colab.research.google.com/notebooks/intro.ipynb)


**To run this notebook on AI Platform Notebooks**, click on the **File** menu, then select "Download .ipynb".  Then, upload that notebook from your local machine to AI Platform Notebooks. (In the AI Platform Notebooks left panel, look for an icon of an arrow pointing up, to upload).

We'll first install some libraries and set up some variables.


Set `gcloud` to use your project.  **Edit the following cell before running it**.

In [None]:
PROJECT_ID = 'your-project-id'  # <---CHANGE THIS

In [None]:
!gcloud config set project {PROJECT_ID}

If you're running this notebook on colab, authenticate with your user account:

In [None]:
import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

-----------------

**If you're on AI Platform Notebooks**, authenticate with Google Cloud before running the next section, by running
```sh
gcloud auth login
```
**in the Terminal window** (which you can open via **File** > **New** in the menu). You only need to do this once per notebook instance.

### Install the KFP SDK and AI Platform Pipelines client library

For Managed Pipelines Experimental, you'll need to download a special version of the AI Platform client library.

In [None]:
!mkdir -p gcf

In [None]:
%cd gcf

In [None]:
!gsutil cp gs://cloud-aiplatform-pipelines/releases/latest/kfp-1.5.0rc5.tar.gz .
!gsutil cp gs://cloud-aiplatform-pipelines/releases/latest/aiplatform_pipelines_client-0.1.0.caip20210415-py3-none-any.whl .

Then, install the libraries and restart the kernel.

In [None]:
if 'google.colab' in sys.modules:
  USER_FLAG = ''
else:
  USER_FLAG = '--user'

In [None]:
!python3 -m pip install {USER_FLAG} kfp-1.5.0rc5.tar.gz --upgrade
!python3 -m pip install {USER_FLAG} aiplatform_pipelines_client-0.1.0.caip20210415-py3-none-any.whl --upgrade
!python3 -m pip install {USER_FLAG} google-cloud-secret-manager==2.0.0


In [None]:
# Automatically restart kernel after installs 
# (for this notebook, necessary for colab too. Ignore the pop-up warning that results.)
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

The KFP version should be >= 1.5.



In [None]:
# Check the KFP version
!python3 -c "import kfp; print('KFP version: {}'.format(kfp.__version__))"

If you're on colab, re-authorize after the kernel restart. **Edit the following cell for your project ID before running it.**

In [None]:
import sys
if 'google.colab' in sys.modules:
  PROJECT_ID = 'your-project-id'  # <---CHANGE THIS
  !gcloud config set project {PROJECT_ID}
  from google.colab import auth
  auth.authenticate_user()
  USER_FLAG = ''

In [None]:
%cd gcf

### Set some variables

**Before you run the next cell**, **edit it** to set variables for your project.  See the "Before you begin" section of the User Guide for information on creating your API key.  For `BUCKET_NAME`, enter the name of a Cloud Storage (GCS) bucket in your project.  Don't include the `gs://` prefix.

In [None]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

# Required Parameters
USER = 'your-user-name' # <---CHANGE THIS
BUCKET_NAME = 'your-bucket-name'  # <---CHANGE THIS
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(BUCKET_NAME, USER)

PROJECT_ID = 'your-project-id'  # <---CHANGE THIS
REGION = 'us-central1'
API_KEY = 'your-api-key'  # <---CHANGE THIS

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

### Give service account access to the Secret Manager

As mentioned above, in this example we'll use the Secret Manager to store the API key, necessary to launch a pipeline run from the GCF function.   
Note: The use of the API key will not be necessary once Managed Pipelines is in Preview, thus in future this part of the process won't be required.

#### GCF service account

We'll need to give the service account used by the GCF runtime access to defined secrets.  

In the Console's "IAM & Admin" panel, look for a service account named `<your-project-name>@appspot.gserviceaccount.com`.  Edit the permissions for that account to add the "**Secret Manager Secret Accessor**" role.

<a href="https://storage.googleapis.com/amy-jo/images/kfp-deploy/secret_role.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/kfp-deploy/secret_role.png" width="50%"/></a>

#### AI Notebook service account

If you're running this example on an AI Notebook, you'll also need to give the notebook instance's service account the "**Secret Manager Admin.**" role (in order to create secrets from the notebook environment).  Visit the Notebooks dashboard in the Cloud Console and mouse over "Service account" for the notebook instance you're using to identify the service account you're using.

<a href="https://storage.googleapis.com/amy-jo/images/kfp-deploy/secret_role2.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/kfp-deploy/secret_role2.png" width="50%"/></a>

(If you are a project Owner, then Secrets admin should work in a Colab notebook out of the box. However, if you have a different project role, you may need to be given Secret Manager Admin access first).



## Define a simple pipeline and create its job spec

Now we'll define a very simple pipeline. It takes one input parameter and has one step.

### Create a function-based pipeline component

We'll first create a component based on a very simple python function. It takes a string input parameter and returns that value as output.

In [None]:
from kfp import components

def hello_world(text: str):
    print(text)
    return text

components.func_to_container_op(hello_world,
      output_component_file='hw.yaml')


Next, we'll define a one-step pipeline that uses that component.
The pipeline takes an input parameter, and passes that parameter as an argument to the pipeline step.

In [None]:
from kfp.v2 import dsl
from kfp.v2 import compiler
from kfp import components


# Create a pipeline op from the component we defined above.
hw_op = components.load_component_from_file('./hw.yaml') # you can also use load_component_from_url

@dsl.pipeline(
  name='remote-deploy-v2',
  description='A simple intro pipeline'
)
def pipeline_parameter_to_consumer(text: str='hi there'):
    '''Pipeline that passes small pipeline parameter string to consumer op'''
    consume_task = hw_op(text) # Passing pipeline parameter as argument to consumer op


Compile the pipeline:

In [None]:
JOB_SPEC = 'pipeline_job.json'

In [None]:
compiler.Compiler().compile(pipeline_func=pipeline_parameter_to_consumer, 
                            output_path=JOB_SPEC)

## Do a test submission of the pipeline job

Before defining a GCF function to deploying the pipeline job spec, let's first test it directly.

Here, we'll create an API client using the API key you generated.    
Then, we'll submit the pipeline job by passing the compiled spec to the `create_run_from_job_spec()` method. Note that we're passing a `parameter_values` dict that specifies the pipeline input parameters we want to use.

In [None]:
from aiplatform.pipelines import client

api_client = client.Client(project_id=PROJECT_ID, region=REGION, api_key=API_KEY)

response = api_client.create_run_from_job_spec(
          job_spec_path=JOB_SPEC,
          pipeline_root=PIPELINE_ROOT, 
          parameter_values={'text': 'Hello world!'})

View the running pipeline in the Console by clicking on the generated link above.

## Create a Secret Manager key for your `API_KEY`

Next, we'll use the Secret Manager to store your API KEY, so that we can access it in our GCF function.

If you're using an AI Platform Notebook, then before you run this section, check that you've updated the notebook's service account roles to include "Secret Manager Admin." as indicated in the "Setup" section.

Make sure that the `secretmanager` API has been enabled for your project:

In [None]:
!gcloud services enable secretmanager.googleapis.com

First, define a key name (you can change this string from `apk1` if you like).

In [None]:
SECRET_KEY = 'apk1'

Next, we'll define some utility functions to create and access a secret.

In [None]:
from google.cloud import secretmanager

def create_secret(client, secret_id):
    # Build the resource name of the parent project.
    parent = f"projects/{PROJECT_ID}"
    # Build a dict of settings for the secret
    secret = {'replication': {'automatic': {}}}
    # Create the secret
    response = client.create_secret(secret_id=secret_id, parent=parent, secret=secret)
    # Print the new secret name.
    print(f'Created secret: {response.name}') 

In [None]:
def add_secret_version(client, secret_id, payload):
    # Build the resource name of the parent secret.
    parent = f"projects/{PROJECT_ID}/secrets/{secret_id}"
    # Convert the string payload into a bytes. This step can be omitted if you
    # pass in bytes instead of a str for the payload argument.
    payload = payload.encode('UTF-8')
    # Add the secret version.
    response = client.add_secret_version(parent=parent, payload={'data': payload})
    # Print the new secret version name.
    print(f'Added secret version: {response.name}')   

In [None]:
def access_secret_version(client, secret_id, version_id="latest"):
    # Build the resource name of the secret version.
    name = f"projects/{PROJECT_ID}/secrets/{secret_id}/versions/{version_id}"
    # Access the secret version.
    response = client.access_secret_version(name=name)
    # Return the decoded payload.
    return response.payload.data.decode('UTF-8')

Create the secret manager client:

In [None]:
# create the Secret Manager client.
sm_client = secretmanager.SecretManagerServiceClient()

Create the key:

In [None]:
# if you've already created the key, you don't need to run this again.
# If you do, you can ignore the 'already exists' error.
create_secret(sm_client, SECRET_KEY)

... and set the key to a value, in this case your `API_KEY` string:

In [None]:
add_secret_version(sm_client, SECRET_KEY, API_KEY)

Test accessing the secret's value, to make sure it looks correct:

In [None]:
access_secret_version(sm_client, SECRET_KEY)

## Define and deploy the Cloud Function

Now we're ready to define and deploy the cloud function.  
First ensure that the Cloud Functions and Cloud Build APIs are enabled for your project:

In [None]:
!gcloud services enable cloudfunctions.googleapis.com
!gcloud services enable cloudbuild.googleapis.com


Next, we'll write out a file that specifies the required installations.  Note that one of the lines installs a local file (the `aiplatform_pipelines_client` whl); that file will be uploaded as part of the GCF deployment.

In [None]:
%%writefile requirements.txt
kfp==1.4
kfp-pipeline-spec==0.1.3.1
google-cloud-secret-manager==2.0.0
aiplatform_pipelines_client-0.1.0.caip20201123-py3-none-any.whl


Now we'll define the GCF function code in `main.py`.  

Note that the function is grabbing some values from environment variables.  We'll show how those are set below.  One of the env vars holds the Secret key name (from which we will grab the API key).  Another holds the name of the pipeline job spec file (which we created above when we compiled the pipeline). This file will be uploaded to read-only storage as part of the GCF function deployment, and the code will be able to access it.

The `gcs_update` function will be called on the addition (or modification) of a file in the specified trigger bucket.

In [None]:
%%writefile main.py

import logging
import os

from kfp.v2 import dsl
from kfp.v2 import compiler
from kfp import components

from aiplatform.pipelines import client

from google.cloud import secretmanager
from google.cloud import storage

PIPELINE_PROJECT_ID = os.getenv('PIPELINE_PROJECT_ID')
REGION = 'us-central1'
SECRET_KEY = os.getenv('SECRET_KEY')
PIPELINE_ROOT = os.getenv('PIPELINE_ROOT') # bucket under PIPELINE_PROJECT_ID
PIPELINE_JOB_SPEC = os.getenv('JOB_SPEC')


def access_secret_version(secret_id, version_id="latest"):
    """Access a Secret's value, and return as string.
    """
    # Create the Secret Manager client.
    client = secretmanager.SecretManagerServiceClient()
    # Build the resource name of the secret version.
    name = f"projects/{PIPELINE_PROJECT_ID}/secrets/{secret_id}/versions/{version_id}"
    # Access the secret version.
    response = client.access_secret_version(name=name)
    # Return the decoded payload.
    return response.payload.data.decode('UTF-8')


def read_trigger_file(data, context, storage_client):
    """Read the contents of the trigger file and return as string.
    """
    print('Event ID: {}'.format(context.event_id))
    print('Event type: {}'.format(context.event_type))
    print('Data: {}'.format(data))
    print('Bucket: {}'.format(data['bucket']))
    print('File: {}'.format(data['name']))
    print('Metageneration: {}'.format(data['metageneration']))
    print('Created: {}'.format(data['timeCreated']))
    print('Updated: {}'.format(data['updated']))

    bucket = storage_client.get_bucket(data['bucket'])
    blob = bucket.get_blob(data['name'])
    trigger_file_string = blob.download_as_string().strip()
    logging.info('trigger file contents: {}'.format(trigger_file_string))
    return trigger_file_string.decode('UTF-8')


def gcs_update(data, context):
    """Background Cloud Function to be triggered by Cloud Storage.
    """
    logging.getLogger().setLevel(logging.INFO)

    storage_client = storage.Client()
    # get the contents of the trigger file
    trigger_file_string = read_trigger_file(data, context, storage_client)

    # then run the pipeline using the given job spec, passing the trigger file contents
    # as a parameter value.
    logging.info('running pipeline...')
    API_KEY = access_secret_version(SECRET_KEY)  # grab the API_KEY from the Secrets Manager
    # create the client object
    api_client = client.Client(project_id=PIPELINE_PROJECT_ID, region=REGION, api_key=API_KEY)
    # deploy the pipeline run
    response = api_client.create_run_from_job_spec(
              job_spec_path=PIPELINE_JOB_SPEC,
              # pipeline_root=PIPELINE_ROOT,  # optional- use if want to override compile-time value
              # example of passing info on the blob that triggered the GCF as a pipeline param...
              parameter_values={'text': trigger_file_string})
    logging.info('job response: %s', response)


Next, identify a GCS bucket in your project to use for the 'trigger bucket'.  This bucket must already exist.  You can create a new bucket via the [Cloud Console](https://console.cloud.google.com/storage/browser) (or via the `gsutil mb` command-line utility).

**Edit the following cell before running it.**

In [None]:
# Change this to your bucket name.  Do not include the 'gs://' prefix.
TRIGGER_BUCKET = 'your-bucket-name'

In [None]:
# check that you're in the gcf subdirectory
!pwd

Now we're ready to deploy the GCF function code.  We specify to use the `gcs_update` definition (in the file `main.py`, which is implicit), and to use the value of the `TRIGGER_BUCKET` var as the trigger bucket.  The trigger event is specified to be the addition or modification of a GCS file in that bucket.

Note that we're setting several environment variables as part of the deployment, to which the function code will have access.  These include the name of the pipeline job spec file, as well as the name of the Secret key that holds the API key string. So, ensure that all these vars are set properly— which they should be if you've run all the notebook cells above.

The deployment will include the files in the current directory (`gcf`), including the pipeline job spec file and the .whl file.  The GCF function will have read-only access to the directory contents.


In [None]:
!gcloud functions deploy gcs_update --set-env-vars \
  SECRET_KEY={SECRET_KEY},PIPELINE_PROJECT_ID={PROJECT_ID},PIPELINE_ROOT={PIPELINE_ROOT},JOB_SPEC={JOB_SPEC} \
  --runtime python37 --trigger-resource {TRIGGER_BUCKET} --trigger-event google.storage.object.finalize


Once deployment has completed, we're ready to test triggering the GCF function.  To do this, we'll upload a new file to the `TRIGGER_BUCKET`.

We'll create a file that contains a (dummy) path to 'new data' to be processed, and upload it.  The intent is that for a real pipeline, this file would contain information about the location of some new data to process.

Uploading this file will trigger a run of the GCF function we defined.

The GCF function will read the contents of the trigger file, and kick off a pipeline run, passing that string— the contents of the 'trigger file'— as a parameter to the pipeline. 
(Our simple example pipeline won't do anything but print that information, of course).



In [None]:
!echo {PIPELINE_ROOT}/newdata > temp.txt

Upload the newly created file to the `TRIGGER_BUCKET`:

In [None]:
!gsutil cp temp.txt gs://{TRIGGER_BUCKET}

The upload will trigger a run of the GCF function, which in turn will trigger a pipeline run.  

First, view the GCF logs via [Cloud Logging](https://console.cloud.google.com/logs/viewer?resource=cloud_function) in the Console.
You should see indication of the trigger file being read, and then the Managed Pipeline run initiated.

Then, you can view the triggered Managed Pipeline run itself in the Console as well.


-----------------------------
Copyright 2020 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.