<a href="https://colab.research.google.com/github/iamsusiep/tfx/blob/template_notebook/stub_template.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##### Copyright &copy; 2020 The TensorFlow Authors.

<font size=-1>Licensed under the Apache License, Version 2.0 (the \"License\");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at [https://www.apache.org/licenses/LICENSE-2.0](https://www.apache.org/licenses/LICENSE-2.0)

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language governing permissions and limitations under the License.</font>

# TFX Template Pipeline Testing Framework using Stub Executors

## Introduction
**You should complete [template.ipynb](https://github.com/tensorflow/tfx/blob/master/docs/tutorials/tfx/template.ipynb) tutorial up to *Step 6* in order to proceed this tutorial.**

This document will provide instructions to test a TensorFlow Extended (TFX) pipeline
using `BaseStubExecuctor`, which generates fake data using the golden test data. This is intended for users to replace executors they don't want to test so that they could save time from running the actual executors. Stub executor is provided with TFX Python package under `tfx.experimental.pipeline_testing.base_stub_executor`.

Many of the instructions are Linux shell commands, which will run on an AI Platform Notebooks instance. Corresponding Jupyter Notebook code cells which invoke those commands using `!` are provided.

This tutorial serves as an extension to `template.ipynb` tutorial, thus you will also use [Taxi Trips dataset](
https://data.cityofchicago.org/Transportation/Taxi-Trips/wrvz-psew)
released by the City of Chicago. We strongly encourage you to try modifying the components prior to utilizing stub executors.



## Step 7. Set up your environment.

AI Platform Pipelines will prepare a development environment to build a pipeline, and a Kubeflow Pipeline cluster to run the newly built pipeline.

**NOTE:** To select a particular TensorFlow version, or select a GPU instance, create a TensorFlow pre-installed instance in AI Platform Notebooks.

**NOTE:** There might be some errors during package installation. For example: 

>"ERROR: some-package 0.some_version.1 has requirement other-package!=2.0.,&lt;3,&gt;=1.15, but you'll have other-package 2.0.0 which is incompatible." Please ignore these errors at this moment.


Install `tfx` from the source code.


In [None]:
%cd
!git clone https://github.com/tensorflow/tfx.git
%cd tfx

In [None]:
!python setup.py bdist_wheel
!python setup.py build
!python setup.py install
!pip install -e .

Install `kfp`, and `skaffold`, and add installation path to the `PATH` environment variable.

In [None]:
# Install kfp Python packages.
import sys
import os

!{sys.executable} -m pip install --user --upgrade -q kfp==0.5.1
# Download skaffold and set it executable.
if not os.path.exists('/home/jupyter/.local/bin/skaffold'):
    !curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64 && chmod +x skaffold && mv skaffold /home/jupyter/.local/bin/

In [None]:
# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

Let's check the versions of TFX.

In [None]:
!python3 -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"

In AI Platform Pipelines, TFX is running in a hosted Kubernetes environment using [Kubeflow Pipelines](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/).

Let's set some environment variables to use Kubeflow Pipelines.

First, get your GCP project ID.

In [None]:
# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
%env GOOGLE_CLOUD_PROJECT={GOOGLE_CLOUD_PROJECT}
print("GCP project ID:" + GOOGLE_CLOUD_PROJECT)

We also need to access your KFP cluster. You can access it in your Google Cloud Console under "AI Platform > Pipeline" menu. The "endpoint" of the KFP cluster can be found from the URL of the Pipelines dashboard, or you can get it from the URL of the Getting Started page where you launched this notebook. Let's create an `ENDPOINT` environment variable and set it to the KFP cluster endpoint. **ENDPOINT should contain only the hostname part of the URL.** For example, if the URL of the KFP dashboard is `https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start`, ENDPOINT value becomes `1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com`.

>**NOTE: You MUST set your ENDPOINT value below.**

In [None]:
# This refers to the KFP cluster endpoint
ENDPOINT='' # Enter your ENDPOINT here.
if not ENDPOINT:
    from absl import logging
    logging.error('Set your ENDPOINT in this cell.')

Set the image name as `tfx-pipeline` under the current GCP project.

In [None]:
# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-stub-pipeline'

## Create a Docker image for tfx packages
Since tfx is installed using github, uploaded to GCP. By default, the docker image will be named `gcr.io/tfx-intern-sujip-gke/tensorflow:latest`

In [None]:
DOCKER_IMAGE_REPO='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tensorflow'
DOCKER_IMAGE_TAG='latest'
%env DOCKER_IMAGE_REPO={DOCKER_IMAGE_REPO}
%env DOCKER_IMAGE_TAG={DOCKER_IMAGE_TAG}

In [None]:
!./tfx/tools/docker/build_docker_image.sh

Push docker image to GCP

In [None]:
DOCKER_IMAGE="{}:{}".format(DOCKER_IMAGE_REPO, DOCKER_IMAGE_TAG)
!gcloud docker -- push {DOCKER_IMAGE}

# Step 8. Record the pipeline outputs in Google Cloud Storage

Since this tutorial assumes that you have completed `template.ipynb` up to step 6, a successful pipeline run must have been saved in the MLMD, which can be accessed using gRPC server. 

Open a Terminal and run the following commands:

> `$ gcloud container clusters get-credentials <cluster_name> --zone <compute_zone> --project {GOOGLE_CLOUD_PROJECT}`

> `$ nohup kubectl port-forward deployment/metadata-grpc-deployment -n <namespace> {PORT}:8080 &`

By default, notebook sets `PORT` to 17333 and `HOST` to "localhost".
You may change `PORT` to an unused port and `HOST` as needed.

In `template.ipynb` tutorial, the pipeline name is set as "my_pipeline" by default. If you have modified the pipeline name when running the template tutorial, you should modify `SAVED_PIPELINE_NAME` accordingly.


In [None]:
PORT=17333 # Default port of the metadata grpc server.
HOST='localhost' # Default hostname of the metadata grpc server.
# Enter name of the pipeline that you created and ran in template.ipynb
# By default, template.ipynb creates a pipeline named "my_pipeline"
SAVED_PIPELINE_NAME='my_pipeline' 
RECORD_DIR="gs://{}-kubeflowpipelines-default/testdata".format(GOOGLE_CLOUD_PROJECT)

In [None]:
! python tfx/experimental/pipeline_testing/pipeline_recorder.py \
--output_dir={RECORD_DIR} \
--host={HOST} \
--port={POST} \
--pipeline_name={SAVED_PIPELINE_NAME}

In [None]:
print("Pipeline outputs are recorded to RECORD_DIR: " + RECORD_DIR)

The golden test data is saved in `RECORD_DIR` in the Google Cloud Storage.

Now we are ready to test a pipeline with stub executors.

---



# Step 9. Copy the predefined template to your project directory.

In this step, we will create a working pipeline project directory and files by copying additional files from a predefined template.

You may give your pipeline a different name by changing the `PIPELINE_NAME` below. This will also become the name of the project directory where your files will be put.

Note that this `PIPELINE_NAME` should be different from the `PIPELINE_NAME` that you used for the `template.ipynb` tutorial.

In [None]:
PIPELINE_NAME="my_stub_pipeline"
import os
PROJECT_DIR=os.path.join(os.path.expanduser("~"),"AIHub",PIPELINE_NAME)

TFX includes the `taxi` template with the TFX python package. If you are planning to solve a point-wise prediction problem, including classification and regresssion, this template could be used as a starting point.

The `tfx template copy` CLI command copies predefined template files into your project directory.

In [None]:
!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
  --model=taxi

Change the working directory context in this notebook to the project directory.

In [None]:
%cd {PROJECT_DIR}

>NOTE: Don't forget to change directory in `File Browser` on the left by clicking into the project directory once it is created.

# Step 10. Browse your copied source files

The TFX template provides basic scaffold files to build a pipeline, including Python source code, sample data, and Jupyter Notebooks to analyse the output of the pipeline. The `taxi` template uses the same *Chicago Taxi* dataset and ML model as the [Airflow Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/airflow_workshop).

Here is brief introduction to each of the Python files.
-   `pipeline` - This directory contains the definition of the pipeline
    -   `configs.py` — defines common constants for pipeline runners
    -   `pipeline.py` — defines TFX components and a pipeline
-   `launcher` - This directory contains launcher for stub executors
    -   `stub_component_launcher.py` - defines `StubComponentLauncher`, a stub component launcher for template tutorial 
-   `models` - This directory contains ML model definitions.
    -   `features.py`, `features_test.py` — defines features for the model
    -   `preprocessing.py`, `preprocessing_test.py` — defines preprocessing
        jobs using `tf::Transform`
    -   `estimator` - This directory contains an Estimator based model.
        -   `constants.py` — defines constants of the model
        -   `model.py`, `model_test.py` — defines DNN model using TF estimator
    -   `keras` - This directory contains a Keras based model.
        -   `constants.py` — defines constants of the model
        -   `model.py`, `model_test.py` — defines DNN model using Keras
-   `beam_dag_runner.py`, `kubeflow_dag_runner.py` — define runners for each orchestration engine


You might notice that there are some files with `_test.py` in their name. These are unit tests of the pipeline and it is recommended to add more unit tests as you implement your own pipelines.
You can run unit tests by supplying the module name of test files with `-m` flag. You can usually get a module name by deleting `.py` extension and replacing `/` with `.`.  For example:

In [None]:
!{sys.executable} -m models.features_test
!{sys.executable} -m models.keras.model_test

# Step 11. Create and run your TFX test pipeline

Components in the TFX pipeline will generate outputs for each run as [ML Metadata Artifacts](https://www.tensorflow.org/tfx/guide/mlmd), and they need to be stored somewhere. You can use any storage which the KFP cluster can access, and for this example we will use Google Cloud Storage (GCS). A default GCS bucket should have been created automatically. Its name will be `<your-project-id>-kubeflowpipelines-default`.


Let's upload our sample data to GCS bucket so that we can use it in our pipeline later.

In [None]:
!gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/data.csv

### Enable Stub Executors in Kubeflow DAG Runner
>**Double-click to open `kubeflow_dag_runner.py`**. 
Uncomment `supported_launcher_classes` argument of `KubeflowDagRunnerConfig` to be able to launch stub executors (Tip: search for comments containing `TODO(step 11):`),  Make sure to save `kubeflow_dag_runner.py` after you edit it.



>**Double-click to change directory to `launcher` and double-click again to open `stub_component_launcher.py`.**
Make sure to set `test_data_dir` to the directory where KFP outputs are recorded, or `RECORD_DIR`(Tip: search for comments containing `TODO(step 11):`). Customize the list `self.stubbed_component_ids`, or ids of components that would be replaced with BaseStubExecutor(Tip: search for comments containing `TODO(step 11):`). You may additionally insert custom stub executor in `self.stubbed_component_map` with component id as a key and custom stub executor class as value (Tip: search for comments containing `TODO(step 11):`). Make sure to save `stub_component_launcher.py` after you edit it.

### Add Components to the Pipeline

Make sure you add components you need for running and testing the pipeline. 

>**Double-click to open pipeline.py.**
If you are interested in testing Trainer or Transform, make sure to add StatisticsGen, SchemaGen, ExampleValidator, Transform, Trainer, 'ResolverNode', Evaluator, and Pusher to the pipeline. Uncomment lines which add these components to the pipeline (Tip: search for comments containing `TODO(step 5)` and `TODO(step 6)` :).





### Create and run TFX Template with Stub Executors


Let's create a TFX pipeline using the `tfx pipeline create` command.

>Note: When creating a pipeline for KFP, we need a container image which will be used to run our pipeline. And `skaffold` will build the image for us. Because skaffold pulls base images from the docker hub, it will take 5~10 minutes when we build the image for the first time, but it will take much less time from the second build.

In [None]:
!tfx pipeline create  \
--pipeline-path=kubeflow_dag_runner.py \
--endpoint={ENDPOINT} \
--build-target-image={CUSTOM_TFX_IMAGE} \
--build-base-image={DOCKER_IMAGE} \
--engine=kubeflow

While creating a pipeline, `Dockerfile` and `build.yaml` will be generated to build a Docker image. Don't forget to add these files to the source control system (for example, git) along with other source files.

A pipeline definition file for [argo](https://argoproj.github.io/argo/) will be generated, too. The name of this file is `${PIPELINE_NAME}.tar.gz`. For example, it will be `my_stub_pipeline.tar.gz` if the name of your pipeline is `my_stub_pipeline`. It is recommended NOT to include this pipeline definition file into source control, because it will be generated from other Python files and will be updated whenever you update the pipeline. For your convenience, this file is already listed in `.gitignore` which is generated automatically.

NOTE: `kubeflow` will be automatically selected as an orchestration engine if `airflow` is not installed and `--engine` is not specified.

Now start an execution run with the newly created pipeline using the `tfx run create` command.

In [None]:
!tfx run create --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT} --engine=kubeflow

Or, you can also run the pipeline in the KFP Dashboard.  The new execution run will be listed under Experiments in the KFP Dashboard.  Clicking into the experiment will allow you to monitor progress and visualize the artifacts created during the execution run.

However, we recommend visiting the KFP Dashboard. You can access the KFP Dashboard from the Cloud AI Platform Pipelines menu in Google Cloud Console. Once you visit the dashboard, you will be able to find the pipeline, and access a wealth of information about the pipeline.
For example, you can find your runs under the *Experiments* menu, and when you open your execution run under Experiments you can find all your artifacts from the pipeline under *Artifacts* menu.

>Note: If your pipeline run fails, you can see detailed logs for each TFX component in the Experiments tab in the KFP Dashboard.
    
One of the major sources of failure is permission related problems. Please make sure your KFP cluster has permissions to access Google Cloud APIs. This can be configured [when you create a KFP cluster in GCP](https://cloud.google.com/ai-platform/pipelines/docs/setting-up), or see [Troubleshooting document in GCP](https://cloud.google.com/ai-platform/pipelines/docs/troubleshooting).

# Step 12. (Optional) Update the pipeline definition of the existing pipeline



You may want to update the existing pipeline with modified pipeline definition, such as using stub executors to replace other component executors. Use the `tfx pipeline update` command to update your pipeline, followed by the `tfx run create` command to create a new execution run of your updated pipeline.


In [None]:
# Update the pipeline
!tfx pipeline update \
--pipeline-path=kubeflow_dag_runner.py \
--endpoint={ENDPOINT} \
--engine=kubeflow
# You can run the pipeline the same way.
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT} --engine=kubeflow

### Check pipeline outputs

Visit the KFP dashboard to find pipeline outputs in the page for your pipeline run. Click the *Experiments* tab on the left, and *All runs* in the Experiments page. You should be able to find the latest run under the name of your pipeline.