# Loading and Preprocessing Data with TensorFlow

In [14]:
import tensorflow as tf
from tensorflow import keras

## The Data API

revolves around the concept of *dataset*: represents a sequence of data items. Usually, use datasets that gradually read data from disk

Create a dataset entirely in RAM using `tf.data.Dataset.from_tensor_slices()`

In [15]:
X = tf.range(10) # any data tensor
dataset = tf.data.Dataset.from_tensor_slices(X)
dataset

<TensorSliceDataset shapes: (), types: tf.int32>

function takes a tensor and creates a `tf.data.Dataset` whose elements are all the slices of `X` (along the first dimension). Would have obtained the same dataset if we used `tf.data.Dataset.range(10)`

In [16]:
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


### Chaining Transformations

Apply transformations by using transformation methods. Each method returns a new dataset

In [17]:
dataset = dataset.repeat(3).batch(7)
for item in dataset:
    print(item)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)


Can also transform items using `map()` method

In [18]:
dataset = dataset.map(lambda x: x*2) # Items: [0, 2, 3, 4, 5, 10, 12]

Transform dataset as a whole using `apply()` method

In [19]:
dataset = dataset.apply(tf.data.experimental.unbatch()) # Items: 0, 2, 4, ...

Instructions for updating:
Use `tf.data.Dataset.unbatch()`.


Filter dataset using `filter()`

In [20]:
dataset = dataset.filter(lambda x: x < 10) # Items: 0 2 4 5 8 0 2 4 6 ...

To look at a few items in the dataset use `take()`

In [21]:
for item in dataset.take(3):
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)


### Shuffling Data

Use `shuffle()` method. Works by filling up a buffer with the first items of the source dataset. Then, whenever it is asked for an item, it will pull one out randomly from the buffer and replace it with a fresh one from the source dataset until it has iterated entirely through the source dataset. It then continues to pull out items from the buffer randomly until it is empty. Must specify the buffer size, and it is important to make it large enough, or else shuffling will not be effective. But don't exceed the amount of RAM you have. Provide a random seed as well

In [None]:
dataset = tf.data.Dataset.range(10).repeat(3)
dataset = dataset.shuffle(buffer_size=5, seed=42).batch(7)
for item in dataset:
    print(item)

#### Interleaving lines from multiple files

Suppose you've loaded California housing dataset, shuffled it, and split it into a training, validation, and test set. Then split each set into many CSV files. Also supposed `train_filepaths` contains the list of training file paths (and also have `valid_filepaths` and `test_filepaths`). Alternatively you could use file patterns, for example `train_filepaths = "datasets/housing/my_train_*.csv". Now create dataset containing only these file paths:

In [None]:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)

`list_files()` function returns a dataset that shuffles the file paths. You can set `shuffle=False` if needed

Next, call `interleave()` method to read from five files at a time and interleave their lines (skipping the first line of each file, which is the header row, using the `skip()` method)

In [None]:
n_readers = 5
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=n_readers
)

`interleave()` will create a dataset that will pull five file pahts from the `filepath_dataset`, and for each one it will cal the function you gave it (a lambda in this example) to create a new dataset (in this case a `TextLineDataset`). At this stage there will be 7 datasets in all: the filepath dataset, the interleave dataset, and the five `TextLineDatasets` created internally by the interleave dataset. When we iterate over the interleave dataset, it will cycle through these five `TextLineDatasets`, reading one line at a time from each until all datasets are out of items. Then it will get the next five file paths from the `filepath_dataset` and interleave them the same way and so on until it runs out of file paths. 

To read files in parallel set `num_parallel_calls` argument to the number of threads you want (note `map()` also has this argument). Can even set it to `tf.data.experimental.AUTOTUNE` to make TensorFlow choose the right number of threads dynamically based on the available CPU

### Preprocessing the Data

In [None]:
X_mean, X_std = [...] # mean and scale of each feature in the training set
n_inputs = 8

def preprocess(line):
    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(fields[:-1])
    y = tf.stack(fields[-1:])
    return (x - X_mean) / X_std, y

### Putting Everything Together

In [None]:
def csv_reader_dataset(filepaths, repeat=1, n_readers=5, n_read_threads=None, shuffle_buffer_size=10000, n_parse_threads=5, batch_size=32):
    dataset = tf.data.Dataset.list_files(filepaths)
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
        cycle_length=n_readers, num_parallel_calls=n_read_threads
    )
    dataset = dataset.shuffle(shuffle_buffer_size).repeat(repeat)
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
    return dataset.batch(batch_size).prefetch(1)

### Prefetching

By calling `prefetch(1)` at the end: while training algorithm is working on one batch, the dataset will already be working in parallel on getting the next batch ready (e.g., reading data from disk and preprocessing it). Improves performance dramatically. Also by ensuring that loading and preprocessing are multithreadeded (setting `num_parallel_calls` when calling `interleave()` and `map()`), we can exploit multiple cores on the CPU and hopefully make preparing one batch of data shorter than running a training set on the GPU.

If dataset fits in memory, significantly speed up training by using the dataset's `cache()` method to cache content to RAM. Do this after loading and preprocessing the data, but before shuffling, repeating, batching, and prefetching. 

### Using the Dataset with tf.keras

In [None]:
train_set = csv_reader_dataset(train_filepaths)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)

In [None]:
model = keras.models.Sequential([...])
model.compile([...])
model.fit(train_set, epochs=10, validation_data=valid_set)

In [None]:
model.evaluate(test_set)
new_set = test_set.take(3).map(lambda X, y: X) # pretend we have 3 new instances
model.predict(new_set) # a dataset containing new instances

To create custom training loop

In [None]:
for X_batch, y_batch in train_set:
    [...] # perform one Gradient Descent step

Create a TF Function that performs the whole training loop

In [None]:
@tf.function
def train(model, optimizer, loss_fn, n_epochs, [...]):
    train_set = csv_reader_dataset(train_filepaths, repeat=n_epochs, [...])
    for X_batch, y_batch in train_set:
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

## The TFRecord Format

Simple binary format that contains a sequence of binary records of varying sizes (each record is comprised of a length, a CRC checksum to check that the length was not corrupted, then the actual data, and finally a CRC checksum for the data).

In [None]:
with tf.io.TFRecordWriter('my_data.tfrecord') as f:
    f.write(b'This is the first record')
    f.write(b'And this is the second record')

Use `tf.data.TFRecordDataset` to read one or more TFRecord files

In [None]:
filepaths = ['my_data.tfrecord']
dataset = tf.data.TFRecordDataset(filepaths)
for item in dataset:
    print(item)

### Compressed TFRecord Files

Create a compressed TFRecord file by setting the `options` argument

In [None]:
options = tf.io.TFRecordOptions(compression_type='GZIP')
with tf.io.TFRecordWriter('my_compressed.tfrecord', options) as f:
    [...]

When reading a compressed TFRecord file, specify the compression type:

In [None]:
dataset = tf.data.TFRecordDataset(['my_compressed.tfrecord'], compression_type='GZIP')

### A Brief Introduction to Protocol Buffers

TFRecord files usually contain serialized protocol buffers (*protobufs*)

Once you have a definition in a *.proto* file, you can compile it. This requires `protoc`, the protobuf compiler, to generate access classes in Python

After creating and modifying the instance, serialize it using the `SerializeToString()` method. This is the binary data that is ready to be saved or transmitted over the network. When reading or receiving this binary data, we can parse it using the `ParseFromString()` method, and we get a copy of the object that was serialized.

### TensorFlow Protobufs

Typically used in a TFRecord file is the Example protobuf, which represents one instance in a dataset. Contains a list of features, where each feature can either be a list of byte strings, a list of floats, or a list of intergers. 

In [None]:
syntax = 'proto3';
message BytesList { repeated bytes value = 1; }
message floatList { repeated float value = 1 [packed = true] }
message Int64List { repeated int64 value = 1 [packed = true] }
message Feature {
    oneof kind {
        BytesList bytes_list = 1;
        FloatList float_list = 2;
        Int64List int64_list = 3;
    }
};

message Features { map<string, Feature> feature = 1; };
message Example { Feature features = 1; };

`[packed = True]` is used for repeated numerical fields, for a more efficient encoding. A `Feature` contains either a `ByteList`, a `FloatList`, or an `Int64List`. A `Features` contains a dictionary that maps a feature name to the corresponding feature value. Finally, an `Example` contains only a `Features` object. 

In [None]:
from tensorflow.train import BytesList, FloatList, Int64List
from tensorflow.train import Feature, Features, Example

person_example = Example(
    features = Features(
        feature = {
            'name': Feature(bytes_list=BytesList(value=[b'Alice'])),
            'id': Feature(int64_list=Int64List(value=[123])),
            'emails': Feature(bytes_list=BytesList(value=[b'a@b.com', b'c@d.com']))
        }
    )
)

In [None]:
with tf.io.TFRecordWriter('my_contracts.tfrecord') as f:
    f.write(person_example.SerializeToString())

Typically, you would create a conversion script that reads from your current format (say, CSV), creates an `Example` protobuf for each instance, serializes them, and saves them to several TFRecord files, ideally shuffling them in the process. 

## Preprocessing the Input Features

Implements a standardization layer using a `Lambda` layer. For each feature, it subtracts the mean and divides by its standard deviation (plus a smoothing term)

In [None]:
means = np.mean(X_train, axis=0, keepdims=True)
stds = np.std(X_train, axis=0, keepdims = True)
eps = keras.backend.epsilon()
model = keras.models.Sequential([
    keras.layers.Lambda(lambda inputs: (inputs - means) / (stds + eps)),
    [...] # other layers
])

Self contained custom layer (like Scikit-Learn's `StandardScaler`)

In [None]:
class Standardization(keras.layers.Layer):
    def adapt(self, data_sample):
        self.means_ = np.mean(data_sample, axis=0, keepdims=True)
        self.stds_ = np.std(data_sample, axis=0, keepdims=True)
    def call(self, inputs):
        return (inputs - self.means_) / (self.stds - keras.backend.epsilon())

Before using this layer, adapt it to your dataset by calling `adapt()` method and passing a data sample

In [None]:
std_layer = Standardization()
std_layer.adapt(data_sample)

Sample must be large enough to be representative of your dataset, but does not have to be the full training set: in general, a few hundred randomly selected instances will suffice. 

In [None]:
model = keras.Sequential()
model.add(std_layer)
[...] # create the rest of the model
model.compile([...])
model.fit([...])

Keras has a built in standardization layer: `keras.layers.Normalization`. Works very similarly to the layer above

### Encoding Categorical Features Using One-Hot Vectors

Consider `ocean_proximity` features in the California housing dataset

In [None]:
vocab = ['<1H OCEAN', 'INLAND', 'NEAR OCEAN', 'NEAR BAY', 'ISLAND']
indices = tf.range(len(vocab), dtype=tf.int64)
table_init = tf.lookup.KeyValueTensorInitializer(vocab, indices)
num_oov_buckets = 2
table = tf.lookup.StaticVocabularyTable(table_init, num_oov_buckets)

Create an initializer for the lookup table, passing it the list of categories and their corresponding indices. In this example, we already have this data, so we used a `KeyValueTensorInitializer`; but if the categories were listed in a text file (with one category per line), we would use a `TextFileInitializer` instead

Last two lines, create the lookup table, giving it the initializer and specifying the number of *out-of-vocabulary* (oov) buckets. If we look up a category that does not exist in the vocabulary, the lookup table will compute a hash of this category and use it to assign the unknown category to one of the oov buckets. Their indices start after the known categories, so in this example the indices of the two oov buckets are 5 and 6.

In [None]:
categories = tf.constant(['NEAR BAY', 'DESERT', 'INLAND', 'INLAND'])
cat_indices = table.lookup(categories)
cat_indices

<tf.Tensor: shape=(4,), dtype=int64, numpy=array([3, 5, 1, 1])>

In [None]:
cat_one_hot = tf.one_hot(cat_indices, depth=len(vocab) + num_oov_buckets)
cat_one_hot

<tf.Tensor: shape=(4, 7), dtype=float32, numpy=
array([[0., 0., 0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 0., 1., 0.],
       [0., 1., 0., 0., 0., 0., 0.],
       [0., 1., 0., 0., 0., 0., 0.]], dtype=float32)>

Keras has similar layer called `keras.layers.TextVectorization`

If vocabulary is large, it is more efficient to encode them using *embedding* instead

### Encoding Categorical Features Using Embeddings

An embedding is a trainable dense vector that represents a category. By default, embeddings are initialized randomly (e.g., [0.131, 0.890] for "NEAR BAY" category and [0.631, 0.791] for "NEAR OCEAN" category). Number of dimensions (for vector) is a hyperparameter than can be tweaked. Since embeddings are trainable, they gradually improve during training; and as they represent fairly similar categories, Gradient Descent will end up pushing them closer together than the "INLAND" category, for instance. This is called *representation learning*.

Implement embeddings manually by creating an *embedding matrix* containing each category's embedding, initialized randomly; it will have one row per category and per oov bucket, and one column per embedding dimension:

In [None]:
embedding_dim = 2
embed_init = tf.random.uniform([len(vocab) + num_oov_buckets, embedding_dim])
embedding_matrix = tf.Variable(embed_init)
embedding_matrix

<tf.Variable 'Variable:0' shape=(7, 2) dtype=float32, numpy=
array([[0.36249554, 0.9753537 ],
       [0.6676022 , 0.09119081],
       [0.7744031 , 0.8374994 ],
       [0.03056931, 0.3748523 ],
       [0.9260483 , 0.00841868],
       [0.8329289 , 0.5473795 ],
       [0.05378199, 0.2471453 ]], dtype=float32)>

Encode the same batch of categorical features as earlier

In [None]:
categories = tf.constant(['NEAR BAY', 'DESERT', 'INLAND', 'INLAND'])
cat_indices = table.lookup(categories)
cat_indices

<tf.Tensor: shape=(4,), dtype=int64, numpy=array([3, 5, 1, 1])>

In [None]:
tf.nn.embedding_lookup(embedding_matrix, cat_indices)

<tf.Tensor: shape=(4, 2), dtype=float32, numpy=
array([[0.03056931, 0.3748523 ],
       [0.8329289 , 0.5473795 ],
       [0.6676022 , 0.09119081],
       [0.6676022 , 0.09119081]], dtype=float32)>

Keras provides a `keras.layers.Embedding` layer that handles the embedding matrix (trainable, by default)

In [None]:
embedding = keras.layers.Embedding(input_dim=len(vocab) + num_oov_buckets, output_dim=embedding_dim)

create a Keras model that can process categorical features (along with numberical features) and learn an embedding for each category (as well as for each oov bucket)

In [None]:
regular_inputs = keras.layers.Input(shape=[8])
categories = keras.layers.Input(shape=[], dtype=tf.string)
cat_indices = keras.layers.Lambda(lambda cats: table.lookup(cats))(categories)
cat_embed = keras.layers.Embedding(input_dim=6, output_dim=2)(cat_indices)
encoded_inputs = keras.layers.Concatenate([regular_inputs, cat_embed])
outputs = keras.layers.Dense(1)(encoded_inputs)
model = keras.models.Model(inputs=[regular_inputs, categories], outputs=[outputs])

### Keras Preprocessing Layers

Refer to API docs for new standard preprocessing layers being added to Keras. 

Includes `Normalization` and `TextVectorization` layers mentioned earlier. 

Also includes `Discretization` layer that will chop continuous data into different bins and encode each bin as a one-hot vector. Can only be used at the start of the model because the layer is not differentiable

Possible to chain multiple preprocessing layers using `PreprocessingStage` class. Works like creating a Scikit-Learn pipeline. After adapting this pipeline to a data sample, you can use it like a regular layer in your models (only at the start of the model).

In [None]:
normalization = keras.layers.Normalization()
discretization = keras.layers.Discretization([...])
pipeline = keras.layers.PreprocessingStage([normalization, discretization])
pipeline.adapt(data_sample)

`TextVectorization` will have an option to output word-count vectors instead of word indices. This representation is called a *bag of words*, since it completely loses the order of the words. 

## TF Transform

Define preprocessing operations just once (instead of having to define them multiple times for different applications like a mobile app, or web browser implementation, etc.).

In [22]:
import tensorflow_transform as tft

def preprocess(inputs): # inputs = a batch of input features
    median_age = inputs['housing_median_age']
    ocean_proximity = inputs['ocean_proximity']
    standardized_age = tft.scale_to_z_score(median_age)
    ocean_proximity_id = tft.compute_and_apply_vocabulary(ocean_proximity)
    return {
        'standardized_median_age': standardized_age,
        'ocean_proximity_id': ocean_proximity_id
    }

ModuleNotFoundError: No module named 'tensorflow_transform'

TF Transform lets you apply `preprocess()` function to the whole training set using Apache Beam (it provides an `AnalyzeAndTranformDataset` class that you can use for this purpose in your Apache Beam pipeline)

TF Transform also generates an equivalent TensorFlow Function that you can plug into the model you deploy. 

## The TensorFlow Datasets (TFDS) Project

TensorFlow Datasets makes it easy to download common datasets

In [None]:
import tensorflow_datasets as tfds

dataset = tfds.load(name='mnist')
mnist_train, mnist_test = dataset['train'], dataset['test']

In [None]:
mnist_train = mnist_train.shuffle(10000).batch(32).prefetch(2)
for item in mnist_train:
    images = item['image']
    labels = item['label']
    [...]

Note each item in the dataset is a dictionary containing both the features and the labels. Keras expects them to be a tuple containing two elements. You could transform the dataset like this:

In [None]:
mnist_train = mnist_train.shuffle(10000).batch(32)
mnist_train = mnist_train.map(lambda items: (items['image'], items['label']))
mnist_train = mnist_train.prefetch(1)

In [None]:
dataset = tfds.load(name='mnist', batch_size=32, as_supervised=True)
mnist_train = dataset['train'].prefetch(1)
model = keras.models.Sequential([...])
model.compile(loss='sparse_categorical_crossentropy', optimizer='sgd')
model.fit(mnist_train, epochs=5)