# Continuous training with TFX and Google Cloud AI Platform

## Learning Objectives

1.  Use the TFX CLI to build a TFX pipeline.
2.  Deploy a TFX pipeline version without tuning to a hosted AI Platform Pipelines instance.
3.  Create and monitor a TFX pipeline run using the TFX CLI.
4.  Deploy a new TFX pipeline version with tuning enabled to a hosted AI Platform Pipelines instance.
5.  Create and monitor another TFX pipeline run directly in the KFP UI.

In this lab, you use utilize the following tools and services to deploy and run a TFX pipeline on Google Cloud that automates the development and deployment of a TensorFlow 2.3 WideDeep Classifer to predict forest cover from cartographic data:

* The [**TFX CLI**](https://www.tensorflow.org/tfx/guide/cli) utility to build and deploy a TFX pipeline.
* A hosted [**AI Platform Pipeline instance (Kubeflow Pipelines)**](https://www.tensorflow.org/tfx/guide/kubeflow) for TFX pipeline orchestration.
* [**Dataflow**](https://cloud.google.com/dataflow) jobs for scalable, distributed data processing for TFX components.
* A [**AI Platform Training**](https://cloud.google.com/ai-platform/) job for model training and flock management for parallel tuning trials. 
* [**AI Platform Prediction**](https://cloud.google.com/ai-platform/) as a model server destination for blessed pipeline model versions.
* [**CloudTuner**](https://www.tensorflow.org/tfx/guide/tuner#tuning_on_google_cloud_platform_gcp) and [**AI Platform Vizier**](https://cloud.google.com/ai-platform/optimizer/docs/overview) for advanced model hyperparameter tuning using the Vizier algorithm.

You will then create and monitor pipeline runs using the TFX CLI as well as the KFP UI.

In [1]:
%load_ext autoreload
%autoreload 2

### Setup

#### Update lab environment PATH to include TFX CLI and skaffold

In [2]:
import yaml

# Set `PATH` to include the directory containing TFX CLI and skaffold.
PATH=%env PATH
HOME=%env HOME

%env PATH={HOME}/.local/bin:{PATH}

env: PATH=/home/michal/.local/bin:/home/michal/venv/ML-3.8/bin:/home/michal/google-cloud-sdk/bin:/home/michal/anaconda3/bin:/home/michal/anaconda3/condabin:/home/michal/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin


#### Validate lab package version installation

In [3]:
!python -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
!python -c "import kfp; print('KFP version: {}'.format(kfp.__version__))"

TFX version: 0.25.0
KFP version: 1.0.4


**Note**: this lab was built and tested with the following package versions:

`TFX version: 0.25.0`  
`KFP version: 1.0.4`

## Setup local path to data, train, test folders 

In [4]:
import os
from pathlib import Path

notebook_path=os.getcwd()
local_data_dirpath = os.path.join(notebook_path, 'data')

local_train_dirpath = os.path.join(local_data_dirpath, "train")
local_train_filepath = os.path.join(local_train_dirpath, "train.csv")
local_test_dirpath = os.path.join(local_data_dirpath, "test")
local_test_filepath = os.path.join(local_test_dirpath, "test.csv")


## Load kaggle

In [None]:
!pip install -q kaggle
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!rm kaggle.json
!chmod 600 ~/.kaggle/kaggle.json


## Download data from kaggle, unzip it and copy it to data folder

In [None]:
!kaggle competitions download -c titanic -p {local_data_dirpath} --force
!unzip -o {local_data_dirpath}/"titanic.zip" -d {local_data_dirpath}
!cp {local_data_dirpath}/"train.csv" {local_train_filepath}
!cp {local_data_dirpath}/"test.csv" {local_test_filepath}

# clean up
!rm  {local_data_dirpath}/*.csv  {local_data_dirpath}/*.zip

## Copy data to gs

In [None]:
!gsutil cp data/train/train.csv gs://cloud-training-281409-kubeflowpipelines-default/tfx-template/data/titanic/data.csv


In [5]:
PIPELINE_NAME = 'tfx-titanic-training'
MODEL_NAME = 'tfx_titanic_classifier'
DATA_ROOT_URI = local_train_dirpath
RUNTIME_VERSION = '2.3'
PYTHON_VERSION = '3.7'
ENABLE_TUNING=False
ENABLE_CACHE=True


In [6]:
%env PIPELINE_NAME={PIPELINE_NAME}
%env MODEL_NAME={MODEL_NAME}
%env DATA_ROOT_URI={DATA_ROOT_URI}
%env RUNTIME_VERSION={RUNTIME_VERSION}
%env PYTHON_VERSION={PYTHON_VERSION}
%env ENABLE_TUNING={ENABLE_TUNING}
%env ENABLE_CACHE={ENABLE_CACHE}


env: PIPELINE_NAME=tfx-titanic-training
env: MODEL_NAME=tfx_titanic_classifier
env: DATA_ROOT_URI=/home/michal/PycharmProjects/ml-gcp-pipeline/tfx_titanic_pipeline/data/train
env: RUNTIME_VERSION=2.3
env: PYTHON_VERSION=3.7
env: ENABLE_TUNING=False
env: ENABLE_CACHE=True


## Local pipeline run

In [7]:
%cd {notebook_path}/pipeline

/home/michal/PycharmProjects/ml-gcp-pipeline/tfx_titanic_pipeline/pipeline


In [8]:
#import local_runner


!python local_runner.py

2021-04-08 23:14:01.412088: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
INFO:absl:PIPELINE_ROOT=/home/michal/artifact-store/tfx-titanic-training/20210408_231408
INFO:absl:Cleaning local log folder : /tmp/logs
INFO:absl:train_steps for training: 30000
INFO:absl:tuner_steps for tuning: 2000
INFO:absl:data_root_uri for training: /home/michal/PycharmProjects/ml-gcp-pipeline/tfx_titanic_pipeline/data/train
INFO:absl:eval_steps for evaluating: 1000
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running driver for CsvExampleGen
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:Running executor for CsvExampleGen
INF

INFO:absl:Feature Embarked has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Ticket has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Sex has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Name has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Cabin has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Age has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Fare has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Parch has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature PassengerId has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Pclass has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature SibSp has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Survived has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Embarked has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Ticket has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Sex 

INFO:absl:Feature Sex has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Name has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Cabin has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Age has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Fare has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Parch has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature PassengerId has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Pclass has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature SibSp has no shape. Setting to VarLenSparseTensor.
INFO:absl:Feature Survived has no shape. Setting to VarLenSparseTensor.
value: "\n\013\n\tConst_3:0\022-vocab_compute_and_apply_vocabulary_vocabulary"

value: "\n\013\n\tConst_3:0\022-vocab_compute_and_apply_vocabulary_vocabulary"

value: "\n\013\n\tConst_5:0\022/vocab_compute_and_apply_vocabulary_1_vocabulary"

value: "\n\013\n\tConst_5:0\022/vocab_compute_and_apply_vocabula

INFO:absl:Feature Age_xf has a shape . Setting to DenseTensor.
INFO:absl:Feature Embarked_xf has a shape . Setting to DenseTensor.
INFO:absl:Feature Fare_xf has a shape . Setting to DenseTensor.
INFO:absl:Feature Parch_xf has a shape . Setting to DenseTensor.
INFO:absl:Feature Pclass_xf has a shape . Setting to DenseTensor.
INFO:absl:Feature Sex_xf has a shape . Setting to DenseTensor.
INFO:absl:Feature SibSp_xf has a shape . Setting to DenseTensor.
INFO:absl:Feature Survived_xf has a shape . Setting to DenseTensor.
INFO:absl:HyperParameters for training: {'space': [{'class_name': 'Float', 'config': {'name': 'learning_rate', 'default': 0.0009167702421017742, 'conditions': [], 'min_value': 0.0001, 'max_value': 0.01, 'step': None, 'sampling': 'log'}}, {'class_name': 'Int', 'config': {'name': 'n_layers', 'default': 2, 'conditions': [], 'min_value': 1, 'max_value': 2, 'step': 1, 'sampling': None}}, {'class_name': 'Int', 'config': {'name': 'n_units_1', 'default': 72, 'conditions': [{'class_

3000/3000 - 12s - loss: 0.4900 - tp: 40974.0000 - fp: 11485.0000 - tn: 109672.0000 - fn: 29869.0000 - binary_accuracy: 0.7846 - precision: 0.7811 - recall: 0.5784 - auc: 0.8219 - val_loss: 0.4260 - val_tp: 21841.0000 - val_fp: 5649.0000 - val_tn: 29734.0000 - val_fn: 6776.0000 - val_binary_accuracy: 0.8059 - val_precision: 0.7945 - val_recall: 0.7632 - val_auc: 0.8888
Epoch 2/10
3000/3000 - 14s - loss: 0.4575 - tp: 47365.0000 - fp: 15980.0000 - tn: 105188.0000 - fn: 23467.0000 - binary_accuracy: 0.7945 - precision: 0.7477 - recall: 0.6687 - auc: 0.8365 - val_loss: 0.4246 - val_tp: 21461.0000 - val_fp: 6022.0000 - val_tn: 29365.0000 - val_fn: 7152.0000 - val_binary_accuracy: 0.7942 - val_precision: 0.7809 - val_recall: 0.7500 - val_auc: 0.8856
Epoch 3/10
3000/3000 - 18s - loss: 0.4571 - tp: 47396.0000 - fp: 15989.0000 - tn: 105182.0000 - fn: 23433.0000 - binary_accuracy: 0.7947 - precision: 0.7477 - recall: 0.6692 - auc: 0.8373 - val_loss: 0.4236 - val_tp: 21461.0000 - val_fp: 6024.0000

INFO:absl:Evaluation complete. Results written to /home/michal/artifact-store/tfx-titanic-training/20210408_231408/Evaluator/evaluation/9.
INFO:absl:Checking validation results.
INFO:absl:Blessing result True written to /home/michal/artifact-store/tfx-titanic-training/20210408_231408/Evaluator/blessing/9.
INFO:absl:Running publisher for Evaluator
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Evaluator is finished.
INFO:absl:Component InfraValidator is running.
INFO:absl:Running driver for InfraValidator
INFO:absl:MetadataStore with DB connection initialized
2021-04-08 23:17:08.201744: W ml_metadata/metadata_store/rdbms_metadata_access_object.cc:588] No property is defined for the Type
INFO:absl:Running executor for InfraValidator
INFO:absl:InfraValidator will be run in LOAD_AND_QUERY mode.
INFO:absl:tag_set is not given. Using {'serve'} instead.
INFO:absl:signature_names are not given. Using ['serving_default'] instead.
INFO:absl:Creating temp directory at 

Run docker container for serving 

In [None]:
!docker run --rm -p 8500:8500 -p 8501:8501 -p 8503:8503 -v=1 \
 --mount type=bind,source=/home/michal/artifact-store/tfx-titanic-training/20210329_231949/Pusher/pushed_model/,target=/models/tfx_titanic_classifier \
 -e MODEL_NAME=tfx_titanic_classifier -t tensorflow/serving:latest
                
#!docker run --rm -p 8500:8500 -p 8501:8501 -p 8503:8503 -v=1 \
# --mount type=bind,source=/home/michal/artifact-store/tfx-titanic-training/20210329_231949/Pusher/pushed_model/,target=/models/tfx_titanic_classifier \
# -e MODEL_NAME=tfx_titanic_classifier -t tensorflow/serving:latest
                

Functions for serializing data to tf.train.Example

In [None]:
from pipeline import features
import importlib
importlib.reload(features)
#import tft 
import tensorflow_transform as tft

feature_tf_example_mapping = {
        'Embarked': _bytes_feature,
        'Ticket': _bytes_feature,
        'Sex': _bytes_feature,
        'Name': _bytes_feature,
        'Cabin': _bytes_feature,
        'Age': _float_feature,
        'Fare': _float_feature,
        'Parch': _int64_feature,
        'PassengerId': _int64_feature,
        'Pclass': _int64_feature,
        'SibSp': _int64_feature
    }


def _bytes_feature(value):
  """Returns a bytes_list from a string / byte."""
  if isinstance(value, type(tf.constant(0))):
    value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
  if isinstance(value, str):
    value = str.encode(value) # str wont work, we need bytes
  return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
  """Returns a float_list from a float / double."""
  return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
  """Returns an int64_list from a bool / enum / int / uint."""
  return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def serialize_example(data):
  """
  Creates a tf.train.Example message ready to be written to a file.
  data : dict
            dictionary with data in key: value format
  """
  if isinstance(data, pd.core.frame.DataFrame):
        data = data.to_dict(orient='records')
  
  # Create a dictionary mapping the feature name to the tf.train.Example-compatible
  # data type.
  feature = { key: feature_tf_example_mapping[key](data[key]) for key in data.keys()}
                                              
  # Create a Features message using tf.train.Example.

  example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
  return example_proto.SerializeToString()


In [None]:
import grpc

from tensorflow.core.framework import types_pb2
from tensorflow.core.framework import tensor_pb2
from tensorflow.core.framework import tensor_shape_pb2
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc 


def predict_titanic(request_data):
    
    serialized_examples_array = [serialize_example(row) for row in request_data] # array od serialized examples
    server = 'localhost:8500'
    host, port = server.split(':')

    channel = grpc.insecure_channel(server)
    stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
    
    dims = [tensor_shape_pb2.TensorShapeProto.Dim(size=len(request_data))]
    tensor_shape_proto = tensor_shape_pb2.TensorShapeProto(dim=dims)
    tensor_proto = tensor_pb2.TensorProto(
                dtype=types_pb2.DT_STRING,
                tensor_shape=tensor_shape_proto,
                string_val=serialized_examples_array)

    request = predict_pb2.PredictRequest()
    request.model_spec.name = "tfx_titanic_classifier"
    request.model_spec.signature_name = 'serving_default'
    request.inputs['examples'].CopyFrom(tensor_proto)
    result_future = stub.Predict(request, 30.)
    
    return result_future

def parse_prediction_result(prediction_result):
    outputs_tensor_proto = prediction_result.outputs["output_0"]
    shape = tf.TensorShape(outputs_tensor_proto.tensor_shape)
    outputs = np.array(outputs_tensor_proto.float_val).reshape(shape)
    return outputs

In [None]:
titanic_types  = {
    'PassengerId': np.int32,
    'Pclass': np.int32,
    'Name': np.object,
    'Sex': np.object,
    'Age': np.float32,
    'SibSp': np.int32,
    'Parch': np.int32,
    'Ticket': np.object,
    'Fare': np.float32,
    'Cabin': np.object,
    'Embarked': np.object,
    'Survived': np.int32,
}
converters = {'Cabin': str, 'Name': str, 'Ticket': str, 'Sex': str, 'Embarked': str}

titanic_test_df = pd.read_csv(local_test_filepath, converters=converters)
titanic_train_df = pd.read_csv(local_train_filepath, converters=converters)
                     
#titanic_test_df.head(10)

titanic_train_df_survived = titanic_train_df[titanic_train_df['Survived'] == 1]
titanic_train_df_dead = titanic_train_df[titanic_train_df['Survived'] == 0]

train_survived_examples_df =  titanic_train_df_survived.head(10)
train_dead_examples_df =  titanic_train_df_dead.head(10)

train_survived_examples_data = train_survived_examples_df.to_dict(orient='records')
train_dead_examples_data = train_dead_examples_df.to_dict(orient='records')

#remove Survived label
for example in train_survived_examples_data:
    example.pop('Survived', None)
for example in train_dead_examples_data:
    example.pop('Survived', None)

prediction_result_for_survived = predict_titanic(train_survived_examples_data)
prediction_result_for_dead = predict_titanic(train_dead_examples_data)

parsed_prediction_results_survived = parse_prediction_result(prediction_result_for_survived)
parsed_prediction_results_dead = parse_prediction_result(prediction_result_for_dead)

train_survived_examples_df['Survived_prediction'] = parsed_prediction_results_survived
train_dead_examples_df['Survived_prediction'] = parsed_prediction_results_dead

#pprint(train_survived_examples_df)
display(train_survived_examples_df)
display(train_dead_examples_df)
print(parsed_prediction_results_survived)
print(parsed_prediction_results_dead)


In [None]:
first10 = titanic.iloc[:10].to_dict(orient='records')
prediction_result = predict_titanic(first10)
type(prediction_result)
#prediction_result.outputs.values
outputs_tensor_proto = prediction_result.outputs["output_0"]
print(outputs_tensor_proto)

prediction_result


shape = tf.TensorShape(outputs_tensor_proto.tensor_shape)
#outputs = tf.constant(outputs_tensor_proto.float_val, shape=shape)
outputs = np.array(outputs_tensor_proto.float_val).reshape(shape)
print(outputs)