The code is accompanied by my blogpost where I go into more detail about the decisions I've made, and whatever notes I may make

The link to the [blogpost](https://ianqs.github.io/blog/2019/01/05/TF-dataset-madness)

The dataset is from [UCI Covertype dataset](https://archive.ics.uci.edu/ml/datasets/covertype)

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import tensorflow as tf
print(tf.__version__)

#tf.enable_eager_execution()
import numpy as np
import os
import datetime
import tqdm
import sys
import pprint
import glob

1.12.0


# 1) Training Pipeline

## 1.1) Producer:

- Ideally takes arbitrary datasets (np, csv, .data)

- $\lambda$: (x) -> `tf.TfRecords`

- loads from `unprocessed_data` folder in this tutorial and writes to `processed_data` folder 

## 1.2) Provider:

- loads from `processed_data` folder 

- processes the data (so that the processing is part of the computation graph)

- loads `tf.TfRecords` and sends it directly to tensorflow. Avoids `feed_dict` which is [slow](https://www.tensorflow.org/guide/performance/overview#input_pipeline)

# Producer:

## Method 1

1: Load the data

2: Conversion of a row of data into formats compatible with tf

3: Save it as a tfRecords file

## Method 2 (not shown here)

You can load it in as a tf.data.Dataset, then use the `experimental` library to construct a tfRecords file. A good resource for this is [official docs](https://www.tensorflow.org/tutorials/load_data/tf-records#tfexample)


```
serialized_features_dataset = features_dataset.map(tf_serialize_example)
writer = tf.data.experimental.TFRecordWriter(filename)
writer.write(serialized_features_dataset)
```


# Dataset Information

    Elevation                               quantitative    meters                       Elevation in meters
    
    Aspect                                  quantitative    azimuth                      Aspect in degrees azimuth
    
    Slope                                   quantitative    degrees                      Slope in degrees
    
    Horizontal_Distance_To_Hydrology        quantitative    meters                       Horz Dist to nearest surface water features
    
    Vertical_Distance_To_Hydrology          quantitative    meters                       Vert Dist to nearest surface water features
    
    Horizontal_Distance_To_Roadways         quantitative    meters                       Horz Dist to nearest roadway
    
    Hillshade_9am                           quantitative    0 to 255 index               Hillshade index at 9am, summer solstice
    
    Hillshade_Noon                          quantitative    0 to 255 index               Hillshade index at noon, summer soltice
    
    Hillshade_3pm                           quantitative    0 to 255 index               Hillshade index at 3pm, summer solstice
    
    Horizontal_Distance_To_Fire_Points      quantitative    meters                       Horz Dist to nearest wildfire ignition points
    
    Wilderness_Area (4 binary columns)      qualitative     0 (absence) or 1 (presence)  Wilderness area designation
    
    Soil_Type (40 binary columns)           qualitative     0 (absence) or 1 (presence)  Soil Type designation
    
    Cover_Type (7 types)                    integer         1 to 7                       Forest Cover Type designation
    

# Producer Pipeline: 

In [3]:
def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))


def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))


def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))

# Prototype class - enables easier feature definition and better for refactoring

class FeatureProto(object):
    from collections import namedtuple
    import numpy as np
    
    proto = namedtuple('prototype', ['name', 'dtype', 'shape'])
    
    # Ordering the features for parsing
    features = [
        proto(name='Elevation', dtype=tf.float32, shape=1),
        proto(name='Aspect', dtype=tf.float32, shape=1),
        proto(name='Slope', dtype=tf.float32, shape=1),
        proto(name='Horizontal_Distance_To_Hydrology', dtype=tf.float32, shape=1),
        proto(name='Vertical_Distance_To_Hydrology', dtype=tf.float32, shape=1),
        proto(name='Horizontal_Distance_To_Roadways', dtype=tf.float32, shape=1),
        proto(name='Hillshade_9am', dtype=tf.float32, shape=1),
        proto(name='Hillshade_Noon', dtype=tf.float32, shape=1),
        proto(name='Hillshade_3pm', dtype=tf.float32, shape=1),
        proto(name='Horizontal_Distance_To_Fire_Points', dtype=tf.float32, shape=1),
        proto(name='Wilderness_Area', dtype=tf.float32, shape=4),
        proto(name='Soil_Type', dtype=tf.float32, shape=40),
        proto(name='Cover_Type', dtype=tf.float32, shape=1),
    ]
    
    @property
    def size(self):
        size = 0
        for prototype in self.features:
            size += prototype.shape
        return size
    
    
    def dataset_creation(self, data):
        """ Used in the producer assuming you're creating tfRecords"""
        idx = 0
        collection = {}
        for prototype in self.features:
            datum = [data[idx]] if prototype.shape == 1 else data[idx:idx + prototype.shape]
            encoded_feature = self._generate_feature(
                prototype.dtype, datum, idx
            )
            collection[prototype.name] = encoded_feature
            idx += prototype.shape
        return collection
    
    def _generate_feature(self, dtype, data, idx):
        """Used in tandem with dataset_creation to define the features to write to the dataset"""
        if dtype == tf.float16 or dtype == tf.float32 or dtype == tf.float64:
            encoded_feature = _float_feature(data)
        elif dtype == tf.int16 or dtype == tf.int32 or dtype == tf.int64:
            encoded_feature = _int64_feature(data)
        elif dtype == tf.string:
            encoded_feature = _bytes_feature(data)
        else:
            raise NotImplementedError('Unmated type while generating feature in FeatureProto')
        return encoded_feature
    
    def unpack(self, example_proto):
        """Used in dataset.map where we unpack the dataset as we read it. This is for the tfrecrd"""
        features = self._dataset_parsing()
        parsed_features = tf.parse_single_example(example_proto, features)
        labels = parsed_features['Cover_Type']
        parsed_features.pop('Cover_Type')
        # Then, convert the dataset into tensors which tensorflow expects?
        parsed_features['Soil_Type'] = tf.convert_to_tensor(parsed_features['Soil_Type'])
        parsed_features['Wilderness_Area'] = tf.cast(tf.argmax(parsed_features['Wilderness_Area'], axis=0), dtype=tf.float32)
        labels = tf.cast(labels, dtype=tf.int32)
        #labels = tf.one_hot(tf.cast(labels, dtype=tf.uint8), 8, on_value=1, off_value=0, axis=-1)
        return parsed_features, labels
            
        
    def unpack_csv(self, example_proto):
        """Used in dataset.map where we unpack the dataset as we read it. This is for the csv data"""
        # 1) Convert the data from bytestring into integers
        vals = tf.string_split([example_proto], delimiter=',').values
        vals_cvt = tf.string_to_number(vals, out_type=tf.int32)
        
        parsed_features = {}
        idx = 0
        for prototype in self.features:
            datum = vals_cvt[idx] if prototype.shape == 1 else vals_cvt[idx:idx + prototype.shape]
            parsed_features[prototype.name] = datum
            idx += prototype.shape
            
        labels = parsed_features['Cover_Type']
        parsed_features.pop('Cover_Type')
        # Then, convert the dataset into tensors which tensorflow expects?
        parsed_features['Soil_Type'] = tf.convert_to_tensor(parsed_features['Soil_Type'])
        parsed_features['Wilderness_Area'] = tf.cast(tf.argmax(parsed_features['Wilderness_Area'], axis=0), dtype=tf.float32)
        labels = tf.cast(labels, dtype=tf.int32)
        
        return parsed_features, labels
        
    def _dataset_parsing(self):
        """Used in unpack to group the values into tf.FixedLenFeatures"""
        if hasattr(self, 'parser_proto'):
            return self.parser_proto
        else:
            parser_proto = {}
            for prototype in self.features:
                feat_name = prototype.name
                dtype = prototype.dtype
                shape = prototype.shape
                parser_proto[feat_name] = tf.FixedLenFeature(() if shape == 1 else (shape), dtype)
            self.parser_proto = parser_proto
            return self.parser_proto


feature_proto = FeatureProto()

# Load data to write and then write 

In [4]:
def load_data():
    loaded = np.loadtxt('unprocessed_data/covtype.data', delimiter=',', dtype=np.int)  # Avoid tf.contrib since we want to get our hands dirty
    print(loaded.shape)
    all_ind = np.arange(0, len(loaded))
    np.random.shuffle(all_ind)
    train_ind = all_ind[: int(len(loaded) * 0.8)]
    test_ind = all_ind[int(len(loaded) * 0.8): ]
    
    return loaded, all_ind, train_ind, test_ind


load_data_run = False
if load_data_run:
    loaded, all_ind, train_ind, test_ind = load_data()
else:
    print('Flip load_data_run load in data from unprocessed_data folder')

Flip load_data_run load in data from unprocessed_data folder


In [5]:
def write(filename, indices, feature_proto, loaded):
  # Round to the previous hour
    with tf.python_io.TFRecordWriter(filename) as writer:
        for i in tqdm.tqdm_notebook(indices):
            datum = loaded[i, :]
            feature = feature_proto.dataset_creation(datum)
            example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
            writer.write(example_proto.SerializeToString())

def generate_samples(feature_proto, loaded, record_size=100000):
    try:
        os.mkdir('processed_data')
    except:
        pass
    
    time = str(datetime.datetime.now().replace(microsecond=0,second=0,minute=0)).replace(' ', '_')
    for record_info in [('train', train_ind), ('test', test_ind)]:
        mode = record_info[0]
        indices = record_info[1]
        if mode == 'test':
            filename = 'processed_data/covtype_test_{}.tfrecord'.format(time)
            write(filename, indices, feature_proto, loaded)
        else:  # Training data
            for i in tqdm.tqdm(range((len(record_info[1]) // record_size))):
                filename = 'processed_data/covtype_train_{}_{}.tfrecord'.format(i, time)
                start = i * record_size
                end = (i + 1) * record_size
                write(filename, indices[start:end], feature_proto, loaded)
            write(filename, indices[end:], feature_proto, loaded)
    # using your storage system -S3 or some other file hosting service
    # add the export here
        
generate_run = False

if generate_run:
    generate_samples(feature_proto, loaded)
else:
    print('Flip generate_run to generate_samples')

Flip generate_run to generate_samples


# Provider

Cell 1: Initialize the loader

- even though there is the train, and test data in the tfRecordDataset, we pretend that they're two different runs of our pre-processor

- The proto_wrap function is unnecessary here but for the sake of clarity I left it in. In the next tutorial, where I show you how to use the loaded data, we will remove it

Cell 2: Provide

- return an iterator that you can go through to iterate your dataset using either 1) stored tfrecords, or via tf.data.Datasets on numpy/ csvs

Cell 3: Iteration

In [6]:
from typing import Tuple

def dataset_config(repeat=False, batch_size=32, num_cpus=None,
                   # Used in tfRecordDatasets
                   filenames: list=None, mapper=None, 
                   # Used in from_tensor_slices
                  initializable:Tuple=False, sess=None, feed_dict=None,
                  csv_filenames: list=None):
    """
    Supports 3 modes: from tensor_slices, tfRecordDatasets and TextLineDataset (e.g csvs)
    """
    tf_record = mapper is not None and filenames is not None
    tensor_slices = initializable is not None and sess is not None and feed_dict is not None
    use_csv = csv_filenames is not None
    
    if tf_record:
        dataset = tf.data.TFRecordDataset(filenames)
        dataset = dataset.map(mapper, num_parallel_calls=num_cpus)
    elif tensor_slices:
        assert initializable != False, 'initializable should be an iterable with placeholders'
        dataset = tf.data.Dataset.from_tensor_slices(initializable)
    elif use_csv:
        dataset = tf.data.TextLineDataset(csv_filenames)
        dataset = dataset.map(mapper, num_parallel_calls=num_cpus)
    else:
        raise ValueError('If loading from tfRecordDatasets fill in filenames and mapper. '
                        'If using from_tensor_slices feed in a initializable(placeholder iterable), session, and feed_dict')
        
    if repeat:
        dataset = dataset.repeat()
        
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(buffer_size=batch_size)
    
    if tensor_slices:
        iterator = dataset.make_initializable_iterator()
        sess.run(iterator.initializer, feed_dict=feed_dict)
    else:
        iterator = dataset.make_one_shot_iterator()
    
    next_element = iterator.get_next()
    return next_element

In [7]:
configuration = 'tf'  # Options: tf, csv, np

# Option 1: reading tf.data.TFRecordDataset
#     - requires that you generate it first
if configuration == 'tf':
    filename_list = []
    for root, dirs, files in os.walk('processed_data/'):
        for f in files:
            if "tfrecord" in f and "train" in f:
                filename_list.append(os.path.join(root, f))
    print(filename_list)
    num_cpus = os.cpu_count()
    training_dataset_next = dataset_config(filenames=filename_list, mapper=feature_proto.unpack, num_cpus=num_cpus)

    
# Note, 
# Option 2: reading as a CSV
# Note, in the later tutorials I will not include this in utils.py - I'm doing this mostly because 
# people kept asking for it
elif configuration == 'csv':
    filename_list = []
    for root, dirs, files in os.walk('unprocessed_data/'):
        for f in files:
            if ".csv" in f:
                filename_list.append(os.path.join(root, f))
    print(filename_list)
    num_cpus = os.cpu_count()
    
    training_dataset_next = dataset_config(csv_filenames=filename_list, mapper=feature_proto.unpack_csv, num_cpus=num_cpus)


# Option 3: Reading Np. Note that there is a 2GB limit and you should avoid this
# I'm not actually going to implement this
else:
    with np.load('unprocessed_data/covtype.npy') as data:
        features = data["features"]
        labels = data["labels"]
        
        
        features_placeholder = tf.placeholder(features.dtype, features.shape)
        labels_placeholder = tf.placeholder(labels.dtype, labels.shape)

        
        training_dataset_next = dataset_config()

['processed_data/covtype_train_1_2019-01-06_02:00:00.tfrecord', 'processed_data/covtype_train_3_2019-01-06_02:00:00.tfrecord', 'processed_data/covtype_train_0_2019-01-06_02:00:00.tfrecord', 'processed_data/covtype_train_2_2019-01-06_02:00:00.tfrecord']


In [8]:
# Lazy execution

init = tf.global_variables_initializer()
with tf.Session() as sess:
    sess.run(init)
    for i in range(2):
        features, label = sess.run(training_dataset_next)
        pprint.pprint(features)
    
# Eager execution
#features, label = training_dataset_next

{'Aspect': array([290.,  99., 123.,  50.,  11., 297., 255., 114., 161., 143., 358.,
        60., 135., 292., 164.,  28.,  15.,  78.,  21., 121.,  16., 324.,
       319., 308., 307., 121., 164., 237., 144., 158.,  16., 197.],
      dtype=float32),
 'Elevation': array([2524., 3014., 2888., 2919., 2975., 3042., 2995., 2915., 3331.,
       2975., 2952., 3235., 2942., 2903., 2692., 2783., 3816., 3007.,
       2935., 2969., 3044., 2949., 2922., 3184., 2532., 2910., 2886.,
       3227., 3251., 3272., 3293., 3193.], dtype=float32),
 'Hillshade_3pm': array([176.,  85., 100., 135., 142., 169., 195., 131., 142., 114., 151.,
        96., 153., 242., 149., 125., 139.,  49., 139., 135., 143., 185.,
       199., 171., 207., 111., 146., 199., 133., 150., 138., 164.],
      dtype=float32),
 'Hillshade_9am': array([202., 247., 247., 224., 205., 208., 190., 233., 231., 242., 191.,
       231., 221.,  50., 227., 212., 208., 243., 212., 232., 211., 171.,
       137., 204., 148., 243., 229., 188., 235., 225

# Done! 

And with that, we're done! We've 

1) taken a non-trivial dataset, 
2) converted it into a `tfRecord`
3) shown how to unload it and read from it