In [3]:
from datasets import load_dataset
from tqdm import tqdm
import os, sys
sys.path.append(os.path.abspath(".."))
from src.tokenizer import Tokenizer

class TinyStoriesPreparer:
    """Stream, clean, and save the TinyStories dataset."""

    def __init__(self, lowercase: bool = True):
        self.lowercase = lowercase
        self._define_vocab = None
    def _clean_text(self, text: str) -> str:
        """Apply lightweight cleaning."""
        if self.lowercase:
            text = text.lower()
        text = text.encode("ascii", "ignore").decode()       # drop non-ASCII
        text = text.replace("\t", " ").replace("\r", "")     # tidy whitespace
        while "  " in text:
            text = text.replace("  ", " ")
        return text

    def _write_split(self, split, out_path: str):
        """Stream through a dataset split and write cleaned lines."""
        if os.path.exists(out_path):
            print(f"[skip] {out_path} already exists â€” skipping.")
            return
        kept = 0
        with open(out_path, "w", encoding="utf-8") as f, tqdm(unit="ex") as pbar:
            for ex in split:
                filtered = self._clean_text(ex["text"])
                f.write(filtered + "\n")
                kept += 1
                pbar.update(1)
                if kept % 5000 == 0:
                    avg_len = f.tell() / kept
                    pbar.set_postfix({"avg_len": f"{avg_len:.0f} chars"})
        print(f"[{out_path}] written {kept:,} examples")

    def prepare(self, out_train: str = "trainingText.clean.txt",
                        out_val: str = "validationText.clean.txt"):
        """Download and clean both splits."""
        ds = load_dataset("roneneldan/TinyStories", streaming=True)
        self._write_split(ds["train"], out_train)
        self._write_split(ds["validation"], out_val)



In [4]:
def define_vocab(path="trainingText.clean.txt"):
    vocab_set = set()
    size = 8*1024*1024
    with open(path, "r", )as f:
            vocab_set.update(f.read())
    vocab_list = sorted(vocab_set)
    return vocab_list
TinyStoriesPreparer._define_vocab = define_vocab # type:ignore

vocab_list = define_vocab()
print(vocab_list)



['\t', '\n', ' ', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '<', '=', '>', '?', '@', '[', '\\', ']', '_', '`', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '{', '|', '}', '~']


In [None]:
import sys, os
sys.path.append(os.path.abspath(".."))
from src.tokenizer import Tokenizer
import array

def _encode_text(vocab: list, output_path, input_path, chunk_size = 8*1024*1024):
    stoi = Tokenizer(vocab).define_mapping_stoi()
    with open(input_path, mode="r",encoding="utf-8" ) as input_file, \
         open(output_path, mode="wb") as output_file:
        
        while True:
            text_chunk = input_file.read(chunk_size)
            if not text_chunk:
                break # EOF
            
            encoded_text = [stoi[ch] for ch in text_chunk]
            arr = array.array("H", encoded_text) 
            arr.tofile(output_file)



_encode_text(vocab_list, "encoded_text.bin", "trainingText.clean.txt")

In [10]:
def _decode_text(path, itos):
    arr = array.array("H")
    with open(path, "rb") as f:
        arr.fromfile(f, os.path.getsize(path) // 2)
    return "".join([itos[ch] for ch in arr])

decoded_text = _decode_text("encoded_text.bin", vocab_list)
print(decoded_text)

aa bb cc dd
