## This notebook describes how to implement distributed tensorflow code.

Content of this notebook is shown below.

0. Prepare CIFAR-10 Dataset (TFRecords Format)
1. Define dataset and global constants
2. Define data input function
3. Define feature columns
4. Instantiate an Estimator
5. Train, evaluate and export ML models
6. Evaluate with Estimator
7. Prediction with Exported Model
8. Distributed Training with Cloud ML Engine

## 1. Prepare CIFAR-10 Dataset (TFRecords Format)

In [None]:
import cPickle
import os
import re
import shutil
import tarfile
import tensorflow as tf

print(tf.__version__)

In [None]:
CIFAR_FILENAME = 'cifar-10-python.tar.gz'
CIFAR_DOWNLOAD_URL = 'http://www.cs.toronto.edu/~kriz/' + CIFAR_FILENAME
CIFAR_LOCAL_FOLDER = 'cifar-10-batches-py'

In [None]:
def _download_and_extract(data_dir):
  tf.contrib.learn.datasets.base.maybe_download(CIFAR_FILENAME, data_dir, CIFAR_DOWNLOAD_URL)
  tarfile.open(os.path.join(data_dir, CIFAR_FILENAME), 'r:gz').extractall(data_dir)

In [None]:
def _get_file_names():
  """Returns the file names expected to exist in the input_dir."""
  file_names = {}
  file_names['train'] = ['data_batch_%d' % i for i in xrange(1, 5)]
  file_names['validation'] = ['data_batch_5']
  file_names['eval'] = ['test_batch']
  return file_names

In [None]:
def _read_pickle_from_file(filename):
  with tf.gfile.Open(filename, 'r') as f:
    data_dict = cPickle.load(f)
  return data_dict

In [None]:
def _convert_to_tfrecord(input_files, output_file):
  """Converts a file to TFRecords."""
  print('Generating %s' % output_file)
  with tf.python_io.TFRecordWriter(output_file) as record_writer:
    for input_file in input_files:
      data_dict = _read_pickle_from_file(input_file)
      data = data_dict['data']
      labels =  data_dict['labels']
      num_entries_in_batch = len(labels)
      for i in range(num_entries_in_batch):
        example = tf.train.Example(features=tf.train.Features(
          feature={
            'image': _bytes_feature(data[i].tobytes()),
            'label': _int64_feature(labels[i])
          }))
        record_writer.write(example.SerializeToString())

In [None]:
def _int64_feature(value):
  return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

In [None]:
def _bytes_feature(value):
  return tf.train.Feature(bytes_list=tf.train.BytesList(value=[str(value)]))

In [None]:
def create_tfrecords_files(data_dir='cifar-10'):
  _download_and_extract(data_dir)
  file_names = _get_file_names()
  input_dir = os.path.join(data_dir, CIFAR_LOCAL_FOLDER)

  for mode, files in file_names.items():
    input_files = [os.path.join(input_dir, f) for f in files]
    output_file = os.path.join(data_dir, mode+'.tfrecords')
    try:
      os.remove(output_file)
    except OSError:
      pass
    # Convert to tf.train.Example and write to TFRecords.
    _convert_to_tfrecord(input_files, output_file)

In [None]:
create_tfrecords_files()

## 2. Define Parameters

In [None]:
class FLAGS():
  pass

FLAGS.batch_size = 200
FLAGS.max_steps = 1000
FLAGS.eval_steps = 100
FLAGS.save_checkpoints_steps = 100
FLAGS.tf_random_seed = 19851211
FLAGS.model_name = 'cnn-model-02'

# We use a weight decay of 0.0002, which performs better than the 0.0001 that
# was originally suggested.
FLAGS.weight_decay = 2e-4
FLAGS.momentum = 0.9

# If a model is trained with multiple GPUs, prefix all Op names with tower_name
# to differentiate the operations. Note that this prefix is removed from the
# names of the summaries when visualizing a model.
FLAGS.tower_name = 'tower'
FLAGS.use_checkpoint = False

In [None]:
# Process images of this size. Note that this differs from the original CIFAR
# image size of 32 x 32. If one alters this number, then the entire model
# architecture will change and any model would need to be retrained.
IMAGE_HEIGHT = 32
IMAGE_WIDTH = 32
IMAGE_DEPTH = 3
NUM_CLASSES = 10

# Global constants describing the CIFAR-10 data set.
NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 50000
NUM_EXAMPLES_PER_EPOCH_FOR_EVAL = 10000

## 3. Define Data Input Function

In [None]:
def parse_record(serialized_example):
  features = tf.parse_single_example(
    serialized_example,
    features={
      'image': tf.FixedLenFeature([], tf.string),
      'label': tf.FixedLenFeature([], tf.int64),
    })
  
  image = tf.decode_raw(features['image'], tf.uint8)
  image.set_shape([IMAGE_DEPTH * IMAGE_HEIGHT * IMAGE_WIDTH])
  image = tf.reshape(image, [IMAGE_DEPTH, IMAGE_HEIGHT, IMAGE_WIDTH])
  image = tf.cast(tf.transpose(image, [1, 2, 0]), tf.float32)
  
  label = tf.cast(features['label'], tf.int32)
  label = tf.one_hot(label, NUM_CLASSES)

  return image, label

In [None]:
def preprocess_image(image, is_training=False):
  """Preprocess a single image of layout [height, width, depth]."""
  if is_training:
    # Resize the image to add four extra pixels on each side.
    image = tf.image.resize_image_with_crop_or_pad(
        image, IMAGE_HEIGHT + 8, IMAGE_WIDTH + 8)

    # Randomly crop a [_HEIGHT, _WIDTH] section of the image.
    image = tf.random_crop(image, [IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_DEPTH])

    # Randomly flip the image horizontally.
    image = tf.image.random_flip_left_right(image)

  # Subtract off the mean and divide by the variance of the pixels.
  image = tf.image.per_image_standardization(image)
  return image

In [None]:
def generate_input_fn(file_names, mode=tf.estimator.ModeKeys.EVAL, batch_size=1):
  def _input_fn():
    dataset = tf.data.TFRecordDataset(filenames=file_names)

    is_training = (mode == tf.estimator.ModeKeys.TRAIN)
    if is_training:
      buffer_size = batch_size * 2 + 1
      dataset = dataset.shuffle(buffer_size=buffer_size)

    # Transformation
    dataset = dataset.map(parse_record)
    dataset = dataset.map(
      lambda image, label: (preprocess_image(image, is_training), label))

    dataset = dataset.repeat()
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(2 * batch_size)

    images, labels = dataset.make_one_shot_iterator().get_next()

    features = {'images': images}
    return features, labels
  
  return _input_fn

## 4. Define Features

In [None]:
def get_feature_columns():
  feature_columns = {
    'images': tf.feature_column.numeric_column('images', (IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_DEPTH)),
  }
  return feature_columns

In [None]:
feature_columns = get_feature_columns()
print("Feature Columns: {}".format(feature_columns))

## 5. Instantiate an Estimator

In [None]:
def _activation_summary(x):
  """Helper to create summaries for activations.
  Creates a summary that provides a histogram of activations.
  Creates a summary that measures the sparsity of activations.
  Args:
    x: Tensor
  Returns:
    nothing
  """
  # Remove 'tower_[0-9]/' from the name in case this is a multi-GPU training
  # session. This helps the clarity of presentation on tensorboard.
  tensor_name = re.sub('%s_[0-9]*/' % FLAGS.tower_name, '', x.op.name)
  tf.summary.histogram(tensor_name + '/activations', x)
  tf.summary.scalar(tensor_name + '/sparsity', tf.nn.zero_fraction(x))
  
def _variable_on_cpu(name, shape, initializer):
  """Helper to create a Variable stored on CPU memory.
  Args:
    name: name of the variable
    shape: list of ints
    initializer: initializer for Variable
  Returns:
    Variable Tensor
  """
  with tf.device('/cpu:0'):
    dtype = tf.float32
    var = tf.get_variable(name, shape, initializer=initializer, dtype=dtype)
  return var

def _variable_with_weight_decay(name, shape, stddev, wd):
  """Helper to create an initialized Variable with weight decay.
  Note that the Variable is initialized with a truncated normal distribution.
  A weight decay is added only if one is specified.
  Args:
    name: name of the variable
    shape: list of ints
    stddev: standard deviation of a truncated Gaussian
    wd: add L2Loss weight decay multiplied by this float. If None, weight
        decay is not added for this Variable.
  Returns:
    Variable Tensor
  """
  dtype = tf.float32
  var = _variable_on_cpu(
      name,
      shape,
      tf.truncated_normal_initializer(stddev=stddev, dtype=dtype))
  if wd is not None:
    weight_decay = tf.multiply(tf.nn.l2_loss(var), wd, name='weight_loss')
    tf.add_to_collection('losses', weight_decay)
  return var

In [None]:
def inference(images):
  with tf.variable_scope('conv1') as scope:
    kernel = _variable_with_weight_decay('weights', shape=[5, 5, 3, 64], stddev=5e-2, wd=0.0)
    conv = tf.nn.conv2d(images, kernel, [1, 1, 1, 1], padding='SAME')
    biases = _variable_on_cpu('biases', [64], tf.constant_initializer(0.0))
    pre_activation = tf.nn.bias_add(conv, biases)
    conv1 = tf.nn.relu(pre_activation, name=scope.name)
    _activation_summary(conv1)
    
  pool1 = tf.nn.max_pool(conv1, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1], padding='SAME', name='pool1')
  norm1 = tf.nn.lrn(pool1, 4, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name='norm1')
  
  with tf.variable_scope('conv2') as scope:
    kernel = _variable_with_weight_decay('weights', shape=[5, 5, 64, 64], stddev=5e-2, wd=0.0)
    conv = tf.nn.conv2d(norm1, kernel, [1, 1, 1, 1], padding='SAME')
    biases = _variable_on_cpu('biases', [64], tf.constant_initializer(0.1))
    pre_activation = tf.nn.bias_add(conv, biases)
    conv2 = tf.nn.relu(pre_activation, name=scope.name)
    _activation_summary(conv2)

  norm2 = tf.nn.lrn(conv2, 4, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name='norm2')
  pool2 = tf.nn.max_pool(norm2, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1], padding='SAME', name='pool2')
    
  with tf.variable_scope('local3') as scope:
    pool2_shape = pool2.get_shape()
    dim = pool2_shape[1] * pool2_shape[2] * pool2_shape[3]
    reshape = tf.reshape(pool2, [-1, dim])
    weights = _variable_with_weight_decay('weights', shape=[dim, 384], stddev=0.04, wd=0.004)
    biases = _variable_on_cpu('biases', [384], tf.constant_initializer(0.1))
    local3 = tf.nn.relu(tf.matmul(reshape, weights) + biases, name=scope.name)
    _activation_summary(local3)

  with tf.variable_scope('local4') as scope:
    weights = _variable_with_weight_decay('weights', shape=[384, 192], stddev=0.04, wd=0.004)
    biases = _variable_on_cpu('biases', [192], tf.constant_initializer(0.1))
    local4 = tf.nn.relu(tf.matmul(local3, weights) + biases, name=scope.name)
    _activation_summary(local4)

  with tf.variable_scope('softmax_linear') as scope:
    weights = _variable_with_weight_decay('weights', [192, NUM_CLASSES], stddev=1/192.0, wd=0.0)
    biases = _variable_on_cpu('biases', [NUM_CLASSES], tf.constant_initializer(0.0))
    logits = tf.add(tf.matmul(local4, weights), biases, name=scope.name)
    _activation_summary(logits)

  return logits

In [None]:
def get_loss(logits, labels):
  # Calculate loss, which includes softmax cross entropy and L2 regularization.
  cross_entropy = tf.losses.softmax_cross_entropy(
    logits=logits, onehot_labels=labels)

  # Create a tensor named cross_entropy for logging purposes.
  tf.identity(cross_entropy, name='cross_entropy')
  tf.summary.scalar('cross_entropy', cross_entropy)

  # Add weight decay to the loss.
  loss = cross_entropy + FLAGS.weight_decay * tf.add_n(
      [tf.nn.l2_loss(v) for v in tf.trainable_variables()])
  
  return loss

In [None]:
def get_train_op(loss, params, mode):
  if mode == tf.estimator.ModeKeys.TRAIN:
    # Scale the learning rate linearly with the batch size. When the batch size
    # is 128, the learning rate should be 0.1.
    initial_learning_rate = 0.1 * params.batch_size / 128
    batches_per_epoch = NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN / params.batch_size
    global_step = tf.train.get_or_create_global_step()

    # Multiply the learning rate by 0.1 at 100, 150, and 200 epochs.
    boundaries = [int(batches_per_epoch * epoch) for epoch in [100, 150, 200]]
    values = [initial_learning_rate * decay for decay in [1, 0.1, 0.01, 0.001]]
    learning_rate = tf.train.piecewise_constant(
        tf.cast(global_step, tf.int32), boundaries, values)

    # Create a tensor named learning_rate for logging purposes
    tf.identity(learning_rate, name='learning_rate')
    tf.summary.scalar('learning_rate', learning_rate)

    optimizer = tf.train.MomentumOptimizer(
        learning_rate=learning_rate,
        momentum=FLAGS.momentum)

    # Batch norm requires update ops to be added as a dependency to the train_op
    update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
    with tf.control_dependencies(update_ops):
      train_op = optimizer.minimize(loss, global_step)
  else:
    train_op = None
    
  return train_op

In [None]:
def get_metrics(predictions, labels):
  # Calculate accuracy
  accuracy = tf.metrics.accuracy(predictions['classes'],
                                 tf.argmax(labels, axis=1))

  # Create a tensor named train_accuracy for logging purposes
  tf.identity(accuracy[1], name='train_accuracy')
  tf.summary.scalar('train_accuracy', accuracy[1])
  
  return {'accuracy': accuracy}

In [None]:
def get_estimator(run_config, hparams):
  def _model_fn(features, labels, mode, params):
    # Create the input layers from the features
    feature_columns = list(get_feature_columns().values())

    images = tf.feature_column.input_layer(
      features=features, feature_columns=feature_columns)

    images = tf.reshape(
      images, shape=(-1, IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_DEPTH))

    # Calculate logits through CNN
    logits = inference(images)

    # Get predictions
    predictions = {
      'classes': tf.argmax(logits, axis=1),
      'probabilities': tf.nn.softmax(logits, name='softmax_tensor')
    }

    # Provide an estimator spec for `ModeKeys.PREDICT`
    if mode == tf.estimator.ModeKeys.PREDICT:
      export_outputs = {
        'predictions': tf.estimator.export.PredictOutput(predictions)
      }
      return tf.estimator.EstimatorSpec(mode=mode,
                                        predictions=predictions,
                                        export_outputs=export_outputs)

    loss = get_loss(logits=logits, labels=labels)
    train_op = get_train_op(loss=loss, mode=mode, params=params)
    metrics = get_metrics(predictions=predictions, labels=labels)

    # Return EstimatorSpec
    return tf.estimator.EstimatorSpec(
      mode=mode,
      predictions=predictions,
      loss=loss,
      train_op=train_op,
      eval_metric_ops=metrics)

  return tf.estimator.Estimator(model_fn=_model_fn,
                                params=hparams,
                                config=run_config)

## 5. Define Serving Function

In [None]:
def serving_input_fn():
  receiver_tensor = {'images': tf.placeholder(
    shape=[None, IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_DEPTH], dtype=tf.float32)}
  features = {'images': tf.map_fn(preprocess_image, receiver_tensor['images'])}
  return tf.estimator.export.ServingInputReceiver(features, receiver_tensor)

## 6. Train, Evaluate and Export a ML Model

In [None]:
model_dir = 'trained_models/{}'.format(FLAGS.model_name)
train_data_files = ['cifar-10/train.tfrecords']
valid_data_files = ['cifar-10/validation.tfrecords']
test_data_files = ['cifar-10/eval.tfrecords']
train_data_files = ['gs://yaboo-sandbox-poc-for-lg/cifar-10/train.tfrecords']
valid_data_files = ['gs://yaboo-sandbox-poc-for-lg/cifar-10/validation.tfrecords']
test_data_files = ['gs://yaboo-sandbox-poc-for-lg/cifar-10/eval.tfrecords']

In [None]:
hparams = tf.contrib.training.HParams(
  batch_size=FLAGS.batch_size,
  max_steps=FLAGS.max_steps,
)

run_config = tf.estimator.RunConfig(
  save_checkpoints_steps=FLAGS.save_checkpoints_steps,
  tf_random_seed=FLAGS.tf_random_seed,
  model_dir=model_dir
)

estimator = get_estimator(run_config, hparams)

# There is another Exporter named FinalExporter
exporter = tf.estimator.LatestExporter(
  name='Servo',
  serving_input_receiver_fn=serving_input_fn,
  assets_extra=None,
  as_text=False,
  exports_to_keep=5)


train_spec = tf.estimator.TrainSpec(
  input_fn=generate_input_fn(file_names=train_data_files,
                             mode=tf.estimator.ModeKeys.TRAIN,
                             batch_size=hparams.batch_size),
  max_steps=FLAGS.max_steps)

eval_spec = tf.estimator.EvalSpec(
  input_fn=generate_input_fn(file_names=valid_data_files,
                             mode=tf.estimator.ModeKeys.EVAL,
                             batch_size=hparams.batch_size),
  steps=FLAGS.eval_steps, exporters=exporter)

In [None]:
if not FLAGS.use_checkpoint:
  print("Removing previous artifacts...")
  shutil.rmtree(model_dir, ignore_errors=True)

tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

## 6. Evaluate with Estimator

In [None]:
test_input_fn = generate_input_fn(file_names=test_data_files,
                                  mode=tf.estimator.ModeKeys.EVAL,
                                  batch_size=1000)
estimator = get_estimator(run_config, hparams)
print(estimator.evaluate(input_fn=test_input_fn, steps=1))

## 8. Prediction with Exported Model

In [None]:
export_dir = model_dir + '/export/Servo/'
saved_model_dir = os.path.join(export_dir, os.listdir(export_dir)[-1]) 

predictor_fn = tf.contrib.predictor.from_saved_model(
  export_dir = saved_model_dir,
  signature_def_key='predictions')

In [None]:
import numpy

data_dict = _read_pickle_from_file('cifar-10/cifar-10-batches-py/test_batch')

N = 1000
images = data_dict['data'][:N].reshape([N, 3, 32, 32]).transpose([0, 2, 3, 1])
labels = data_dict['labels'][:N]

output = predictor_fn({'images': images})
accuracy = numpy.sum(
  [ans==ret for ans, ret in zip(labels, output['classes'])]) / float(N)

print(accuracy)

## 9. Distributed Training with Cloud ML Engine

### a. Set environments

In [None]:
import os

PROJECT = 'project-id' # REPLACE WITH YOUR PROJECT ID
BUCKET = 'bucket-name' # REPLACE WITH YOUR BUCKET NAME
REGION = 'bucket-region' # REPLACE WITH YOUR BUCKET REGION e.g. us-central1

os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION

In [None]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

### b. Set permission to BUCKET (NOTE: Create bucket beforehand)

In [None]:
%%bash

PROJECT_ID=$PROJECT
AUTH_TOKEN=$(gcloud auth print-access-token)

SVC_ACCOUNT=$(curl -X GET -H "Content-Type: application/json" \
    -H "Authorization: Bearer $AUTH_TOKEN" \
    https://ml.googleapis.com/v1/projects/${PROJECT_ID}:getConfig \
    | python -c "import json; import sys; response = json.load(sys.stdin); \
    print response['serviceAccount']")

echo "Authorizing the Cloud ML Service account $SVC_ACCOUNT to access files in $BUCKET"
gsutil -m defacl ch -u $SVC_ACCOUNT:R gs://$BUCKET
gsutil -m acl ch -u $SVC_ACCOUNT:R -r gs://$BUCKET  # error message (if bucket is empty) can be ignored
gsutil -m acl ch -u $SVC_ACCOUNT:W gs://$BUCKET

### c. Copy TFRecords files to GCS BUCKET

In [None]:
%%bash

echo ${BUCKET}
gsutil -m rm -rf gs://${BUCKET}/cifar-10
gsutil -m cp cifar-10/*.tfrecords gs://${BUCKET}/cifar-10

### d. Run distributed training with Cloud MLE

In [None]:
%%bash
OUTDIR=gs://$BUCKET/trained_models
JOBNAME=sm_$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME

gsutil -m rm -rf $OUTDIR
gcloud ml-engine jobs submit training $JOBNAME \
   --region=$REGION \
   --module-name=cnn-model-02.task \
   --package-path="$(pwd)/trainer/cnn-model-02" \
   --job-dir=$OUTDIR \
   --staging-bucket=gs://$BUCKET \
   --scale-tier=STANDARD_1 \
   --runtime-version=1.4 \
   -- \
   --bucket_name=$BUCKET \
   --train_data_pattern=cifar-10/train*.tfrecords \
   --eval_data_pattern=cifar-10/eval*.tfrecords  \
   --output_dir=$OUTDIR