# Lab 2 — Streaming LM Pipeline

**Changes from template:**
- Dataset: `wikitext` (streaming)
- Tokenizer: `gpt2` (GPT-2 tokenizer)
- Block size: **256** tokens (was 128)

This notebook demonstrates a true streaming data pipeline using Hugging Face Datasets
and PyTorch `IterableDataset`. The pipeline tokenizes on the fly, concatenates tokens
with a rolling buffer, and yields fixed-length chunks for language model training.

Run the cells in order. This notebook is self-contained and intended as the final
deliverable for the assignment (ready to submit).


In [1]:
# Install required packages if necessary (uncomment when needed)
!pip3 install datasets transformers torch
print("Notebook: Streaming LM Pipeline (modified)\n")

Notebook: Streaming LM Pipeline (modified)



In [2]:
from datasets import load_dataset
from transformers import AutoTokenizer
from torch.utils.data import IterableDataset, DataLoader
import torch
import time
import math


  from .autonotebook import tqdm as notebook_tqdm


## 1) Load the dataset in **streaming** mode

We use `wikitext` in streaming mode so that the dataset is not fully loaded into RAM.
This mirrors real-world training pipelines where corpora can be very large.


In [3]:
stream_dataset = load_dataset(
    "wikitext",
    "wikitext-103-v1",
    split="train",
    streaming=True
)

print("stream_dataset created (streaming=True)")

stream_dataset created (streaming=True)


## 2) Initialize the tokenizer

We use GPT-2's tokenizer. GPT-2 doesn't have a pad token by default, so we set `pad_token` to `eos_token`.


In [4]:
tokenizer = AutoTokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token
print("Tokenizer initialized:", tokenizer.__class__)


Tokenizer initialized: <class 'transformers.models.gpt2.tokenization_gpt2_fast.GPT2TokenizerFast'>


## 3) Tokenization (lazy / streaming)

We map tokenization lazily over the streaming dataset. We do not pad/truncate here —
we want raw token sequences so they can be concatenated across documents.


In [5]:
def tokenize_function(examples):
    # `examples` is a list of dicts when batched=True; HF streaming map provides batched lists
    # but in streaming mode it may also provide single examples depending on environment.
    # We guard for both cases.
    if isinstance(examples, dict) and "text" in examples:
        texts = examples["text"]
    else:
        # dataset iterable yields dicts with 'text'
        texts = [examples.get("text", "")] if isinstance(examples, dict) else examples
    return tokenizer(texts)

tokenized_stream = stream_dataset.map(tokenize_function, batched=True)
print("Tokenization mapping created (lazy, streaming)")

Tokenization mapping created (lazy, streaming)


## 4) Rolling buffer to group tokens into fixed-length blocks

We use a rolling buffer because streaming datasets are iterators and we cannot look ahead.


In [6]:
block_size = 256  # increased from 128 in the original lab

def group_texts_streaming(dataset_iter, block_size):
    """Generator that yields fixed-length token chunks from a streaming tokenized dataset.
    Each yielded item is a dict with 'input_ids' and 'attention_mask'.
    """
    buffer = []
    for example in dataset_iter:
        # example may contain 'input_ids' and other fields; we guard for shapes
        ids = example.get("input_ids") if isinstance(example, dict) else None
        if ids is None:
            # if tokenization returned lists under different structure, try to extract
            try:
                ids = example["input_ids"]
            except Exception:
                continue
        buffer.extend(ids)
        while len(buffer) >= block_size:
            chunk = buffer[:block_size]
            buffer = buffer[block_size:]
            yield {"input_ids": chunk, "attention_mask": [1] * block_size}
    # Optionally: yield a final shorter chunk padded to block_size (commented out by default)
    # if buffer:
    #     pad_len = block_size - len(buffer)
    #     yield {"input_ids": buffer + [tokenizer.pad_token_id] * pad_len, "attention_mask": [1] * len(buffer) + [0] * pad_len}


## 5) Wrap generator in a PyTorch `IterableDataset`

This allows us to use PyTorch's `DataLoader` with an iterable dataset and a custom collate.


In [7]:
class StreamingLMIterableDataset(IterableDataset):
    def __init__(self, hf_iterable_dataset, block_size):
        self.dataset = hf_iterable_dataset
        self.block_size = block_size

    def __iter__(self):
        # The HF iterable returns examples lazily; pass it to the grouping generator
        return group_texts_streaming(self.dataset, self.block_size)

grouped_iterable_dataset = StreamingLMIterableDataset(tokenized_stream, block_size)
print("Wrapped grouped generator into StreamingLMIterableDataset")


Wrapped grouped generator into StreamingLMIterableDataset


## 6) Collate function and DataLoader

We produce tensors for `input_ids`, `attention_mask`, and `labels` (copy of `input_ids`).


In [8]:
def collate_fn(batch):
    input_ids = torch.tensor([ex["input_ids"] for ex in batch], dtype=torch.long)
    attention_mask = torch.tensor([ex["attention_mask"] for ex in batch], dtype=torch.long)
    return {
        "input_ids": input_ids,
        "attention_mask": attention_mask,
        "labels": input_ids.clone()
    }

train_loader = DataLoader(grouped_iterable_dataset, batch_size=8, collate_fn=collate_fn)
print("DataLoader created (streaming, batch_size=8)")


DataLoader created (streaming, batch_size=8)


## 7) Inspect a few streaming batches and measure throughput

We iterate a few batches and report shapes and throughput (examples/sec).


In [9]:
num_batches = 3
t0 = time.time()
for i, batch in enumerate(train_loader):
    print(f"Batch {i} -> input_ids shape: {batch['input_ids'].shape}")
    if i + 1 >= num_batches:
        break
t1 = time.time()
elapsed = t1 - t0
if elapsed > 0:
    processed = (i + 1) * 8 * block_size
    print(f"Elapsed: {elapsed:.2f}s — approx processed tokens: {processed} — tokens/sec: {processed/elapsed:.2f}")
else:
    print("Elapsed time too small to measure throughput")


Batch 0 -> input_ids shape: torch.Size([8, 256])
Batch 1 -> input_ids shape: torch.Size([8, 256])
Batch 2 -> input_ids shape: torch.Size([8, 256])
Elapsed: 1.42s — approx processed tokens: 6144 — tokens/sec: 4321.74


## 8) Small utilities and token statistics

We compute some quick streaming statistics like average tokens per example for the first N examples.


In [10]:
def sample_stats(hf_iter, n_examples=1000):
    cnt = 0
    token_counts = []
    t0 = time.time()
    for ex in hf_iter:
        ids = ex.get("input_ids") if isinstance(ex, dict) else None
        if ids is None:
            continue
        token_counts.append(len(ids))
        cnt += 1
        if cnt >= n_examples:
            break
    t1 = time.time()
    if token_counts:
        return {
            "examples": cnt,
            "avg_tokens_per_example": sum(token_counts)/len(token_counts),
            "median_tokens_per_example": sorted(token_counts)[len(token_counts)//2],
            "time_sec": t1-t0,
            "examples_per_sec": cnt/(t1-t0) if (t1-t0)>0 else float('inf')
        }
    return {}

# WARNING: sampling can take time on very large datasets — use a small n when running locally
# stats = sample_stats(tokenized_stream, n_examples=200)
# print(stats)


## 9) Notes for submission

- This lab modifies the original Lab 2 by using `openwebtext` (streaming), switching to the
  `gpt2` tokenizer, and using a block size of 256 tokens.
- The pipeline remains streaming and memory-efficient.
- If submitting to a grader that runs the notebook, they should have `datasets` and `transformers` installed.

### How to run
1. Run all cells in order in Colab / Jupyter / VS Code.
2. If data fails to stream (rate limits), consider switching to a smaller dataset or running with HF cache.


In [11]:
# Notebook metadata footer
print('Lab 2 (modified) notebook ready. Save this file and submit as your lab deliverable.')


Lab 2 (modified) notebook ready. Save this file and submit as your lab deliverable.
