# Data Pipelines in TensorFlow

### What is a “data pipeline” in machine learning?
A data pipeline is the system that feeds data into your model during training and evaluation.
Think of it like a kitchen conveyor belt:
- At one end, you load in raw ingredients (text files, CSVs, TFRecords, images, etc.).
- Along the belt, you wash, cut, and prepare them (decode, normalize, tokenize, batch).
- At the other end, your model gets perfectly prepared “mini-meals” (tensors ready for training).
If this belt is slow, your model waits idle, wasting GPU power.
If it’s too fast, you waste memory.
So ML engineers tune it carefully for balance and throughput.

### Why TensorFlow needs pipelines
TensorFlow models are trained in graphs — computations that run on CPUs, GPUs, or TPUs.
Those devices are fast, but the bottleneck is usually the data loading step:
reading from disk, decoding files, augmenting images, or tokenizing text.

To fix this, TensorFlow provides the tf.data API — a high-performance, graph-integrated data pipeline system.
It lets you:
- Stream data efficiently from disk or memory
- Parallelize operations across CPU cores
- Prefetch batches so the GPU never waits
- Cache preprocessed data to avoid recomputation
- Compose your data transformations like a chain
  
So when we say “TensorFlow pipeline,” we really mean:
A tf.data.Dataset object that describes how to load, process, and batch your data efficiently, often entirely inside the TensorFlow graph.

### What actually happens inside a pipeline
Let’s walk through a typical training example:

```py
dataset = (
    tf.data.TextLineDataset("reviews.txt")     # 1️⃣ Read from file(s)
    .map(parse_line)                           # 2️⃣ Parse or tokenize text
    .shuffle(buffer_size=10000)                # 3️⃣ Shuffle samples for randomness
    .batch(32)                                 # 4️⃣ Group into batches
    .prefetch(tf.data.AUTOTUNE)                # 5️⃣ Prepare next batch while GPU trains
)
```
Let’s break it down:

| Step   | Function                          | What It Does                                                   | Why It Matters                                |
|--------|-----------------------------------|----------------------------------------------------------------|----------------------------------------------|
| 1. Read | TextLineDataset, TFRecordDataset, etc. | Loads data efficiently from disk, streaming, or memory.         | Prevents “file I/O bottlenecks”.             |
| 2. Map  | .map(func)                       | Applies a transformation to each element (like parsing JSON, tokenizing text, decoding images). | Lets you preprocess inside TF (parallelizable). |
| 3. Shuffle | .shuffle(buffer_size)          | Randomizes sample order each epoch.                            | Improves generalization, prevents overfitting. |
| 4. Batch | .batch(batch_size)              | Groups samples into mini-batches for training.                 | Allows vectorized GPU operations.            |
| 5. Prefetch | .prefetch(tf.data.AUTOTUNE)   | Loads next batch while GPU is training on current batch.       | Maximizes GPU utilization.                   |

That’s a complete data pipeline — from reading → preprocessing → batching → feeding.

### What makes TensorFlow pipelines special
TensorFlow’s tf.data pipelines are not just loops — they are part of the graph, meaning:
- They can run asynchronously from the model.
- They can overlap CPU preprocessing and GPU training.
- They can automatically tune performance using tf.data.AUTOTUNE.
- They can scale across devices (e.g., multiple GPUs or TPUs).
This is what makes them much faster and cleaner than writing your own Python loop like:
``` py
for x, y in dataset:
    model.train_on_batch(x, y)
```
That’s fine for small demos — but real-world ML needs throughput and reproducibility, which tf.data gives you.

### TF pipeline in context of NLP
When your data is text, the pipeline also does:
1. Reading raw text (from files, CSVs, TFRecords).
2. Tokenizing (turning words → integers).
3. Padding/truncating sequences.
4. Building attention masks or features.
5. Batching for model input.

Example:
```py
dataset = (
    tf.data.TextLineDataset("data.txt")
    .map(tokenize_and_pad, num_parallel_calls=tf.data.AUTOTUNE)
    .batch(32)
    .prefetch(tf.data.AUTOTUNE)
)
```

The tokenize_and_pad function might call:
- tf.strings.split() (basic whitespace)
- or a pretrained tokenizer like keras_nlp.tokenizers.WordPieceTokenizer
- or tf.py_function wrapping a Python tokenizer
That’s what “integrating a tokenizer into the pipeline” means — it becomes part of this conveyor belt.

### Why we care about things like cache(), prefetch(), AUTOTUNE
These are performance tuning knobs for your data loader:

| Function                  | Description                                           | Common Use                                      |
|---------------------------|-------------------------------------------------------|------------------------------------------------|
| cache()                  | Stores the preprocessed dataset in memory or on disk after first epoch. | When dataset fits in RAM or is small.          |
| prefetch()               | Loads the next batch while the model trains on the current one. | Always use with AUTOTUNE.                      |
| AUTOTUNE                 | Lets TF automatically pick parallelism/prefetch settings. | Default best practice.                         |
| map(num_parallel_calls=AUTOTUNE) | Runs preprocessing functions in parallel threads. | Speeds up CPU-bound steps like decoding/tokenizing. |

Together, these let TensorFlow stream data continuously to your GPU.

### How it connects to what you’ll build later
Eventually, your model training loop will look like this:
``` py
for x_batch, y_batch in dataset:
    with tf.GradientTape() as tape:
        preds = model(x_batch, training=True)
        loss = loss_fn(y_batch, preds)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
```
The dataset in this loop is what the pipeline built.
It keeps producing ready-to-train batches infinitely or per epoch.

That’s why every TensorFlow engineer must master tf.data — it’s how you feed your models at scale.

Summary — “What are TensorFlow pipelines?”
| Concept         | Intuition                          | Analogy                                      |
|------------------|------------------------------------|----------------------------------------------|
| Pipeline         | The complete data flow from disk → ready tensors | A kitchen conveyor belt for data            |
| tf.data.Dataset  | TensorFlow object representing a pipeline | Recipe for data preparation                 |
| map()            | Transform each data sample        | Chop vegetables on the belt                 |
| batch()          | Group samples together            | Pack boxes of meals                         |
| prefetch()       | Get the next batch ready          | Chef preps next dish while plating current one |
| cache()          | Save processed data               | Store pre-chopped ingredients               |
| AUTOTUNE         | Auto-optimizes performance        | Smart chef who adjusts speed                |

NOTE: in scikit-learn, pipelines chain processing + modeling steps, while in TensorFlow, pipelines mainly handle data loading, preprocessing, and feeding efficiently to the model during training.

### Example: High-Level Text Classification Pipeline
Let’s build a clean pipeline for a text classification task — say, classifying IMDB movie reviews as positive or negative.


In [None]:
%pip install tensorflow

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds

# 1. Load dataset
# TensorFlow Datasets (TFDS) gives us ready-to-use data
train_ds, test_ds = tfds.load(
    "imdb_reviews",
    split=["train", "test"],
    as_supervised=True,  # returns (text, label) for both train and test sets
)

# 2. Tokenization and TextVectorization layer, tokenizing is to convert text to numbers (vectors) and textVectorization is a layer that helps with that
# This is a built-in Keras preprocessing layer for text
vocab_size = 10000 # Limit vocabulary size to top 10,000 words to save memory this is 10000 words from the dataset
seq_length = 250 # Limit each review to 250 words

# a vectorization layer is created to handle the tokenization and vectorization of text data it converts text into sequences of integers using a fixed vocabulary size and sequence length the word vocab for the layer is learned from the training data
vectorize_layer = tf.keras.layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=seq_length
)

# You must "adapt" the layer to learn the vocabulary from the text
train_text = train_ds.map(lambda text, label: text) # Extract only the text from the training dataset
vectorize_layer.adapt(train_text) # once we learn the vocab we can use the layer to convert text to integer sequences basically the text embeddings willwe based on this vocab

# 3. Preprocessing pipeline function
def preprocess_text(text, label):
    text = vectorize_layer(text)  # Apply the TextVectorization layer to the text this will convert the text to integer sequences from out learned vocabulary 
    return text, label # Return the processed text and label as a tuple

# 4. Apply preprocessing, shuffle, batch, and prefetch
batch_size = 32

# Apply preprocessing to the datasets and optimize them for performance (my using autotune for number of parallel calls and prefetching we let tensorflow decide the optimal number of batches to prefetch and number of parallel calls)
train_ds = (
    train_ds 
    .shuffle(10000) # Shuffle the dataset with a buffer size of 10,000 to ensure randomness
    .map(preprocess_text, num_parallel_calls=tf.data.AUTOTUNE) # Apply the preprocessing function in parallel
    .batch(batch_size) # Batch the data
    .prefetch(tf.data.AUTOTUNE) # Prefetch data for better performance AUTOTUNE lets TensorFlow decide the optimal number of batches to prefetch
)

# Apply preprocessing to the test dataset (by using autotune for number of parallel calls and prefetching we let tensorflow decide the optimal number of batches to prefetch and number of parallel calls)
test_ds = (
    test_ds
    .map(preprocess_text, num_parallel_calls=tf.data.AUTOTUNE) # Apply the preprocessing function in parallel
    .batch(batch_size) # Batch the data
    .prefetch(tf.data.AUTOTUNE) # Prefetch data for better performance
)

# 5. Build a simple model
model = tf.keras.Sequential([
    tf.keras.layers.Embedding(vocab_size, 64, input_length=seq_length), # Embedding layer to convert integer sequences to dense vectors of fixed size why because neural networks work better with dense vectors (here we say each word will be represented by a 64-dimensional vector we specify the input length to be seq_length which is 250 words and give vocab size to the function as well)
    tf.keras.layers.GlobalAveragePooling1D(), # Global average pooling to reduce the sequence dimension pooling is a way to downsample the data means taking the average of all the elements in the sequence and reducing the dimensionality of the data here we reduce the dimentions of the sequence to a single vector our sequence is now represented by a single vector (our sequence here was 250 words long now its just one vector) this vector represents the entire review the embedding process is done by pooling function
    tf.keras.layers.Dense(64, activation="relu"), # A dense hidden layer with ReLU activation (takes in the 64 neurons from the embedding layer i.e our input in a vector of size 64 and applies ReLU activation to it)
    tf.keras.layers.Dense(1, activation="sigmoid") # Output layer for binary classification (positive/negative review) (here we have one neuron with sigmoid activation to output a probability between 0 and 1 indicating the sentiment of the review)
])

# Compile the model
model.compile(
    optimizer="adam", # Adam optimizer is an efficient optimization algorithm that adjusts the learning rate during training we use the optimizer to apply gradients to the model's weights based on the loss function
    loss="binary_crossentropy", # Binary crossentropy loss function for binary classification tasks
    metrics=["accuracy"] # Track accuracy during training
)

# 6. Train the model — the dataset is already optimized
history = model.fit(train_ds, validation_data=test_ds, epochs=3)

# In this code example we built a complete TensorFlow data pipeline for text data using the IMDB reviews dataset we loaded the data, tokenized and vectorized the text using a TextVectorization layer, applied preprocessing, shuffling, batching, and prefetching to optimize performance finally we built and trained a simple neural network model for sentiment analysis on the preprocessed data
# what we mean by a data pipeline is a series of steps that process and prepare data for training machine learning models these steps typically include loading the data, preprocessing it (like tokenization and vectorization for text data), batching it into manageable sizes, and optimizing the data flow for performance during training
# in our case that part (the data pipeline part) was: loading the dataset, applying the TextVectorization layer, shuffling, batching, and prefetching the data
# this ensures that the data is in the right format and is efficiently fed into the model during training

# EX output 
""" 
Epoch 1/3
782/782 ━━━━━━━━━━━━━━━━━━━━ 13s 16ms/step - accuracy: 0.6535 - loss: 0.5926 - val_accuracy: 0.8550 - val_loss: 0.3496
Epoch 2/3
782/782 ━━━━━━━━━━━━━━━━━━━━ 12s 16ms/step - accuracy: 0.8736 - loss: 0.3008 - val_accuracy: 0.8558 - val_loss: 0.3340
Epoch 3/3
782/782 ━━━━━━━━━━━━━━━━━━━━ 12s 16ms/step - accuracy: 0.9040 - loss: 0.2412 - val_accuracy: 0.8660 - val_loss: 0.3235
"""

## Efficient tf.data Pipelines

**Goal**: Keep the accelerators fed. The CPU pipeline should produce batches faster than the device consumes them.

## Map vs map with num_parallel_calls

- `map(func)` applies `func` sequentially. Slow if `func` is CPU-heavy.
- `map(func, num_parallel_calls=N)` runs up to N calls concurrently on CPU threads.
- `num_parallel_calls=tf.data.AUTOTUNE` lets TF choose.
- **Always use parallel map for CPU-bound transforms** (tokenization, augmentation).
```python
ds = ds.map(parse_fn, num_parallel_calls=tf.data.AUTOTUNE)
```

If your map function uses Python-only tokenizers via `tf.py_function`, you still benefit from `num_parallel_calls`, but pyfunctions have extra overhead.

## batch() and prefetch()

- `batch(batch_size)` groups items into tensors shape `[batch_size, ...]`. Use `padded_batch` for variable-length sequences.
- `prefetch(buffer_size)` overlaps data production and model consumption.
- Use `AUTOTUNE` for prefetch to let TF decide.
```python
ds = ds.padded_batch(batch_size, padded_shapes=(...), drop_remainder=True)
ds = ds.prefetch(tf.data.AUTOTUNE)
```

**Notes:**
- `drop_remainder=True` is helpful for multi-device training (synchronous replicas expect equal sized per-replica batches).
- For LLM pretraining using sequence lengths, `padded_batch` with truncation/padding is typical.

## shuffle and buffer size

- `shuffle(buffer_size)` maintains a reservoir of `buffer_size` samples; draws uniformly from it.
- Larger `buffer_size` => better mixing but more memory and startup delay.

**Common strategies:**
- For huge datasets use `buffer_size = 1000000` or a multiple of per-worker dataset size when memory allows.
- For very large corpora where full-shuffle is impossible, use file-level shuffle (shuffle list of files) + intra-file partial shuffle.

**Pattern:**
```python
files = tf.data.Dataset.list_files("data/train-*.tfrecord")
files = files.shuffle(buffer_size=1000)
ds = files.interleave(lambda f: tf.data.TFRecordDataset(f), 
                      cycle_length=16, 
                      num_parallel_calls=tf.data.AUTOTUNE)
ds = ds.shuffle(buffer_size=10000)  # local shuffle
```

`interleave` + file shuffle is effective: it mixes records from different shards early.

## cache() — when and when not to use

- `cache()` stores the results of previous transforms to speed subsequent epochs.
- Use when dataset fits RAM or you can use disk-backed cache (`ds.cache(filename)`).
- **Do not cache huge corpora in memory** — it will OOM and slow system.

**Common pattern:**
- If you have a small validation set, `val_ds = val_ds.cache()` for fast evaluation.
- For pretraining on very large corpora, do not cache entire dataset. Instead rely on sharding, interleave, and parallel map.

## Sliding window for sequence generation (critical for LLM pretraining)

LLM training often converts long token streams into many overlapping context windows.

**Two approaches:**
1. Pre-chunk offline into fixed windows and save as TFRecords.
2. Use `tf.data` sliding windows on token streams.

**Sliding window example:**
```python
tokens = tf.data.Dataset.from_tensor_slices(token_ids)  # long 1-D stream

seq_length = 2048
# window dataset: create overlapping windows with shift
windows = tokens.window(size=seq_length, shift=seq_length, drop_remainder=True)
# For overlapping with stride < seq_length use shift < seq_length

# Convert windows (Dataset of Datasets) to batched tensors
windows = windows.flat_map(lambda w: w.batch(seq_length))
# now windows yields tensors shape (seq_length,)
```

For overlapping windows with step stride:
```python
windows = tokens.window(size=seq_length, shift=stride, drop_remainder=True)
windows = windows.flat_map(lambda w: w.batch(seq_length))
```

**Alternative:** using `tf.signal.frame` (works on tensors, not datasets) or vectorized ops: convert long arrays to 2D matrix of frames.

**Important:** sliding windows produce many overlapping examples — this is expected for causal LM training but increases dataset size. Precompute and shard offline if possible for efficiency.

## Example pipeline putting it all together (text -> tokenization -> batching)
```python
import tensorflow as tf

files = tf.io.gfile.glob("gs://bucket/train-*.tfrecord")
files_ds = tf.data.Dataset.from_tensor_slices(files).shuffle(len(files))
ds = files_ds.interleave(lambda f: tf.data.TFRecordDataset(f),
                         cycle_length=16, 
                         num_parallel_calls=tf.data.AUTOTUNE)

ds = ds.map(parse_fn, num_parallel_calls=tf.data.AUTOTUNE)

# tokenization: prefer TF-native tokenizers (tensorflow_text or KerasNLP) inside graph
# if using Python tokenizer use tf.py_function wrapped carefully
def tokenize_map(text, label):
    # assume tokenizer_fn is pure-TF or tf.py_function wrapper
    input_ids = tokenizer_fn(text)  # returns 1D int tensor
    return input_ids, label

ds = ds.map(tokenize_map, num_parallel_calls=tf.data.AUTOTUNE)

# pack into sequences or use padded_batch
ds = ds.padded_batch(batch_size, padded_shapes=([max_len], []), drop_remainder=True)
ds = ds.prefetch(tf.data.AUTOTUNE)
```

# Sharding Basics

Scaling across devices and workers needs correct sharding to avoid duplicated training data and maximize parallelism.

## Automatic sharding with tf.distribute

When you use `tf.distribute.Strategy` (`MirroredStrategy`, `MultiWorkerMirroredStrategy`), TensorFlow can perform automatic dataset sharding if you pass a dataset created for distribution.

- For `model.fit(dataset, ...)` with a Strategy active, Keras expects that the dataset yields global batches (size = `batch_per_replica * num_replicas_in_sync`) or that the dataset is already sharded per worker.
- With `MultiWorkerMirroredStrategy`, set environment variable `TF_CONFIG` and rely on automatic sharding when using `dataset = strategy.experimental_distribute_dataset(dataset)`.

**Important:** automatic sharding works only when dataset is shaped properly and you use `experimental_distribute_dataset` or `model.fit` under the strategy.

## How dataset.batch interacts with strategy

If you call `dataset.batch(global_batch_size)` and then pass the dataset to `strategy.run` or `model.fit` inside a `MirroredStrategy` context, TensorFlow will split each batch across replicas. 

**Example:**
- `global_batch_size = per_replica_batch * num_replicas`.
- `dataset.batch(global_batch_size)` yields `[global_batch_size, ...]` tensors; TF runtime splits into per-replica sub-batches.

If you instead call `dataset.batch(per_replica_batch)` and then use `strategy.experimental_distribute_dataset`, TF will automatically concatenate per-replica batches across workers which may cause duplication. 

**Best practice:** always batch with a global batch size.

## Why each worker should get unique shards

If all workers read the same sequence of files without sharding, they will train on exactly the same samples, effectively multiplying gradient steps on the same data — breaks randomness and slows convergence.

Unique sharding per worker ensures independent samples and correct effective epoch size.

### How to shard:

- Use `tf.data.Dataset.shard(num_shards, index)` when you have manual worker id and worker count:
```python
ds = ds.shard(num_shards=num_workers, index=worker_id)
```

- Or use **file-level sharding**: on worker i, read `files[i::num_workers]` only.
- With `tf.distribute.experimental.MultiWorkerMirroredStrategy`, prefer letting TF handle sharding: create dataset of filenames and use `tf.data.experimental.AutoShardPolicy` (default) or `dataset = dataset.shard(num_shards, task_index)` before interleave.

## Practical multi-worker pipeline template
```python
import tensorflow as tf
strategy = tf.distribute.MultiWorkerMirroredStrategy()

def make_dataset(file_pattern, batch_size, worker_id, num_workers):
    files = tf.io.gfile.glob(file_pattern)
    # file-level sharding: each worker reads a subset of files
    files = sorted(files)
    files = files[worker_id::num_workers]
    ds = tf.data.Dataset.from_tensor_slices(files)
    ds = ds.interleave(lambda f: tf.data.TFRecordDataset(f),
                       cycle_length=16, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.map(parse_fn, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch_size, drop_remainder=True)
    ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds

with strategy.scope():
    model = build_model()
    model.compile(...)
    dataset = make_dataset("gs://bucket/train-*.tfrecord", global_batch_size, worker_id, num_workers)
    model.fit(dataset, epochs=..., steps_per_epoch=...)
```

**Notes:**
- `worker_id` and `num_workers` typically come from the cluster manager or `TF_CONFIG`.
- `global_batch_size` equals `per_replica_batch * num_replicas_in_sync`.

## Practical tips, diagnostics, and benchmarks

### Measure throughput

Simple: iterate N batches on dataset and time it.
```python
import time
it = iter(dataset)
t0 = time.time()
num_samples = 0
for i in range(100):
    x, y = next(it)
    num_samples += x.shape[0]
elapsed = time.time() - t0
print("samples/sec:", num_samples / elapsed)
```

If `samples/sec < GPU utilization capacity`, pipeline is bottlenecked.

### Profiling data pipeline

Use TensorBoard profiler and look at CPU activity, file I/O, and thread usage.

If CPU bound, increase `num_parallel_calls` and the number of `interleave cycle_length`.

### Tokenizer placement

Prefer TF-native tokenizers (`tensorflow_text`, `KerasNLP`) to avoid Python overhead. When using Python tokenizers, pre-tokenize to TFRecords or use `tf.py_function` sparingly with parallel map.

## Performance checklist

- Use sharded TFRecord files.
- Use `tf.data.TFRecordDataset(files, num_parallel_reads=AUTOTUNE)`.
- Use `interleave` with `cycle_length` tuned (16 or 32 for large datasets).
- Use `map(..., num_parallel_calls=AUTOTUNE)`.
- Use file-level shuffle plus local sample-level shuffle.
- Use `padded_batch` with `drop_remainder=True` for multi-GPU.
- Use `prefetch(AUTOTUNE)`.
- If tokenization is heavy, pre-tokenize into TFRecords with `input_ids`.
- Monitor samples/sec and system CPU/GPU utilization.

## Common pitfalls and how to fix them

- **Low throughput:** Often due to tokenization in Python. Fix by using TF tokenizers or pre-tokenize, increase `num_parallel_calls`, or add more `interleave` parallelism.
- **OOM during cache:** Do not use `ds.cache()` on datasets that do not fit memory. Use disk cache or avoid caching.
- **Data duplication across workers:** Happens when files are not sharded per worker. Use file-level sharding or `Dataset.shard`.
- **Incorrect batch sizes for distributed training:** Always reason in global batch size and compute per-replica batch accordingly.
- **Retracing or slow startup:** Avoid complicated Python logic inside map functions that cause retracing; provide static shapes where possible.