<h1> 2c. Loading large datasets progressively with the tf.data.Dataset </h1>

In this notebook, we continue reading the same small dataset, but refactor our ML pipeline in two small, but significant, ways:
<ol>
<li> Refactor the input to read data from disk progressively.
<li> Refactor the feature creation so that it is not one-to-one with inputs.
</ol>
<br/>
The Pandas function in the previous notebook first read the whole data into memory -- on a large dataset, this won't be an option.

In [1]:
# import datalab.bigquery as bq
from google.cloud import bigquery as bq
import tensorflow as tf
import numpy as np
import shutil
import tensorflow as tf
print('tensorflow version:', tf.__version__)
print('bigquery version:', bq.__version__)

  from ._conv import register_converters as _register_converters


tensorflow version: 1.10.0
bigquery version: 1.5.0


<h2> 1. Refactor the input </h2>

Read data created in Lab1a, but this time make it more general, so that we can later handle large datasets. We use the Dataset API for this. It ensures that, as data gets delivered to the model in mini-batches, it is loaded from disk only when needed.

In [2]:
CSV_COLUMNS = ['fare_amount', 'pickuplon','pickuplat','dropofflon','dropofflat','passengers', 'key']
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [[0.0], [-74.0], [40.0], [-74.0], [40.7], [1.0], ['nokey']]

def read_dataset(filename, mode, batch_size = 512):
    '''这个函数还是很重要的，一定要吃透！'''
    def _input_fn():
        def decode_csv(value_column):
            columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)
            features = dict(zip(CSV_COLUMNS, columns))
            label = features.pop(LABEL_COLUMN)
            return features, label

        # Create list of file names that match "glob" pattern (i.e. data_file_*.csv)
        filenames_dataset = tf.data.Dataset.list_files(filename)
        # Read lines from text files
        textlines_dataset = filenames_dataset.flat_map(tf.data.TextLineDataset)
        # Parse text lines as comma-separated values (CSV)
        dataset = textlines_dataset.map(decode_csv)

        # Note:
        # use tf.data.Dataset.flat_map to apply one to many transformations (here: filename -> text lines)
        # 上面说one2many我不大同意啊，应该是many2one把？mapping a nested structure of tensors to a dataset.
        # use tf.data.Dataset.map      to apply one to one  transformations (here: text line -> feature list)

        if mode == tf.estimator.ModeKeys.TRAIN:
            num_epochs = None # indefinitely
            dataset = dataset.shuffle(buffer_size = 10 * batch_size)
        else:
            num_epochs = 1 # end-of-input after this

        dataset = dataset.repeat(num_epochs).batch(batch_size)

        return dataset.make_one_shot_iterator().get_next()
    return _input_fn
    

# 别忘了，filename在下面函数里面已经写好了。
def get_train():
    return read_dataset('./taxi-train.csv', mode = tf.estimator.ModeKeys.TRAIN)

def get_valid():
    return read_dataset('./taxi-valid.csv', mode = tf.estimator.ModeKeys.EVAL)

def get_test():
    return read_dataset('./taxi-test.csv', mode = tf.estimator.ModeKeys.EVAL)

In [7]:
filename='./taxi-train.csv'
filenames_dataset = tf.data.Dataset.list_files(filename)
# Read lines from text files
textlines_dataset = filenames_dataset.flat_map(tf.data.TextLineDataset)

<h2> 2. Refactor the way features are created. </h2>

For now, pass these through (same as previous lab).  However, refactoring this way will enable us to break the one-to-one relationship between inputs and features.

In [3]:
INPUT_COLUMNS = [
    tf.feature_column.numeric_column('pickuplon'),
    tf.feature_column.numeric_column('pickuplat'),
    tf.feature_column.numeric_column('dropofflat'),
    tf.feature_column.numeric_column('dropofflon'),
    tf.feature_column.numeric_column('passengers'),
]

def add_more_features(feats):
  # Nothing to add (yet!)
  return feats

feature_cols = add_more_features(INPUT_COLUMNS)

In [4]:
feature_cols

[_NumericColumn(key='pickuplon', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None),
 _NumericColumn(key='pickuplat', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None),
 _NumericColumn(key='dropofflat', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None),
 _NumericColumn(key='dropofflon', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None),
 _NumericColumn(key='passengers', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None)]

<h2> Create and train the model </h2>

Note that we train for num_steps * batch_size examples.

In [5]:
tf.logging.set_verbosity(tf.logging.INFO)
OUTDIR = 'taxi_trained'
shutil.rmtree(OUTDIR, ignore_errors = True) # delete existing directory and thus start fresh each time
model = tf.estimator.LinearRegressor(
      feature_columns = feature_cols, model_dir = OUTDIR)
model.train(input_fn = get_train(), steps = 1000)

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': 'taxi_trained', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': None, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f59040ca7b8>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 0 into taxi_trained/model.ckpt.
INFO:tensorflow:

<tensorflow.python.estimator.canned.linear.LinearRegressor at 0x7f59040b5898>

<h3> Evaluate model </h3>

As before, evaluate on the validation data.  We'll do the third refactoring (to move the evaluation into the training loop) in the next lab.

In [6]:
def print_rmse(model, name, input_fn):
    metrics = model.evaluate(input_fn = input_fn, steps = 1)
    print('RMSE on {} dataset = {}'.format(name, np.sqrt(metrics['average_loss'])))
print_rmse(model, 'validation', get_valid())

INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2018-08-15-14:35:46
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from taxi_trained/model.ckpt-1000
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/1]
INFO:tensorflow:Finished evaluation at 2018-08-15-14:35:47
INFO:tensorflow:Saving dict for global step 1000: average_loss = 126.819214, global_step = 1000, label/mean = 12.282267, loss = 64931.438, prediction/mean = 11.733331
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 1000: taxi_trained/model.ckpt-1000
RMSE on validation dataset = 11.261404037475586


Copyright 2017 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License