In [2]:
import numpy as np
import tensorflow as tf
import pandas as pd

# Loading and Preprocessing Data with Tensorflow

Ingesting a large dataset and preprocessing it efficiently can be tricky to implement with other Deep Learning libraries, but tensorFlow makes it easy thanks to the *Data API*: you just create a dataset object, and tell it where to get the data and how to transform it.

Off the shelf, the Data API can read from text files, binary files with fixed-sized records, and binary files that use TensorFlow's TFRecord format. The Data API also has support for reading from SQL databases.

Reading huge datasets efficiently is not the only difficulty: the data also needs to be preprocessed, usually normalized. Moreover, it is not always composed strictly of convenient numerical fields: there may be text features, categorical features, and so on. These need to be encoded, for example using one-hot encoding, bag-of-words encoding, or _embeddings_ (as we will see, an embedding is a trainable dense vector that represents a category or token). One option to handle all this preprocessing is to write your own custom preprocessing layers. Another is to use the standard preprocessing layers provided by Keras.

## The Data API

In [34]:
X = tf.range(10)  # any data tensor
dataset = tf.data.Dataset.from_tensor_slices(X)

for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


The from_tensor_slices() function takes a tensor and creates a tf.data.Dataset whose elements are all the slices of X (along the first dimension), so this dataset contains 10 items. In this case we would have obtained the same dataset if we had used tf.data.Dataset.range(10). Note that you can iterate over the dataset's items intuitively.

### Chaining Transformations

Once you have a dataset, you can apply all sorts of transformations to it by calling its transformation methods. Each method returns a new dataset, so you can chain transformations like the example below.

In [35]:
dataset = dataset.repeat(3).batch(7)
for item in dataset:
    print(item)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)


In this example, we first call the repeat() method on the original dataset, and it returns a new dataset that will repeat the items of the original dataset three times. Of course, this will not copy all the data in memory three times! (If you call this method with no arguments the new dataset will repeat the srouce dataset forever, so the code that iterates over the dataset will have to decide when to stop.) Then we call the batch() method on this new dataset, and again this creates a new dataset. This one will group the items of this final dataset. As you can see, the batch() method had to output a final batch of size two instead of seven, but you can call it with drop_remainder=True if you want it to drop this final batch so that all batches have the exact same size.

The dataset methods do _not_ modify datasets, they create new ones, so make sure to keep a reference to these new datasets (e.g. with dataset = ...) or else nothing will happen.

In [36]:
dataset = dataset.map(lambda x: x * 2)
for item in dataset:
    print(item)

tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int32)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int32)
tf.Tensor([16 18], shape=(2,), dtype=int32)


You can also transform the items by calling the map() method. For example, the code above doubles the values of the original dataset. This function is the on you will call to apply any preprocessing you want to your data. Sometimes this will include computations that can be quite intensive, such as reshaping or rotating an image, so you will usually want to spawn multiple threads to speeds things up: it's as simple as setting the num_parallel_calls argument. Note that the function you pass to the map() method must be convertible to a TF Function.

While the map() method applies a transformation to each item, the apply() method applies a transformation to the dataset as a whole. For example, the following code applies the unbatch() to the dataset. Each item in the new dataset will be a single-integer tensor instead of a batch of seven integers. It is also possible to simply filter the dataset using filter() or look at just a few items using take()

In [37]:
dataset = dataset.unbatch() #dataset.apply(tf.data.experimental.unbatch())
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(10, shape=(), dtype=int32)
tf.Tensor(12, shape=(), dtype=int32)
tf.Tensor(14, shape=(), dtype=int32)
tf.Tensor(16, shape=(), dtype=int32)
tf.Tensor(18, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(10, shape=(), dtype=int32)
tf.Tensor(12, shape=(), dtype=int32)
tf.Tensor(14, shape=(), dtype=int32)
tf.Tensor(16, shape=(), dtype=int32)
tf.Tensor(18, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(10, shape=(), dtype=int32)
tf.Tensor(12, shape=(), dtype=int32)
tf.Tensor(14, sh

In [38]:
dataset = dataset.filter(lambda x: x < 10)
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)


In [39]:
for item in dataset.take(3):
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)


### Shuffling Data

Gradient Descent works best when the instances in the training set are independent and identically distributed. A simple way to ensure this is to shuffle the instances using the shuffle() method. You must specify the buffer size, and it is important to make it large enough, or else shuffling will not be very effective. Just don't exceed the amount of RAM you have, and even if you ahve plenty of it, there's no need to go beyond the dataset's size.

For example, the following code creates and displays a dataset containing the integers 0 to 9, repeated 3 times, shuffled using a buffer of size 5 and a random seed of 42, then batched with a batch size of 7.

If you call repeat() on a shuffled dataset, but default it will generate a new order at every iteration. This is generally a good idea, but if you prefer to reuse the same order at each iteration you can set reshuffle_each_iteration=False

In [40]:
dataset = tf.data.Dataset.range(10).repeat(3).shuffle(buffer_size=5, seed=42).batch(7)
for item in dataset:
    print(item)

tf.Tensor([0 2 3 6 7 9 4], shape=(7,), dtype=int64)
tf.Tensor([5 0 1 1 8 6 5], shape=(7,), dtype=int64)
tf.Tensor([4 8 7 1 2 3 0], shape=(7,), dtype=int64)
tf.Tensor([5 4 2 7 8 9 9], shape=(7,), dtype=int64)
tf.Tensor([3 6], shape=(2,), dtype=int64)


For a large dataset that does not fit into memory, this simple shuffling-buffer approach may not be sufficient. One solution is to shuffle the source data itself. Then, to shuffle the instances some more, a common approach is to split the source data into multiple files, then read them in a random order during training. However, instances located in the same file will still end up close to each other. To avoid this you can pick multiple files randomly and read them simultaneously, interleaving their records. Then on top of that you can add a shuffling buffer using the shuffle() method. The Data API makes all of this possible in just a few lines of code.

#### Interleaving lines from multiple files

The following example assumes the Califorina housing dataset has been downloaded, split into train, val, and test sets, and that each set is further broken down into multiple csv files. I'm not going to do this because it is unncessary bloat on my computer.

In [None]:
# train_filepaths = ['datasets/housing/my_train_00.csv', 'datasets/housing/my_train_01.csv', ...]
# filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)

# n_readers = 5
# dataset = filepath_dataset.interleave(
#     lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
#     cycle_length=n_readers
# )

# for line in dataset.take(5):
#     print(line.numpy())

Let's suppose train_filepaths contains the list of training file paths. By default, the list_files() function returns a dataset that shuffles the file paths. In general, this is a good thing, but you can set shuffle=False if you do not want that for some reason.

Next, you can call the interleave() method to read from five files (n_readers) at a time and interleave their lines (skipping the first line of each file, which is the header row, using the skip() method).

The interleave() method will create a dataset that will pull five file paths from the filepath_dataset, and for each one it will call the function you feed it (a lambda in this example) to create a new dataset (in this case a TextLineDataset).

For interleaving to work best, it is preferable to have files of identical length; otherwise the ends of the longest files will not be interleaved.

By default, interleave() does not use parallelism; it just reads one line at a time from each file, sequentially. If you want it to actually read files in parallel, you can set the num_parallel_calls argument to the number of threads you want. Alteratively, you can set num_parallel_calls=tf.data.experimental.AUTOTUNE for TensorFlow to choose the right number of threads dynamically based on the availble CPU.

### Preprocessing the Data

In [42]:
# x_mean, x_std = [...] # mean and scale of each feature in the training set
# n_inputs = 8

# def preprocess(line):
#     defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
#     fields = tf.io.decode_csv(line, record_defaults=defs)
#     x = tf.stack(fields[:-1])
#     y = tf.stack(fields[-1:])
#     return (x - x_mean) / x_std, y

Let's walk through the code above:

1. First, the code assumes that we have precomputed the mean and standard deviation of each feature in the training set. x_mean and x_std are just 1D tensors (or Numpy arrays) containing 8 floats, one per input feature.
2. The preprocess() function takes one CSV line and starts by parsing it. For this it uses the tf.io.decode_csv() function, which takes 2 arguments: the first is the line to parse, and the second is an array containing the default value for each column in CSV File. This array tells TensorFlow not only the default value for each column, but also what number of columns and their types. In this example, we tell it that all feature columns are floats and that missing values should default to 0, but we provide an empty array of tf.float32 as the default value for the last column (the target): the array tells TensorFlow that this column contains floats, but that there is no default value, so it will raise an exception if it encounters the missing value.
3. The decode_csv() function returns a list of scalar tensors (one per column), but we need to return 1D tensor arrays. So we call tf.stack() on all tensors except the last one (the target): this will stack these tensors into a 1D array. We then do the same for the target value (this makes it a 1D tensor array with a single value, rather than a scalar tensor).
4. Finally, we scale the input features by subtracting the feature means and then dividing by the feature standard deviations, and we return a tuple containing the scaled features and target.

### Putting Everything Together

To make the code reusable, let's put together everything we have discussed so far int oa small helper function

In [43]:
def csv_reader_dataset(
    filepaths,
    repeat=1,
    n_readers=5,
    n_read_threads=None,
    shuffle_buffer_size=1e5,
    n_parse_threads=5,
    batch_size=32
):
    dataset = tf.data.Dataset.list_files(filepaths)
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
        cycle_length=n_readers,
        num_parallel_calls=n_parse_threads
    )
    dataset = dataset.shuffle(shuffle_buffer_size).repeat(repeat)
    return dataset.batch(batch_size).prefetch(1)

### Prefetching

By calling prefetch(1) at the end, we are creating a dataset that will do its best to always be one batch ahead. In other words, while our training algorithm is working on one batch, the dataset will already be working in parallel on getting the next batch ready. This can improve performance dramatically.

In general, just prefetching one batch is fine, but in some cases you may way to prefetch a few more. Alternatively, you can let TensorFlow decide automatically by passing tf.data.experimental.AUTOTUNE.

With prefetching, the CPU and the GPU work in parallel: as the GPU works on one batch, the CPU works on the next. If you plan to purchase a GPU, its processing power and its memory size are of course very important (in particular, a large amount of RAM is crucial for computer vision). Just as important to get good performance is its _memory bandwidth_; this is the number of gigabytes of data it can get into or out of its RAM per second.

If the dataset is small enough to fit in memory, you can significantly speed up training by using the dataset's cache() method to cache its content to RAM. This way, each instance will only be read and preprocessed once.

We have discussed the most common dataset methods, but there are a few more you may want to look at:

1. concatenate()
2. zip()
3. window()
4. reduce()
5. shard()
6. flat_map()
7. padded_batch()

There are also a couple more class methods work mentioning:

1. from_generator()
2. from_tensors()

which create a new dataset from a Python generator or a list of tensors, respectively. Also note that there are experimental features available in tf.data.experimental.

### Using the Dataset with tf.keras

In [44]:
# train_set = csv_reader_dataset(train_filepaths)
# val_set = csv_reader_dataset(val_filepaths)
# test_set = csv_reader_dataset(test_filepaths)

# model = tf.keras.models.Sequential([...])
# model.compile([....])
# model.fit(train_set, epochs=10, validation_data=val_set)

# model.evaluate(test_set)
# new_set = test_set.take(3).map(lambda X: y: X) # pretend we have 3 new instances
# model.predict(new_set) # a dataset containing new instances

Now we can use the csv_reader_dataset(). Note that we do not need to repeat it, as this will be taken care of by tf.keras.

If you want to build your own custom training loop (as in Chapter 12), you can just iterate over the training set, very naturally

In [45]:
# for X_batch, y_batch in train_set:
#     [...] # perform one gradient descent step

In fact, it is even possible to create a TF Function (see Chapter 12) that performs the whoel training loop.

In [46]:
# @tf.function
# def train(model, optimizer, loss_fun, n_epochs, **kwargs): 
#     train_set = csv_reader_dataset(train_filepaths, repeat=n_epochs, [...])
#     for X_batch, y_batch in train_set:
#         with tf.GradientTape() as tape:
#             y_pred = model(X_batch)
#             main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
#             loss = tf.add_n([main_loss] + model.losses)
#         grads = tape.gradient(loss, model.trainable_variables)
#         optimizer.apply_gradients(zip(grads, model.trainable_variables))

Note that CSV files do not support large or complex data structures (such as images or audio) very well. So let's see how to use TFRecords instead. If you are happy with CSV files (or whatever other format you are using), you do not _need_ to use TFRecords. As the saying goes, if it ain't broke, don't fix it! __TFRecords are useful when the bottleneck during training is loading and parsing data.__

## The TFRecord Format

### Compressed TFRecord Files

### A Brief Introduction to Protocol Buffers

### TensorFlow Protobufs

### Loading and Parsing Examples

### Handling Lists of Lists Using the SequenceExample Protobuf

## Preprocessing the Input Features

### Encoding Categorical Features Using One-Hot Vectors

### Encoding Categorical Features Using Embeddings

### Word Embeddings

### Keras Preprocessing Layers

## TF Transform

tf.Transform makes it possible to write a single preprocessing function that can be run in batch mode on your full training set, before training (to speed it up), and then exported to a TF Function and incorporated into your trained model so that once it is deployed in production it can take care of preprocessing new instances on the fly.

## The TensorFlow Datasets (TFDS) Project

TFDS provides a convenient function to download many common datasets of all kinds, including large ones like ImageNet, as well as convenient dataset objects to manipulate them using the Data API

# Exercises

My Answer:



Book Answer:

My Answer:



Book Answer:

My Answer:



Book Answer:

My Answer:



Book Answer:

My Answer:



Book Answer:

My Answer:



Book Answer:

My Answer:



Book Answer:

My Answer:



Book Answer: