<h1> 2c. Refactoring to add batching and feature-creation </h1>

In this notebook, we continue reading the same small dataset, but refactor our ML pipeline in two small, but significant, ways:
<ol>
<li> Refactor the input to read data in batches.
<li> Refactor the feature creation so that it is not one-to-one with inputs.
</ol>
The Pandas function in the previous notebook also batched, only after it had read the whole data into memory -- on a large dataset, this won't be an option.

In [1]:
import datalab.bigquery as bq
import tensorflow as tf
import numpy as np
import shutil
import tensorflow as tf
print(tf.__version__)

  from ._conv import register_converters as _register_converters


1.8.0


<h2> 1. Refactor the input </h2>

Read data created in Lab1a, but this time make it more general and performant.  Instead of using Pandas, we will use TensorFlow's Dataset API.

In [2]:
CSV_COLUMNS = ['fare_amount', 'pickuplon','pickuplat','dropofflon','dropofflat','passengers', 'key']
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [[0.0], [-74.0], [40.0], [-74.0], [40.7], [1.0], ['nokey']]

def read_dataset(filename, mode, batch_size = 512):
  def _input_fn():
    def decode_csv(value_column):
      columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)
      features = dict(zip(CSV_COLUMNS, columns))
      label = features.pop(LABEL_COLUMN)
      return features, label

    # Create list of files that match pattern
    file_list = tf.gfile.Glob(filename)

    # Create dataset from file list
    dataset = tf.data.TextLineDataset(file_list).map(decode_csv)
    if mode == tf.estimator.ModeKeys.TRAIN:
        num_epochs = None # indefinitely
        dataset = dataset.shuffle(buffer_size = 10 * batch_size)
    else:
        num_epochs = 1 # end-of-input after this

    dataset = dataset.repeat(num_epochs).batch(batch_size)
    return dataset.make_one_shot_iterator().get_next()
  return _input_fn
    

def get_train():
  return read_dataset('./taxi-train.csv', mode = tf.estimator.ModeKeys.TRAIN)

def get_valid():
  return read_dataset('./taxi-valid.csv', mode = tf.estimator.ModeKeys.EVAL)

def get_test():
  return read_dataset('./taxi-test.csv', mode = tf.estimator.ModeKeys.EVAL)

<h2> 2. Refactor the way features are created. </h2>

For now, pass these through (same as previous lab).  However, refactoring this way will enable us to break the one-to-one relationship between inputs and features.

In [3]:
INPUT_COLUMNS = [
    tf.feature_column.numeric_column('pickuplon'),
    tf.feature_column.numeric_column('pickuplat'),
    tf.feature_column.numeric_column('dropofflat'),
    tf.feature_column.numeric_column('dropofflon'),
    tf.feature_column.numeric_column('passengers'),
]

def add_more_features(feats):
  # Nothing to add (yet!)
  return feats

feature_cols = add_more_features(INPUT_COLUMNS)

<h2> Create and train the model </h2>

Note that we train for num_steps * batch_size examples.

In [4]:
tf.logging.set_verbosity(tf.logging.INFO)
OUTDIR = 'taxi_trained'
shutil.rmtree(OUTDIR, ignore_errors = True) # start fresh each time
model = tf.estimator.LinearRegressor(
      feature_columns = feature_cols, model_dir = OUTDIR)
model.train(input_fn = get_train(), steps = 100);  # TODO: change the name of input_fn as needed

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_task_type': 'worker', '_global_id_in_cluster': 0, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f883edda438>, '_log_step_count_steps': 100, '_num_ps_replicas': 0, '_task_id': 0, '_evaluation_master': '', '_save_summary_steps': 100, '_master': '', '_keep_checkpoint_max': 5, '_tf_random_seed': None, '_is_chief': True, '_save_checkpoints_steps': None, '_service': None, '_save_checkpoints_secs': 600, '_train_distribute': None, '_num_worker_replicas': 1, '_keep_checkpoint_every_n_hours': 10000, '_session_config': None, '_model_dir': 'taxi_trained'}
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 1 into taxi_trained/model.ckpt.
INFO:tensorflow:step = 1, loss = 118

<h3> Evaluate model </h3>

As before, evaluate on the validation data.  We'll do the third refactoring (to move the evaluation into the training loop) in the next lab.

In [5]:
def print_rmse(model, name, input_fn):
  metrics = model.evaluate(input_fn = input_fn, steps = 1)
  print('RMSE on {} dataset = {}'.format(name, np.sqrt(metrics['average_loss'])))
print_rmse(model, 'validation', get_valid())

INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2019-06-28-00:35:59
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from taxi_trained/model.ckpt-100


InvalidArgumentError: Unsuccessful TensorSliceReader constructor: Failed to get matching files on taxi_trained/model.ckpt-100: Not found: taxi_trained; No such file or directory
	 [[Node: save/RestoreV2 = RestoreV2[dtypes=[DT_INT64, DT_FLOAT, DT_FLOAT, DT_FLOAT, DT_FLOAT, DT_FLOAT, DT_FLOAT], _device="/job:localhost/replica:0/task:0/device:CPU:0"](_arg_save/Const_0_0, save/RestoreV2/tensor_names, save/RestoreV2/shape_and_slices)]]

Caused by op 'save/RestoreV2', defined at:
  File "/usr/local/envs/py3env/lib/python3.5/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/local/envs/py3env/lib/python3.5/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/ipykernel/__main__.py", line 3, in <module>
    app.launch_new_instance()
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/traitlets/config/application.py", line 658, in launch_instance
    app.start()
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/ipykernel/kernelapp.py", line 486, in start
    self.io_loop.start()
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tornado/ioloop.py", line 832, in start
    self._run_callback(self._callbacks.popleft())
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tornado/ioloop.py", line 605, in _run_callback
    ret = callback()
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/zmq/eventloop/zmqstream.py", line 536, in <lambda>
    self.io_loop.add_callback(lambda : self._handle_events(self.socket, 0))
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/zmq/eventloop/zmqstream.py", line 450, in _handle_events
    self._handle_recv()
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/zmq/eventloop/zmqstream.py", line 480, in _handle_recv
    self._run_callback(callback, msg)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/zmq/eventloop/zmqstream.py", line 432, in _run_callback
    callback(*args, **kwargs)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/ipykernel/kernelbase.py", line 283, in dispatcher
    return self.dispatch_shell(stream, msg)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/ipykernel/kernelbase.py", line 233, in dispatch_shell
    handler(stream, idents, msg)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/ipykernel/kernelbase.py", line 399, in execute_request
    user_expressions, allow_stdin)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/ipykernel/ipkernel.py", line 208, in do_execute
    res = shell.run_cell(code, store_history=store_history, silent=silent)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/ipykernel/zmqshell.py", line 537, in run_cell
    return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2662, in run_cell
    raw_cell, store_history, silent, shell_futures)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2785, in _run_cell
    interactivity=interactivity, compiler=compiler, result=result)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2907, in run_ast_nodes
    if self.run_code(code, result):
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2961, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-5-9ebe0219e717>", line 4, in <module>
    print_rmse(model, 'validation', get_valid())
  File "<ipython-input-5-9ebe0219e717>", line 2, in print_rmse
    metrics = model.evaluate(input_fn = input_fn, steps = 1)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/estimator/estimator.py", line 425, in evaluate
    name=name)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/estimator/estimator.py", line 1117, in _evaluate_model
    config=self._session_config)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/evaluation.py", line 209, in _evaluate_once
    session_creator=session_creator, hooks=hooks) as session:
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/monitored_session.py", line 816, in __init__
    stop_grace_period_secs=stop_grace_period_secs)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/monitored_session.py", line 539, in __init__
    self._sess = _RecoverableSession(self._coordinated_creator)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/monitored_session.py", line 1002, in __init__
    _WrappedSession.__init__(self, self._create_session())
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/monitored_session.py", line 1007, in _create_session
    return self._sess_creator.create_session()
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/monitored_session.py", line 696, in create_session
    self.tf_sess = self._session_creator.create_session()
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/monitored_session.py", line 458, in create_session
    self._scaffold.finalize()
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/monitored_session.py", line 212, in finalize
    self._saver = training_saver._get_saver_or_default()  # pylint: disable=protected-access
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/saver.py", line 910, in _get_saver_or_default
    saver = Saver(sharded=True, allow_empty=True)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/saver.py", line 1338, in __init__
    self.build()
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/saver.py", line 1347, in build
    self._build(self._filename, build_save=True, build_restore=True)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/saver.py", line 1384, in _build
    build_save=build_save, build_restore=build_restore)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/saver.py", line 829, in _build_internal
    restore_sequentially, reshape)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/saver.py", line 525, in _AddShardedRestoreOps
    name="restore_shard"))
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/saver.py", line 472, in _AddRestoreOps
    restore_sequentially)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/training/saver.py", line 886, in bulk_restore
    return io_ops.restore_v2(filename_tensor, names, slices, dtypes)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/ops/gen_io_ops.py", line 1463, in restore_v2
    shape_and_slices=shape_and_slices, dtypes=dtypes, name=name)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/framework/op_def_library.py", line 787, in _apply_op_helper
    op_def=op_def)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/framework/ops.py", line 3392, in create_op
    op_def=op_def)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/framework/ops.py", line 1718, in __init__
    self._traceback = self._graph._extract_stack()  # pylint: disable=protected-access

InvalidArgumentError (see above for traceback): Unsuccessful TensorSliceReader constructor: Failed to get matching files on taxi_trained/model.ckpt-100: Not found: taxi_trained; No such file or directory
	 [[Node: save/RestoreV2 = RestoreV2[dtypes=[DT_INT64, DT_FLOAT, DT_FLOAT, DT_FLOAT, DT_FLOAT, DT_FLOAT, DT_FLOAT], _device="/job:localhost/replica:0/task:0/device:CPU:0"](_arg_save/Const_0_0, save/RestoreV2/tensor_names, save/RestoreV2/shape_and_slices)]]


Copyright 2017 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License