# Chapter 13: Loading and Preprocessing Data with TensorFlow

**Reference:** Hands-On Machine Learning with Scikit-Learn, Keras, and TensorFlow (Aurélien Géron)

---

So far we have used only datasets that fit in memory, but Deep Learning systems are often trained on very large datasets that will not fit in RAM. Ingesting a large dataset and preprocessing it efficiently can be tricky to implement with other Deep Learning libraries, but TensorFlow makes it easy thanks to the Data API: you just create a dataset object, and tell it where to get the data and how to transform it. TensorFlow takes care of all the implementation details, such as multithreading, queuing, batching, and prefetching. Moreover, the Data API works seamlessly with `tf.keras`!

In this chapter, we will cover the Data API, the TFRecord format, and how to build custom preprocessing layers and use the standard Keras layers. We will also look at the TensorFlow ecosystem’s related projects: TF Transform (tf.Transform), TF Datasets (TFDS).

## 0. Setup

First, let's import the necessary modules and configure the environment for reproducibility.

In [None]:
import sys
import sklearn
import tensorflow as tf
from tensorflow import keras
import numpy as np
import os

# specific imports for plotting/data
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# to make this notebook's output stable across runs
np.random.seed(42)
tf.random.set_seed(42)

## 1. The Data API

The whole Data API revolves around the concept of a `Dataset`. This represents a sequence of data items. Usually you will use datasets that gradually read data from disk, but for simplicity let’s create a dataset entirely in RAM using `tf.data.Dataset.from_tensor_slices()`.

In [None]:
X = tf.range(10)  # any data tensor
dataset = tf.data.Dataset.from_tensor_slices(X)
dataset

You can iterate over a dataset’s items like this:

In [None]:
for item in dataset:
    print(item.numpy())

### Chaining Transformations

Once you have a dataset, you can apply all sorts of transformations to it by calling its transformation methods. Each method returns a new dataset, so you can chain transformations.

The following code:
1.  **Repeats** the items 3 times.
2.  **Batches** them into groups of 7.
3.  **Maps** a function (x * 2) to each item.
4.  **Unbatches** them back into single items.
5.  **Filters** to keep items < 10.
6.  **Takes** just the first 3 items.

In [None]:
dataset = tf.data.Dataset.range(10)
dataset = dataset.repeat(3).batch(7)
dataset = dataset.map(lambda x: x * 2)
dataset = dataset.unbatch().filter(lambda x: x < 10)

for item in dataset.take(3):
    print(item.numpy())

### Shuffling the Data

Gradient Descent works best when the instances in the training set are independent and identically distributed (IID). The `shuffle()` method uses a buffer to sample elements randomly.

In [None]:
tf.random.set_seed(42)
dataset = tf.data.Dataset.range(10).repeat(3)
dataset = dataset.shuffle(buffer_size=3, seed=42).batch(7)
for item in dataset:
    print(item.numpy())

### Interleaving Lines from Multiple Files

For a large dataset that does not fit in memory, a simple solution is to split it into multiple files and read them in parallel. 

First, let's prepare the California Housing dataset and split it into multiple CSV files.

In [None]:
housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
scaler.fit(X_train)
X_mean = scaler.mean_
X_std = scaler.scale_

def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = os.path.join("datasets", "housing")
    os.makedirs(housing_dir, exist_ok=True)
    path_format = os.path.join(housing_dir, "my_{}_{:02d}.csv")

    filepaths = []
    m = len(data)
    for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filepaths.append(part_csv)
        with open(part_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header)
                f.write("\n")
            for row_idx in row_indices:
                f.write(",".join([repr(col) for col in data[row_idx]])) 
                f.write("\n")
    return filepaths

train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)

train_filepaths = save_to_multiple_csv_files(train_data, "train", header, n_parts=20)
valid_filepaths = save_to_multiple_csv_files(valid_data, "valid", header, n_parts=10)
test_filepaths = save_to_multiple_csv_files(test_data, "test", header, n_parts=10)

Now we have the files. We can use `tf.data.Dataset.list_files` to create a dataset of file paths, and then `interleave` to read from multiple files at once.

In [None]:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)

# Interleave: Read from 5 files at a time, cycle through them
n_readers = 5
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=n_readers)

### Preprocessing and Pipelining

We need a function to parse the CSV lines and apply scaling. We'll wrap this in a reusable function `csv_reader_dataset` that handles the entire pipeline (loading, parsing, shuffling, batching, and prefetching).

In [None]:
n_inputs = 8 # Number of features

def preprocess(line):
    # Definitions for CSV columns: 8 floats for X, 1 float for y
    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(fields[:-1])
    y = tf.stack(fields[-1:])
    return (x - X_mean) / X_std, y

def csv_reader_dataset(filepaths, repeat=1, n_readers=5,
                       n_read_threads=None, shuffle_buffer_size=10000,
                       n_parse_threads=5, batch_size=32):
    dataset = tf.data.Dataset.list_files(filepaths)
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
        cycle_length=n_readers, num_parallel_calls=n_read_threads)
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
    dataset = dataset.shuffle(shuffle_buffer_size).repeat(repeat)
    return dataset.batch(batch_size).prefetch(1)

**Training with the Dataset:**
We can now simply pass these datasets to Keras's `model.fit()`.

In [None]:
# Create Datasets
train_set = csv_reader_dataset(train_filepaths, batch_size=32)
valid_set = csv_reader_dataset(valid_filepaths, batch_size=32)
test_set = csv_reader_dataset(test_filepaths, batch_size=32)

# Build Model
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="relu", input_shape=X_train.shape[1:]),
    keras.layers.Dense(1),
])
model.compile(loss="mse", optimizer=keras.optimizers.SGD(learning_rate=1e-3))

# Train
history = model.fit(train_set, steps_per_epoch=len(X_train) // 32, epochs=5,
                    validation_data=valid_set)

# Evaluate
model.evaluate(test_set, steps=len(X_test) // 32)

## 2. The TFRecord Format

The TFRecord format is TensorFlow’s preferred format for storing large amounts of data efficiently. It is a sequence of binary records. We often use it combined with **Protocol Buffers**.

In [None]:
# Writing a TFRecord file
with tf.io.TFRecordWriter("my_data.tfrecord") as f:
    f.write(b"This is the first record")
    f.write(b"And this is the second record")

# Reading a TFRecord file
filepaths = ["my_data.tfrecord"]
dataset = tf.data.TFRecordDataset(filepaths)
for item in dataset:
    print(item.numpy())

### Using Example Protobufs

Typically, we serialize instances as `Example` protocol buffers. This provides a structured format (like a dictionary) inside the binary record.

In [None]:
from tensorflow.train import BytesList, FloatList, Int64List
from tensorflow.train import Feature, Features, Example

# 1. Create a structured Example
person_example = Example(
    features=Features(
        feature={
            "name": Feature(bytes_list=BytesList(value=[b"Alice"])),
            "id": Feature(int64_list=Int64List(value=[123])),
            "emails": Feature(bytes_list=BytesList(value=[b"a@b.com", b"c@d.com"]))
        }))

# 2. Serialize and Write
with tf.io.TFRecordWriter("my_contacts.tfrecord") as f:
    f.write(person_example.SerializeToString())

# 3. Read and Parse
feature_description = {
    "name": tf.io.FixedLenFeature([], tf.string, default_value=""),
    "id": tf.io.FixedLenFeature([], tf.int64, default_value=0),
    "emails": tf.io.VarLenFeature(tf.string),
}

def parse(serialized_example):
    return tf.io.parse_single_example(serialized_example, feature_description)

dataset = tf.data.TFRecordDataset(["my_contacts.tfrecord"])
for serialized_example in dataset:
    parsed_example = parse(serialized_example)
    print(parsed_example)

## 3. Keras Preprocessing Layers

Handling preprocessing inside the model ensures that the model is portable and avoids training/serving skew. Keras provides specific layers for normalization, text handling, and categorical features.

### One-Hot Encoding & Embeddings
We can use standard TF operations or Keras layers to handle categorical data.

In [None]:
# Example: One-Hot Encoding using a Lookup Table
vocab = ["<1H OCEAN", "INLAND", "NEAR OCEAN", "NEAR BAY", "ISLAND"]
indices = tf.range(len(vocab), dtype=tf.int64)
table_init = tf.lookup.KeyValueTensorInitializer(vocab, indices)
num_oov_buckets = 2
table = tf.lookup.StaticVocabularyTable(table_init, num_oov_buckets)

categories = tf.constant(["NEAR BAY", "DESERT", "INLAND", "INLAND"])
cat_indices = table.lookup(categories)
cat_one_hot = tf.one_hot(cat_indices, depth=len(vocab) + num_oov_buckets)
print("One-Hot Output:\n", cat_one_hot.numpy())

In [None]:
# Example: Embeddings using Keras Layer
embedding_dim = 2
embedding_layer = keras.layers.Embedding(input_dim=len(vocab) + num_oov_buckets,
                                         output_dim=embedding_dim)
print("Embedding Output:\n", embedding_layer(cat_indices).numpy())

### Feature Preprocessing Layers

Common layers include `Normalization`, `Discretization`, `StringLookup`, and `Hashing`.

In [None]:
# 1. Normalization
norm_layer = keras.layers.Normalization()
norm_layer.adapt(X_train[:5]) # Adapt to a small sample for demo
print("Normalized sample:", norm_layer(X_train[:1]).numpy())

# 2. Discretization (Binning)
age = tf.constant([[10.], [93.], [57.], [18.], [70.], [5.]])
discretization = keras.layers.Discretization(bin_boundaries=[18., 50.])
print("Discretized Bins:\n", discretization(age).numpy())

# 3. String Lookup
cities = tf.constant(["Auckland", "Paris", "Paris", "San Francisco"])
str_lookup = keras.layers.StringLookup()
str_lookup.adapt(cities)
print("String Indices:", str_lookup([["Paris"], ["Auckland"], ["Montreal"]]).numpy())

# 4. Hashing (for very large vocabularies)
hashing = keras.layers.Hashing(num_bins=10)
print("Hashed Indices:", hashing([["Paris"], ["Tokyo"], ["Auckland"]]).numpy())

## 4. TensorFlow Datasets (TFDS)

Finally, `tensorflow_datasets` is a library that provides a collection of ready-to-use datasets. They are handled as `tf.data` datasets.

In [None]:
import tensorflow_datasets as tfds

datasets = tfds.load(name="mnist", as_supervised=True)
mnist_train, mnist_test = datasets["train"], datasets["test"]

# Transform, shuffle, and batch
mnist_train = mnist_train.shuffle(10000).batch(32).prefetch(1)
for image, label in mnist_train.take(1):
    print(f"Image shape: {image.shape}, Label shape: {label.shape}")