# Chapter 13: Loading and Preprocessing Data with TensorFlow

## 1. Chapter Overview
**Goal:** Deep Learning systems often require massive amounts of data that cannot fit into RAM. Furthermore, if the CPU is slow at loading/processing data, the powerful GPU will sit idle, wasting resources. In this chapter, we master the **TensorFlow Data API (`tf.data`)** to build efficient, parallelized input pipelines. We also explore the **TFRecord** format for fast binary storage and **Keras Preprocessing Layers** to include preprocessing logic directly inside the model.

**Key Concepts:**
* **The Data API (`tf.data`):** Creating, chaining, and optimizing dataset transformations.
* **ETL Pipeline:** Extract (read), Transform (map/filter), Load (prefetch/batch).
* **Performance Optimization:** Prefetching, caching, and parallelizing operations (`num_parallel_calls`).
* **TFRecord Format:** A simple binary format for storing sequences of binary records (Protobufs).
* **Protocol Buffers:** Defining structured data schemas for serialization.
* **Keras Preprocessing Layers:** `Normalization`, `TextVectorization`, `CategoryEncoding`.

**Practical Skills:**
* Creating datasets from tensors and CSV files.
* Chaining `shuffle`, `batch`, `map`, and `prefetch` methods.
* Writing and reading **TFRecord** files manually.
* creating a custom `Example` protobuf message.
* Building a model that accepts raw strings using the `TextVectorization` layer.

In [None]:
# Setup
import sys
assert sys.version_info >= (3, 5)

import sklearn
assert sklearn.__version__ >= "0.20"

import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"

import numpy as np
import os
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt

np.random.seed(42)
tf.random.set_seed(42)

## 2. Theoretical Explanation (In-Depth)

### 1. The Data API (`tf.data`)
The Data API allows you to define a pipeline of operations. The core object is `tf.data.Dataset`. 
* **Immutability:** Datasets are immutable. Methods like `.map()` or `.batch()` do not modify the dataset; they return a new one.
* **Lazy Evaluation:** Operations are not executed immediately. The dataset is just a graph of instructions. Data is only processed when you iterate over it (e.g., during training).

**Key Transformations:**
1.  **`from_tensor_slices()`:** Creates a dataset from a list/array in memory.
2.  **`shuffle(buffer_size)`:** Fills a buffer with data and samples randomly from it. Crucial for SGD.
3.  **`map(function)`:** Applies a transformation to each item (e.g., resizing images, parsing CSVs).
4.  **`batch(size)`:** Groups items into batches.
5.  **`prefetch(buffer_size)`:** The most important performance tool. While the GPU is training on batch $N$, the CPU prepares batch $N+1$ in parallel.

### 2. The TFRecord Format
CSV files are human-readable but inefficient (text parsing is slow). **TFRecord** is TensorFlow's preferred binary format. It stores a sequence of binary records. 
Each record is typically a serialized **Protocol Buffer** (protobuf). Protobuf is a portable, efficient, binary format developed by Google.

**Structure:**
* **`tf.train.Example`:** The standard protobuf message used in datasets. It contains a dictionary of **Features**.
* **`tf.train.Features`:** A mapping of feature names to values (BytesList, FloatList, or Int64List).

### 3. Keras Preprocessing Layers
Historically, preprocessing (like scaling or one-hot encoding) was done in NumPy/Pandas before feeding data to the model. This creates a **Training-Serving Skew** risk (if the preprocessing code in production differs slightly from training).
Keras now provides layers that handle preprocessing *inside* the model graph.
* **`Normalization`:** Replaces StandardScaler.
* **`TextVectorization`:** Handles tokenization and indexing for NLP.
* **`CategoryEncoding`:** Handles one-hot encoding.

## 3. Code Reproduction

### 3.1 Basic `tf.data` Pipeline
We create a dataset from a simple range of numbers and apply a chain of transformations.

In [None]:
# Create a dataset from a list of numbers 0 to 9
dataset = tf.data.Dataset.range(10)

# Chain transformations
dataset = dataset.repeat(3) # Repeat the dataset 3 times
dataset = dataset.shuffle(buffer_size=5, seed=42)
dataset = dataset.batch(7) # Group into batches of 7
dataset = dataset.map(lambda x: x * 2) # Double every value

# Iterate and inspect
for batch in dataset:
    print(batch.numpy())

### 3.2 Splitting the California Housing Dataset
We will load the housing data, split it into multiple CSV files, and then build a pipeline to read them in parallel. This simulates handling a "Big Data" scenario where data is sharded across many files.

In [None]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

# Function to save data into multiple CSV files
def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = os.path.join("datasets", "housing")
    os.makedirs(housing_dir, exist_ok=True)
    path_format = os.path.join(housing_dir, "my_{}_{:02d}.csv")

    filepaths = []
    m = len(data)
    for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filepaths.append(part_csv)
        with open(part_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header)
                f.write("\n")
            for row_idx in row_indices:
                f.write(",".join([repr(col) for col in data[row_idx]])) 
                f.write("\n")
    return filepaths

train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)

train_filepaths = save_to_multiple_csv_files(train_data, "train", header, n_parts=20)
print("Created 20 training CSV files.")

### 3.3 Building the Input Pipeline
We use `list_files` to shuffle filenames, `interleave` to read from multiple files simultaneously, and `TextLineDataset` to read lines.

In [None]:
# 1. Get file paths
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)

# 2. Interleave: Read from 5 files at a time
n_readers = 5
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1), # skip header
    cycle_length=n_readers)

# 3. Parse CSV lines
n_inputs = 8

def preprocess(line):
    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(fields[:-1])
    y = tf.stack(fields[-1:])
    return (x - X_train.mean(axis=0)) / X_train.std(axis=0), y # simple scaling

# 4. Apply transformations
batch_size = 32
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(1) # Prefetch for GPU optimization

print("Pipeline ready.")

### 3.4 The TFRecord Format
We will demonstrate how to write data to a TFRecord file and read it back.

In [None]:
# Define helper functions to create Protobuf Features
def bytes_feature(value):
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy() 
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def float_feature(value):
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

# Create a dictionary representing one data instance (an Example)
def create_example(house_idx, median_income, population):
    feature = {
        "house_idx": int64_feature(house_idx),
        "median_income": float_feature(median_income),
        "population": float_feature(population)
    }
    return tf.train.Example(features=tf.train.Features(feature=feature))

# Write to TFRecord file
with tf.io.TFRecordWriter("my_data.tfrecord") as writer:
    for i in range(5):
        example = create_example(i, np.random.rand(), np.random.rand() * 100)
        writer.write(example.SerializeToString())

print("Written my_data.tfrecord")

### 3.5 Reading and Parsing TFRecords
Now we define a schema to parse the binary data back into tensors.

In [None]:
feature_description = {
    "house_idx": tf.io.FixedLenFeature([], tf.int64, default_value=0),
    "median_income": tf.io.FixedLenFeature([], tf.float32, default_value=0.0),
    "population": tf.io.FixedLenFeature([], tf.float32, default_value=0.0),
}

def parse_example(serialized_example):
    return tf.io.parse_single_example(serialized_example, feature_description)

dataset = tf.data.TFRecordDataset(["my_data.tfrecord"])
dataset = dataset.map(parse_example)

for item in dataset:
    print(item)

### 3.6 Keras Preprocessing Layers: TextVectorization
Preprocessing raw text directly within the model.

In [None]:
from tensorflow.keras.layers import TextVectorization

# Raw text data
training_data = np.array([["This is the 1st sample."], ["And here's the 2nd sample."], ["Stop tokens?"]])

# Create layer: standardize (lower case, remove punctuation), split, and index
vectorizer = TextVectorization(output_mode="int")

# Adapt: Learn the vocabulary from the data
vectorizer.adapt(training_data)

# Inspect vocabulary
print("Vocabulary:", vectorizer.get_vocabulary())

# Transform new data
print("Vectorized:", vectorizer([["This is a sample."]]).numpy())

## 4. Step-by-Step Explanation

### 1. Interleaving Strategy
**Problem:** Reading from one large file one line at a time limits throughput to the speed of a single disk read.
**Solution:** `interleave` opens `n_readers` (5 in our code) files at once. It reads a line from File A, then File B, etc. This increases throughput and also adds a layer of shuffling.

### 2. Protobuf Serialization
**Concept:** The `Example` class is just a container. The real data is in `Features`. 
* `BytesList` handles strings and binary data (images).
* `FloatList` handles float32/float64.
* `Int64List` handles integers and booleans.
When we call `.SerializeToString()`, the structured object is converted into a compact byte string, which is what gets written to the disk.

### 3. Parsing TFRecords
Since the TFRecord file is just a stream of bytes, TensorFlow doesn't know what's inside. We must provide a `feature_description` dictionary (a schema) to `tf.io.parse_single_example`. This tells TF: "Expect a field named 'median_income' which is a float."

### 4. TextVectorization
* **Adaptation:** The layer scans the training text to build a vocabulary (dictionary). It assigns integer ID 1 to the most frequent word, ID 2 to the second, etc.
* **Processing:** During inference, it takes a raw string "This is a sample", converts it to lowercase, strips punctuation, splits by whitespace, and replaces words with their learned IDs (e.g., `[2, 3, 1, 4]`). This makes the model portable; you don't need a separate tokenizer script in production.

## 5. Chapter Summary

* **Data API:** The key to feeding GPUs efficiently. Always use `dataset.prefetch(1)` at the end of your pipeline.
* **Parallelism:** Use `num_parallel_calls=tf.data.experimental.AUTOTUNE` in your `.map()` functions to use all CPU cores.
* **TFRecords:** Use this format for large datasets. It is sequential, binary, and efficient.
* **Preprocessing Layers:** Move preprocessing into the model graph (using `adapt()`) to ensure the model handles raw data identically in training and production.
* **One-Hot Encoding:** Use `CategoryEncoding` or `StringLookup` layers instead of pandas `get_dummies`.