# **Chapter 13 â€“ Loading and Preprocessing Data with TensorFlow**

# The tf.data API

In [3]:
import tensorflow as tf

In [4]:
X = tf.range(10) # any data tensor
dataset = tf.data.Dataset.from_tensor_slices(X)
dataset

<_TensorSliceDataset element_spec=TensorSpec(shape=(), dtype=tf.int32, name=None)>

In [5]:
# or 
tf.data.Dataset.range(10)

<_RangeDataset element_spec=TensorSpec(shape=(), dtype=tf.int64, name=None)>

In [6]:
# can iterate
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


In [7]:
X_nested = {"a": ([1, 2, 3], [4, 5, 6]), "b": [7, 8, 9]}
dataset = tf.data.Dataset.from_tensor_slices(X_nested)
for item in dataset:
    print(item)

{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=1>, <tf.Tensor: shape=(), dtype=int32, numpy=4>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=7>}
{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=2>, <tf.Tensor: shape=(), dtype=int32, numpy=5>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=8>}
{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=3>, <tf.Tensor: shape=(), dtype=int32, numpy=6>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=9>}


## Chaining Transformations

In [8]:
dataset = tf.data.Dataset.from_tensor_slices(tf.range(10))
dataset = dataset.repeat(3).batch(7)
for item in dataset:
    print(item)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)


In [9]:
# tranforming items by calling map()
dataset = dataset.map(lambda x: x * 2) # x is a batch, can speed things up by setting num_parallel_calls argument to number of threads to run or to tf.data.AUTOTUNE.
for item in dataset:
    print(item)

tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int32)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int32)
tf.Tensor([16 18], shape=(2,), dtype=int32)


In [10]:
# filter batches whose sum is less than or equal to 50
dataset = dataset.filter(lambda x: tf.reduce_sum(x) > 50)
for item in dataset:
    print(item)

tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int32)


In [11]:
# look at first n items in dataset 
for item in dataset.take(2):
    print(item)

tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)


### Shuffling the Data

In [12]:
dataset = tf.data.Dataset.range(10).repeat(2)
dataset = dataset.shuffle(buffer_size=4, seed=42).batch(7)
for item in dataset:
    print(item)

tf.Tensor([1 4 2 3 5 0 6], shape=(7,), dtype=int64)
tf.Tensor([9 8 2 0 3 1 4], shape=(7,), dtype=int64)
tf.Tensor([5 7 9 6 7 8], shape=(6,), dtype=int64)


### Interleaving Lines from Multiple Files

_suppose we have many csv files for the train set and all their paths are store in an array called train_filepaths_

train_filepaths = ["path1.csv", "path2.csv", "path3.csv", ...] or just "path*.csv"

filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)

_next, call the interleave() method to read from 5 files at a time and interleave their lines_

n_reader = 5

dataset = filepath_dataset.interleave(
lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
cycle_length=n_readers)

Can use interleave's num_parallel_call to use more threads

### Preprocessing the Data

In [13]:
import numpy as np

In [14]:
# For the california housing dataset
n_inputs = 8
X_mean, X_std = [np.array([.5] * n_inputs), np.array([2.] * n_inputs)]

def parse_csv_line(line):
    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    return tf.stack(fields[:-1]), tf.stack(fields[-1:])

def preprocess(line):
    x, y = parse_csv_line(line)
    return (x - X_mean) / X_std, y

In [15]:
preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 1.8541501e+00,  2.1750000e+01,  2.4116001e+00,  2.0855001e-01,
         4.2275000e+02,  9.1849995e-01,  1.8485001e+01, -6.1349998e+01],
       dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.782], dtype=float32)>)

Use dataset's map() method to apply the preprocess() function to each sample in the dataset

### Putting Everything Together

In [16]:
def csv_reader_dataset(filepaths, n_readers=5, n_read_threads=None,
                       n_parse_threads=5, shuffle_buffer_size=10_000, seed=42,
                       batch_size=32):
    dataset = tf.data.Dataset.list_files(filepaths, seed=seed)
    
    dataset = dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=n_readers, num_parallel_calls=n_read_threads)
    
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
    dataset = dataset.shuffle(shuffle_buffer_size, seed=seed)
    
    return dataset.batch(batch_size).prefetch(1)

### Using the Dataset with Keras

train_set = csv_reader_dataset(train_filepaths)

valid_set = csv_reader_dataset(valid_filepaths)

test_set = csv_reader_dataset(test_filepaths)

model = tf.keras.Sequential([...])

model.compile(loss="mse", optimizer="sgd")

model.fit(train_set, validation_data=valid_set, epochs=5)

test_mse = model.evaluate(test_set)

new_set = test_set.take(3) # pretend we have 3 new samples

y_pred = model.predict(new_set) # or you could just pass a NumPy array

_If you want to build a custom training loop:_

n_epochs = 5

for epoch in range(epochs):

    for X_batch, y_batch in train_set:
        [...] # perform one gradient descent step

In [17]:
# Can even create a TF function that trains the model for a whole epoch:
@tf.function
def train_one_epoch(model, optimizer, loss_fn, train_set):
    for X_batch, y_batch in train_set:
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main__loss] + model.losses)
            gradients = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(gradients, model.trainable_variables))

# optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
# loss_fn = tf.keras.losses.MeanSquaredError
# for epoch in range(n_epochs):
#     print("\rEpoch {}/{}".format(epoch + 1, n_epochs), end="")
#     traine_one_epoch(model, optimzier, loss_fn, train_set)

In Keras, the _steps_per_execution_ argument of the compile() method lets you define the number of batches that the fit() method will process during each call to the tf.function it uses for training. The default is just 1, so if you set it ot 50 you will often see a significant performance improvement. However, the on_batch_*() methods of Keras callbacks will only be called every 50 batches.

## The TFRecord Format

In [18]:
with tf.io.TFRecordWriter("my_data.tfrecord") as f:
    f.write(b"This is the first record")
    f.write(b"And this is the second record")

In [19]:
filepaths = ["my_data.tfrecord"]
dataset = tf.data.TFRecordDataset(filepaths)
for item in dataset:
    print(item)

tf.Tensor(b'This is the first record', shape=(), dtype=string)
tf.Tensor(b'And this is the second record', shape=(), dtype=string)


By default, a TFRecordDataset will read files one by one, but you make it read multiple files in parallel and interleave their records by passing the constructor a list of filepaths and setting num_parallel_reads to a number greater than one. Alternatively, you could obtain the same result by using list_files() and interleave() as we did earlier to read multiple CSV files.

### Compresssed TFRecord Files

In [20]:
# compress
options = tf.io.TFRecordOptions(compression_type="GZIP")
with tf.io.TFRecordWriter("my_compressed.tfrecord", options) as f:
    f.write("Compress, compress, compress!")

In [21]:
# decompress
dataset = tf.data.TFRecordDataset(["my_compressed.tfrecord"],
                                 compression_type="GZIP")

for item in dataset:
    print(item)

tf.Tensor(b'Compress, compress, compress!', shape=(), dtype=string)


### A Brief Introduction to Protocol Buffers

see page 454

### TensorFlow Protobufs

In [23]:
 # Create a tf.train.Example representing a person
from tensorflow.train import BytesList, FloatList, Int64List
from tensorflow.train import Feature, Features, Example

person_example = Example(
    features=Features(
        feature={
            "name": Feature(bytes_list=BytesList(value=[b"Alice"])),
            "id": Feature(int64_list=Int64List(value=[123])),
            "emails": Feature(bytes_list=BytesList(value=[b"a@b.com",
                                                          b"c@d.com"]))
        }))

In [30]:
with tf.io.TFRecordWriter("my_contacts.tfrecord") as f:
    for _ in range(5):
        f.write(person_example.SerializeToString())

### Loading and Parsing Examples

In [None]:
feature_description = {
    "name": tf.io.FixedLenFeature([], tf.string, default_value=""),
    "id": tf.io.FixedLenFeature([], tf.int64, default_value=0),
    "emails": tf.io.VarLenFeature(tf.string),
}
