In [15]:
"""
Streaming Language Modeling Data Pipeline with Hugging Face Datasets
--------------------------------------------------------------------
Goal:
    Demonstrate how to build a *true streaming* LM pipeline that:
    - Processes data without loading the entire dataset into RAM.
    - Tokenizes on the fly.
    - Concatenates text and chunks into fixed-length blocks for LM training.
    - Produces batches ready for training in PyTorch.

Key Teaching Points:
    1. Streaming allows us to work with web-scale corpora.
    2. We still can do grouping/chunking in a rolling fashion.
    3. This approach mimics real-world pipelines for large-scale LM training.
"""
print(__doc__)


Streaming Language Modeling Data Pipeline with Hugging Face Datasets
--------------------------------------------------------------------
Goal:
    Demonstrate how to build a *true streaming* LM pipeline that:
    - Processes data without loading the entire dataset into RAM.
    - Tokenizes on the fly.
    - Concatenates text and chunks into fixed-length blocks for LM training.
    - Produces batches ready for training in PyTorch.

Key Teaching Points:
    1. Streaming allows us to work with web-scale corpora.
    2. We still can do grouping/chunking in a rolling fashion.
    3. This approach mimics real-world pipelines for large-scale LM training.



In [16]:
!pip install transformers torch



In [17]:
from datasets import load_dataset
from transformers import AutoTokenizer
from torch.utils.data import IterableDataset, DataLoader
import torch

In [18]:
# ============================================================
# 1. Load the dataset in STREAMING mode
# ============================================================
# TinyStories is a larger synthetic dataset, making streaming essential.
# We still use streaming=True to avoid RAM overload.
dataset_name = "roneneldan/TinyStories"
print(f"Loading {dataset_name} in streaming mode...")

stream_dataset = load_dataset(
    dataset_name,
    split="train",
    streaming=True
)

Loading roneneldan/TinyStories in streaming mode...


README.md: 0.00B [00:00, ?B/s]

In [19]:
# ============================================================
# 2. Initialize the tokenizer
# ============================================================
# Pythia is a modern model suite often used for research.
# We load its specific tokenizer here.
model_checkpoint = "EleutherAI/pythia-70m"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

# Ensure pad token is defined (Pythia/GPT-NeoX usually uses eos_token as pad)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

tokenizer_config.json:   0%|          | 0.00/396 [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/99.0 [00:00<?, ?B/s]

In [20]:
# ============================================================
# 3. Tokenization step
# ============================================================
# We do NOT pad/truncate here â€” we want raw token sequences.
# This keeps flexibility to later concatenate across documents.
def tokenize_function(examples):
    # TinyStories also uses a "text" column, so this remains compatible
    return tokenizer(examples["text"])

tokenized_stream = stream_dataset.map(tokenize_function, batched=True)

In [21]:
# ============================================================
# 4. Rolling buffer for grouping into fixed-length blocks
# ============================================================
# Because streaming datasets are iterators, we can't look ahead arbitrarily.
# We'll keep a buffer that stores leftover tokens from the previous batch,
# so we can concatenate and chunk consistently.
block_size = 256

def group_texts_streaming(dataset_iter, block_size):
    buffer = []
    for example in dataset_iter:
        buffer.extend(example["input_ids"])
        while len(buffer) >= block_size:
            chunk = buffer[:block_size]
            buffer = buffer[block_size:]
            yield {
                "input_ids": chunk,
                # Create a simple attention mask (1s for real tokens)
                "attention_mask": [1] * block_size
            }

In [22]:
# ============================================================
# 5. Wrap generator in an IterableDataset
# ============================================================
class StreamingLMIterableDataset(IterableDataset):
    def __init__(self, hf_iterable_dataset, block_size):
        self.dataset = hf_iterable_dataset
        self.block_size = block_size

    def __iter__(self):
        return group_texts_streaming(self.dataset, self.block_size)

grouped_iterable_dataset = StreamingLMIterableDataset(tokenized_stream, block_size)

In [23]:
# ============================================================
# 6. Collate function for batches
# ============================================================
def collate_fn(batch):
    input_ids = torch.tensor([ex["input_ids"] for ex in batch], dtype=torch.long)
    attention_mask = torch.tensor([ex["attention_mask"] for ex in batch], dtype=torch.long)

    # For Causal LM (like Pythia/GPT), labels are usually the input_ids
    return {
        "input_ids": input_ids,
        "attention_mask": attention_mask,
        "labels": input_ids.clone()
    }

In [24]:
# ============================================================
# 7. DataLoader for streaming data
# ============================================================
train_loader = DataLoader(grouped_iterable_dataset, batch_size=8, collate_fn=collate_fn)

In [25]:
# ============================================================
# 8. Iterate over a few batches
# ============================================================
print(f"\nStream ready. Model: {model_checkpoint} | Dataset: {dataset_name}")
print("Sample streaming batches:")

for i, batch in enumerate(train_loader):
    print(f"Batch {i} -> input_ids shape: {batch['input_ids'].shape}")
    # Verify the shapes match our new block_size of 256
    if i == 2:
        break


Stream ready. Model: EleutherAI/pythia-70m | Dataset: roneneldan/TinyStories
Sample streaming batches:
Batch 0 -> input_ids shape: torch.Size([8, 256])
Batch 1 -> input_ids shape: torch.Size([8, 256])
Batch 2 -> input_ids shape: torch.Size([8, 256])
