# Data Processing

In this notebook, we will implement the tokenizer and dataset classes to prepare our text for the model.


## Imports


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import json
import os
import re
from collections import Counter
from typing import List, Dict, Optional, Tuple


## Character Tokenizer

First, we build a simple character-level tokenizer.


In [None]:
class CharTokenizer:
    """
    Character-level tokenizer.
    
    Simply maps each character to a unique ID. This is the simplest
    form of tokenization and works well for small datasets.
    
    Args:
        text: Optional text to build vocabulary from
        
    Example:
        >>> tokenizer = CharTokenizer("hello world")
        >>> tokens = tokenizer.encode("hello")
        >>> print(tokens)  # [0, 1, 2, 2, 3]
        >>> print(tokenizer.decode(tokens))  # "hello"
    """
    
    # Special tokens
    PAD_TOKEN = "<PAD>"
    UNK_TOKEN = "<UNK>"
    BOS_TOKEN = "<BOS>"
    EOS_TOKEN = "<EOS>"
    
    def __init__(self, text: Optional[str] = None):
        self.char_to_id: Dict[str, int] = {}
        self.id_to_char: Dict[int, str] = {}
        
        # Add special tokens first
        self._add_special_tokens()
        
        if text is not None:
            self.build_vocab(text)
    
    def _add_special_tokens(self):
        """Add special tokens to vocabulary."""
        special_tokens = [self.PAD_TOKEN, self.UNK_TOKEN, self.BOS_TOKEN, self.EOS_TOKEN]
        for token in special_tokens:
            idx = len(self.char_to_id)
            self.char_to_id[token] = idx
            self.id_to_char[idx] = token
    
    @property
    def pad_token_id(self) -> int:
        return self.char_to_id[self.PAD_TOKEN]
    
    @property
    def unk_token_id(self) -> int:
        return self.char_to_id[self.UNK_TOKEN]
    
    @property
    def bos_token_id(self) -> int:
        return self.char_to_id[self.BOS_TOKEN]
    
    @property
    def eos_token_id(self) -> int:
        return self.char_to_id[self.EOS_TOKEN]
    
    @property
    def vocab_size(self) -> int:
        return len(self.char_to_id)
    
    def build_vocab(self, text: str):
        """
        Build vocabulary from text.
        
        Args:
            text: Text to extract characters from
        """
        # Get unique characters
        chars = sorted(set(text))
        
        # Add to vocabulary (skip if already exists)
        for char in chars:
            if char not in self.char_to_id:
                idx = len(self.char_to_id)
                self.char_to_id[char] = idx
                self.id_to_char[idx] = char
    
    def encode(
        self, 
        text: str, 
        add_bos: bool = False,
        add_eos: bool = False
    ) -> List[int]:
        """
        Convert text to token IDs.
        
        Args:
            text: Text to encode
            add_bos: Whether to add beginning-of-sequence token
            add_eos: Whether to add end-of-sequence token
            
        Returns:
            List of token IDs
        """
        tokens = []
        
        if add_bos:
            tokens.append(self.bos_token_id)
        
        for char in text:
            tokens.append(self.char_to_id.get(char, self.unk_token_id))
        
        if add_eos:
            tokens.append(self.eos_token_id)
        
        return tokens
    
    def decode(self, tokens: List[int], skip_special: bool = True) -> str:
        """
        Convert token IDs back to text.
        
        Args:
            tokens: List of token IDs
            skip_special: Whether to skip special tokens in output
            
        Returns:
            Decoded text
        """
        special_ids = {self.pad_token_id, self.unk_token_id, 
                      self.bos_token_id, self.eos_token_id}
        
        chars = []
        for token_id in tokens:
            if skip_special and token_id in special_ids:
                continue
            chars.append(self.id_to_char.get(token_id, self.UNK_TOKEN))
        
        return "".join(chars)
    
    def save(self, path: str):
        """Save tokenizer vocabulary to file."""
        with open(path, 'w', encoding='utf-8') as f:
            json.dump({
                'char_to_id': self.char_to_id,
                'type': 'char'
            }, f, ensure_ascii=False, indent=2)
    
    @classmethod
    def load(cls, path: str) -> 'CharTokenizer':
        """Load tokenizer from file."""
        with open(path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        tokenizer = cls()
        tokenizer.char_to_id = data['char_to_id']
        tokenizer.id_to_char = {int(v): k for k, v in data['char_to_id'].items()}
        return tokenizer


## Text Dataset

Next, we define the PyTorch Dataset.


In [None]:
class TextDataset(Dataset):
    """
    PyTorch Dataset for language modeling.
    
    Creates sequences of fixed length from text for training.
    Each sample is (input_ids, target_ids) where target is input shifted by 1.
    
    Args:
        text: The text to create dataset from
        tokenizer: Tokenizer to use for encoding
        seq_len: Sequence length for each sample
        
    Example:
        >>> tokenizer = CharTokenizer("hello world")
        >>> dataset = TextDataset("hello world", tokenizer, seq_len=5)
        >>> x, y = dataset[0]
        >>> print(x.shape, y.shape)  # torch.Size([5]), torch.Size([5])
    """
    
    def __init__(
        self,
        text: str,
        tokenizer: CharTokenizer,
        seq_len: int = 128
    ):
        self.tokenizer = tokenizer
        self.seq_len = seq_len
        
        # Encode the entire text
        self.tokens = torch.tensor(tokenizer.encode(text), dtype=torch.long)
        
        # Calculate number of complete sequences
        self.n_samples = max(0, len(self.tokens) - seq_len)
        
        print(f"Dataset created with {len(self.tokens):,} tokens, {self.n_samples:,} samples")
    
    def __len__(self) -> int:
        return self.n_samples
    
    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Get a training sample.
        
        Args:
            idx: Sample index
            
        Returns:
            Tuple of (input_ids, target_ids) both of shape (seq_len,)
        """
        # Input: tokens from idx to idx + seq_len
        # Target: tokens from idx + 1 to idx + seq_len + 1 (shifted by 1)
        x = self.tokens[idx : idx + self.seq_len]
        y = self.tokens[idx + 1 : idx + self.seq_len + 1]
        
        return x, y


## Create Dataloaders

Function to download data and create dataloaders.


In [None]:
def download_tinystories(data_dir: str = "data/raw", max_stories: int = 50000) -> str:
    """
    Download the TinyStories dataset from Hugging Face.
    
    TinyStories is a dataset of short stories written in simple English,
    designed for training small language models. It's much larger than
    TinyShakespeare and produces better results.
    
    Args:
        data_dir: Directory to save the file
        max_stories: Maximum number of stories to use (default: 50000)
                    Use -1 for all stories (~2.1M)
        
    Returns:
        Path to the downloaded file
    """
    os.makedirs(data_dir, exist_ok=True)
    file_path = os.path.join(data_dir, "tinystories.txt")
    
    if not os.path.exists(file_path):
        print(f"Downloading TinyStories dataset...")
        print("This may take a few minutes on first run...")
        
        try:
            from datasets import load_dataset
        except ImportError:
            print("Installing datasets library...")
            import subprocess
            subprocess.check_call(["pip", "install", "datasets", "-q"])
            from datasets import load_dataset
        
        try:
            # Load TinyStories dataset from Hugging Face
            dataset = load_dataset("roneneldan/TinyStories", split="train")
            
            # Limit number of stories if specified
            if max_stories > 0 and len(dataset) > max_stories:
                dataset = dataset.select(range(max_stories))
            
            print(f"Processing {len(dataset):,} stories...")
            
            # Combine stories into single text file
            texts = []
            for item in dataset:
                text = item.get("text", "")
                if text:
                    texts.append(text.strip())
            
            full_text = "\n\n".join(texts)
            
            with open(file_path, 'w', encoding='utf-8') as f:
                f.write(full_text)
            
            print(f"Saved {len(full_text):,} characters to {file_path}")
            
        except Exception as e:
            print(f"Error downloading TinyStories: {e}")
            print("Falling back to TinyShakespeare...")
            return download_tiny_shakespeare(data_dir)
    else:
        print(f"Dataset already exists at {file_path}")
    
    return file_path


In [None]:
def download_tiny_shakespeare(data_dir: str = "data/raw") -> str:
    """
    Download the TinyShakespeare dataset (fallback/alternative).
    
    Args:
        data_dir: Directory to save the file
        
    Returns:
        Path to the downloaded file
    """
    os.makedirs(data_dir, exist_ok=True)
    file_path = os.path.join(data_dir, "tiny_shakespeare.txt")
    
    if not os.path.exists(file_path):
        print(f"Downloading TinyShakespeare dataset...")
        try:
            urllib.request.urlretrieve(TINY_SHAKESPEARE_URL, file_path)
            print(f"Downloaded to {file_path}")
        except Exception as e:
            print(f"Error downloading dataset: {e}")
            print("Please download manually from:")
            print(TINY_SHAKESPEARE_URL)
            raise
    else:
        print(f"Dataset already exists at {file_path}")
    
    return file_path


In [None]:
def create_dataloaders(
    data_path: Optional[str] = None,
    dataset_name: str = "tinystories",
    seq_len: int = 128,
    batch_size: int = 16,
    train_split: float = 0.9,
    num_workers: int = 0,
    seed: int = 42,
    max_stories: int = 50000
) -> Tuple[DataLoader, DataLoader, CharTokenizer]:
    """
    Create train and validation dataloaders.
    
    Args:
        data_path: Path to text file (downloads dataset if None)
        dataset_name: Dataset to use ("tinystories" or "shakespeare")
        seq_len: Sequence length for samples
        batch_size: Batch size
        train_split: Fraction of data for training
        num_workers: Number of data loading workers
        seed: Random seed for reproducibility
        max_stories: Max stories for TinyStories (default: 50000)
        
    Returns:
        Tuple of (train_loader, val_loader, tokenizer)
        
    Example:
        >>> train_loader, val_loader, tokenizer = create_dataloaders(batch_size=32)
        >>> for batch_idx, (x, y) in enumerate(train_loader):
        ...     print(x.shape, y.shape)  # (32, 128), (32, 128)
        ...     break
    """
    # Download dataset if needed
    if data_path is None:
        if dataset_name.lower() == "tinystories":
            data_path = download_tinystories(max_stories=max_stories)
        else:
            data_path = download_tiny_shakespeare()
    
    # Load text
    print(f"Loading data from {data_path}...")
    with open(data_path, 'r', encoding='utf-8') as f:
        text = f.read()
    
    print(f"Loaded {len(text):,} characters")
    
    # Create tokenizer
    tokenizer = CharTokenizer(text)
    print(f"Vocabulary size: {tokenizer.vocab_size}")
    
    # Create dataset
    dataset = TextDataset(text, tokenizer, seq_len)
    
    # Split into train/val
    n_train = int(len(dataset) * train_split)
    n_val = len(dataset) - n_train
    
    generator = torch.Generator().manual_seed(seed)
    train_dataset, val_dataset = random_split(
        dataset, [n_train, n_val], generator=generator
    )
    
    print(f"Train samples: {len(train_dataset):,}, Val samples: {len(val_dataset):,}")
    
    # Create dataloaders
    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True,
        drop_last=True  # Drop incomplete batches for consistent batch size
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True,
        drop_last=False
    )
    
    print(f"Train batches: {len(train_loader)}, Val batches: {len(val_loader)}")
    
    return train_loader, val_loader, tokenizer


## Test Data Pipeline


In [None]:
# Quick test
if __name__ == "__main__":
    print("Testing pipeline...")
    # NOTE: Set data_path to None to download, or point to a local file
    try:
        train_loader, val_loader, tokenizer = create_dataloaders(
            batch_size=4, seq_len=32, max_stories=100
        )
        x, y = next(iter(train_loader))
        print(f"Input shape: {x.shape}")
        print(f"Target shape: {y.shape}")
        print(f"Decoded: {tokenizer.decode(x[0].tolist())[:50]}...")
    except Exception as e:
        print(f"Could not run test (missing internet?): {e}")
