# Developing, Training, and Deploying a TensorFlow model on Google Cloud Platform (completely within Jupyter)


In Chapter 9 of [Data Science on the Google Cloud Platform](http://shop.oreilly.com/product/0636920057628.do), I trained a TensorFlow Estimator model to predict flight delays.

In this notebook, we'll modernize the workflow:
* Use eager mode for TensorFlow development
* Use tf.data to write the input pipeline
* Run the notebook as-is on Cloud using Deep Learning VM or Kubeflow pipelines
* Deploy the trained model to Cloud ML Engine as a web service

The combination of eager mode, tf.data and DLVM/KFP makes this workflow a lot easier.
We don't need to deal with Python packages or Docker containers.

In [1]:
# change these to try this notebook out
# In "production", these will be replaced by the parameters passed to papermill
BUCKET = 'cloud-training-demos-ml'
PROJECT = 'cloud-training-demos'
REGION = 'us-central1'
DEVELOP_MODE = True
NBUCKETS = 5

In [2]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

In [3]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

Updated property [core/project].
Updated property [compute/region].


## Creating the input data pipeline

In [4]:
DATA_BUCKET = "gs://cloud-training-demos/flights/chapter8/output/"
TRAIN_DATA_PATTERN = DATA_BUCKET + "train*"
VALID_DATA_PATTERN = DATA_BUCKET + "test*"

In [5]:
!gsutil ls $DATA_BUCKET

gs://cloud-training-demos/flights/chapter8/output/delays.csv
gs://cloud-training-demos/flights/chapter8/output/testFlights-00000-of-00007.csv
gs://cloud-training-demos/flights/chapter8/output/testFlights-00001-of-00007.csv
gs://cloud-training-demos/flights/chapter8/output/testFlights-00002-of-00007.csv
gs://cloud-training-demos/flights/chapter8/output/testFlights-00003-of-00007.csv
gs://cloud-training-demos/flights/chapter8/output/testFlights-00004-of-00007.csv
gs://cloud-training-demos/flights/chapter8/output/testFlights-00005-of-00007.csv
gs://cloud-training-demos/flights/chapter8/output/testFlights-00006-of-00007.csv
gs://cloud-training-demos/flights/chapter8/output/trainFlights-00000-of-00007.csv
gs://cloud-training-demos/flights/chapter8/output/trainFlights-00001-of-00007.csv
gs://cloud-training-demos/flights/chapter8/output/trainFlights-00002-of-00007.csv
gs://cloud-training-demos/flights/chapter8/output/trainFlights-00003-of-00007.csv
gs://cloud-training-demos/flights/chapter8/o

### Use tf.data to read the CSV files

In [6]:
import os, json, math
import numpy as np
import tensorflow as tf
print("Tensorflow version " + tf.__version__)

Tensorflow version 2.0.0-alpha0


In [7]:
CSV_COLUMNS  = ('ontime,dep_delay,taxiout,distance,avg_dep_delay,avg_arr_delay' + \
                ',carrier,dep_lat,dep_lon,arr_lat,arr_lon,origin,dest').split(',')
LABEL_COLUMN = 'ontime'
DEFAULTS     = [[0.0],[0.0],[0.0],[0.0],[0.0],[0.0],\
                ['na'],[0.0],[0.0],[0.0],[0.0],['na'],['na']]

def load_dataset(pattern):
  return tf.data.experimental.make_csv_dataset(pattern, 1, CSV_COLUMNS, DEFAULTS)

In [8]:
if DEVELOP_MODE:
    dataset = load_dataset(TRAIN_DATA_PATTERN)
    for n, data in enumerate(dataset):
        numpy_data = {k: v.numpy() for k, v in data.items()} # .numpy() works only in eager mode
        print(numpy_data)
        if n>3: break

{'ontime': array([0.], dtype=float32), 'dep_delay': array([34.], dtype=float32), 'taxiout': array([17.], dtype=float32), 'distance': array([453.], dtype=float32), 'avg_dep_delay': array([26.245787], dtype=float32), 'avg_arr_delay': array([6.], dtype=float32), 'carrier': array([b'DL'], dtype=object), 'dep_lat': array([33.636665], dtype=float32), 'dep_lon': array([-84.42778], dtype=float32), 'arr_lat': array([34.729443], dtype=float32), 'arr_lon': array([-92.224724], dtype=float32), 'origin': array([b'ATL'], dtype=object), 'dest': array([b'LIT'], dtype=object)}
{'ontime': array([1.], dtype=float32), 'dep_delay': array([-1.], dtype=float32), 'taxiout': array([20.], dtype=float32), 'distance': array([453.], dtype=float32), 'avg_dep_delay': array([26.245787], dtype=float32), 'avg_arr_delay': array([-6.], dtype=float32), 'carrier': array([b'DL'], dtype=object), 'dep_lat': array([33.636665], dtype=float32), 'dep_lon': array([-84.42778], dtype=float32), 'arr_lat': array([34.729443], dtype=floa

In [9]:
%%writefile example_input.json
{"dep_delay": 14.0, "taxiout": 13.0, "distance": 319.0, "avg_dep_delay": 25.863039, "avg_arr_delay": 27.0, "carrier": "WN", "dep_lat": 32.84722, "dep_lon": -96.85167, "arr_lat": 31.9425, "arr_lon": -102.20194, "origin": "DAL", "dest": "MAF"}
{"dep_delay": -9.0, "taxiout": 21.0, "distance": 301.0, "avg_dep_delay": 41.050808, "avg_arr_delay": -7.0, "carrier": "EV", "dep_lat": 29.984444, "dep_lon": -95.34139, "arr_lat": 27.544167, "arr_lon": -99.46167, "origin": "IAH", "dest": "LRD"}

Overwriting example_input.json


In [10]:
def features_and_labels(features):
  label = features.pop('ontime') # this is what we will train for
  return features, label

def prepare_dataset(pattern, batch_size, truncate=None, mode=tf.estimator.ModeKeys.TRAIN):
  dataset = load_dataset(pattern)
  dataset = dataset.map(features_and_labels)
  dataset = dataset.cache()
  if mode == tf.estimator.ModeKeys.TRAIN:
    dataset = dataset.shuffle(1000)
    dataset = dataset.repeat()
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(1)
  if truncate is not None:
    dataset = dataset.take(truncate)
  return dataset

if DEVELOP_MODE:
    print("Calling prepare")
    one_item = prepare_dataset(TRAIN_DATA_PATTERN, batch_size=5, truncate=1)
    print(list(one_item)) # should print one batch of 2 items

Calling prepare
[(OrderedDict([('dep_delay', <tf.Tensor: id=345, shape=(5, 1), dtype=float32, numpy=
array([[-3.],
       [-6.],
       [-3.],
       [99.],
       [-3.]], dtype=float32)>), ('taxiout', <tf.Tensor: id=351, shape=(5, 1), dtype=float32, numpy=
array([[10.],
       [ 9.],
       [12.],
       [19.],
       [20.]], dtype=float32)>), ('distance', <tf.Tensor: id=349, shape=(5, 1), dtype=float32, numpy=
array([[ 399.],
       [ 519.],
       [ 511.],
       [1437.],
       [1437.]], dtype=float32)>), ('avg_dep_delay', <tf.Tensor: id=343, shape=(5, 1), dtype=float32, numpy=
array([[22.600828],
       [26.191599],
       [24.076136],
       [30.699594],
       [37.17035 ]], dtype=float32)>), ('avg_arr_delay', <tf.Tensor: id=342, shape=(5, 1), dtype=float32, numpy=
array([[-14.5 ],
       [-14.75],
       [ -1.  ],
       [ 23.75],
       [-10.  ]], dtype=float32)>), ('carrier', <tf.Tensor: id=344, shape=(5, 1), dtype=string, numpy=
array([[b'OO'],
       [b'WN'],
       [b'WN'],

## Create TensorFlow wide-and-deep model

We'll create feature columns, and do some discretization and feature engineering.
See the book for details.

In [11]:
import tensorflow.feature_column as fc

real = {
    colname : fc.numeric_column(colname) \
          for colname in \
            ('dep_delay,taxiout,distance,avg_dep_delay,avg_arr_delay' +
             ',dep_lat,dep_lon,arr_lat,arr_lon').split(',')
}
sparse = {
      'carrier': fc.categorical_column_with_vocabulary_list('carrier',
                  vocabulary_list='AS,VX,F9,UA,US,WN,HA,EV,MQ,DL,OO,B6,NK,AA'.split(',')),
      'origin' : fc.categorical_column_with_hash_bucket('origin', hash_bucket_size=1000),
      'dest'   : fc.categorical_column_with_hash_bucket('dest', hash_bucket_size=1000)
}

### Feature engineering

In [12]:
latbuckets = np.linspace(20.0, 50.0, NBUCKETS).tolist()  # USA
lonbuckets = np.linspace(-120.0, -70.0, NBUCKETS).tolist() # USA
disc = {}
disc.update({
       'd_{}'.format(key) : fc.bucketized_column(real[key], latbuckets) \
          for key in ['dep_lat', 'arr_lat']
})
disc.update({
       'd_{}'.format(key) : fc.bucketized_column(real[key], lonbuckets) \
          for key in ['dep_lon', 'arr_lon']
})

# cross columns that make sense in combination
sparse['dep_loc'] = fc.crossed_column([disc['d_dep_lat'], disc['d_dep_lon']], NBUCKETS*NBUCKETS)
sparse['arr_loc'] = fc.crossed_column([disc['d_arr_lat'], disc['d_arr_lon']], NBUCKETS*NBUCKETS)
sparse['dep_arr'] = fc.crossed_column([sparse['dep_loc'], sparse['arr_loc']], NBUCKETS ** 4)
sparse['ori_dest'] = fc.crossed_column(['origin', 'dest'], hash_bucket_size=1000)

# embed all the sparse columns
embed = {
       colname : fc.embedding_column(col, 10) \
          for colname, col in sparse.items()
}
real.update(embed)

if DEVELOP_MODE:
    print(sparse.keys())
    print(real.keys())

dict_keys(['carrier', 'origin', 'dest', 'dep_loc', 'arr_loc', 'dep_arr', 'ori_dest'])
dict_keys(['dep_delay', 'taxiout', 'distance', 'avg_dep_delay', 'avg_arr_delay', 'dep_lat', 'dep_lon', 'arr_lat', 'arr_lon', 'carrier', 'origin', 'dest', 'dep_loc', 'arr_loc', 'dep_arr', 'ori_dest'])


### Serving

This serving input function is how the model will be deployed for prediction. We require these fields for prediction

In [21]:
from tensorflow.compat.v1 import placeholder
def serving_input_fn():
    feature_placeholders = {
        # All the real-valued columns
        column: placeholder(tf.float32, [None]) \
             for column in ('dep_delay,taxiout,distance,avg_dep_delay,avg_arr_delay' + 
                            ',dep_lat,dep_lon,arr_lat,arr_lon').split(',')
    }
    feature_placeholders.update({
        column: placeholder(tf.string, [None]) for column in ['carrier', 'origin', 'dest']
    })
    features = feature_placeholders # no transformations
    return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)

## Train the model and evaluate once in a while

Also checkpoint

In [22]:
model_dir='gs://{}/flights/trained_model'.format(BUCKET)
os.environ['OUTDIR'] = model_dir  # needed for deployment
print('Writing trained model to {}'.format(model_dir))

Writing trained model to gs://cloud-training-demos-ml/flights/trained_model


In [23]:
!gsutil -m rm -rf $OUTDIR

Removing gs://cloud-training-demos-ml/flights/trained_model/checkpoint#1553294072663821...
Removing gs://cloud-training-demos-ml/flights/trained_model/eval/events.out.tfevents.1553293993.laktfnightly#1553294081019312...
Removing gs://cloud-training-demos-ml/flights/trained_model/export/#1553294082698379...
Removing gs://cloud-training-demos-ml/flights/trained_model/export/exporter/temp-b'1553294081'/#1553294083651837...
Removing gs://cloud-training-demos-ml/flights/trained_model/export/exporter/#1553294083180243...
Removing gs://cloud-training-demos-ml/flights/trained_model/graph.pbtxt#1553294056478447...
Removing gs://cloud-training-demos-ml/flights/trained_model/model.ckpt-10.index#1553294071477769...
Removing gs://cloud-training-demos-ml/flights/trained_model/model.ckpt-0.data-00001-of-00002#1553294060756692...
Removing gs://cloud-training-demos-ml/flights/trained_model/model.ckpt-10.data-00000-of-00002#1553294070951705...
Removing gs://cloud-training-demos-ml/flights/trained_model/

In [24]:
estimator = tf.compat.v1.estimator.DNNLinearCombinedClassifier(
        model_dir = model_dir,
        linear_feature_columns = sparse.values(),
        dnn_feature_columns = real.values(),
        dnn_hidden_units = [64, 32])

train_batch_size = 64
train_input_fn = lambda: prepare_dataset(TRAIN_DATA_PATTERN, train_batch_size)
eval_batch_size = 100 if DEVELOP_MODE else 10000
eval_input_fn = lambda: prepare_dataset(VALID_DATA_PATTERN, eval_batch_size, eval_batch_size*10, tf.estimator.ModeKeys.EVAL)
num_steps = 10 if DEVELOP_MODE else (1000000 // train_batch_size)

train_spec = tf.estimator.TrainSpec(train_input_fn, max_steps = num_steps)
exporter = tf.estimator.LatestExporter('exporter', serving_input_fn)
eval_spec = tf.estimator.EvalSpec(eval_input_fn, steps=10, exporters=exporter)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

W0322 22:37:58.388895 140055668778368 metrics_impl.py:783] Trapezoidal rule is known to produce incorrect PR-AUCs; please switch to "careful_interpolation" instead.
W0322 22:37:58.418073 140055668778368 metrics_impl.py:783] Trapezoidal rule is known to produce incorrect PR-AUCs; please switch to "careful_interpolation" instead.
W0322 22:38:07.437546 140055668778368 deprecation.py:323] From /opt/anaconda3/lib/python3.6/site-packages/tensorflow/python/saved_model/signature_def_utils_impl.py:201: build_tensor_info (from tensorflow.python.saved_model.utils_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.
W0322 22:38:07.439567 140055668778368 tf_logging.py:161] Export includes no default signature!


({'accuracy': 0.785,
  'accuracy_baseline': 0.824,
  'auc': 0.84984213,
  'auc_precision_recall': 0.9575325,
  'average_loss': 0.6758764,
  'label/mean': 0.824,
  'loss': 67.58764,
  'precision': 0.93191487,
  'prediction/mean': 0.68610954,
  'recall': 0.7973301,
  'global_step': 10},
 [b'gs://cloud-training-demos-ml/flights/trained_model/export/exporter/1553294283'])

## Deploy the trained model

In [25]:
%%bash
model_dir=$(gsutil ls ${OUTDIR}/export/exporter | tail -1)
echo $model_dir
saved_model_cli show --dir ${model_dir} --all

gs://cloud-training-demos-ml/flights/trained_model/export/exporter/1553294283/

MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['predict']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['arr_lat'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: Placeholder_7:0
    inputs['arr_lon'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: Placeholder_8:0
    inputs['avg_arr_delay'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: Placeholder_4:0
    inputs['avg_dep_delay'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: Placeholder_3:0
    inputs['carrier'] tensor_info:
        dtype: DT_STRING
        shape: (-1)
        name: Placeholder_9:0
    inputs['dep_delay'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: Placeholder:0
    inputs['dep_lat'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
     

In [None]:
%%bash
MODEL_NAME="flights"
MODEL_VERSION="kfp"
TFVERSION="2.0"
MODEL_LOCATION=$(gsutil ls ${OUTDIR}/export/exporter | tail -1)
echo "Run these commands one-by-one (the very first time, you'll create a model and then create a version)"
#yes | gcloud ml-engine versions delete ${MODEL_VERSION} --model ${MODEL_NAME}
#gcloud ml-engine models delete ${MODEL_NAME}
gcloud ml-engine models create ${MODEL_NAME} --regions $REGION
gcloud ml-engine versions create ${MODEL_VERSION} --model ${MODEL_NAME} --origin ${MODEL_LOCATION} --runtime-version $TFVERSION

In [None]:
!gcloud ml-engine predict --model=flights --version=kfp --json-instances=example_input.json

Copyright 2016 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License