# Some experiments on tokenization

## Setup
### Some Helper Functions

I like to use relative paths to load stuff, these functions enable this in a jupyter notebook.

In [2]:
import os
import math
from pathlib import Path
from typing import Mapping, Optional, Iterator, Iterable

from IPython.display import Markdown, display

def get_current_path():
    current_notebook = os.environ.get("JPY_SESSION_NAME")
    if current_notebook is None:
        raise EnvironmentError("JPY_SESSION_NAME is not set")
    return Path(current_notebook).parent

def get_project_path():
    current_path = get_current_path()
    project_path = current_path.parent.parent
    project_git = project_path / ".git"
    if not project_git.exists() or (not project_git.is_dir()):
        raise ValueError("Project Base directory not found")
    return project_path


### Load Example Text

The examples should be in the `data/examples/` directory. Use the python script there to download the files on first run.

In [3]:
example_filename = get_project_path() / "data" / "examples" / "the-verdict.txt"
example_text = example_filename.read_text(encoding="utf-8")

result_markdown = f"""
#### Metadata
Total number of character: {len(example_text)}

#### First Characters
```
{example_text[:99]}
```
"""
display(Markdown(result_markdown))


#### Metadata
Total number of character: 20479

#### First Characters
```
I HAD always thought Jack Gisburn rather a cheap genius--though a good fellow enough--so it was no 
```


## A simple Tokenizer

In [4]:
import re
basic_split_regex_string = r'([,.:;?_!"()\']|--|\s)'
basic_split_regex = re.compile(basic_split_regex_string)

def token_generator(text):
    tokens = basic_split_regex.split(text)
    for token in tokens:
        token = token.strip()
        if token:
            yield token

example_tokens = [token for token in token_generator(example_text)]

example_token_set= sorted(set(example_tokens))
example_vocabulary_decode = {token_id:token for token_id, token in enumerate(example_token_set)}
example_vocabulary_encode = {token:token_id for token_id, token in enumerate(example_token_set)}
#print(example_vocabulary)


result_markdown = f"""
#### Metadata
* Total number of Tokens: {len(example_tokens)}
* Size of Vocabulary: {len(example_token_set)}
#### First Tokens
{"\n".join([(lambda token: "".join(["* `", token, "` (", str(example_vocabulary_encode.get(token, "?")), ")"]))(token) for token in example_tokens[:11]])}
"""
display(Markdown(result_markdown))



#### Metadata
* Total number of Tokens: 4690
* Size of Vocabulary: 1130
#### First Tokens
* `I` (53)
* `HAD` (44)
* `always` (149)
* `thought` (1003)
* `Jack` (57)
* `Gisburn` (38)
* `rather` (818)
* `a` (115)
* `cheap` (256)
* `genius` (486)
* `--` (6)


In [5]:

class SimpleTokenizer(object):
    def __init__(self, id_to_token_vocab: Mapping[int, str],
                       *,
                       id_to_reserved_tokens: Optional[Mapping[int, str]] = None,
                       invalid_token_id: Optional[int] = None,
                       begin_of_text_id: Optional[int] = None,
                       end_of_text_id: Optional[int] = None,
                       split_regex_string: str = r'([,.:;?_!"()\']|--|\s)',
                       reserved_tokens_regex_string: str = r'^<\|[a-zA-Z0-9_]+\|>$',
                       clean_punct_regex_string: str=r'\s+([,.?!"()\'])'):
        self.split_regex = re.compile(split_regex_string)
        self.reserved_tokens_regex = re.compile(reserved_tokens_regex_string)
        self.clean_punct_regex = re.compile(clean_punct_regex_string)
        if any([self.reserved_tokens_regex.match(token) for token in id_to_token_vocab.values()]):
            raise ValueError("id_to_token_vocab cannot include reserved tokens according to {reserved_tokens_regex_string}")
        self.id_to_token = dict(id_to_token_vocab)
        self.reserved_tokens = id_to_reserved_tokens
        token_id_set = set(self.id_to_token.keys())
        if id_to_reserved_tokens is not None:
            for reserved_token in id_to_reserved_tokens.values():
                if not self.reserved_tokens_regex.match(reserved_token):
                    raise ValueError(f"Reserved token '{reserved_token}' is not allowed according to {reserved_tokens_regex_string}")
            reserved_tokens_id_set = set(id_to_reserved_tokens.keys())
            if not reserved_tokens_id_set.issubset(token_id_set):
                raise ValueError("Reserved tokens IDs must not be included in vocabulary")
            if any([invalid_token_id, begin_of_text_id, end_of_text_id] is None):
                raise ValueError("invalid_token_id, begin_of_text_id, end_of_text_id must be provided if reserved_tokens are given")
            if any([token_id not in token_id_set for token_id in [invalid_token_id, begin_of_text_id, end_of_text_id]]):
                raise ValueError("invalid_token_id, begin_of_text_id, end_of_text_id must be included in id_to_reserved_tokens")
            self.invalid_token_id = invalid_token_id
            self.begin_of_text_id = begin_of_text_id
            self.end_of_text_id = end_of_text_id
        else:
            if invalid_token_id is not None:
                raise ValueError("Invalid token id is specified without provided reserved tokens")
            reserved_token_id_start = 2**math.ceil(math.log2(max(token_id_set) + 1))
            self.invalid_token_id = reserved_token_id_start
            self.begin_of_text_id = reserved_token_id_start+1
            self.end_of_text_id = reserved_token_id_start+2
            self.reserved_tokens = {
                self.invalid_token_id: "<|unk|>",
                self.begin_of_text_id: "<|bot|>",
                self.end_of_text_id: "<|eot|>",
                }

        self.id_to_token.update(self.reserved_tokens)
        self.token_to_id = {token:token_id for token_id,token in self.id_to_token.items()}

    def _token_generator(self, text) -> Iterator[str]:
        tokens = self.split_regex.split(text)
        for token in tokens:
            token = token.strip()
            if token:
                yield token

    def encoder(self, text) -> Iterator[int]:
        tokens = self._token_generator(text)
        for token in tokens:
            token_id = example_vocabulary_encode.get(token)
            if token_id is None:
                if self.reserved_tokens_regex.match(token):
                    raise ValueError(f"Could not find reserved token '{token}'")
                token_id = self.invalid_token_id
            yield token_id

    def encode(self, text) -> list[int]:
        return [token_id for token_id in self.encoder(text)]

    def encoder_batch(self, texts: Iterable[str]) -> Iterator[int]:
        for text in texts:
            yield self.begin_of_text_id
            for token_id in self.encoder(text):
                yield token_id
            yield self.end_of_text_id

    def encode_batch(self, texts: Iterable[str]) -> list[int]:
        return [token_id for token_id in self.encoder_batch(texts)]

    def decoder(self, token_ids: Iterable[int]) -> Iterator[str]:
        invalid_token = self.id_to_token[self.invalid_token_id]
        for token_id in token_ids:
            yield self.id_to_token.get(token_id, invalid_token)

    def decode(self, token_ids: Iterable[int]) -> str:
        decoded_text = " ".join(self.decoder(token_ids))
        cleaned_text = self.clean_punct_regex.sub(r"\1", decoded_text)
        return cleaned_text

example_tokenizer = SimpleTokenizer(example_vocabulary_decode)

test_texts = [
    "This is a text with GarantiertUnbekannt! And so on.",
    "The next one!",
]
encoded_test = example_tokenizer.encode_batch(test_texts)
roundtrip_test = example_tokenizer.decode(encoded_test)

result_markdown = f"""
#### Metadata
* Size of Vocabulary: {len(example_token_set)}
* Invalid Token ID: {example_tokenizer.invalid_token_id}
* Begin of Text: {example_tokenizer.begin_of_text_id}
* End of Text ID: {example_tokenizer.end_of_text_id}
#### Test String
{test_texts}
#### Encoded Tokens
{encoded_test}
#### Roundtrip
{roundtrip_test}
"""

display(Markdown(result_markdown))


#### Metadata
* Size of Vocabulary: 1130
* Invalid Token ID: 2048
* Begin of Text: 2049
* End of Text ID: 2050
#### Test String
['This is a text with GarantiertUnbekannt! And so on.', 'The next one!']
#### Encoded Tokens
[2049, 97, 584, 115, 2048, 1108, 2048, 0, 14, 908, 727, 7, 2050, 2049, 93, 708, 729, 0, 2050]
#### Roundtrip
<|bot|> This is a <|unk|> with <|unk|>! And so on. <|eot|> <|bot|> The next one! <|eot|>


## A BPE Tokenizer



In [None]:
%pip install tiktoken

In [7]:
from importlib.metadata import version
import tiktoken

print("tiktoken version:", version("tiktoken"))

tokenizer = tiktoken.get_encoding("gpt2")

text = ( "Hello, do you like tea? <|endoftext|> In the sunlit terraces" "of someunknownPlace." )
integers = tokenizer.encode(text, allowed_special={"<|endoftext|>"})
print(integers)
strings = tokenizer.decode(integers)
print(strings)

tiktoken version: 0.11.0
[15496, 11, 466, 345, 588, 8887, 30, 220, 50256, 554, 262, 4252, 18250, 8812, 2114, 1659, 617, 34680, 27271, 13]
Hello, do you like tea? <|endoftext|> In the sunlit terracesof someunknownPlace.


## Generating Training examples

In [11]:
from itertools import islice, repeat, chain
from collections import deque

def encoding_generator(texts: Iterable[str], *, tokenizer=tiktoken.get_encoding("gpt2")) -> list[int]:
    texts_list = [text for text in texts]
    encodings = tokenizer.encode_batch(texts_list, allowed_special={"<|endoftext|>"})
    eot_id = tokenizer.encode_single_token("<|endoftext|>")
    collected_encodings = list()
    for encoding in encodings:
        if encoding:
            if collected_encodings:
                collected_encodings.append(eot_id)
            collected_encodings.extend(encoding)
    return collected_encodings

def samples_generator(texts: Iterable[str], *, tokenizer=tiktoken.get_encoding("gpt2"), context_size=50) -> Iterator[tuple[list[int], list[int]]]:
    padding_id = tokenizer.encode_single_token("<|endoftext|>")
    padding_iterator = repeat(padding_id, context_size-1)
    encodings = encoding_generator(texts, tokenizer=tokenizer)
    encoding_iterator = chain(padding_iterator, iter(encodings))
    window = deque(islice(encoding_iterator, context_size), maxlen=context_size)
    for x in encoding_iterator:
        source_window = list(window)
        window.append(x)
        target_window = list(window)
        yield (source_window, target_window)

samples = samples_generator(test_texts, context_size=5)

for sample in samples:
    print(f"{sample[0]}: {sample[1]}")


[50256, 50256, 50256, 50256, 1212]: [50256, 50256, 50256, 1212, 318]
[50256, 50256, 50256, 1212, 318]: [50256, 50256, 1212, 318, 257]
[50256, 50256, 1212, 318, 257]: [50256, 1212, 318, 257, 2420]
[50256, 1212, 318, 257, 2420]: [1212, 318, 257, 2420, 351]
[1212, 318, 257, 2420, 351]: [318, 257, 2420, 351, 402]
[318, 257, 2420, 351, 402]: [257, 2420, 351, 402, 4741]
[257, 2420, 351, 402, 4741]: [2420, 351, 402, 4741, 72]
[2420, 351, 402, 4741, 72]: [351, 402, 4741, 72, 861]
[351, 402, 4741, 72, 861]: [402, 4741, 72, 861, 3118]
[402, 4741, 72, 861, 3118]: [4741, 72, 861, 3118, 47083]
[4741, 72, 861, 3118, 47083]: [72, 861, 3118, 47083, 272]
[72, 861, 3118, 47083, 272]: [861, 3118, 47083, 272, 429]
[861, 3118, 47083, 272, 429]: [3118, 47083, 272, 429, 0]
[3118, 47083, 272, 429, 0]: [47083, 272, 429, 0, 843]
[47083, 272, 429, 0, 843]: [272, 429, 0, 843, 523]
[272, 429, 0, 843, 523]: [429, 0, 843, 523, 319]
[429, 0, 843, 523, 319]: [0, 843, 523, 319, 13]
[0, 843, 523, 319, 13]: [843, 523, 31

## Generate pytorch Dataset

In [16]:
import torch
from torch.utils.data import Dataset, DataLoader
class GPTDatasetV1(Dataset):
    def __init__(self, txt, *, tokenizer=tiktoken.get_encoding("gpt2"), context_size=50, stride=1):
        self.input_ids = []
        self.target_ids = []

        samples = samples_generator([txt], tokenizer=tokenizer, context_size=context_size)
        sliced_samples = islice(samples, None, None, stride)
        for sample in samples:
            self.input_ids.append(torch.tensor(sample[0]))
            self.target_ids.append(torch.tensor(sample[1]))

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.target_ids[idx]


def create_dataloader(txt, *, batch_size=4, max_length=256, stride=128, shuffle=True, drop_last=True, num_workers=0):
    tokenizer = tiktoken.get_encoding("gpt2")
    dataset = GPTDatasetV1(txt, tokenizer=tokenizer, context_size=max_length, stride=stride)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last, num_workers=num_workers )
    return dataloader

dataloader = create_dataloader(example_text, batch_size=1, max_length=4, stride=1, shuffle=False)
data_iter = iter(dataloader)
first_batch = next(data_iter)
print(first_batch)

[tensor([[50256, 50256, 50256,    40]]), tensor([[50256, 50256,    40,   367]])]
