# Training and serving Tensorflow Estimators
This notebook shows step by step how to generate input data, train and evaluate a Convolutional Neural Network model, output results, save models, and make predictions using the high-level [Estimator API](https://www.tensorflow.org/guide/estimators)

In [1]:
import numpy as np
from pathlib import Path

import tensorflow as tf
from tensorflow.contrib import predictor

## Global variables

In [2]:
tf.logging.set_verbosity(tf.logging.INFO)

BATCH_SIZE = 100                      # number of samples per batch
MAX_STEPS = 1000                      # max number of training steps
EVAL_STEPS = 1                        # number of steps to run evaluation

SAVE_SUMMARY_STEPS = 100              # frequency of summary saving
SAVE_CHECKPOINTS_STEPS = 200          # frequency of checkpoint saving - also correspond to evaluation frequency
LOGGING_STEPS = 50                    # frequency of logging output

MODEL_DIR = '../models/ckpt/'         # save model and checkpoints
SAVE_DIR = '../models/pb/'            # save model for TF serving

## Load data

In [3]:
# Load training and eval data
((train_data, train_labels),
 (eval_data, eval_labels)) = tf.keras.datasets.mnist.load_data()

train_data = train_data/np.float32(255)
train_labels = train_labels.astype(np.int32)  # not required

eval_data = eval_data/np.float32(255)
eval_labels = eval_labels.astype(np.int32)  # not required

print('train data shape =', train_data.shape)
print('train labels shape =', train_labels.shape)
print('evaluation data shape =', eval_data.shape)
print('evaluation labels shape =', eval_labels.shape)

train data shape = (60000, 28, 28)
train labels shape = (60000,)
evaluation data shape = (10000, 28, 28)
evaluation labels shape = (10000,)


## Estimator Input Data
To load data into Estimator `input_fn()` is created through the `tf.data.Dataset.from_tensor_slices` API, which allows to feed numpy arrays into the model function. In this case trainining features are returned by `input_fn()` as a dictionary `{'x': features}`, where `features` is a batch of image numpy arrays, instead of passing directly the array `features` to the model. This is necessary because during serving the serialized input must be in the form `{'key': feature}`.  

In [4]:
def input_fn(data, labels, repeat=1):
    
    dataset = tf.data.Dataset.from_tensor_slices((data, labels))
    dataset = dataset.repeat(repeat) # None
    dataset = dataset.batch(BATCH_SIZE)
    features, labels = dataset.make_one_shot_iterator().get_next()
    
    return {'x':features}, labels

# verify that the output of input_fn is correct
features_batch, labels_batch = input_fn(train_data, train_labels)
with tf.Session() as sess:
    for i in range(5):
        features, labels = sess.run([features_batch, labels_batch]) 
        print(features['x'].shape, labels.shape)


(100, 28, 28) (100,)
(100, 28, 28) (100,)
(100, 28, 28) (100,)
(100, 28, 28) (100,)
(100, 28, 28) (100,)


During prediction mode the data is not feed in batches but rather as single images/arrays. `predict_input_fn()` assumes that a single array of size 28x28 is to be classified, which needs to be reshape in the same format returned by `input_fn()` 

In [5]:
def predict_input_fn(predict_data):
    
    predict_data = tf.reshape(predict_data, [-1,28,28])
    dataset = tf.data.Dataset.from_tensor_slices((predict_data))
    dataset = dataset.repeat(None)
    dataset = dataset.batch(1)
    features = dataset.make_one_shot_iterator().get_next()
    
    return {'x':features}

# verify that the output of predict_input_fn is correct
predict_data = eval_data[0]        # sample of image to classify
features_pred = predict_input_fn(predict_data)

with tf.Session() as sess:
    feature_pred = sess.run(features_pred)
    print(feature_pred['x'].shape)

## Estimator Custom DNN Function

In [None]:
def cnn_model_fn(features, labels, mode, params):
    
    input_layer = tf.reshape(features['x'], [-1, 28, 28, 1])
    
    conv1 = tf.layers.conv2d(
        inputs=input_layer,
        filters=32,
        kernel_size=[5, 5],
        padding="same",
        activation=tf.nn.relu)
    
    pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2)
    
    conv2 = tf.layers.conv2d(
        inputs=pool1,
        filters=64,
        kernel_size=[5, 5],
        padding="same",
        activation=tf.nn.relu)
    
    pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2)
    
    pool2_flat = tf.reshape(pool2, [-1, 7 * 7 * 64])
    dense = tf.layers.dense(inputs=pool2_flat, units=1024, activation=tf.nn.relu)
    dropout = tf.layers.dropout(
        inputs=dense, rate=params['dropout_rate'], training=mode == tf.estimator.ModeKeys.TRAIN)
    
    logits = tf.layers.dense(inputs=dropout, units=10)
    predictions = {"classes": tf.argmax(input=logits, axis=1), "probabilities": tf.nn.softmax(logits)}
    
    optimizer = tf.train.GradientDescentOptimizer(learning_rate=params['learning_rate'])
    
    
    loss = train_op = eval_metric_ops = None
    
    if mode == tf.estimator.ModeKeys.PREDICT:
        
        logging_hook = tf.train.LoggingTensorHook({'predictions': predictions['classes']}, 
                                                  every_n_iter=LOGGING_STEPS)    
    
    if (mode == tf.estimator.ModeKeys.TRAIN or mode == tf.estimator.ModeKeys.EVAL):
        
        loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)
        tf.summary.scalar('loss', loss)
        
        accuracy = tf.metrics.accuracy(labels=labels, predictions=predictions["classes"])
        tf.summary.scalar('accuracy', accuracy[1])
        eval_metric_ops = {"accuracy": accuracy}
        
        logging_hook = tf.train.LoggingTensorHook({'loss': loss, 'accuracy': accuracy[1]}, 
                                                  every_n_iter=LOGGING_STEPS)
        
    if mode == tf.estimator.ModeKeys.TRAIN:
        
        train_op = optimizer.minimize(loss=loss, global_step=tf.train.get_global_step())
    
    estimator_spec = tf.estimator.EstimatorSpec(mode=mode, predictions=predictions, loss=loss, train_op=train_op, 
                                                eval_metric_ops=eval_metric_ops, training_hooks=[logging_hook], 
                                                prediction_hooks=[logging_hook])
        
    return estimator_spec

## Configure Estimator

In [None]:
training_config = tf.estimator.RunConfig(
    model_dir=MODEL_DIR,
    save_summary_steps=SAVE_SUMMARY_STEPS,
    save_checkpoints_steps=SAVE_CHECKPOINTS_STEPS)

mnist_classifier = tf.estimator.Estimator(
    model_fn=cnn_model_fn, 
    model_dir=MODEL_DIR,
    config=training_config,
    params={
        'dropout_rate': 0.4,
        'learning_rate': 0.001
    }
)

## Train and Evaluate

In [None]:
repeat = np.floor(MAX_STEPS/(len(train_data)/BATCH_SIZE)).astype(int)
train_spec = tf.estimator.TrainSpec(
    input_fn=lambda:input_fn(train_data, train_labels),
    max_steps=MAX_STEPS)

eval_spec = tf.estimator.EvalSpec(
    input_fn=lambda:input_fn(eval_data, eval_labels),
    steps=EVAL_STEPS,
    name='validation',
    start_delay_secs=10,
    throttle_secs=20)

tf.logging.info('start experiment...')
tf.estimator.train_and_evaluate(mnist_classifier, train_spec, eval_spec)

## Evaluate Model

In [None]:
eval_results = mnist_classifier.evaluate(input_fn=lambda:input_fn(eval_data, eval_labels))
print(eval_results)

## Model Prediction

In [None]:
pred_results = mnist_classifier.predict(input_fn=lambda:predict_input_fn(eval_data[0]))

In [None]:
print(next(pred_results))
print(eval_labels[0])

## Model Save and Predict

In [None]:
def serving_receiver_input_fn():
    
    # serialized tf example - assumes that input_images is a 4-D tensor of shape [batch, height, width, channels]
    input_images = tf.placeholder(dtype=tf.float32, shape=[None,None,None,1])
    # resize_images needs that the input tensor has the channel dimension
    images = tf.image.resize_images(input_images, [28,28])
    # dictionary passed to the model 
    features = {'x': images}
    # dictionary of serving input data 
    receiver_tensors = {'x': input_images}
    
    return tf.estimator.export.ServingInputReceiver(features, receiver_tensors)
    

In [None]:
mnist_classifier.export_savedmodel(SAVE_DIR, serving_input_receiver_fn=serving_receiver_input_fn)

In [None]:
subdirs = [x for x in Path(SAVE_DIR).iterdir() if x.is_dir() and 'temp' not in str(x)]
latest_dir = str(sorted(subdirs)[-1])
predict_fn = predictor.from_saved_model(latest_dir)

In [None]:
# serving images needs to be of shape [batch, height, width, channels]
serving_examples = eval_data[0:2].reshape(-1,28,28,1)
predictions = predict_fn({'x': serving_examples})
print('predictions', predictions['classes'])
print('true labels', eval_labels[0:2])