In [1]:
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import

In [2]:
import tensorflow as tf

In [3]:
%load_ext autoreload
%autoreload 2

In [4]:
PACKAGE="./train"
from tools import make_src_dumper
write_py = make_src_dumper(PACKAGE)

---
# ```tf.data``` Input Functions

#### These input functions read from any number of files containing pre-processed data
In our parlour, this data is at the **training stage**. It's been fetched from a BigQuery table and pre-processed. It's not what we expect to come in at prediction time! In our [pre-processing pipeline](./02_Preprocessing_Pipeline.ipynb), we applied a transform function to the **signature stage** data. The reason is simply that we want to avoid having to do that transformation again and again during our model development.

During the pre-processing, the pre-processing function has been stored in a metadata directory that is available to us to treat data at prediction time (**signature stage**) exactly the same way that our training data has been treated.

We support reading from csv files, which is sometimes preferred in earlier exploration phases, because results in CSV files can more easily be inspected with standard tools. But we also support reading from tfrecord files, which is a format that is highly optimized for maximum performant distributed computing.

Any of these functions can be passed to the estimator, such that the estimator can call it in its own session/graph context to create a particularly useful input tensor. That input tensor will return the next batch of input records whenever it is evaluated. 

```make_input_fns()``` returns a dict with those two input_functions. We chose this design so that we can choose the file type at any point in time, e.g. by supplying a respectivie command line parameter to the training script. 

In [5]:
def make_input_fns():
    from train.make_csv_input_fn import make_csv_input_fn
    from train.make_tfr_input_fn import make_tfr_input_fn
    
    return {
        'csv': make_csv_input_fn,
        'tfr': make_tfr_input_fn
    }
write_py(make_input_fns)

'make_input_fns written to ./train/make_input_fns.py.'

----
#### Read from CSV
Not so good for production, but may come handy for exploration.

In [6]:
def make_csv_input_fn(filename_pattern, batch_size, options): 

    import tensorflow as tf
    from train.model_config import ORDERED_TRAINING_DEFAULTS
    from train.model_config import ORDERED_TRAINING_COLUMNS
    from train.model_config import LABEL_COLUMN
    
    
    def _input_fn():
        filenames = tf.gfile.Glob(filename_pattern)
        dataset = tf.data.TextLineDataset(filenames)

        def decode_csv(row):
            cols = tf.decode_csv(row, record_defaults=ORDERED_TRAINING_DEFAULTS)
            features = dict(zip(ORDERED_TRAINING_COLUMNS, cols))
            return features

        def pop_target(features):
            target = features.pop(LABEL_COLUMN)
            return features, target
        
        if options['shuffle_buffer_size'] is not None:
            dataset = dataset.shuffle(buffer_size=options['shuffle_buffer_size'])
                
        dataset = (dataset.repeat()
                   .map(decode_csv)
                   .map(pop_target)
                   .batch(batch_size))
        
        if options['distribute']:
            return dataset 
        else:
            return dataset.make_one_shot_iterator().get_next()
    
    return _input_fn

write_py(make_csv_input_fn)

'make_csv_input_fn written to ./train/make_csv_input_fn.py.'

#### Verify the behaviour

In [7]:
make_input_fn = make_input_fns()['csv']
NUM_BATCHES=1
with tf.Session() as sess:
    train_input_fn = make_input_fn("./testdata/test.csv", batch_size = 2, 
                                   options={'shuffle_buffer_size': 10, 'distribute':False})
    input = train_input_fn()
    res = [sess.run(input) for i in range(NUM_BATCHES)]
res

[({'AIRLINE': array([1, 1], dtype=int32),
   'ARR': array([48, 48], dtype=int32),
   'ARR_LAT': array([36.89, 36.89], dtype=float32),
   'ARR_LON': array([-76.2, -76.2], dtype=float32),
   'DEP_DELAY': array([0.09733124, 0.18210362], dtype=float32),
   'DEP_DOW': array([6, 6], dtype=int32),
   'DEP_HOD': array([15, 22], dtype=int32),
   'DEP_LAT': array([33.63, 33.63], dtype=float32),
   'DEP_LON': array([-84.42, -84.42], dtype=float32),
   'DIFF_LAT': array([0.44145387, 0.44145387], dtype=float32),
   'DIFF_LON': array([0.87757736, 0.87757736], dtype=float32),
   'DISTANCE': array([0.09876987, 0.09876987], dtype=float32),
   'MEAN_TEMP_ARR': array([0.5162454 , 0.30505416], dtype=float32),
   'MEAN_TEMP_DEP': array([0.84873927, 0.394958  ], dtype=float32),
   'MEAN_VIS_ARR': array([0.45698923, 0.4892473 ], dtype=float32),
   'MEAN_VIS_DEP': array([1.        , 0.98437494], dtype=float32),
   'WND_SPD_ARR': array([0.00540054, 0.00820082], dtype=float32),
   'WND_SPD_DEP': array([0.372093

---
#### Read from TFRecords File

In [8]:
def make_tfr_input_fn(filename_pattern, batch_size, options):
    
    import tensorflow as tf
    from train.model_config import LABEL_COLUMN
    from train.model_config import TRAINING_METADATA

    feature_spec = TRAINING_METADATA.schema.as_feature_spec()

    def _input_fn():
        dataset = tf.data.experimental.make_batched_features_dataset(
            file_pattern=filename_pattern,
            batch_size=batch_size,
            features=feature_spec,
            shuffle_buffer_size=options['shuffle_buffer_size'],
            prefetch_buffer_size=options['prefetch_buffer_size'],
            reader_num_threads=options['reader_num_threads'],
            parser_num_threads=options['parser_num_threads'],
            sloppy_ordering=options['sloppy_ordering'],
            label_key=LABEL_COLUMN)

        if options['distribute']:
            return dataset 
        else:
            return dataset.make_one_shot_iterator().get_next()
    return _input_fn

write_py(make_tfr_input_fn)

'make_tfr_input_fn written to ./train/make_tfr_input_fn.py.'

#### Verify the behaviour

In [9]:
make_input_fn = make_input_fns()['tfr']
with tf.Session() as sess:
    train_input_fn = make_input_fn(
        './testdata/test.tfr',
        batch_size=2,                 
        options={'shuffle_buffer_size': 10,
                'prefetch_buffer_size': 10,
                'reader_num_threads': 2,
                'parser_num_threads': 2,
                'sloppy_ordering': True,
                'distribute': False})
    input = train_input_fn()
    res = [sess.run(input) for i in range(1)]
res

[({'AIRLINE': array([1, 1]),
   'ARR': array([48, 87]),
   'ARR_LAT': array([36.89, 41.3 ], dtype=float32),
   'ARR_LON': array([-76.2 , -95.89], dtype=float32),
   'DEP_DELAY': array([0.09733124, 0.09733124], dtype=float32),
   'DEP_DOW': array([6, 1]),
   'DEP_HOD': array([15, 16]),
   'DEP_LAT': array([33.63, 33.63], dtype=float32),
   'DEP_LON': array([-84.42, -84.42], dtype=float32),
   'DIFF_LAT': array([0.44145387, 0.5429031 ], dtype=float32),
   'DIFF_LON': array([0.87757736, 0.6661297 ], dtype=float32),
   'DISTANCE': array([0.09876987, 0.16803624], dtype=float32),
   'MEAN_TEMP_ARR': array([0.5162454 , 0.48916963], dtype=float32),
   'MEAN_TEMP_DEP': array([0.84873927, 0.36974797], dtype=float32),
   'MEAN_VIS_ARR': array([0.45698923, 0.4892473 ], dtype=float32),
   'MEAN_VIS_DEP': array([1.        , 0.96875006], dtype=float32),
   'WND_SPD_ARR': array([0.00540054, 0.00860086], dtype=float32),
   'WND_SPD_DEP': array([0.37209302, 0.3875969 ], dtype=float32)},
  array([22., 21