# Distributed Keras Neural Net

Now we're going to do a little Datalab-fu to get things running on CMLE.  The essence is that once you get to the point of a working, production-grade model you'll flip from Datalab to coding in Python script files because CMLE requires this (you need to supply a python package and module to the gcloud CLI.  Personally, what I do is once I have the code in the previous notebook running in Datalab I download the script as a Python script file and switch to my editor of choice (usually vi or emacs because I don't have much left to do in essence).

However, what I'll do here is create the required files via Datalab:

1. Some cleanup - in case you are running this notebook multiple times - by deleting the local directories for the Python package and the model training output.
2. Create a directory for our Python package (called trainer).
3. Create an (empty) __init__.py file in the trainer directory (which can be empty but serves to designate the directory as a package).
4. Take the exact same code as the last notebook (with the exception that we'll read all the arguments from the command line - in the last notebook we hardcoded some defaults) and save it in a file called task in the trainer directory.

Then we can run train and evaluate the mode locally and on CMLE proper.

To run locally we execute the following.

```
gcloud ml-engine local train --package-path trainer \
   --module-name trainer.task \
   -- \
   --train-files gs://smiling-beaming-abalone/abalone_train.csv \
   --eval-files gs://smiling-beaming-abalone/abalone_test.csv \
   --job-dir abalone_output \
   --train-steps 5000 \
   --eval-steps 100
```

Which is self explanatory.  To run on CMLE we execute the following.

```
!gcloud ml-engine jobs submit training abalone_$(date -u +%y%m%d_%H%M%S) \
  --stream-logs \
  --scale-tier STANDARD_1 \
  --runtime-version 1.2 \
  --job-dir gs://smiling-beaming-abalone/abalone_$(date -u +%y%m%d_%H%M%S) \
  --module-name trainer.task \
  --package-path trainer \
  --region us-central1 \
  -- \
  --train-files gs://smiling-beaming-abalone/abalone_train.csv \
  --eval-files gs://smiling-beaming-abalone/abalone_test.csv \
  --train-steps 5000 \
  --eval-steps 100
```

The only difference in this case is that we submit the job to the CMLE managed service and we choose a scale tier to indicate the amount of compute horsepower we want to throw at the problem.  In this case, we're using STANDARD_1 which gives us a master, three workers and three parameter servers (all CPU-based) which is teh smallest, standard footprint that gives us distributed training.

First some housekeeping, we need to delete the output folder we use locally between runs (in case you run this notebook multiple times).

In [1]:
!rm -rf trainer && rm -rf abalone_output

In [2]:
!mkdir -p trainer

In [3]:
!touch trainer/__init__.py

In [4]:
%%writefile trainer/task.py
#  Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
"""DNNRegressor with custom estimator for abalone dataset."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
import argparse
import multiprocessing

import six

import numpy as np

import tensorflow as tf
from tensorflow.python.estimator.model_fn import ModeKeys as Modes
from tensorflow.contrib.keras.python.keras.layers import Dense

from tensorflow.contrib.learn import learn_runner
from tensorflow.contrib.learn.python.learn.utils import (saved_model_export_utils)
from tensorflow.contrib.training.python.training import hparam

tf.logging.set_verbosity(tf.logging.INFO)

CSV_COLUMNS = [
    'length', 'diameter', 'height', 'whole_weight', 'shucked_weight',
    'viscera_weight', 'shell_weight', 'num_rings'
]
CSV_COLUMN_DEFAULTS = [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0]]

PREDICTED_COLUMN = 'num_rings'
INPUT_COLUMNS = [
    tf.feature_column.numeric_column('length'),
    tf.feature_column.numeric_column('diameter'),
    tf.feature_column.numeric_column('height'),
    tf.feature_column.numeric_column('whole_weight'),
    tf.feature_column.numeric_column('shucked_weight'),
    tf.feature_column.numeric_column('viscera_weight'),
    tf.feature_column.numeric_column('shell_weight'),
]

UNUSED_COLUMNS = set(CSV_COLUMNS) - {col.name for col in INPUT_COLUMNS} - {PREDICTED_COLUMN}

def parse_csv(rows_string_tensor):
    columns = tf.decode_csv(rows_string_tensor, record_defaults=CSV_COLUMN_DEFAULTS)
    features = dict(zip(CSV_COLUMNS, columns))

    for col in UNUSED_COLUMNS:
        features.pop(col)

    for key, value in six.iteritems(features):
        features[key] = tf.expand_dims(features[key], -1)
    return features

def generate_input_fn(filenames,
                      num_epochs=None,
                      shuffle=True,
                      skip_header_lines=0,
                      batch_size=64):
  
    def _input_fn():
  
        filename_queue = tf.train.string_input_producer(filenames, num_epochs=num_epochs, shuffle=shuffle)
        reader = tf.TextLineReader(skip_header_lines=skip_header_lines)

        _, rows = reader.read_up_to(filename_queue, num_records=batch_size)

        features = parse_csv(rows)

        if shuffle:
            features = tf.train.shuffle_batch(
                features,
                batch_size,
                min_after_dequeue=2 * batch_size + 1,
                capacity=batch_size * 10,
                num_threads=multiprocessing.cpu_count(),
                enqueue_many=True,
                allow_smaller_final_batch=True
            )
        else:
            features = tf.train.batch(
                features,
                batch_size,
                capacity=batch_size * 10,
                num_threads=multiprocessing.cpu_count(),
                enqueue_many=True,
                allow_smaller_final_batch=True
            )

        return features, features.pop(PREDICTED_COLUMN)
  
    return _input_fn

def generate_model_fn(learning_rate):
    
    def _model_fn(mode, features, labels):

        (length, diameter, height, whole_weight, shucked_weight, viscera_weight, shell_weight) = INPUT_COLUMNS

        transformed_columns = [
            length, diameter, height, whole_weight, shucked_weight, viscera_weight, shell_weight
        ]

        inputs = tf.feature_column.input_layer(features, transformed_columns)

        first_hidden_layer = Dense(10, activation='relu')(inputs)
        second_hidden_layer = Dense(10, activation='relu')(first_hidden_layer)
        output_layer = Dense(1, activation='linear')(second_hidden_layer)

        if mode in (Modes.PREDICT, Modes.EVAL):
            predictions = tf.reshape(output_layer, [-1])
            predictions_dict = {"ages": predictions}

        if mode in (Modes.TRAIN, Modes.EVAL):
            loss = tf.losses.mean_squared_error(labels, output_layer)

        if mode == Modes.TRAIN:
            train_op = tf.contrib.layers.optimize_loss(
                loss=loss,
                global_step=tf.contrib.framework.get_global_step(),
                learning_rate=learning_rate,
                optimizer="SGD")
        
        if mode == Modes.TRAIN:
            return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
        
        if mode == Modes.EVAL:
            eval_metric_ops = {
                "rmse": tf.metrics.root_mean_squared_error(
                    tf.cast(labels, tf.float32), predictions)
            }
            return tf.estimator.EstimatorSpec(mode, loss=loss, eval_metric_ops=eval_metric_ops)
        
        if mode == Modes.PREDICT:
            export_outputs = {
                'prediction': tf.estimator.export.RegressionOutput(predictions)
            }
            return tf.estimator.EstimatorSpec(
                mode, predictions=predictions_dict, export_outputs=export_outputs)
    
    return _model_fn

def generate_experiment_fn(**experiment_args):  
  
    def _experiment_fn(run_config, hparams):

        train_input = generate_input_fn(
            hparams.train_files,
            num_epochs=hparams.num_epochs,
            batch_size=hparams.train_batch_size,
        )

        test_input = generate_input_fn(
            hparams.eval_files,
            shuffle=False
        )

        return tf.contrib.learn.Experiment(
            tf.estimator.Estimator(
                generate_model_fn(learning_rate=hparams.learning_rate),
                config=run_config
            ),
            train_input_fn=train_input,
            eval_input_fn=test_input,
            **experiment_args
        )

    return _experiment_fn

def example_serving_input_fn():
    example_bytestring = tf.placeholder(
        shape=[None],
        dtype=tf.string,
    )
    features = tf.parse_example(
        example_bytestring,
        tf.feature_column.make_parse_example_spec(INPUT_COLUMNS)
    )
    return tf.estimator.export.ServingInputReceiver(
        features, {'example_proto': example_bytestring})

if __name__ == '__main__':
    
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--train-files',
        nargs='+',
        required=True
    )
    parser.add_argument(
        '--num-epochs',
        type=int
    )
    parser.add_argument(
        '--train-batch-size',
        type=int,
        default=1
    )
    parser.add_argument(
        '--eval-batch-size',
        type=int,
        default=1
    )
    parser.add_argument(
        '--eval-files',
        nargs='+',
        required=True
    )
    parser.add_argument(
        '--learning-rate',
        default=0.001,
        type=float
    )
    parser.add_argument(
        '--job-dir',
        required=True
    )
    parser.add_argument(
        '--eval-delay-secs',
        default=10,
        type=int
    )
    parser.add_argument(
        '--min-eval-frequency',
        default=1,
        type=int
    )
    parser.add_argument(
        '--train-steps',
        type=int
    )
    parser.add_argument(
        '--eval-steps',
        default=None,
        type=int
    )

    # For the purposes of running in a notebook hardcoding arguments
    '''args = parser.parse_args([
        '--train-files', 'gs://smiling-beaming-abalone/abalone_train.csv',
        '--eval-files', 'gs://smiling-beaming-abalone/abalone_test.csv',
        '--job-dir', 'abalone_output',
        '--train-steps', '5000',
        '--eval-steps', '100'
      ])'''
    args = parser.parse_args()

    learn_runner.run(
        generate_experiment_fn(
            min_eval_frequency=args.min_eval_frequency,
            eval_delay_secs=args.eval_delay_secs,
            train_steps=args.train_steps,
            eval_steps=args.eval_steps,
            export_strategies=[saved_model_export_utils.make_export_strategy(
                example_serving_input_fn,
                exports_to_keep=1
            )]
        ),
        run_config=tf.contrib.learn.RunConfig(model_dir=args.job_dir),
        hparams=hparam.HParams(**args.__dict__)
    )

Writing trainer/task.py


## Test the code locally...

In [None]:
!gcloud ml-engine local train --package-path trainer \
   --module-name trainer.task \
   -- \
   --train-files gs://smiling-beaming-abalone/abalone_train.csv \
   --eval-files gs://smiling-beaming-abalone/abalone_test.csv \
   --job-dir abalone_output \
   --train-steps 5000 \
   --eval-steps 100

INFO:tensorflow:Using config: {'_model_dir': 'abalone_output', '_save_checkpoints_secs': 600, '_num_ps_replicas': 0, '_keep_checkpoint_max': 5, '_session_config': None, '_tf_random_seed': None, '_task_type': None, '_environment': u'cloud', '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f4587df95d0>, '_tf_config': gpu_options {
  per_process_gpu_memory_fraction: 1.0
}
, '_num_worker_replicas': 0, '_task_id': 0, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_evaluation_master': '', '_keep_checkpoint_every_n_hours': 10000, '_master': '', '_log_step_count_steps': 100}
Instructions for updating:
Monitors are deprecated. Please use tf.train.SessionRunHook.
INFO:tensorflow:Create CheckpointSaverHook.
2017-08-21 19:35:55.154728: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
2017

## Run the code on CMLE...

In [None]:
!gcloud ml-engine jobs submit training abalone_$(date -u +%y%m%d_%H%M%S) \
  --stream-logs \
  --scale-tier STANDARD_1 \
  --runtime-version 1.2 \
  --job-dir gs://smiling-beaming-abalone/abalone_$(date -u +%y%m%d_%H%M%S) \
  --module-name trainer.task \
  --package-path trainer \
  --region us-central1 \
  -- \
  --train-files gs://smiling-beaming-abalone/abalone_train.csv \
  --eval-files gs://smiling-beaming-abalone/abalone_test.csv \
  --train-steps 5000 \
  --eval-steps 100

Job [abalone_170821_193611] submitted successfully.
INFO	2017-08-21 19:36:13 +0000	service		Validating job requirements...
INFO	2017-08-21 19:36:14 +0000	service		Job creation request has been successfully validated.
INFO	2017-08-21 19:36:14 +0000	service		Job abalone_170821_193611 is queued.
INFO	2017-08-21 19:36:14 +0000	service		Waiting for job to be provisioned.
