# Event-triggered Kubeflow Pipeline runs, and using TFDV to detect data drift






## Introduction

With ML workflows, it is often insufficient to train and deploy a given model just once.  Even if the model has desired accuracy initially, this can change if the data used for making prediction requests becomes— perhaps over time— sufficiently different from the data used to originally train the model.  

This notebook shows how to build a [Kubeflow Pipeline](https://www.kubeflow.org/docs/pipelines/) that checks for statistical *drift* across successive versions of a dataset and uses that information to make a decision on whether to (re)train a model; and how to configure event-driven deployment of pipeline jobs when new data arrives.

The example builds on a [previous example](https://github.com/amygdala/code-snippets/blob/master/ml/kubeflow-pipelines/keras_tuner/README.md) and introduces two primary new concepts:

- it demonstrates use of the [TensorFlow Data Validation (TFDV)](https://www.tensorflow.org/tfx/guide/tfdv) library to build pipeline *components* that derive dataset **statistics** and detect **drift** between older and newer datasets, and shows how to use drift information to decide whether to retrain a model on newer data.
- it shows how to support **event-triggered** launch of KFP Pipelines runs from a [**Cloud Functions**](https://cloud.google.com/functions/docs/) (GCF) function, where the Function run is triggered by addition of a file to a given [Cloud Storage](https://cloud.google.com/storage) (GCS) bucket.

Familiarity with the previous example is not necessary for this tutorial.


The machine learning task uses a tabular dataset that joins London bike rental information with weather data, and train a Keras model to predict rental `duration`. 
See [this](https://amygdala.github.io/gcp_blog/ml/kfp/kubeflow/keras/tensorflow/hp_tuning/2020/10/19/keras_tuner.html) and [this](https://amygdala.github.io/gcp_blog/ml/kfp/mlops/keras/hp_tuning/2020/10/26/metrics_eval_component.html) blog post and associated [README](https://github.com/amygdala/code-snippets/blob/master/ml/kubeflow-pipelines/keras_tuner/README.md) for more background on the dataset and model architecture.



## Setup


This notebook is intended to be run on either one of:
* [AI Platform Notebooks](https://cloud.google.com/ai-platform-notebooks). See the "AI Platform Notebooks" section in the Experimental [User Guide](https://docs.google.com/document/d/1JXtowHwppgyghnj1N1CT73hwD1caKtWkLcm2_0qGBoI/edit?usp=sharing) for more detail on creating a notebook server instance.
* [Google Colab](https://colab.research.google.com/notebooks/intro.ipynb)


This example requires a [Google Cloud Platform (GCP)](https://cloud.google.com/) account and project, ideally with quota for using GPUs.

The example also **requires you to have installed AI Platform Pipelines (Hosted Kubeflow Pipelines)** as done from this panel of the Cloud Console: https://console.cloud.google.com/ai-platform/pipelines/, with a few additional configurations once installation is complete.   
We'll do that first in the next section, then return to the notebook to install some libraries and set up some variables.


### AI Pipelines (Hosted Kubeflow Pipelines) installation and setup

(**See [this README](https://github.com/amygdala/code-snippets/blob/master/ml/kubeflow-pipelines/keras_tuner/README.md#1-create-a-cloud-ai-platform-pipelines-installation) for a more detailed walkthrough of this setup process**.)

Install AI Platform Pipelines (Hosted KFP) as described in the [documentation](https://cloud.google.com/ai-platform/pipelines/docs). Visit this panel in the Cloud Console to do the installation: https://console.cloud.google.com/ai-platform/pipelines/.     
During the installation process, select "**Create a new cluster**", and **check the box** that says "Allow access to the following Cloud APIs: https://www.googleapis.com/auth/cloud-platform".

The installation spins up a [Google Kubernetes Engine (GKE)](https://cloud.google.com/kubernetes-engine) cluster and installs Kubeflow Pipelines onto that cluster.  
For this example to run correctly, **some additional configuration needs to be done after the installation has completed**.  

First, set up `kubectl` to use the credentials of your new GKE cluster. An easy way to do this is to click on the name of the cluster from the [Pipelines listing in the Cloud Console](https://console.cloud.google.com/ai-platform/pipelines/), then from the cluster details page, click the "Connect" button at the top of the page.  This will display a command that you can run to configure `kubectl` command-line access. It will look like:
```
gcloud container clusters get-credentials <cluster-name> --zone <zone> --project <project-id>
```

You should be able to run that command and the following ones in the notebook Terminal, or from the [Cloud Shell](https://cloud.google.com/shell/), or locally if you have the [`gcloud` SDK](https://cloud.google.com/sdk/docs/install) installed.

Once you've connected to your new cluster, run:

```sh
kubectl create clusterrolebinding sa-admin --clusterrole=cluster-admin --serviceaccount=kubeflow:pipeline-runner
```

(This gives the pipeline steps permissions to create new cluster resources).

Next, define a `daemonset` to install the nvidia driver on any GPU-enabled cluster nodes:

```sh
kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/nvidia-driver-installer/cos/daemonset-preloaded.yaml
```

Finally, set up a **GPU node pool**, which is used (by default) for the pipeline's training step.  **Edit the following to use your values**:

```sh
gcloud container node-pools create gpu-pool2 \
    --cluster=<your-cluster-name> \
    --zone <your-cluster-zone> \
    --enable-autoscaling --max-nodes=3 --min-nodes=0 \
    --machine-type n1-highmem-8 \
    --scopes cloud-platform --verbosity error \
    --accelerator=type=nvidia-tesla-k80,count=2
```

You may need to increase your k80 quota before setting up the node pool. (If you don't want to use a GPU node pool, then later in the notebook you can alter the pipeline definition so that it is not required.)



### Set some variables and install the KFP SDK

Now we'll return to the notebook config.


Set `gcloud` to use your project.  **Edit the following cell before running it**.

In [None]:
PROJECT_ID = 'your-project-id'  # <---CHANGE THIS

In [None]:
!gcloud config set project {PROJECT_ID}

If you're running this notebook on colab, authenticate with your user account:

In [None]:
import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Then, install the KFP SDK and (on AI Platform Notebooks) restart the kernel.

In [None]:
import sys
if 'google.colab' in sys.modules:
  USER_FLAG = ''
else:
  USER_FLAG = '--user'

In [None]:
!python3 -m pip install {USER_FLAG} -U kfp


In [None]:
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

The KFP version should be >= 1.4



In [None]:
# Check the KFP version
!python3 -c "import kfp; print('KFP version: {}'.format(kfp.__version__))"

### Create and change to a new subdirectory

We'll work in a `gcf` subdirectory for this example.

In [None]:
!mkdir -p gcf

In [None]:
%cd gcf

### Set some variables

**Before you run the next cell**, **edit it** to set variables for your project.  

For `WORKING_DIR`, enter the name of a Cloud Storage (GCS) path in your project. Include the `gs://` prefix. **The bucket must already exist**.  You can create a new bucket via the [Cloud Console](https://console.cloud.google.com/storage/browser) (or via the `gsutil mb` command-line utility).

In [None]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

# Required Parameters
WORKING_DIR = 'gs://your-gcs-path'  # <---CHANGE THIS

PROJECT_ID = 'your-project-id'  # <---CHANGE THIS
REGION = 'us-central1' # <-----CHANGE THIS AS NECESSARY

## Define and compile a pipeline

Now we're set up to define, compile, and run a pipeline.  We'll use as a starting point the pipeline described [here](https://amygdala.github.io/gcp_blog/ml/kfp/mlops/keras/hp_tuning/2020/10/26/metrics_eval_component.html), but— for simplicity— without the Keras tuner part.

That is, we'll use the training, eval, and serving components from that pipeline, and add two [TFDV](https://www.tensorflow.org/tfx/guide/tfdv)-related components— one to derive dataset statistics, and the other to detect drift between older and newer datasets using the derived statistical information.



### Create the TFDV pipeline components

We'll define both TFDV pipeline *components* as ['lightweight' Python-function-based components](https://www.kubeflow.org/docs/pipelines/sdk/python-function-components/). For each component, we define a function, then call `kfp.components.func_to_container_op()` on that function to build a reusable component in `.yaml` format. 

For these components, we need to specify a base container image that will run the function.  We'll use one that has the TFDV libraries installed (its Dockerfile is [here](https://github.com/amygdala/code-snippets/blob/keras_tuner3/ml/kubeflow-pipelines/keras_tuner/components/tfdv/Dockerfile)).

#### Component to generate stats on the dataset

This component uses TFDV to generate statistics on a given dataset.

> Note: For this example, our training data is in GCS, in CSV-formatted files.  So, we can take advantage of TFDV’s ability to process CSV files.  The TFDV libraries can also process files in `TFRecords` format. 

It uses a [Beam](https://beam.apache.org/) pipeline— not to be confused with KFP Pipelines— to do this. Depending upon configuration, the component can use either the Direct (local) runner or the [Dataflow](https://cloud.google.com/dataflow#section-5) runner. 

Running the Beam pipeline on Dataflow rather than locally can make sense with large datasets. (If you use Dataflow, first ensure that the Datflow API is enabled for your GCP project).

We'll first define the python function:

In [None]:
from typing import NamedTuple

def generate_tfdv_stats(input_data: str, output_path: str, job_name: str, use_dataflow: str,
                        project_id: str, region:str, gcs_temp_location: str, gcs_staging_location: str,
                        whl_location: str = '', requirements_file: str = 'requirements.txt'
) -> NamedTuple('Outputs', [('stats_path', str)]):

  import logging
  import time

  import tensorflow_data_validation as tfdv
  import tensorflow_data_validation.statistics.stats_impl
  from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions

  logging.getLogger().setLevel(logging.INFO)
  logging.info("output path: %s", output_path)
  logging.info("Building pipeline options")
  # Create and set your PipelineOptions.
  options = PipelineOptions()

  if use_dataflow == 'true':
    logging.info("using Dataflow")
    if not whl_location:
      logging.warning('tfdv whl file required with dataflow runner.')
      exit(1)
    # For Cloud execution, set the Cloud Platform project, job_name,
    # staging location, temp_location and specify DataflowRunner.
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = project_id
    google_cloud_options.job_name = '{}-{}'.format(job_name, str(int(time.time())))
    google_cloud_options.staging_location = gcs_staging_location
    google_cloud_options.temp_location = gcs_temp_location
    google_cloud_options.region = region
    options.view_as(StandardOptions).runner = 'DataflowRunner'

    setup_options = options.view_as(SetupOptions)
    # PATH_TO_WHL_FILE should point to the downloaded tfdv wheel file.
    setup_options.extra_packages = [whl_location]
    setup_options.requirements_file = 'requirements.txt'

  tfdv.generate_statistics_from_csv(
    data_location=input_data, output_path=output_path,
    pipeline_options=options)

  return (output_path, )

This component outputs the path to the generated stats file. 

When we define the pipeline later in the lab, we'll see this output used as one of the inputs to the 'drift detection' component below.

Next, we'll create the pipeline component from the function definition:

In [None]:
import kfp
kfp.components.func_to_container_op(generate_tfdv_stats,
    output_component_file='tfdv_component.yaml', base_image='gcr.io/google-samples/tfdv-tests:v1')

#### Detect dataset 'drift'

Detect drift on the `duration` field between two sets of derived stats from two datasets.  See the [TFDV docs](https://www.tensorflow.org/tfx/data_validation/get_started#checking_data_skew_and_drift) for more detail.  

The component outputs a string indicating whether model drift was detected (it also returns 'true' if no comparison stats file was provided).  

When we define the pipeline later in the lab, we'll see this output used in a *conditional* expression, to determine whether or not model training should occur.


In [None]:
from typing import NamedTuple

def tfdv_detect_drift(
    stats_older_path: str, stats_new_path: str
) -> NamedTuple('Outputs', [('drift', str)]):

  import logging
  import time

  import tensorflow_data_validation as tfdv
  import tensorflow_data_validation.statistics.stats_impl

  logging.getLogger().setLevel(logging.INFO)
  logging.info('stats_older_path: %s', stats_older_path)
  logging.info('stats_new_path: %s', stats_new_path)

  # if there are no older stats to compare with, just return 'true'
  if stats_older_path == 'none':
    return ('true', )

  stats1 = tfdv.load_statistics(stats_older_path)
  stats2 = tfdv.load_statistics(stats_new_path)

  schema1 = tfdv.infer_schema(statistics=stats1)
  tfdv.get_feature(schema1, 'duration').drift_comparator.jensen_shannon_divergence.threshold = 0.01
  drift_anomalies = tfdv.validate_statistics(
      statistics=stats2, schema=schema1, previous_statistics=stats1)
  logging.info('drift analysis results: %s', drift_anomalies.drift_skew_info)

  from google.protobuf.json_format import MessageToDict
  d = MessageToDict(drift_anomalies)
  val = d['driftSkewInfo'][0]['driftMeasurements'][0]['value']
  thresh = d['driftSkewInfo'][0]['driftMeasurements'][0]['threshold']
  logging.info('value %s and threshold %s', val, thresh)
  res = 'true'
  if val < thresh:
    res = 'false'
  logging.info('train decision: %s', res)
  return (res, )


Create the component from the function definition:

In [None]:
import kfp
kfp.components.func_to_container_op(tfdv_detect_drift,
    output_component_file='tfdv_drift_component.yaml', base_image='gcr.io/google-samples/tfdv-tests:v1')

### Define the pipeline

Now we've defined all the components we need, and are ready to define the pipeline, using the Kubeflow Pipelines (KFP) SDK. 




#### Define the pipeline ops from components

We'll first define the pipeline ops we need from component definitions.

In addition to the TFDV-based components that we just built, we'll use some other components that have already been created. These components train a model; perform a simple evaluation using model metrics; and deploy a model using TF-Serving.   
See [this example](https://github.com/amygdala/code-snippets/blob/master/ml/kubeflow-pipelines/keras_tuner/README.md) for more information on the non-TFDV components.

In [None]:
import kfp.dsl as dsl
import kfp.components as comp

# pre-existing components
train_op = comp.load_component_from_url(
  'https://raw.githubusercontent.com/amygdala/code-snippets/master/ml/kubeflow-pipelines/keras_tuner/components/train_component.yaml'
  )
serve_op = comp.load_component_from_url(
  'https://raw.githubusercontent.com/amygdala/code-snippets/master/ml/kubeflow-pipelines/keras_tuner/components/serve_component.yaml'
  )

tb_op = comp.load_component_from_url(
  'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/tensorflow/tensorboard/prepare_tensorboard/component.yaml'
  )

eval_metrics_op = comp.load_component_from_url(
  'https://raw.githubusercontent.com/amygdala/code-snippets/master/ml/kubeflow-pipelines/keras_tuner/components/eval_metrics_component.yaml'
)

# our new TFDV components
tfdv_op = comp.load_component_from_file(
  'tfdv_component.yaml'
  )
tfdv_drift_op = comp.load_component_from_file(
  'tfdv_drift_component.yaml'
  )    


#### Define the pipeline using the defined ops

Next, we'll define the pipeline itself, using the ops above.

We'll specify two steps based on the TFDV stats-generation op— one run on the training data, and one run on the test data. The drift detection step consumes the stats generated for the training data. (The stats for the test data are not used by the downstream steps).

Training is conditional on the results of the drift detection.

Serving is conditional on the results of the model metrics at the end of training.

**If you did not set up a GPU node pool on your GKE cluster, comment out the following line** in the definition below.  If you leave it uncommented, the 'train' step must be placed on a GPU-enabled node with at least two available GPUs (and will show status "Pending" until it can be placed— so if there are no GPU-enabled nodes in the cluster, it will wait 'forever').  
```
    train.set_gpu_limit(2)
```

In [None]:
@dsl.pipeline(
  name='bikes_weather',
  description='Model bike rental duration given weather'
)
def bikes_weather_tfdv( 
  train_epochs: int = 3,
  working_dir: str = 'gs://YOUR/GCS/PATH',  
  data_dir: str = 'gs://aju-dev-demos-codelabs/bikes_weather/', # currently, requires trailing slash
  steps_per_epoch: int = -1 ,  # if -1, don't override normal calcs based on dataset size
  hptune_params: str = '[{"num_hidden_layers": %s, "learning_rate": %s, "hidden_size": %s}]' % (3, 1e-2, 64),
  thresholds: str = '{"root_mean_squared_error": 2000}',
  # tfdv-related
  project_id: str = 'YOUR-PROJECT-ID',
  region: str = 'us-central1',
  requirements_file: str = 'requirements.txt',
  job_name: str = 'testx',
  whl_location: str = 'tensorflow_data_validation-0.26.0-cp37-cp37m-manylinux2010_x86_64.whl',
  use_dataflow: str = '',
  stats_older_path: str = 'gs://aju-dev-demos-codelabs/bikes_weather_chronological/evaltrain1.pb'
  ):


  # create TensorBoard viz for the parent directory of all training runs, so that we can
  # compare them.
  tb_viz = tb_op(
    log_dir_uri='%s/%s' % (working_dir, dsl.RUN_ID_PLACEHOLDER)
  )

  tfdv1 = tfdv_op(  # TFDV stats for the test data
    input_data='%stest-*.csv' % (data_dir,),
    output_path='%s/tfdv_expers/%s/eval/evaltest.pb' % (working_dir, dsl.RUN_ID_PLACEHOLDER),
    job_name='%s-1' % (job_name,),
    use_dataflow=use_dataflow,
    project_id=project_id, region=region,
    gcs_temp_location='%s/tfdv_expers/tmp' % (working_dir,), 
    gcs_staging_location='%s/tfdv_expers' % (working_dir,), 
    whl_location=whl_location, requirements_file=requirements_file
    )
  tfdv2 = tfdv_op(  # TFDV stats for the training data
    input_data='%strain-*.csv' % (data_dir,),
    # output_path='%s/%s/eval/evaltrain.pb' % (output_path, dsl.RUN_ID_PLACEHOLDER),
    output_path='%s/tfdv_expers/%s/eval/evaltrain.pb' % (working_dir, dsl.RUN_ID_PLACEHOLDER),
    job_name='%s-2' % (job_name,),
    use_dataflow=use_dataflow,
    project_id=project_id, region=region,
    gcs_temp_location='%s/tfdv_expers/tmp' % (working_dir,), 
    gcs_staging_location='%s/tfdv_expers' % (working_dir,), 
    whl_location=whl_location, requirements_file=requirements_file
    )

  # compare generated training data stats with stats from a previous version
  # of the training data set.
  tfdv_drift = tfdv_drift_op(stats_older_path, tfdv2.outputs['stats_path'])
  
  # proceed with training if drift is detected (or if no previous stats were provided)
  with dsl.Condition(tfdv_drift.outputs['drift'] == 'true'):  

    train = train_op(
      data_dir=data_dir,
      workdir='%s/%s' % (tb_viz.outputs['log_dir_uri'], 0),
      tb_dir=tb_viz.outputs['log_dir_uri'],
      epochs=train_epochs, steps_per_epoch=steps_per_epoch,
      hp_idx=0,
      hptune_results=hptune_params
      )

    eval_metrics = eval_metrics_op(
      thresholds=thresholds,
      metrics=train.outputs['metrics_output_path'],
      )

    with dsl.Condition(eval_metrics.outputs['deploy'] == 'deploy'):
      serve = serve_op(
        model_path=train.outputs['train_output_path'],
        model_name='bikesw',
        namespace='default'
        )
    train.set_gpu_limit(2)  # comment out this line if you did not set up a GPU node pool


In the pipeline definition, you can see that the output of the `tfdv2` step (which generates stats on the training dataset) is consumed as an input by the `tfdv_drift` step.  Then, the output of that drift detection step is used in a conditional expression to decide whether or not model (re)training is necessary.

(See [this example](https://github.com/amygdala/code-snippets/blob/master/ml/kubeflow-pipelines/keras_tuner/README.md) for more information on the non-TFDV parts of the pipeline.  You'll notice a second conditional expression, related to deciding whether or not a trained model should be deployed, based on its eval metrics).

### Compile the pipeline and (optionally) try a test run

Now we're ready to compiile the pipeline, and try a test run of the pipeline to make sure that it is working correctly, before we set up the Cloud Function-based event triggering.   



In [None]:
PIPELINE_SPEC = 'bw_train_tfdv.tar.gz'

Compile the pipeline:

In [None]:
import kfp.compiler as compiler
compiler.Compiler().compile(bikes_weather_tfdv, PIPELINE_SPEC)

To run the compiled pipeline from a notebook, we need to create a client object to connect to the pipelines installation on the GKE cluster.  To do this, we need to know the host URI. 

The host URI string can be obtained by clicking on the 'settings' gear for the relevant installation in the Cloud Console [dashboard](https://console.cloud.google.com/ai-platform/pipelines/clusters) that lists the pipelines installations, then copying the client connection info from the popup window.

<a href="https://storage.googleapis.com/amy-jo/images/kf-pls/kfp_host_uri.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/kf-pls/kfp_host_uri.png" width="80%"/></a>

**Edit the following cell with your host URI before running it**.

In [None]:
import kfp
PIPELINE_HOST = 'your-host-uri.pipelines.googleusercontent.com'  # CHANGE THIS

In [None]:
client = kfp.Client(host=PIPELINE_HOST)

Test the client connection:

In [None]:
client.list_experiments()

Create a KFP 'Experiment' under which to organize the pipeline run:

In [None]:
exp = client.create_experiment(name='bw_expers')
EXP_ID = exp.id
print(EXP_ID)

Upload the pipeline and get its ID:

In [None]:
import time
ts = str(int(time.time()))
res = client.upload_pipeline(pipeline_package_path=PIPELINE_SPEC, 
                                     pipeline_name='bw_metrics_tfdv_{}'.format(ts))

In [None]:
PIPELINE_ID = res.id
print(PIPELINE_ID)

If you view the uploaded pipeline details in the Kubeflow Pipelines dashboard, it should look like the following. You can see that the model training has a dependency on the training set drift analysis  (whereas TFDV stats for the test set are generated but not used further in this pipeline).

<a href="https://storage.googleapis.com/amy-jo/images/kf-pls/bw_tfdv_pipeline.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/kf-pls/bw_tfdv_pipeline.png" width="50%"/></a>

Optionally, directly run the pipeline to test that it's working correctly.  In the cell below we're passing the pipeline ID, but we could alternately pass the compiled pipeline archive file.

For the `data_dir` pipeline argument we're using the second half (chronologically) of the "bike rental" dataset, and as the `stats_older_path` argument we're using a statistics file previously generated from the first half of the dataset.

In [None]:
run = client.run_pipeline(EXP_ID, 'bw_tfdv_train', pipeline_id=res.id,
                          params={'working_dir': WORKING_DIR,
                                  'project_id': PROJECT_ID,
                                  'use_dataflow': 'true', 
                                  'data_dir': 'gs://aju-dev-demos-codelabs/bikes_weather_chronological/ds2/',
                                  'stats_older_path': 'gs://aju-dev-demos-codelabs/bikes_weather_chronological/evaltrain1.pb'})

View the running pipeline in the Console by clicking on the generated link above.  It will look something like this. (In this screenshot, the TFDV analysis steps have completed and the training step is in progress).

<a href="https://storage.googleapis.com/amy-jo/images/kf-pls/bw_tfdv_pipeline_run.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/kf-pls/bw_tfdv_pipeline_run.png" width="70%"/></a>

This pipeline will take a while to run. In the meantime, proceed to the next section below.

## Define and deploy a Cloud Function to launch a pipeline run

Now we're ready to define and deploy a  [**Cloud Functions**](https://cloud.google.com/functions/docs/) (GCF) function that launches a run of this pipeline when new training data becomes available. 

In most cases, you don’t want to launch a new pipeline run for every new file added to a dataset— since typically, the dataset will be comprised of a collection of files, to which you will add/update multiple files in a batch. So, you don’t want the ‘trigger bucket’ to be the dataset bucket (if the data lives on GCS)— that will trigger unwanted pipeline runs.
Instead, we’ll trigger a pipeline run after the upload of a *batch* of new data has completed.

To do this, we’ll use an approach where the the 'trigger' bucket is different from the bucket used to store dataset files. ‘Trigger files’ uploaded to that bucket are expected to contain the path of the updated dataset as well as the path to the data stats file generated for the last model trained. 
A trigger file is uploaded once the new data upload has completed, and that upload triggers a run of the GCF function, which in turn reads info on the new data path from the trigger file and launches the pipeline job.


### Define the Cloud Function

First ensure that the Cloud Functions and Cloud Build APIs are enabled for your project:

In [None]:
!gcloud services enable cloudfunctions.googleapis.com
!gcloud services enable cloudbuild.googleapis.com


Next, we'll write out a file that specifies the library installations required by the GCF function.

In [None]:
%%writefile requirements.txt
kfp==1.4

Now we'll define the GCF function code in `main.py`.  

Note that the function is grabbing some values from environment variables.  We'll show how those are set below.

The `gcs_update` function will be called on the addition (or modification) of a file in the specified trigger bucket.

In [None]:
%%writefile main.py

import logging
import os

import kfp
from kfp import dsl
from kfp import compiler
from kfp import components

from google.cloud import storage

PIPELINE_PROJECT_ID = os.getenv('PIPELINE_PROJECT_ID')
REGION = 'us-central1'
WORKING_DIR = os.getenv('WORKING_DIR')
PIPELINE_SPEC = os.getenv('PIPELINE_SPEC')
PIPELINE_ID = os.getenv('PIPELINE_ID')
PIPELINE_HOST = os.getenv('PIPELINE_HOST')
USE_DATAFLOW = os.getenv('USE_DATAFLOW')
EXP_ID = os.getenv('EXP_ID')


def read_trigger_file(data, context, storage_client):
    """Read the contents of the trigger file and return as string.
    """
    print('Event ID: {}'.format(context.event_id))
    print('Event type: {}'.format(context.event_type))
    print('Data: {}'.format(data))
    print('Bucket: {}'.format(data['bucket']))
    print('File: {}'.format(data['name']))
    print('Metageneration: {}'.format(data['metageneration']))
    print('Created: {}'.format(data['timeCreated']))
    print('Updated: {}'.format(data['updated']))

    bucket = storage_client.get_bucket(data['bucket'])
    blob = bucket.get_blob(data['name'])
    trigger_file_string = blob.download_as_string().strip()
    logging.info('trigger file contents: {}'.format(trigger_file_string))
    return trigger_file_string.decode('UTF-8')


def gcs_update(data, context):
    """Background Cloud Function to be triggered by Cloud Storage.
    """
    logging.getLogger().setLevel(logging.INFO)

    storage_client = storage.Client()
    # get the contents of the trigger file
    trigger_file_string = read_trigger_file(data, context, storage_client)
    trigger_file_info = trigger_file_string.strip().split('\n')  # TODO: add error-checking
    logging.info('trigger file info: %s', trigger_file_info)
    # then run the pipeline using the given job spec, passing the trigger file contents
    # as parameter values.  
    logging.info('running pipeline with id %s...', PIPELINE_ID)
    # create the client object
    client = kfp.Client(host=PIPELINE_HOST)    
    # deploy the pipeline run
    run = client.run_pipeline(EXP_ID, 'bw_tfdv_gcf', pipeline_id=PIPELINE_ID,
                          params={'working_dir': WORKING_DIR,
                                  'project_id': PIPELINE_PROJECT_ID,
                                  'use_dataflow': USE_DATAFLOW,
                                  'data_dir': trigger_file_info[0],
                                  'stats_older_path': trigger_file_info[1]})     

    logging.info('job response: %s', run)


Next, identify a GCS bucket in your project to use for the 'trigger bucket'.  This bucket must already exist.  You can create a new bucket via the [Cloud Console](https://console.cloud.google.com/storage/browser) (or via the `gsutil mb` command-line utility). **This bucket must be different from your `WORKING_DIR` bucket**.

**Edit the following cell before running it.**

In [None]:
# Change this to your bucket name.  Do not include the 'gs://' prefix.
TRIGGER_BUCKET = 'your-bucket-name'

In [None]:
# confirm that you're in the gcf subdirectory
!pwd

Now we're ready to deploy the GCF function code.  We specify to use the `gcs_update` definition (in the file `main.py`, which is implicit), and to use the value of the `TRIGGER_BUCKET` var as the trigger bucket.  The trigger event is specified to be the addition or modification of a GCS file in that bucket.

Note that we're setting several environment variables as part of the deployment, to which the function code will have access.  These include the name of the pipeline job spec file, as well as the name of the Secret key that holds the API key string. So, ensure that all these vars are set properly— which they should be if you've run all the notebook cells above.

The deployment will include the files in the current directory (`gcf`). The GCF function will have read-only access to the directory contents.


In [None]:
# You can also try setting to 'false', but this means that a Beam job runs locally on the GKE node
# running the TFDV stats pipeline step (instead of launching a Dataflow job). 
# In that case, if your GKE node is not large enough, you may see an 'out of memory error'
# when processing the training dataset.
USE_DATAFLOW = 'true'

Check that all the necessary vars are set:

In [None]:
print('PROJECT_ID {}, WORKING_DIR {}, PIPELINE_SPEC {}, PIPELINE_ID {}, \nPIPELINE_HOST {}, EXP_ID {}, USE_DATAFLOW {}, TRIGGER_BUCKET {}'.format(
    PROJECT_ID, WORKING_DIR, PIPELINE_SPEC, PIPELINE_ID, PIPELINE_HOST, EXP_ID, USE_DATAFLOW, TRIGGER_BUCKET
))

Then, deploy the GCF function.  Note how we're setting environment vars as part of the deployment.  Note also that we’re indicating to use the `gcs_update` definition (from `main.py`, which is implicit).


In [None]:
!gcloud functions deploy gcs_update --set-env-vars \
  PIPELINE_PROJECT_ID={PROJECT_ID},WORKING_DIR={WORKING_DIR},PIPELINE_SPEC={PIPELINE_SPEC},PIPELINE_ID={PIPELINE_ID},PIPELINE_HOST={PIPELINE_HOST},EXP_ID={EXP_ID},USE_DATAFLOW=true \
  --runtime python37 --trigger-resource {TRIGGER_BUCKET} --trigger-event google.storage.object.finalize


You can see your deployed Cloud Function in the Console: https://console.cloud.google.com/functions/list.

### Trigger the Cloud Function to run the pipeline


Once deployment has completed, we're ready to test triggering the GCF function.  To do this, we'll upload a new file to the `TRIGGER_BUCKET`.

We'll create a file that contains as its first line a path the new dataset to be processed.  
Its second line contains either the path to TFDV stats for the dataset used for the previously-trained model (which we'll compare with the stats for the new data), or 'none'.

Uploading this file will trigger a run of the GCF function we defined.

The GCF function will read the contents of the trigger file, and kick off a pipeline run, passing the contents of the 'trigger file' as parameters to the pipeline. 


For this simple example, we're using the two halves, chronologically, of our 'bike rental' dataset, for the 'old' and 'new' datasets.

`gs://aju-dev-demos-codelabs/bikes_weather_chronological/ds1/` is the first half, chronologically, of the 'bikes rental' dataset.  If we had wanted to trigger a pipeline run on that data, we could do it as per the commented-out cells below.  



In [None]:
# %%writefile temp1.txt
# gs://aju-dev-demos-codelabs/bikes_weather_chronological/ds1/
# none

In [None]:
#!gsutil cp temp1.txt gs://{TRIGGER_BUCKET}

Instead, we'll skip that and jump ahead to processing the second dataset, comparing it to the stats from the first dataset to detect data drift.

In the following, `gs://aju-dev-demos-codelabs/bikes_weather_chronological/ds2` is the second half, chronologically, of the 'bikes and weather' data. 

The `.pb` file in the 2nd line points to the stats generated on the training set from the first half of the data.


In [None]:
%%writefile temp2.txt
gs://aju-dev-demos-codelabs/bikes_weather_chronological/ds2/
gs://aju-dev-demos-codelabs/bikes_weather_chronological/evaltrain1.pb

Upload `temp2.txt` to your 'trigger bucket' to trigger a run of the GCF function.  

In [None]:
!gsutil cp temp2.txt gs://{TRIGGER_BUCKET}

The upload will trigger a run of the GCF function, which in turn will trigger a pipeline run.  From the 'trigger file' the GCF function will obtain the path to the new input data 
(`gs://aju-dev-demos-codelabs/bikes_weather_chronological/ds2/`) as well as the path to the previously-generated stats.

That is, the pipeline will compare the stats on the new training data to that of `gs://aju-dev-demos-codelabs/bikes_weather_chronological/evaltrain1.pb`, stats generated from the `gs://aju-dev-demos-codelabs/bikes_weather_chronological/ds1/` dataset.  If drift is detected, model retraining will be initiated.


You should be able to see the newly triggered pipeline job by visiting the Kubeflow Pipelines dashboard.  If you don't already have a tab open, it can be reached by clicking the "**OPEN PIPELINES DASHBOARD**" link next to your KFP installation in the [Cloud Console](https://console.cloud.google.com/ai-platform/pipelines/).

If the pipeline doesn't seem to be running correctly, or if you want more info about how the GCF function was executed, you can 
view the GCF logs via [Cloud Logging](https://console.cloud.google.com/logs/viewer?resource=cloud_function) in the Console.  Select "Cloud Function" as the resource to view.
From the log output of your function, you should see indication of the trigger file being read, and then the Pipeline run initiated.


## Appendix: Using your trained and deployed model(s) for prediction

(Optional).

Each time a new model is trained, it is deployed to the cluster using TF-Serving.  Once deployed, you can send prediction requests to the serving endpoint.  See [this example](https://github.com/amygdala/code-snippets/blob/master/ml/kubeflow-pipelines/keras_tuner/README.md#5-make-predictions-against-your-trained-model) for more detail on how to do that.

-----------------------------
Copyright 2021 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.