# ML Pipeline with Tensorflow

## Introduction

This sets up a production-ready ML pipeline with the following components:
1. Data generation with BigQuery ExampleGen
2. Data streaming with Apache Beam
3. Generate schema using schemagen
4. Validator: Perform data validation
5. Transformer: Transform into usable input for training
6. Trainer: Train regression model
7. Evaluator: Evaluate model performance metrics on eval set
8. Pusher: Export model for serving
9. Kubeflow backend for orchestrating pipeline

## Setup environment

In [1]:
import sys
# Use the latest version of pip.
!pip install --upgrade pip
# Install tfx and kfp Python packages.
!pip install --upgrade "tfx[kfp]<2"



In [2]:
!python3 -c "from tfx import version ; print('TFX version: {}'.format(version.__version__))"

TFX version: 1.7.1


In [3]:
# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
%env GOOGLE_CLOUD_PROJECT={GOOGLE_CLOUD_PROJECT}
print("GCP project ID:" + GOOGLE_CLOUD_PROJECT)

env: GOOGLE_CLOUD_PROJECT=ml-pipelines-347413
GCP project ID:ml-pipelines-347413


In [20]:
# Set KFP cluster endpoint
ENDPOINT='7e86f3226a3ad238-dot-us-central1.pipelines.googleusercontent.com'
if not ENDPOINT:
    from absl import logging
    logging.error('Set your ENDPOINT in this cell.')

In [5]:
# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'

In [6]:
PIPELINE_NAME="my_pipeline"    # or use custom name
import os
PROJECT_DIR=os.path.join(os.path.expanduser("~"),"imported",PIPELINE_NAME)

In [None]:
# copy template to dir
!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
  --model=taxi

Change the working directory context in this notebook to the project directory.

In [8]:
%cd {PROJECT_DIR}

/home/jupyter/imported/my_pipeline


In [None]:
# run tests
!{sys.executable} -m models.features_test
!{sys.executable} -m models.keras.model_test


## Make pipeline

In [10]:
!gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/taxi/data.csv

Copying file://data/data.csv [Content-Type=text/csv]...
/ [1 files][  1.8 MiB/  1.8 MiB]                                                
Operation completed over 1 objects/1.8 MiB.                                      


In [11]:
# create pipeline
!tfx pipeline create  --pipeline-path=kubeflow_runner.py --endpoint={ENDPOINT} \
--build-image

CLI
Creating pipeline
Detected Kubeflow.
Use --engine flag if you intend to use a different orchestrator.
  ' Defaults to "https".' % host)
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
[Docker] Step 1/4 : FROM tensorflow/tfx:1.7.1[Docker] 
[Docker]  ---> 4ca6f224d609
[Docker] Step 2/4 : WORKDIR /pipeline[Docker] 
[Docker]  ---> Using cache
[Docker]  ---> cae1d571fc05
[Docker] Step 3/4 : COPY ./ ./[Docker] 
[Docker]  ---> cb8403c64534
[Docker] Step 4/4 : ENV PYTHONPATH="/pipeline:${PYTHONPATH}"[Docker] 
[Docker]  ---> Running in 5f5053a25a0a
[Docker] Removing intermediate container 5f5053a25a0a
[Docker]  ---> 99288fe7f56e
[Docker] Successfully built 99288fe7f56e
[Docker] Successfully tagged gcr.io/ml-pipelines-347413/my_pipeline:latest
[Docker] The push refers to repository [gcr.io/ml-pipelines-347413/my_pipeline]
[Docker] Preparing
[Docker] Waiting
[Docker] Preparing
[Docker] Waiting
[Docker] Preparing

In [12]:
!tfx run create --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}

CLI
Creating a run for pipeline: my_pipeline
Detected Kubeflow.
Use --engine flag if you intend to use a different orchestrator.
  ' Defaults to "https".' % host)
Run created for pipeline: my_pipeline
| pipeline_name | run_id                               | status | created_at                | link                                                                                                                        |
| my_pipeline   | a40b6ae1-0cc0-470e-aab2-e6be575908b5 | None   | 2022-04-20T01:58:53+00:00 | http://7e86f3226a3ad238-dot-us-central1.pipelines.googleusercontent.com/#/runs/details/a40b6ae1-0cc0-470e-aab2-e6be575908b5 |



In [22]:
# Update the pipeline
!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}

CLI
Updating pipeline
Detected Kubeflow.
Use --engine flag if you intend to use a different orchestrator.
  ' Defaults to "https".' % host)
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Adding upstream dependencies for component bigqueryexamplegen
INFO:absl:Adding upstream dependencies for component latest-blessed-model-resolver
INFO:absl:Adding upstream dependencies for component statisticsgen
INFO:absl:   ->  Component: bigqueryexamplegen
INFO:absl:Adding upstream dependencies for component schemagen
INFO:absl:   ->  Component: statisticsgen
INFO:absl:Adding upstream dependencies for component transform
INFO:absl:   ->  Component: schemagen
INFO:absl:   ->  Component: bigqueryexamplegen
INFO:absl:Adding upstream dependencies for component trainer
INFO:absl:   ->  Component: transform
INFO:absl:   ->  Component: schemagen
INFO:absl:Adding upstream dependencies for component evaluator
INFO:abs