# Data Pipelines with PyTorch

Reliable data pipelines keep your models productive. In this notebook we design loaders that scale from quick experiments to production workloads while respecting the top-down workflow introduced earlier.

_Environment note:_ Guidance reflects PyTorch practices current through October 2024.

## Learning Objectives

- Decide when to build a custom `Dataset` versus using built-ins.
- Compose deterministic transforms and stochastic augmentations.
- Configure `DataLoader` workers, shuffling, and memory pinning.
- Prepare tabular, sequential, and vision-friendly batches for the notebooks ahead.

## Pipeline Stages at a Glance

1. **Ingest** raw samples.
2. **Transform** them into tensors (normalize, tokenize, augment).
3. **Batch/collate** to align shapes.
4. **Prefetch & transfer** so computation and I/O overlap.

Keep this map handy when diagnosing bottlenecks.

In [None]:
import matplotlib.pyplot as plt

fig, ax = plt.subplots(figsize=(7, 2.5))
ax.axis("off")

stages = [
    (0.05, 0.6, "Ingest"),
    (0.30, 0.6, "Transform"),
    (0.55, 0.6, "Batch"),
    (0.80, 0.6, "Prefetch"),
]

for x, y, label in stages:
    ax.add_patch(plt.Rectangle((x, y), 0.18, 0.25, color="#d3e1ff", ec="#3366cc", lw=2))
    ax.text(x + 0.09, y + 0.125, label, ha="center", va="center", fontsize=11)

for (x, _, _), (nx, _, _) in zip(stages[:-1], stages[1:]):
    ax.annotate("", xy=(nx, 0.72), xytext=(x + 0.18, 0.72), arrowprops=dict(arrowstyle="->", lw=2))

ax.text(0.5, 0.22, "A saturated pipeline keeps accelerators busy", ha="center", fontsize=11)
plt.show()


## Custom Dataset Example (Tabular Data)

Convert NumPy arrays to tensors once in `__init__` to avoid unnecessary work in every `__getitem__` call.

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np

class TabularDataset(Dataset):
    def __init__(self, features: np.ndarray, targets: np.ndarray):
        self.x = torch.from_numpy(features).float()
        self.y = torch.from_numpy(targets).float().unsqueeze(-1)

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

rng = np.random.default_rng(7)
features = rng.normal(size=(128, 3))
targets = (features @ np.array([0.5, -1.2, 2.0]) + 0.3).astype(np.float32)

dataset = TabularDataset(features, targets)
loader = DataLoader(dataset, batch_size=16, shuffle=True, num_workers=0)

xb, yb = next(iter(loader))
print(xb.shape, yb.shape)


### Visual Sanity Checks

Spot outliers or skewed distributions before training begins.

In [None]:
import matplotlib.pyplot as plt

fig, axes = plt.subplots(1, 3, figsize=(9, 3))
for idx, ax in enumerate(axes):
    ax.hist(features[:, idx], bins=15, color="#4c72b0", alpha=0.75)
    ax.set_title(f"Feature {idx}")
fig.suptitle("Feature Distributions", fontsize=14)
plt.tight_layout()
plt.show()


## Mini Task – Padding Variable-Length Sequences

Sequence models (next notebook) need padded batches. Implement a collate function that returns both padded sequences and their original lengths.

Try the starter cell before revealing the hidden solution.

In [None]:
from torch.utils.data import Dataset, DataLoader
import torch

toy_sequences = [[1, 2, 3], [4, 5], [6]]

class ToyDataset(Dataset):
    def __len__(self):
        return len(toy_sequences)

    def __getitem__(self, idx):
        return torch.tensor(toy_sequences[idx], dtype=torch.long)

# TODO: implement collate_fn that returns (padded_batch, lengths)


In [None]:
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
import torch

toy_sequences = [[1, 2, 3], [4, 5], [6]]

class ToyDataset(Dataset):
    def __len__(self):
        return len(toy_sequences)

    def __getitem__(self, idx):
        return torch.tensor(toy_sequences[idx], dtype=torch.long)

def collate_fn(batch):
    lengths = torch.tensor([item.size(0) for item in batch])
    padded = pad_sequence(batch, batch_first=True, padding_value=0)
    return padded, lengths

loader = DataLoader(ToyDataset(), batch_size=3, collate_fn=collate_fn)
padded, lengths = next(iter(loader))
print(padded)
print(lengths)


## Throughput Tips

- Increase `num_workers` to parallelize CPU preprocessing (benchmark per machine).
- Enable `pin_memory=True` when transferring batches to GPU.
- Measure time per batch before assuming the model is slow.

In [None]:
import time

loader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=0)
start = time.perf_counter()
for _ in range(5):
    for xb, yb in loader:
        _ = (xb, yb)
elapsed = time.perf_counter() - start
print(f"Average step time: {elapsed / (5 * len(loader)):.6f}s")


## Comprehensive Exercise – Vision Pipeline Blueprint

Generate synthetic RGB images, apply augmentations and normalization, and build train/validation loaders. Explain how you would swap in a real dataset (e.g., CIFAR-10).

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision.transforms as T

class SyntheticImages(Dataset):
    def __init__(self, num_images=200):
        self.data = torch.rand(num_images, 3, 32, 32)
        self.targets = torch.randint(0, 10, (num_images,))
        # TODO: define train/eval transforms
        self.training = True

    def train(self, mode=True):
        self.training = mode
        return self

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # TODO: apply transforms based on mode
        raise NotImplementedError

# TODO: split dataset into train/validation loaders with augmentations


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision.transforms as T

class SyntheticImages(Dataset):
    def __init__(self, num_images=200):
        self.data = torch.rand(num_images, 3, 32, 32)
        self.targets = torch.randint(0, 10, (num_images,))
        self.train_t = T.Compose([
            T.RandomHorizontalFlip(),
            T.RandomCrop(32, padding=4),
            T.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
        ])
        self.eval_t = T.Compose([
            T.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
        ])
        self.training = True

    def train(self, mode=True):
        self.training = mode
        return self

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        transforms = self.train_t if self.training else self.eval_t
        return transforms(self.data[idx]), self.targets[idx]

dataset = SyntheticImages(200)
train_ds, val_ds = random_split(dataset, [160, 40], generator=torch.Generator().manual_seed(21))

dataset.train(True)
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
dataset.train(False)
val_loader = DataLoader(val_ds, batch_size=32)

print(f"Train batches: {len(train_loader)}, Val batches: {len(val_loader)}")


## Further Reading

- PyTorch Data Loading: https://pytorch.org/docs/stable/data.html
- TorchData datapipes for streaming scenarios
- NVIDIA DALI for GPU-accelerated preprocessing