what is the overall responsibility of the following Python code And Why is it important to be employed? An ultimate one-liner summary for the overall block of code. And I need for each function to know what it does and why is it matter?

'''python



'''

# **tiktoken/<span style='color:orange'> **tests** </span>**

- _ init _.py
- test_encoding.py
- test_helpers.py
- test_misc.py
- test_offsets.py
- test_pickle.py
- test_simple_public.py

## **tiktoken/tests/<span style='color:orange'> **test_encoding.py** </span>**

**Why these tests matter (intent & responsibilities)**<br>
1. Correctness ‚Äî ensure encode & decode are inverses where appropriate (ordinary text) and consistent for single tokens.<br>
2.	Unicode safety ‚Äî correct behavior for emojis, surrogate pairs, and malformed Unicode.<br>
3.	Binary handling ‚Äî stable encoding/decoding of arbitrary bytes.<br>
4.	Special-token safety ‚Äî correct and safe handling of reserved tokens (prevent accidental injection of control tokens).<br>
5.	Robustness & limits ‚Äî tests for very large or repetitive inputs and ensuring the library behaves deterministically or fails safely.<br>
6.	Regression protection ‚Äî many of these tests capture expected token ids (deterministic assertions) so future changes that break token assignments or regex tokenization rules get flagged.<br>
7.	API contract ‚Äî batch APIs, allowed/disallowed flags, encode_ordinary behavior ‚Äî the test suite documents and enforces the intended API semantics.

- @pytest.mark.parametrize("make_enc", ENCODING_FACTORIES) runs the same test for each encoding factory, so the library is tested across several encoder configurations.
- hypothesis decorators (@given, @settings) produce many random examples to catch edge cases not covered by fixed examples. deadline=None disables per-test timeout; max_examples=MAX_EXAMPLES controls number of tries.
- with pytest.raises(ValueError): ensures specific failures happen for disallowed conditions.
- assert statements validate expectations ‚Äî if any fail, pytest reports the failing test.

**Examples of bugs these tests could catch**

- A change in token-to-string mapping that makes encode("hello world") yield different tokens ‚Üí test_simple fails.
- A Unicode bug that mishandles surrogate pairs ‚Üí test_encode_surrogate_pairs fails.
- An unchecked special token injection that should raise ‚Üí test_special_token exposes it.
- A buffer overflow / OOM for huge inputs ‚Äî ValueError not raised or memory blow-up ‚Üí test_large_repeated fails or reveals slow behavior.
- Batch encode returning wrong order or mutated inputs ‚Üí test_batch_encode or Hypothesis batch tests fail.

**Summary (one-liner)**

- This file is a comprehensive unit + property-test suite that verifies correctness, robustness, Unicode/bytes safety, special-token semantics, and batch behavior of the tiktoken encoding implementation across multiple tokenizer configurations.


Smoke test for the most basic encoding/decoding functionality across known encodings.

In [3]:
def test_simple():
    enc = tiktoken.get_encoding("gpt2")
    assert enc.encode("hello world") == [31373, 995]
    assert enc.decode([31373, 995]) == "hello world"
    assert enc.encode("hello <|endoftext|>", allowed_special="all") == [31373, 220, 50256]

    enc = tiktoken.get_encoding("cl100k_base")
    assert enc.encode("hello world") == [15339, 1917]
    assert enc.decode([15339, 1917]) == "hello world"
    assert enc.encode("hello <|endoftext|>", allowed_special="all") == [15339, 220, 100257]

    for enc_name in tiktoken.list_encoding_names():
        enc = tiktoken.get_encoding(enc_name)
        for token in range(min(10_000, enc.max_token_value - 1)):
            assert enc.encode_single_token(enc.decode_single_token_bytes(token)) == token

Check how the tokenizer handles repeated identical characters (here "0" repeated many times). This catches regressions in tokenization logic for repetitive sequences.

In [2]:
def test_simple_repeated():
    enc = tiktoken.get_encoding("gpt2")
    assert enc.encode("0") == [15]
    assert enc.encode("00") == [405]
    assert enc.encode("000") == [830]
    assert enc.encode("0000") == [2388]
    assert enc.encode("00000") == [20483]
    assert enc.encode("000000") == [10535]
    assert enc.encode("0000000") == [24598]
    assert enc.encode("00000000") == [8269]
    assert enc.encode("000000000") == [10535, 830]
    assert enc.encode("0000000000") == [8269, 405]
    assert enc.encode("00000000000") == [8269, 830]
    assert enc.encode("000000000000") == [8269, 2388]
    assert enc.encode("0000000000000") == [8269, 20483]
    assert enc.encode("00000000000000") == [8269, 10535]
    assert enc.encode("000000000000000") == [8269, 24598]
    assert enc.encode("0000000000000000") == [25645]
    assert enc.encode("00000000000000000") == [8269, 10535, 830]

Ensure extremely long inputs are either handled safely or rejected. It fetches an encoding and asserts that encoding a million-character string raises ValueError. Tests boundary behavior / limits / defensive checks (prevents OOM [Out Of Memory] or runaway behavior).

In [1]:
def test_large_repeated():
    enc = tiktoken.get_encoding("o200k_base")

    with pytest.raises(ValueError):
        enc.encode("x" * 1_000_000)

Check tokenizer behavior for a few tricky strings with punctuation, newlines, whitespace patterns. Ensures tokenization rules (which often use regexes) are producing consistent token ids.

In [None]:
def test_simple_regex():
    enc = tiktoken.get_encoding("cl100k_base")
    assert enc.encode("rer") == [38149]
    assert enc.encode("'rer") == [2351, 81]
    assert enc.encode("today\n ") == [31213, 198, 220]
    assert enc.encode("today\n \n") == [31213, 27907]
    assert enc.encode("today\n  \n") == [31213, 14211]

Test basic encode behavior for several prebuilt encodings; check that certain byte sequences map to expected tokens.

In [None]:
def test_basic_encode():
    enc = tiktoken.get_encoding("r50k_base") # >>> r <<< 50k_base
    assert enc.encode("hello world") == [31373, 995]

    enc = tiktoken.get_encoding("p50k_base") # >>> p <<< 50k_base
    assert enc.encode("hello world") == [31373, 995]

    enc = tiktoken.get_encoding("cl100k_base")
    assert enc.encode("hello world") == [15339, 1917]
    assert enc.encode(" \x850") == [220, 126, 227, 15]

Encoding an empty string should return empty list [].

In [None]:
def test_encode_empty():
    enc = tiktoken.get_encoding("r50k_base")
    assert enc.encode("") == []

test lower-level bytes-encoding functions. The Hypothesis-powered test_hyp_encode_bytes uses @given(bytestring=st.binary()) to check for many random byte sequences that _encode_bytes ‚Üí decode_bytes is an actual roundtrip. This helps find corner cases in binary handling.

In [None]:
@pytest.mark.parametrize("make_enc", ENCODING_FACTORIES)
@hypothesis.given(bytestring=st.binary())
@hypothesis.settings(deadline=None, max_examples=MAX_EXAMPLES)

def test_encode_bytes():
    enc = tiktoken.get_encoding("cl100k_base")
    assert enc._encode_bytes(b" \xec\x8b\xa4\xed") == [62085]
    for i in range(10):
        bytestring = b"\x80" * i
        assert enc.decode_bytes(enc._encode_bytes(bytestring)) == bytestring

def test_hyp_encode_bytes(make_enc: Callable[[], tiktoken.Encoding], bytestring: bytes):
    enc = make_enc()
    assert enc.decode_bytes(enc._encode_bytes(bytestring)) == bytestring

Unicode correctness. Emoji and other characters are represented as codepoints or surrogate pairs in UTF-16:

In [None]:
def test_encode_surrogate_pairs():
    enc = tiktoken.get_encoding("cl100k_base")

    assert enc.encode("üëç") == [9468, 239, 235]
    # surrogate pair gets converted to codepoint
    assert enc.encode("\ud83d\udc4d") == [9468, 239, 235]

    # lone surrogate just gets replaced
    assert enc.encode("\ud83d") == enc.encode("ÔøΩ")

more robustness tests for huge repetitive strings of different characters. Ensures encode/decode roundtrip for large strings and slight variations (leading space, trailing newline). Checks the encoder can handle long repetitive input without corrupting it.


In [None]:
@pytest.mark.parametrize("make_enc", ENCODING_FACTORIES)

def test_catastrophically_repetitive(make_enc: Callable[[], tiktoken.Encoding]):
    enc = make_enc()
    for c in ["^", "0", "a", "'s", " ", "\n"]:
        big_value = c * 10_000
        assert big_value == enc.decode(enc.encode(big_value))

        big_value = " " + big_value
        assert big_value == enc.decode(enc.encode(big_value))

        big_value = big_value + "\n"
        assert big_value == enc.decode(enc.encode(big_value))

**Roundtrip tests**

*These tests validate that encode followed by decode returns the original input.*

For a fixed list of strings (including non-ASCII "ËØ∑ËÄÉËØïÊàëÁöÑËΩØ‰ª∂ÔºÅ12345"), confirms enc.decode(enc.encode(value)) == value. Also checks encode_ordinary (which probably encodes ignoring special tokens) behaves same for ordinary text.

In [None]:
@pytest.mark.parametrize("make_enc", ENCODING_FACTORIES)
def test_basic_roundtrip(make_enc):
    enc = make_enc()
    for value in (
        "hello",
        "hello ",
        "hello  ",
        " hello",
        " hello ",
        " hello  ",
        "hello world",
        "ËØ∑ËÄÉËØïÊàëÁöÑËΩØ‰ª∂ÔºÅ12345",
    ):
        assert value == enc.decode(enc.encode(value))
        assert value == enc.decode(enc.encode_ordinary(value))


Hypothesis-based: @given(text=st.text()) generates many random Unicode strings; confirms encode->decode roundtrip property across lots of inputs and for every encoding factory. This is a powerful property test to catch many edge cases.


In [None]:
@pytest.mark.parametrize("make_enc", ENCODING_FACTORIES)
@hypothesis.given(text=st.text())
@hypothesis.settings(deadline=None, max_examples=MAX_EXAMPLES)

def test_hyp_roundtrip(make_enc: Callable[[], tiktoken.Encoding], text):
    enc = make_enc()
    assert text == enc.decode(enc.encode(text))

For every token id in enc.n_vocab:
	‚Ä¢	Try to get token_bytes = enc.decode_single_token_bytes(token) (some tokens may raise KeyError if invalid)
	‚Ä¢	Assert that enc.encode_single_token(token_bytes) == token.
‚Üí Ensures individual token encode/decode pair is consistent across vocabulary.

In [None]:
@pytest.mark.parametrize("make_enc", ENCODING_FACTORIES)

def test_single_token_roundtrip(make_enc: Callable[[], tiktoken.Encoding]):
    enc = make_enc()

    for token in range(enc.n_vocab):
        try:
            token_bytes = enc.decode_single_token_bytes(token)
        except KeyError:
            continue
        assert enc.encode_single_token(token_bytes) == token

**Special tokens tests**

This whole block ensures strict and predictable API semantics around special tokens, which is crucial when special tokens carry model control semantics.

In [None]:
@pytest.mark.parametrize("make_enc", ENCODING_FACTORIES)
@hypothesis.given(text=st.text())
@hypothesis.settings(deadline=None, max_examples=MAX_EXAMPLES)

def test_special_token():
    enc = tiktoken.get_encoding("cl100k_base")

    eot = enc.encode_single_token("<|endoftext|>")
    assert eot == enc.eot_token
    fip = enc.encode_single_token("<|fim_prefix|>")
    fim = enc.encode_single_token("<|fim_middle|>")

    text = "<|endoftext|> hello <|fim_prefix|>"
    assert eot not in enc.encode(text, disallowed_special=())
    with pytest.raises(ValueError):
        enc.encode(text)
    with pytest.raises(ValueError):
        enc.encode(text, disallowed_special="all")
    with pytest.raises(ValueError):
        enc.encode(text, disallowed_special={"<|endoftext|>"})
    with pytest.raises(ValueError):
        enc.encode(text, disallowed_special={"<|fim_prefix|>"})

    text = "<|endoftext|> hello <|fim_prefix|> there <|fim_middle|>"
    tokens = enc.encode(text, disallowed_special=())
    assert eot not in tokens
    assert fip not in tokens
    assert fim not in tokens

    tokens = enc.encode(text, allowed_special="all", disallowed_special=())
    assert eot in tokens
    assert fip in tokens
    assert fim in tokens

    tokens = enc.encode(text, allowed_special="all", disallowed_special="all")
    assert eot in tokens
    assert fip in tokens
    assert fim in tokens

    tokens = enc.encode(text, allowed_special={"<|fim_prefix|>"}, disallowed_special=())
    assert eot not in tokens
    assert fip in tokens
    assert fim not in tokens

    tokens = enc.encode(text, allowed_special={"<|endoftext|>"}, disallowed_special=())
    assert eot in tokens
    assert fip not in tokens
    assert fim not in tokens

    tokens = enc.encode(text, allowed_special={"<|fim_middle|>"}, disallowed_special=())
    assert eot not in tokens
    assert fip not in tokens
    assert fim in tokens
    
def test_hyp_special_ordinary(make_enc, text: str):
    enc = make_enc()
    assert enc.encode_ordinary(text) == enc.encode(text, disallowed_special=())

**Batch encoding tests**

The Hypothesis batch test generates lists of random strings and checks encode_batch/decode_batch roundtrip for whole batches.

In [None]:
@pytest.mark.parametrize("make_enc", ENCODING_FACTORIES)

def test_batch_encode(make_enc: Callable[[], tiktoken.Encoding]):
    enc = make_enc()
    text1 = "hello world"
    text2 = "goodbye world"

    assert enc.encode_batch([text1]) == [enc.encode(text1)]
    assert enc.encode_batch([text1, text2]) == [enc.encode(text1), enc.encode(text2)]

    assert enc.encode_ordinary_batch([text1]) == [enc.encode_ordinary(text1)]
    assert enc.encode_ordinary_batch([text1, text2]) == [
        enc.encode_ordinary(text1),
        enc.encode_ordinary(text2),
    ]

@pytest.mark.parametrize("make_enc", ENCODING_FACTORIES)
@hypothesis.given(batch=st.lists(st.text()))
@hypothesis.settings(deadline=None, max_examples=MAX_EXAMPLES)

def test_hyp_batch_roundtrip(make_enc: Callable[[], tiktoken.Encoding], batch):
    enc = make_enc()

    encoded = enc.encode_batch(batch, allowed_special="all")
    assert encoded == [enc.encode(t, allowed_special="all") for t in batch]
    decoded = enc.decode_batch(encoded)
    assert decoded == batch

## **tiktoken/tests/<span style='color:orange'> **test_helpers.py** </span>**

This code sets up configurable, named, lazy factories for multiple tokenizer encodings (and a configurable test-size limit) so pytest tests can run the same test logic cleanly and repeatedly across different encodings with clear test IDs.


In [None]:
import bisect
import functools
import os

import pytest

import tiktoken

MAX_EXAMPLES: int = int(os.environ.get("TIKTOKEN_MAX_EXAMPLES", "100"))

ENCODINGS = ["r50k_base", "cl100k_base"]
SOME_ENCODINGS = ["cl100k_base"]


ENCODING_FACTORIES = [
    pytest.param(functools.partial(tiktoken.get_encoding, name), id=name) for name in ENCODINGS
]
SOME_ENCODING_FACTORIES = [
    pytest.param(functools.partial(tiktoken.get_encoding, name), id=name) for name in SOME_ENCODINGS
]

## **tiktoken/tests/<span style='color:orange'> **test_misc.py** </span>**

**one-liner summary:** <br><br>
These tests ensure that tiktoken resolves correct model-to-encoding mappings and stays lightweight by not importing optional dependencies unless explicitly needed.

Verifies that each OpenAI model name correctly maps to its intended tokenizer encoding ‚Äî preventing regressions in token counting behavior.

In [None]:
import subprocess
import sys

import tiktoken


def test_encoding_for_model():
    enc = tiktoken.encoding_for_model("gpt2")
    assert enc.name == "gpt2"
    enc = tiktoken.encoding_for_model("text-davinci-003")
    assert enc.name == "p50k_base"
    enc = tiktoken.encoding_for_model("text-davinci-edit-001")
    assert enc.name == "p50k_edit"
    enc = tiktoken.encoding_for_model("gpt-3.5-turbo-0301")
    assert enc.name == "cl100k_base"
    enc = tiktoken.encoding_for_model("gpt-4")
    assert enc.name == "cl100k_base"
    enc = tiktoken.encoding_for_model("gpt-4o")
    assert enc.name == "o200k_base"
    enc = tiktoken.encoding_for_model("gpt-oss-120b")
    assert enc.name == "o200k_harmony"

Ensures tiktoken does not automatically import optional heavy dependencies like blobfile.

In [None]:
def test_optional_blobfile_dependency():
    prog = """
import tiktoken
import sys
assert "blobfile" not in sys.modules
"""
    subprocess.check_call([sys.executable, "-c", prog])

## **tiktoken/tests/<span style='color:orange'> **test_offsets.py** </span>**

**one-liner summary:** <br><br>
This file validates, by a mixture of property-based and hand-written tests, that tiktoken.Encoding.decode_with_offsets returns both the exact decoded text and the correct per-token character start offsets ‚Äî crucial for mapping tokens back to text across languages, special tokens, and tricky UTF-8 boundaries.


**What the function does** <br>
- Returns the number of leading characters that a and b share (length of their common prefix).

**Why this matters in the test** <br>
- This function is the basic comparison primitive used to compute an offset by comparing a full decoded text to a prefix-decoded text; it tells you how many characters the prefix covers in the full text.

In [None]:
from typing import Callable

import hypothesis
import pytest
from hypothesis import strategies as st

import tiktoken

from .test_helpers import MAX_EXAMPLES, SOME_ENCODING_FACTORIES


def _common_prefix_len(a, b):
    i = 0
    while i < len(a) and i < len(b) and a[i] == b[i]:
        i += 1
    return i


**What the function does** <br>
- Constructs a reference list of character offsets for each token in tokens by decoding the token sequence and computing where each token starts in the decoded string using the naive prefix method.

**Why this matters in the test** <br>
- This function provides the ground-truth / reference offsets to compare against enc.decode_with_offsets. Because it‚Äôs simple and straightforward (decode prefixes and measure), it‚Äôs unlikely to have the same subtle bugs that an optimized library implementation might have.

In [None]:
def _token_offsets_reference(enc, tokens):
    text = enc.decode(tokens, errors="strict")
    res = []
    for i in range(len(tokens)):
        prefix = enc.decode(tokens[:i], errors="ignore")
        res.append(_common_prefix_len(text, prefix))
    return res


**What the function does** <br>
- Property-based test that compares the offsets produced by enc.decode_with_offsets(tokens) to the reference offsets produced by _token_offsets_reference(enc, tokens) for many randomly generated valid token sequences and across multiple encoding factories.

**Why this matters in the test** <br>
- Provides broad coverage and confidence that decode_with_offsets matches a straightforward, correct algorithm across many encodings and input shapes.


In [None]:
@pytest.mark.parametrize("make_enc", SOME_ENCODING_FACTORIES)
@hypothesis.given(data=st.data())
@hypothesis.settings(deadline=None, max_examples=MAX_EXAMPLES)

def test_hyp_offsets(make_enc: Callable[[], tiktoken.Encoding], data):
    enc = make_enc()

    tokens_st = st.lists(
        st.integers(0, enc.n_vocab - 1).filter(
            lambda x: x in enc._special_tokens.values() or x in enc._mergeable_ranks.values()
        ),
        min_size=1,
        max_size=20,
    )
    tokens = data.draw(tokens_st)

    # This is a dumb hack to make sure that our tokens are a valid UTF-8 string
    # We could potentially drop this, see the TODO in decode_with_offsets
    tokens = enc.encode(enc.decode(tokens, errors="ignore"), allowed_special="all")
    assert enc.decode_with_offsets(tokens)[1] == _token_offsets_reference(enc, tokens)

**What the function does** <br>
- A set of deterministic unit tests using explicit strings to check enc.decode_with_offsets in a few specific, important scenarios (simple ascii, special tokens, Chinese, Tamil, examples with tricky UTF-8 byte sequences).

**Why this matters in the test** <br>
- Provides quick, readable confirmation of correctness for known tricky cases; easier to debug than a failed Hypothesis case without context.

In [None]:
def test_basic_offsets():
    enc = tiktoken.get_encoding("cl100k_base")

    prompt = "hello world"
    p, o = enc.decode_with_offsets(enc.encode(prompt))
    assert p == prompt
    assert o == [0, 5]

    prompt = "hello world<|endoftext|> green cow"
    p, o = enc.decode_with_offsets(enc.encode(prompt, allowed_special="all"))
    assert p == prompt
    assert o == [0, 5, 11, 24, 30]

    prompt = "ÊàëÈùûÂ∏∏Ê∏¥Êúõ‰∏é‰∫∫Â∑•Êô∫ËÉΩ‰∏ÄËµ∑Â∑•‰Ωú"
    p, o = enc.decode_with_offsets(enc.encode(prompt))
    assert p == prompt
    assert o == [0, 1, 2, 3, 3, 4, 4, 5, 6, 7, 8, 8, 9, 10, 11, 12, 13]

    # contains the interesting tokens b'\xe0\xae\xbf\xe0\xae' and b'\xe0\xaf\x8d\xe0\xae'
    # in which \xe0 is the start of a 3-byte UTF-8 character
    prompt = "‡Æ®‡Æü‡Æø‡Æï‡Æ∞‡Øç ‡Æö‡ØÇ‡Æ∞‡Øç‡ÆØ‡Ææ"
    p, o = enc.decode_with_offsets(enc.encode(prompt))
    assert p == prompt
    assert o == [0, 0, 1, 1, 2, 3, 4, 4, 5, 6, 7, 8, 8, 9, 9, 10, 11, 12, 12]

    # contains the interesting token b'\xa0\xe9\x99\xa4'
    # in which \xe9 is the start of a 3-byte UTF-8 character and \xa0 is a continuation byte
    prompt = " ƒ†Èô§"
    p, o = enc.decode_with_offsets(enc.encode(prompt))
    assert p == prompt
    assert o == [0, 1]

## **tiktoken/tests/<span style='color:orange'> **test_pickle.py** </span>**

**one-liner summary:** <br><br>
This code verifies that a tiktoken Encoding object can be safely serialized and deserialized using Python‚Äôs pickle module without losing any of its tokenization behavior. This matters because reliable serialization is essential for saving, loading, transferring, caching, or distributing tokenizer models across processes or machines.

<br>This test ensures that a tokenizer‚Äîbuilt-in or custom‚Äîcan be serialized and deserialized through Python‚Äôs pickle without causing tokenization inconsistencies, which is crucial for reliably saving, sharing, and loading tokenizers in real systems.

In [None]:
import tiktoken


def test_pickle():
    import pickle

    enc_old = tiktoken.get_encoding("r50k_base")
    enc_new = pickle.loads(pickle.dumps(enc_old))
    assert enc_old.encode("hello world") == enc_new.encode("hello world")

    enc_old = tiktoken.Encoding(
        name="custom_enc",
        pat_str=enc_old._pat_str,
        mergeable_ranks=enc_old._mergeable_ranks,
        special_tokens={"<|pickle|>": 100_000},
    )
    enc_new = pickle.loads(pickle.dumps(enc_old))
    assert enc_old.encode("hello world") == enc_new.encode("hello world")
    assert (
        enc_old.encode("<|pickle|>", allowed_special="all")
        == enc_new.encode("<|pickle|>", allowed_special="all")
        == [100_000]
    )

## **tiktoken/tests/<span style='color:orange'> **test_simple_public.py** </span>**

**one-liner summary:** <br>

This test module verifies that tiktoken encodings correctly encode/decode text (including special tokens), map models to the right encodings, and avoid importing an optional dependency at import-time ‚Äî ensuring correctness, compatibility, and lightweight behavior.<br><br>

**Why this matters:**<br>
- Correct tokenization is foundational: wrong token IDs or decoding breaks models, causes incorrect token counts (affects batching, truncation, costs), and corrupts inputs/outputs.
- Model‚Üíencoder mapping must be stable so code that chooses encoders based on a model name behaves predictably.
- Avoiding accidental optional imports keeps installs lightweight, prevents surprising side effects, and reduces dependency/packaging problems in downstream projects.
Together these tests protect reliability and portability for systems that rely on tiktoken.

**What it does (summary):**
- Loads specific encodings ("gpt2" and "cl100k_base"), asserts known encode/decode results for "hello world" and for a string containing the special token <|endoftext|> (using allowed_special="all").
- Iterates every publicly available encoding name from tiktoken.list_encoding_names() and, for tokens 0..9999, checks a single-token round-trip: decode the single-token bytes then re-encode them, asserting the token ID is preserved.

**Why it matters:**
- The concrete assertions for "hello world" and the <|endoftext|> case ensure canonical, expected mappings for well-known encoders ‚Äî a regression here would indicate the tokenizer or its vocab changed unexpectedly.
- The single-token round-trip loop verifies every encoding‚Äôs basic encode/decode invariants for many token IDs: it catches issues where decode_single_token_bytes and encode_single_token are not inverse operations (which would break token-level manipulation, detokenization, byte-level operations).
- Overall, it enforces deterministic, reversible behavior of tokenizers ‚Äî essential for correctness when token-level operations are used (e.g., token counting, embeddings, model I/O, byte-level protocols).

In [None]:
import subprocess
import sys

import tiktoken


def test_simple():
    # Note that there are more actual tests, they're just not currently public :-)
    enc = tiktoken.get_encoding("gpt2")
    assert enc.encode("hello world") == [31373, 995]
    assert enc.decode([31373, 995]) == "hello world"
    assert enc.encode("hello <|endoftext|>", allowed_special="all") == [31373, 220, 50256]

    enc = tiktoken.get_encoding("cl100k_base")
    assert enc.encode("hello world") == [15339, 1917]
    assert enc.decode([15339, 1917]) == "hello world"
    assert enc.encode("hello <|endoftext|>", allowed_special="all") == [15339, 220, 100257]

    for enc_name in tiktoken.list_encoding_names():
        enc = tiktoken.get_encoding(enc_name)
        for token in range(10_000):
            assert enc.encode_single_token(enc.decode_single_token_bytes(token)) == token

**What it does (summary):**
- Calls tiktoken.encoding_for_model(...) for several model identifiers and asserts the returned encoder names match the expected canonical encodings (e.g., "gpt2" ‚Üí "gpt2", "text-davinci-003" ‚Üí "p50k_base", "gpt-3.5-turbo-0301" ‚Üí "cl100k_base").

**Why it matters:**
- Many codepaths pick an encoder based on the model name. This test ensures the library maps models to the correct tokenizer. If mappings drift, tokenization will be inconsistent with how the model was trained ‚Äî leading to incorrect token IDs, mismatched inputs, different token counts, or degraded model behavior.
- This is also crucial for billing/limits (accurate token counting) and for any logic that depends on encoder-specific behavior (special tokens, tokenization granularity).

In [None]:
def test_encoding_for_model():
    enc = tiktoken.encoding_for_model("gpt2")
    assert enc.name == "gpt2"
    enc = tiktoken.encoding_for_model("text-davinci-003")
    assert enc.name == "p50k_base"
    enc = tiktoken.encoding_for_model("text-davinci-edit-001")
    assert enc.name == "p50k_edit"
    enc = tiktoken.encoding_for_model("gpt-3.5-turbo-0301")
    assert enc.name == "cl100k_base"

**What it does (summary):**
- Builds a short Python snippet that imports tiktoken and asserts "blobfile" is not present in sys.modules. It then runs that snippet in a fresh subprocess (i.e., a clean Python process) to ensure importing tiktoken does not automatically import the optional blobfile package.

**Why it matters:**
- Some packages include optional dependencies for extended features; if those optional modules are imported unconditionally at top-level, they force users to install extra packages or cause import-time failures. This test ensures tiktoken import is lightweight and side-effect free with respect to blobfile.
- That prevents surprise dependency bloat, avoids packaging/CI problems, and reduces risk of import-time errors in environments where optional libs are absent.

In [None]:
def test_optional_blobfile_dependency():
    prog = """
import tiktoken
import sys
assert "blobfile" not in sys.modules
"""
    subprocess.check_call([sys.executable, "-c", prog])

# **tiktoken/<span style='color:black'> **tiktoken** </span>**

- _ init _.py
- _educational.py
- core.py
- load.py
- model.py
- py.typed
- registry.py

## **tiktoken/tiktoken/<span style='color:black'> **_ init _.py** </span>**


In [None]:
# This is the public API of tiktoken
from .core import Encoding as Encoding
from .model import encoding_for_model as encoding_for_model
from .model import encoding_name_for_model as encoding_name_for_model
from .registry import get_encoding as get_encoding
from .registry import list_encoding_names as list_encoding_names

__version__ = "0.12.0"

## **tiktoken/tiktoken/<span style='color:black'> **_educational.py** </span>**

The educational implementation of the byte pair encoding algorithm.

In [1]:
"""This is an educational implementation of the byte pair encoding algorithm."""

from __future__ import annotations

import collections

import regex

import tiktoken


class SimpleBytePairEncoding:
    def __init__(self, *, pat_str: str, mergeable_ranks: dict[bytes, int]) -> None:
        """Creates an Encoding object."""
        # A regex pattern string that is used to split the input text
        self.pat_str = pat_str
        # A dictionary mapping token bytes to their ranks. The ranks correspond to merge priority
        self.mergeable_ranks = mergeable_ranks

        self._decoder = {token: token_bytes for token_bytes, token in mergeable_ranks.items()}
        self._pat = regex.compile(pat_str)

    def encode(self, text: str, visualise: str | None = "colour") -> list[int]:
        """Encodes a string into tokens.

        >>> enc.encode("hello world")
        [388, 372]
        """
        # Use the regex to split the text into (approximately) words
        words = self._pat.findall(text)
        tokens = []
        for word in words:
            # Turn each word into tokens, using the byte pair encoding algorithm
            word_bytes = word.encode("utf-8")
            word_tokens = bpe_encode(self.mergeable_ranks, word_bytes, visualise=visualise)
            tokens.extend(word_tokens)
        return tokens

    def decode_bytes(self, tokens: list[int]) -> bytes:
        """Decodes a list of tokens into bytes.

        >>> enc.decode_bytes([388, 372])
        b'hello world'
        """
        return b"".join(self._decoder[token] for token in tokens)

    def decode(self, tokens: list[int]) -> str:
        """Decodes a list of tokens into a string.

        Decoded bytes are not guaranteed to be valid UTF-8. In that case, we replace
        the invalid bytes with the replacement character "ÔøΩ".

        >>> enc.decode([388, 372])
        'hello world'
        """
        return self.decode_bytes(tokens).decode("utf-8", errors="replace")

    def decode_tokens_bytes(self, tokens: list[int]) -> list[bytes]:
        """Decodes a list of tokens into a list of bytes.

        Useful for visualising how a string is tokenised.

        >>> enc.decode_tokens_bytes([388, 372])
        [b'hello', b' world']
        """
        return [self._decoder[token] for token in tokens]

    @staticmethod
    def train(training_data: str, vocab_size: int, pat_str: str):
        """Train a BPE tokeniser on some data!"""
        mergeable_ranks = bpe_train(data=training_data, vocab_size=vocab_size, pat_str=pat_str)
        return SimpleBytePairEncoding(pat_str=pat_str, mergeable_ranks=mergeable_ranks)

    @staticmethod
    def from_tiktoken(encoding):
        if isinstance(encoding, str):
            encoding = tiktoken.get_encoding(encoding)
        return SimpleBytePairEncoding(
            pat_str=encoding._pat_str, mergeable_ranks=encoding._mergeable_ranks
        )


def bpe_encode(
    mergeable_ranks: dict[bytes, int], input: bytes, visualise: str | None = "colour"
) -> list[int]:
    parts = [bytes([b]) for b in input]
    while True:
        # See the intermediate merges play out!
        if visualise:
            if visualise in ["colour", "color"]:
                visualise_tokens(parts)
            elif visualise == "simple":
                print(parts)

        # Iterate over all pairs and find the pair we want to merge the most
        min_idx = None
        min_rank = None
        for i, pair in enumerate(zip(parts[:-1], parts[1:])):
            rank = mergeable_ranks.get(pair[0] + pair[1])
            if rank is not None and (min_rank is None or rank < min_rank):
                min_idx = i
                min_rank = rank

        # If there were no pairs we could merge, we're done!
        if min_rank is None:
            break
        assert min_idx is not None

        # Otherwise, merge that pair and leave the rest unchanged. Then repeat.
        parts = parts[:min_idx] + [parts[min_idx] + parts[min_idx + 1]] + parts[min_idx + 2 :]

    if visualise:
        print()

    tokens = [mergeable_ranks[part] for part in parts]
    return tokens


def bpe_train(
    data: str, vocab_size: int, pat_str: str, visualise: str | None = "colour"
) -> dict[bytes, int]:
    # First, add tokens for each individual byte value
    if vocab_size < 2**8:
        raise ValueError("vocab_size must be at least 256, so we can encode all bytes")
    ranks = {}
    for i in range(2**8):
        ranks[bytes([i])] = i

    # Splinter up our data into lists of bytes
    # data = "Hello world"
    # words = [
    #     [b'H', b'e', b'l', b'l', b'o'],
    #     [b' ', b'w', b'o', b'r', b'l', b'd']
    # ]
    words: list[list[bytes]] = [
        [bytes([b]) for b in word.encode("utf-8")] for word in regex.findall(pat_str, data)
    ]

    # Now, use our data to figure out which merges we should make
    while len(ranks) < vocab_size:
        # Find the most common pair. This will become our next token
        stats = collections.Counter()
        for piece in words:
            for pair in zip(piece[:-1], piece[1:]):
                stats[pair] += 1

        most_common_pair = max(stats, key=lambda x: stats[x])
        token_bytes = most_common_pair[0] + most_common_pair[1]
        token = len(ranks)
        # Add the new token!
        ranks[token_bytes] = token

        # Now merge that most common pair in all the words. That is, update our training data
        # to reflect our decision to make that pair into a new token.
        new_words = []
        for word in words:
            new_word = []
            i = 0
            while i < len(word) - 1:
                if (word[i], word[i + 1]) == most_common_pair:
                    # We found our pair! Merge it
                    new_word.append(token_bytes)
                    i += 2
                else:
                    new_word.append(word[i])
                    i += 1
            if i == len(word) - 1:
                new_word.append(word[i])
            new_words.append(new_word)
        words = new_words

        # See the intermediate merges play out!
        if visualise:
            print(f"The current most common pair is {most_common_pair[0]} + {most_common_pair[1]}")
            print(f"So we made {token_bytes} our {len(ranks)}th token")
            if visualise in ["colour", "color"]:
                print("Now the first fifty words in our training data look like:")
                visualise_tokens([token for word in words[:50] for token in word])
            elif visualise == "simple":
                print("Now the first twenty words in our training data look like:")
                for word in words[:20]:
                    print(word)
            print("\n")

    return ranks


def visualise_tokens(token_values: list[bytes]) -> None:
    background = [f"\u001b[48;5;{i}m" for i in [167, 179, 185, 77, 80, 68, 134]]
    # If token boundaries do not occur at unicode character boundaries, it's unclear how best to
    # visualise the token. Here, we'll just use the unicode replacement character to represent some
    # fraction of a character.
    unicode_token_values = [x.decode("utf-8", errors="replace") for x in token_values]

    running_length = 0
    last_color = None
    for token in unicode_token_values:
        color = background[running_length % len(background)]
        if color == last_color:
            color = background[(running_length + 1) % len(background)]
            assert color != last_color
        last_color = color
        running_length += len(token)
        print(color + token, end="")
    print("\u001b[0m")


def train_simple_encoding():
    gpt2_pattern = (
        r"""'s|'t|'re|'ve|'m|'ll|'d| ?[\p{L}]+| ?[\p{N}]+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+"""
    )
    with open(__file__) as f:
        data = f.read()

    enc = SimpleBytePairEncoding.train(data, vocab_size=600, pat_str=gpt2_pattern)

    print("This is the sequence of merges performed in order to encode 'hello world':")
    tokens = enc.encode("hello world")
    assert enc.decode(tokens) == "hello world"
    assert enc.decode_bytes(tokens) == b"hello world"
    assert enc.decode_tokens_bytes(tokens) == [b"hello", b" world"]

    return enc

## **tiktoken/tiktoken/<span style='color:black'> **core.py** </span>**

**ONE-LINER SUMMARY:**<br>

This code implements a complete Byte-Pair-Encoding (BPE) tokenizer that converts text ‚Üî tokens safely, efficiently, and with strict control over special tokens to prevent misuse or injection attacks.<br><br>

This code provides a fast, safe, feature-complete BPE tokenizer that handles normal text, special-token security, parallel encoding/decoding, and compatibility with LLM training and inference.<br><br>


**HIGH-LEVEL RESPONSIBILITY OF THE CODE:**<br>

This file wraps a fast Rust BPE engine (CoreBPE) and provides Python-friendly methods to:

1. Encode text into tokens (normal strings ‚Üí token IDs)
2.	Decode tokens back into text
3.	Handle special tokens safely (e.g., <|endoftext|>, FIM tokens)
4.	Run tokenization in parallel
5.	Validate, cache, and registry-load encodings
6.	Provide stable interfaces for ML model training + inference

It is the public-facing implementation behind OpenAI‚Äôs tiktoken library.

In [None]:
from __future__ import annotations

import functools
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, AbstractSet, Collection, Literal, NoReturn, Sequence

from tiktoken import _tiktoken

if TYPE_CHECKING:
    import re

    import numpy as np
    import numpy.typing as npt

*class Encodings*<br><br>

**Overall Purpose**<br>

- Represents a specific tokenizer configuration *(regex rules, merges, and special tokens)*, and exposes all encode/decode operations.

**Why It Matters**<br>

- This is the central object used everywhere in LLM applications (OpenAI, HuggingFace, etc.). Without it, models couldn‚Äôt reliably convert between text and tokens.

*"class Encodings"*<br><br><span style='color:GREEN'> **BEGINS**</span>


**What it does**
- Stores regex, BPE ranks, and special token mappings.
- Ensures vocabulary size is consistent.
- Builds the underlying fast Rust CoreBPE engine.

**Why it matters**
- Misconfigured tokenizers break models. This step guarantees vocabulary integrity.

In [None]:
def __init__(
        self,
        name: str,
        *,
        pat_str: str,
        mergeable_ranks: dict[bytes, int],
        special_tokens: dict[str, int],
        explicit_n_vocab: int | None = None,
    ):
        """Creates an Encoding object.

        See openai_public.py for examples of how to construct an Encoding object.

        Args:
            name: The name of the encoding. It should be clear from the name of the encoding
                what behaviour to expect, in particular, encodings with different special tokens
                should have different names.
            pat_str: A regex pattern string that is used to split the input text.
            mergeable_ranks: A dictionary mapping mergeable token bytes to their ranks. The ranks
                must correspond to merge priority.
            special_tokens: A dictionary mapping special token strings to their token values.
            explicit_n_vocab: The number of tokens in the vocabulary. If provided, it is checked
                that the number of mergeable tokens and special tokens is equal to this number.
        """
        self.name = name

        self._pat_str = pat_str
        self._mergeable_ranks = mergeable_ranks
        self._special_tokens = special_tokens

        self.max_token_value = max(
            max(mergeable_ranks.values()), max(special_tokens.values(), default=0)
        )
        if explicit_n_vocab:
            assert len(mergeable_ranks) + len(special_tokens) == explicit_n_vocab
            assert self.max_token_value == explicit_n_vocab - 1

        self._core_bpe = _tiktoken.CoreBPE(mergeable_ranks, special_tokens, pat_str)

def __repr__(self) -> str:
        return f"<Encoding {self.name!r}>"

    # ====================
    # Encoding
    # ====================

**What**
- Encodes text into tokens ignoring special tokens entirely.

**Why**
- Faster and simpler path when you know text has no special tokens.

In [None]:
def encode_ordinary(self, text: str) -> list[int]:
        """Encodes a string into tokens, ignoring special tokens.

        This is equivalent to `encode(text, disallowed_special=())` (but slightly faster).

        ```
        >>> enc.encode_ordinary("hello world")
        [31373, 995]
        """
        try:
            return self._core_bpe.encode_ordinary(text)
        except UnicodeEncodeError:
            # See comment in encode
            text = text.encode("utf-16", "surrogatepass").decode("utf-16", "replace")
            return self._core_bpe.encode_ordinary(text)

**What**

Main secure encoder:
- Detects special tokens in text
- Allows or blocks them
- Throws errors if disallowed tokens are present
- Uses Rust BPE for speed

**Why**
- Prevents prompt injection and accidental triggering of model modes (like FIM or EOT tokens). This is critical for safety.

In [None]:
def encode(
        self,
        text: str,
        *,
        allowed_special: Literal["all"] | AbstractSet[str] = set(),  # noqa: B006
        disallowed_special: Literal["all"] | Collection[str] = "all",
    ) -> list[int]:
        """Encodes a string into tokens.

        Special tokens are artificial tokens used to unlock capabilities from a model,
        such as fill-in-the-middle. So we want to be careful about accidentally encoding special
        tokens, since they can be used to trick a model into doing something we don't want it to do.

        Hence, by default, encode will raise an error if it encounters text that corresponds
        to a special token. This can be controlled on a per-token level using the `allowed_special`
        and `disallowed_special` parameters. In particular:
        - Setting `disallowed_special` to () will prevent this function from raising errors and
          cause all text corresponding to special tokens to be encoded as natural text.
        - Setting `allowed_special` to "all" will cause this function to treat all text
          corresponding to special tokens to be encoded as special tokens.

        ```
        >>> enc.encode("hello world")
        [31373, 995]
        >>> enc.encode("<|endoftext|>", allowed_special={"<|endoftext|>"})
        [50256]
        >>> enc.encode("<|endoftext|>", allowed_special="all")
        [50256]
        >>> enc.encode("<|endoftext|>")
        # Raises ValueError
        >>> enc.encode("<|endoftext|>", disallowed_special=())
        [27, 91, 437, 1659, 5239, 91, 29]
        ```
        """
        if allowed_special == "all":
            allowed_special = self.special_tokens_set
        if disallowed_special == "all":
            disallowed_special = self.special_tokens_set - allowed_special
        if disallowed_special:
            if not isinstance(disallowed_special, frozenset):
                disallowed_special = frozenset(disallowed_special)
            if match := _special_token_regex(disallowed_special).search(text):
                raise_disallowed_special_token(match.group())

        try:
            return self._core_bpe.encode(text, allowed_special)
        except UnicodeEncodeError:
            # BPE operates on bytes, but the regex operates on unicode. If we pass a str that is
            # invalid UTF-8 to Rust, it will rightfully complain. Here we do a quick and dirty
            # fixup for any surrogate pairs that may have sneaked their way into the text.
            # Technically, this introduces a place where encode + decode doesn't roundtrip a Python
            # string, but given that this is input we want to support, maybe that's okay.
            # Also we use errors="replace" to handle weird things like lone surrogates.
            text = text.encode("utf-16", "surrogatepass").decode("utf-16", "replace")
            return self._core_bpe.encode(text, allowed_special)

**What**

- Same as encode but returns a NumPy array instead of list.

**Why**

- Avoids memory copies ‚Üí massive speed benefits for high-throughput systems.

In [None]:
def encode_to_numpy(
        self,
        text: str,
        *,
        allowed_special: Literal["all"] | AbstractSet[str] = set(),  # noqa: B006
        disallowed_special: Literal["all"] | Collection[str] = "all",
    ) -> npt.NDArray[np.uint32]:
        """Encodes a string into tokens, returning a numpy array.

        Avoids the overhead of copying the token buffer into a Python list.
        """
        if allowed_special == "all":
            allowed_special = self.special_tokens_set
        if disallowed_special == "all":
            disallowed_special = self.special_tokens_set - allowed_special
        if disallowed_special:
            if not isinstance(disallowed_special, frozenset):
                disallowed_special = frozenset(disallowed_special)
            if match := _special_token_regex(disallowed_special).search(text):
                raise_disallowed_special_token(match.group())

        import numpy as np

        buffer = self._core_bpe.encode_to_tiktoken_buffer(text, allowed_special)
        return np.frombuffer(buffer, dtype=np.uint32)

**What**

- Run tokenization in parallel threads.

**Why**

- Huge speedup when encoding thousands of prompts.

In [None]:
def encode_ordinary_batch(self, text: list[str], *, num_threads: int = 8) -> list[list[int]]:
        """Encodes a list of strings into tokens, in parallel, ignoring special tokens.

        This is equivalent to `encode_batch(text, disallowed_special=())` (but slightly faster).

        ```
        >>> enc.encode_ordinary_batch(["hello world", "goodbye world"])
        [[31373, 995], [11274, 16390, 995]]
        ```
        """
        encoder = functools.partial(self.encode_ordinary)
        with ThreadPoolExecutor(num_threads) as e:
            return list(e.map(encoder, text))

def encode_batch(
        self,
        text: list[str],
        *,
        num_threads: int = 8,
        allowed_special: Literal["all"] | AbstractSet[str] = set(),  # noqa: B006
        disallowed_special: Literal["all"] | Collection[str] = "all",
    ) -> list[list[int]]:
        """Encodes a list of strings into tokens, in parallel.

        See `encode` for more details on `allowed_special` and `disallowed_special`.

        ```
        >>> enc.encode_batch(["hello world", "goodbye world"])
        [[31373, 995], [11274, 16390, 995]]
        ```
        """
        if allowed_special == "all":
            allowed_special = self.special_tokens_set
        if disallowed_special == "all":
            disallowed_special = self.special_tokens_set - allowed_special
        if not isinstance(disallowed_special, frozenset):
            disallowed_special = frozenset(disallowed_special)

        encoder = functools.partial(
            self.encode, allowed_special=allowed_special, disallowed_special=disallowed_special
        )
        with ThreadPoolExecutor(num_threads) as e:
            return list(e.map(encoder, text))

**What**

Returns:
- Stable tokens (guaranteed prefix)
- All possible completions

**Why**

- Used internally for beam search, completion prediction, and advanced sampling.

In [None]:
def encode_with_unstable(
        self,
        text: str,
        *,
        allowed_special: Literal["all"] | AbstractSet[str] = set(),  # noqa: B006
        disallowed_special: Literal["all"] | Collection[str] = "all",
    ) -> tuple[list[int], list[list[int]]]:
        """Encodes a string into stable tokens and possible completion sequences.

        Note that the stable tokens will only represent a substring of `text`.

        See `encode` for more details on `allowed_special` and `disallowed_special`.

        This API should itself be considered unstable.

        ```
        >>> enc.encode_with_unstable("hello fanta")
        ([31373], [(277, 4910), (5113, 265), ..., (8842,)])

        >>> text = "..."
        >>> stable_tokens, completions = enc.encode_with_unstable(text)
        >>> assert text.encode().startswith(enc.decode_bytes(stable_tokens))
        >>> assert all(enc.decode_bytes(stable_tokens + seq).startswith(text.encode()) for seq in completions)
        ```
        """
        if allowed_special == "all":
            allowed_special = self.special_tokens_set
        if disallowed_special == "all":
            disallowed_special = self.special_tokens_set - allowed_special
        if disallowed_special:
            if not isinstance(disallowed_special, frozenset):
                disallowed_special = frozenset(disallowed_special)
            if match := _special_token_regex(disallowed_special).search(text):
                raise_disallowed_special_token(match.group())

        return self._core_bpe.encode_with_unstable(text, allowed_special)

**What**

- Convert exactly one token (text or bytes) ‚Üí its ID.

**Why**

- Used for debugging, tokenizer introspection, or low-level BPE analysis.

In [None]:
def encode_single_token(self, text_or_bytes: str | bytes) -> int:
        """Encodes text corresponding to a single token to its token value.

        NOTE: this will encode all special tokens.

        Raises `KeyError` if the token is not in the vocabulary.

        ```
        >>> enc.encode_single_token("hello")
        31373
        ```
        """
        if isinstance(text_or_bytes, str):
            text_or_bytes = text_or_bytes.encode("utf-8")
        return self._core_bpe.encode_single_token(text_or_bytes)

    # ====================
    # Decoding
    # ====================

**What** ‚Üí bytes

- Fastest decoding path.

**Why**

- Needed for LLM streaming output or non-UTF8 content.

In [None]:
def decode_bytes(self, tokens: Sequence[int]) -> bytes:
        """Decodes a list of tokens into bytes.

        ```
        >>> enc.decode_bytes([31373, 995])
        b'hello world'
        ```
        """
        return self._core_bpe.decode_bytes(tokens)

**What**

- Full decode to UTF-8 string.

**Why**

- This is how tokens become human-readable model output.

In [None]:
def decode(self, tokens: Sequence[int], errors: str = "replace") -> str:
        """Decodes a list of tokens into a string.

        WARNING: the default behavior of this function is lossy, since decoded bytes are not
        guaranteed to be valid UTF-8. You can control this behavior using the `errors` parameter,
        for instance, setting `errors=strict`.

        ```
        >>> enc.decode([31373, 995])
        'hello world'
        ```
        """
        return self._core_bpe.decode_bytes(tokens).decode("utf-8", errors=errors)

**What**

- Look up bytes for a single token.

**Why**

- Used to inspect exactly what a token represents.

In [None]:
def decode_single_token_bytes(self, token: int) -> bytes:
        """Decodes a token into bytes.

        NOTE: this will decode all special tokens.

        Raises `KeyError` if the token is not in the vocabulary.

        ```
        >>> enc.decode_single_token_bytes(31373)
        b'hello'
        ```
        """
        return self._core_bpe.decode_single_token_bytes(token)

**What**

- List each token‚Äôs raw bytes.

**Why**

- Great for visualizing token boundaries.

In [None]:
def decode_tokens_bytes(self, tokens: Sequence[int]) -> list[bytes]:
        """Decodes a list of tokens into a list of bytes.

        Useful for visualising tokenisation.
        >>> enc.decode_tokens_bytes([31373, 995])
        [b'hello', b' world']
        """
        return [self.decode_single_token_bytes(token) for token in tokens]

**What**

- Returns text + index offsets where each token begins.

**Why**

- Vital for aligning model predictions to text (for highlighting, instruction alignment, token-level editing, etc.).

In [None]:
def decode_with_offsets(self, tokens: Sequence[int]) -> tuple[str, list[int]]:
        """Decodes a list of tokens into a string and a list of offsets.

        Each offset is the index into text corresponding to the start of each token.
        If UTF-8 character boundaries do not line up with token boundaries, the offset is the index
        of the first character that contains bytes from the token.

        This will currently raise if given tokens that decode to invalid UTF-8; this behaviour may
        change in the future to be more permissive.

        >>> enc.decode_with_offsets([31373, 995])
        ('hello world', [0, 5])
        """
        token_bytes = self.decode_tokens_bytes(tokens)

        text_len = 0
        offsets = []
        for token in token_bytes:
            offsets.append(max(0, text_len - (0x80 <= token[0] < 0xC0)))
            text_len += sum(1 for c in token if not 0x80 <= c < 0xC0)

        # TODO: assess correctness for errors="ignore" and errors="replace"
        text = b"".join(token_bytes).decode("utf-8", errors="strict")
        return text, offsets

**What**

- Parallel decoding.

**Why**

- Speed for multi-request workloads.

In [None]:
def decode_batch(
        self, batch: Sequence[Sequence[int]], *, errors: str = "replace", num_threads: int = 8
    ) -> list[str]:
        """Decodes a batch (list of lists of tokens) into a list of strings."""
        decoder = functools.partial(self.decode, errors=errors)
        with ThreadPoolExecutor(num_threads) as e:
            return list(e.map(decoder, batch))

def decode_bytes_batch(
        self, batch: Sequence[Sequence[int]], *, num_threads: int = 8
    ) -> list[bytes]:
        """Decodes a batch (list of lists of tokens) into a list of bytes."""
        with ThreadPoolExecutor(num_threads) as e:
            return list(e.map(self.decode_bytes, batch))

    # ====================
    # Miscellaneous
    # ====================

Returns raw byte values for each token in vocab. Useful for debugging & introspection.

In [None]:
def token_byte_values(self) -> list[bytes]:
        """Returns the list of all token byte values."""
        return self._core_bpe.token_byte_values()

Returns <|endoftext|> token ID. Often used in training loops.

In [None]:
@property
def eot_token(self) -> int:
        return self._special_tokens["<|endoftext|>"]

Cached set of special token strings. Used by encoders for fast checking.

In [None]:
@functools.cached_property
def special_tokens_set(self) -> set[str]:
        return set(self._special_tokens.keys())

Tests whether a token ID is special.

In [None]:
def is_special_token(self, token: int) -> bool:
        assert isinstance(token, int)
        return token in self._special_token_values

Returns vocabulary size.

In [None]:
@property
def n_vocab(self) -> int:
        """For backwards compatibility. Prefer to use `enc.max_token_value + 1`."""
        return self.max_token_value + 1

    # ====================
    # Private
    # ====================

Encode text without regex splitting. Handles inner BPE operations.

In [None]:
def _encode_single_piece(self, text_or_bytes: str | bytes) -> list[int]:
        """Encodes text corresponding to bytes without a regex split.

        NOTE: this will not encode any special tokens.

        ```
        >>> enc.encode_single_piece("helloqqqq")
        [31373, 38227, 38227]
        ```
        """
        if isinstance(text_or_bytes, str):
            text_or_bytes = text_or_bytes.encode("utf-8")
        return self._core_bpe.encode_single_piece(text_or_bytes)

Emulates the Rust BPE algorithm in Python. Ultra-low-level fast path.

In [None]:
def _encode_only_native_bpe(self, text: str) -> list[int]:
        """Encodes a string into tokens, but do regex splitting in Python."""
        # We need specifically `regex` in order to compile pat_str due to e.g. \p
        import regex

        _unused_pat = regex.compile(self._pat_str)
        ret = []
        for piece in regex.findall(_unused_pat, text):
            ret.extend(self._core_bpe.encode_single_piece(piece.encode("utf-8")))
        return ret

Pass bytes through Rust encoder. Ultra-low-level fast path.

In [None]:
def _encode_bytes(self, text: bytes) -> list[int]:
        return self._core_bpe._encode_bytes(text)

Pickling support. Allows tokenizers to be cached, saved, and restored.

In [None]:
def __getstate__(self) -> object:
        import tiktoken.registry

        # As an optimisation, pickle registered encodings by reference
        if self is tiktoken.registry.ENCODINGS.get(self.name):
            return self.name
        return {
            "name": self.name,
            "pat_str": self._pat_str,
            "mergeable_ranks": self._mergeable_ranks,
            "special_tokens": self._special_tokens,
        }

def __setstate__(self, value: object) -> None:
        import tiktoken.registry

        if isinstance(value, str):
            self.__dict__ = tiktoken.registry.get_encoding(value).__dict__
            return
        self.__init__(**value)

*"class Encodings"*<br><br><span style='color:RED'> **ENDS**</span>

Creates a regex pattern matching all special tokens. Fast detection of injection attempts or forbidden tokens.

In [None]:
@functools.lru_cache(maxsize=128)
def _special_token_regex(tokens: frozenset[str]) -> re.Pattern[str]:
    try:
        import regex as re
    except ImportError:
        import re
    inner = "|".join(re.escape(token) for token in tokens)
    return re.compile(f"({inner})")

Raises detailed error explaining how to handle special tokens. Safety, clarity, misuse prevention.

In [None]:
def raise_disallowed_special_token(token: str) -> NoReturn:
    raise ValueError(
        f"Encountered text corresponding to disallowed special token {token!r}.\n"
        "If you want this text to be encoded as a special token, "
        f"pass it to `allowed_special`, e.g. `allowed_special={{{token!r}, ...}}`.\n"
        f"If you want this text to be encoded as normal text, disable the check for this token "
        f"by passing `disallowed_special=(enc.special_tokens_set - {{{token!r}}})`.\n"
        "To disable this check for all special tokens, pass `disallowed_special=()`.\n"
    )

## **tiktoken/tiktoken/<span style='color:black'> **load.py** </span>**

This module provides robust file I/O helpers (local/HTTP/cloud) with optional caching and integrity checks, plus converters to read/produce mergeable BPE rank tables used by tiktoken-style tokenizers (and to serialize/deserialize those tables).

**One-liner**:<br>

- Utilities to safely fetch/cache files and convert between DataGym-style BPE/encoder files and tiktoken-compatible mergeable-BPE rank files (and read/write those).

- Unifies reading from local disk, public HTTP URLs, and cloud-backed storage (GCS/S3 through blobfile) behind one function.
- Avoids unexpected MFA/auth prompts by preferring plain HTTP for public URLs.
- Reduces duplicated code and centralizes error messaging for missing dependencies.

In [None]:
def read_file(blobpath: str) -> bytes:
    if "://" not in blobpath:
        with open(blobpath, "rb", buffering=0) as f:
            return f.read()

    if blobpath.startswith(("http://", "https://")):
        # avoiding blobfile for public files helps avoid auth issues, like MFA prompts.
        import requests

        resp = requests.get(blobpath)
        resp.raise_for_status()
        return resp.content

    try:
        import blobfile
    except ImportError as e:
        raise ImportError(
            "blobfile is not installed. Please install it by running `pip install blobfile`."
        ) from e
    with blobfile.BlobFile(blobpath, "rb") as f:
        return f.read()

**What it does**
- Computes the SHA-256 hex digest of data and returns whether it equals expected_hash.

**How it works**
- hashlib.sha256(data).hexdigest() then equality check.

**Why it matters**
- Simple data integrity check to detect corrupted downloads or tampering.
- Used by other functions to decide whether to trust cached files or raise an error.

In [None]:
def check_hash(data: bytes, expected_hash: str) -> bool:
    actual_hash = hashlib.sha256(data).hexdigest()
    return actual_hash == expected_hash

**What it does**
- Fetches blobpath but uses a local cache directory to avoid re-downloading; validates cache with expected_hash if provided; writes fetched results into the cache (if writable).

**How it works (key steps)**
1. Determine cache_dir from environment variables TIKTOKEN_CACHE_DIR or DATA_GYM_CACHE_DIR, otherwise use a temp-directory data-gym-cache (and track whether user explicitly set it).
2.	If cache_dir == "", disables caching and calls read_file.
3.	Compute a cache key as SHA-1 of blobpath, check cache_path.
4.	If cached file exists, read it and return it if expected_hash is None or matches; otherwise delete the cache entry and re-fetch.
5.	If not cached or cache invalid: call read_file(blobpath) to fetch contents. If expected_hash provided and the fetched content fails the hash, raise ValueError.
6.	Attempt safe atomic write: write to a tmp file then os.rename to cache_path. If the write fails and the cache directory was user-specified, re-raise; if it was the default temp dir, silently ignore write failures.

**Why it matters**
- Avoids repeated expensive network reads (saves bandwidth/time).
- Ensures cached files are trusted via hash validation (prevents using corrupted or tampered cached data).
- Uses safe write pattern (tmp file + rename) to avoid partial/half-written cache files.
- Respects user opt-out (empty cache path) and tolerates unwritable default caches but fails loudly for user-specified caches (good UX).

In [None]:
def read_file_cached(blobpath: str, expected_hash: str | None = None) -> bytes:
    user_specified_cache = True
    if "TIKTOKEN_CACHE_DIR" in os.environ:
        cache_dir = os.environ["TIKTOKEN_CACHE_DIR"]
    elif "DATA_GYM_CACHE_DIR" in os.environ:
        cache_dir = os.environ["DATA_GYM_CACHE_DIR"]
    else:
        import tempfile

        cache_dir = os.path.join(tempfile.gettempdir(), "data-gym-cache")
        user_specified_cache = False

    if cache_dir == "":
        # disable caching
        return read_file(blobpath)

    cache_key = hashlib.sha1(blobpath.encode()).hexdigest()

    cache_path = os.path.join(cache_dir, cache_key)
    if os.path.exists(cache_path):
        with open(cache_path, "rb", buffering=0) as f:
            data = f.read()
        if expected_hash is None or check_hash(data, expected_hash):
            return data

        # the cached file does not match the hash, remove it and re-fetch
        try:
            os.remove(cache_path)
        except OSError:
            pass

    contents = read_file(blobpath)
    if expected_hash and not check_hash(contents, expected_hash):
        raise ValueError(
            f"Hash mismatch for data downloaded from {blobpath} (expected {expected_hash}). "
            f"This may indicate a corrupted download. Please try again."
        )

    import uuid

    try:
        os.makedirs(cache_dir, exist_ok=True)
        tmp_filename = cache_path + "." + str(uuid.uuid4()) + ".tmp"
        with open(tmp_filename, "wb") as f:
            f.write(contents)
        os.rename(tmp_filename, cache_path)
    except OSError:
        # don't raise if we can't write to the default cache, e.g. issue #75
        if user_specified_cache:
            raise

    return contents

**What it does**
- Converts a DataGym style BPE merges file plus encoder JSON into a single mapping token_bytes -> rank that matches the ordering/format tiktoken expects for mergeable BPE ranks.

**How it works (overview)**
- Builds a mapping of printable single-byte tokens in a deterministic rank order and then appends the other byte values so there are 256 single-byte tokens. (The data_gym_byte_to_byte / rank_to_intbyte logic does this remapping.)
- Reads vocab_bpe_file (cached) and parses merge pairs (these describe how two tokens merge to a new token, with ordering/ranks).
- Constructs initial bpe_ranks for single-byte tokens, then assigns ranks for merged tokens according to the merges file (in increasing n).
- Loads encoder_json_file (cached), decodes keys into bytes, removes special non-mergeable tokens (e.g. <|endoftext|>), and optionally clobbers single-byte tokens with values from encoder JSON if clobber_one_byte_tokens is True.
- Asserts that the constructed bpe_ranks equals the encoder JSON mapping ‚Äî this is an important sanity check guaranteeing both inputs are consistent.

**Why it matters**
- Tokenizers (especially tiktoken-like BPE) rely on a specific mapping from token bytes to ranks where rank order equals merge priority. Converting between different tokenizer repo formats is essential to reuse models, vocabularies, or tooling.
- The function enforces consistency between merges and encoder definitions ‚Äî catching mismatched files early prevents subtle tokenization bugs that can silently change model inputs/outputs.
- The clobber_one_byte_tokens flag gives controlled flexibility when encoders provide a particular ordering for single-byte tokens.

**Practical use**
- If you have a merges file and an encoder JSON from another tokenizer/tooling (DataGym), this function produces the exact rank table tiktoken expects so you can instantiate or interoperate with tiktoken-style tokenizers.

In [None]:
def data_gym_to_mergeable_bpe_ranks(
    vocab_bpe_file: str,
    encoder_json_file: str,
    vocab_bpe_hash: str | None = None,
    encoder_json_hash: str | None = None,
    clobber_one_byte_tokens: bool = False,
) -> dict[bytes, int]:
    # NB: do not add caching to this function
    rank_to_intbyte = [b for b in range(2**8) if chr(b).isprintable() and chr(b) != " "]

    data_gym_byte_to_byte = {chr(b): b for b in rank_to_intbyte}
    n = 0
    for b in range(2**8):
        if b not in rank_to_intbyte:
            rank_to_intbyte.append(b)
            data_gym_byte_to_byte[chr(2**8 + n)] = b
            n += 1
    assert len(rank_to_intbyte) == 2**8

    # vocab_bpe contains the merges along with associated ranks
    vocab_bpe_contents = read_file_cached(vocab_bpe_file, vocab_bpe_hash).decode()
    bpe_merges = [tuple(merge_str.split()) for merge_str in vocab_bpe_contents.split("\n")[1:-1]]

    def decode_data_gym(value: str) -> bytes:
        return bytes(data_gym_byte_to_byte[b] for b in value)

    # add the single byte tokens
    # if clobber_one_byte_tokens is True, we'll replace these with ones from the encoder json
    bpe_ranks = {bytes([b]): i for i, b in enumerate(rank_to_intbyte)}
    del rank_to_intbyte

    # add the merged tokens
    n = len(bpe_ranks)
    for first, second in bpe_merges:
        bpe_ranks[decode_data_gym(first) + decode_data_gym(second)] = n
        n += 1

    import json

    # check that the encoder file matches the merges file
    # this sanity check is important since tiktoken assumes that ranks are ordered the same
    # as merge priority
    encoder_json = json.loads(read_file_cached(encoder_json_file, encoder_json_hash))
    encoder_json_loaded = {decode_data_gym(k): v for k, v in encoder_json.items()}
    # drop these two special tokens if present, since they're not mergeable bpe tokens
    encoder_json_loaded.pop(b"<|endoftext|>", None)
    encoder_json_loaded.pop(b"<|startoftext|>", None)

    if clobber_one_byte_tokens:
        for k in encoder_json_loaded:
            if len(k) == 1:
                bpe_ranks[k] = encoder_json_loaded[k]

    assert bpe_ranks == encoder_json_loaded

    return bpe_ranks

**What it does**
- Writes bpe_ranks to tiktoken_bpe_file in a simple text format: each line base64(token) rank\n, ordered by ascending rank.

**How it works**
- Ensures blobfile is installed, opens blobfile.BlobFile(..., "wb"), sorts bpe_ranks by rank, and writes base64.b64encode(token) + b" " + str(rank).encode() + b"\n" for each.

**Why it matters**
- Produces a portable serialized BPE rank file that other code (including load_tiktoken_bpe or other tools) can read.
- Using base64 ensures arbitrary byte tokens are safely stored in a text-like file, avoiding issues with binary data in text files.
- Writing via blobfile enables writing to cloud-backed paths as well as local disk.

In [None]:
def dump_tiktoken_bpe(bpe_ranks: dict[bytes, int], tiktoken_bpe_file: str) -> None:
    try:
        import blobfile
    except ImportError as e:
        raise ImportError(
            "blobfile is not installed. Please install it by running `pip install blobfile`."
        ) from e
    with blobfile.BlobFile(tiktoken_bpe_file, "wb") as f:
        for token, rank in sorted(bpe_ranks.items(), key=lambda x: x[1]):
            f.write(base64.b64encode(token) + b" " + str(rank).encode() + b"\n")

**What it does**
- Reads the file format produced by dump_tiktoken_bpe and returns dict[bytes, int] mapping token bytes to rank (int). Optionally validates via expected_hash on the raw file.

**How it works**
- Calls read_file_cached with optional expected_hash to get the file contents.
- Splits into lines and for each non-empty line splits the token and rank. Decodes the base64 token and converts the rank to int. On parse failure raises a ValueError with context.

**Why it matters**
- Loads a stable, portable BPE rank table into memory for tokenizer construction/inspection.
- Validates format and surfaces parse errors clearly (helps debug mismatched versions or corrupted files).
- Works with cached files and supports cloud paths (via read_file_cached internals).

In [None]:
def load_tiktoken_bpe(tiktoken_bpe_file: str, expected_hash: str | None = None) -> dict[bytes, int]:
    # NB: do not add caching to this function
    contents = read_file_cached(tiktoken_bpe_file, expected_hash)
    ret = {}
    for line in contents.splitlines():
        if not line:
            continue
        try:
            token, rank = line.split()
            ret[base64.b64decode(token)] = int(rank)
        except Exception as e:
            raise ValueError(f"Error parsing line {line!r} in {tiktoken_bpe_file}") from e
    return ret

## **tiktoken/tiktoken/<span style='color:black'> **model.py** </span>**

This module maps model names (or model-name prefixes) to the correct tokenizer/encoding and provides helpers to return the encoding name or the Encoding object for a given model. Select the correct tokenizer/encoding for a model name (including versioned names) so text is tokenized correctly.

Tokenization must match the model‚Äôs expected encoding. Using the wrong encoding produces wrong token counts, broken prompt handling, misaligned special tokens, wrong truncation, and can break generation or billing/usage calculations. This module centralizes and automates that mapping so callers can reliably obtain the exact encoding the model expects ‚Äî including for versioned model names ‚Äî without needing manual lookup every time.

Maps model names (including versioned names via prefixes) to the exact tokenizer encoding and returns the Encoding object ‚Äî ensuring text is tokenized the way the model expects.


In [None]:
from __future__ import annotations

from .core import Encoding
from .registry import get_encoding

# TODO: these will likely be replaced by an API endpoint
MODEL_PREFIX_TO_ENCODING: dict[str, str] = {
    "o1-": "o200k_base",
    "o3-": "o200k_base",
    "o4-mini-": "o200k_base",
    # chat
    "gpt-5-": "o200k_base",
    "gpt-4.5-": "o200k_base",
    "gpt-4.1-": "o200k_base",
    "chatgpt-4o-": "o200k_base",
    "gpt-4o-": "o200k_base",  # e.g., gpt-4o-2024-05-13
    "gpt-4-": "cl100k_base",  # e.g., gpt-4-0314, etc., plus gpt-4-32k
    "gpt-3.5-turbo-": "cl100k_base",  # e.g, gpt-3.5-turbo-0301, -0401, etc.
    "gpt-35-turbo-": "cl100k_base",  # Azure deployment name
    "gpt-oss-": "o200k_harmony",
    # fine-tuned
    "ft:gpt-4o": "o200k_base",
    "ft:gpt-4": "cl100k_base",
    "ft:gpt-3.5-turbo": "cl100k_base",
    "ft:davinci-002": "cl100k_base",
    "ft:babbage-002": "cl100k_base",
}

MODEL_TO_ENCODING: dict[str, str] = {
    # reasoning
    "o1": "o200k_base",
    "o3": "o200k_base",
    "o4-mini": "o200k_base",
    # chat
    "gpt-5": "o200k_base",
    "gpt-4.1": "o200k_base",
    "gpt-4o": "o200k_base",
    "gpt-4": "cl100k_base",
    "gpt-3.5-turbo": "cl100k_base",
    "gpt-3.5": "cl100k_base",  # Common shorthand
    "gpt-35-turbo": "cl100k_base",  # Azure deployment name
    # base
    "davinci-002": "cl100k_base",
    "babbage-002": "cl100k_base",
    # embeddings
    "text-embedding-ada-002": "cl100k_base",
    "text-embedding-3-small": "cl100k_base",
    "text-embedding-3-large": "cl100k_base",
    # DEPRECATED MODELS
    # text (DEPRECATED)
    "text-davinci-003": "p50k_base",
    "text-davinci-002": "p50k_base",
    "text-davinci-001": "r50k_base",
    "text-curie-001": "r50k_base",
    "text-babbage-001": "r50k_base",
    "text-ada-001": "r50k_base",
    "davinci": "r50k_base",
    "curie": "r50k_base",
    "babbage": "r50k_base",
    "ada": "r50k_base",
    # code (DEPRECATED)
    "code-davinci-002": "p50k_base",
    "code-davinci-001": "p50k_base",
    "code-cushman-002": "p50k_base",
    "code-cushman-001": "p50k_base",
    "davinci-codex": "p50k_base",
    "cushman-codex": "p50k_base",
    # edit (DEPRECATED)
    "text-davinci-edit-001": "p50k_edit",
    "code-davinci-edit-001": "p50k_edit",
    # old embeddings (DEPRECATED)
    "text-similarity-davinci-001": "r50k_base",
    "text-similarity-curie-001": "r50k_base",
    "text-similarity-babbage-001": "r50k_base",
    "text-similarity-ada-001": "r50k_base",
    "text-search-davinci-doc-001": "r50k_base",
    "text-search-curie-doc-001": "r50k_base",
    "text-search-babbage-doc-001": "r50k_base",
    "text-search-ada-doc-001": "r50k_base",
    "code-search-babbage-code-001": "r50k_base",
    "code-search-ada-code-001": "r50k_base",
    # open source
    "gpt2": "gpt2",
    "gpt-2": "gpt2",  # Maintains consistency with gpt-4
}


def encoding_name_for_model(model_name: str) -> str:
    """Returns the name of the encoding used by a model.

    Raises a KeyError if the model name is not recognised.
    """
    encoding_name = None
    if model_name in MODEL_TO_ENCODING:
        encoding_name = MODEL_TO_ENCODING[model_name]
    else:
        # Check if the model matches a known prefix
        # Prefix matching avoids needing library updates for every model version release
        # Note that this can match on non-existent models (e.g., gpt-3.5-turbo-FAKE)
        for model_prefix, model_encoding_name in MODEL_PREFIX_TO_ENCODING.items():
            if model_name.startswith(model_prefix):
                return model_encoding_name

    if encoding_name is None:
        raise KeyError(
            f"Could not automatically map {model_name} to a tokeniser. "
            "Please use `tiktoken.get_encoding` to explicitly get the tokeniser you expect."
        ) from None

    return encoding_name


def encoding_for_model(model_name: str) -> Encoding:
    """Returns the encoding used by a model.

    Raises a KeyError if the model name is not recognised.
    """
    return get_encoding(encoding_name_for_model(model_name))

## **tiktoken/tiktoken/<span style='color:black'> **py.typed** </span>**

## **tiktoken/tiktoken/<span style='color:black'> **registry.py** </span>**

This module discovers encoding plugins, constructs Encoding instances on demand, caches them, and exposes functions to get or list available encodings in a thread-safe, lazy, and idempotent way.

**One-liner**: ‚ÄúDiscover encoding plugins, build and cache Encoding objects on demand, and provide safe lookups for callers.‚Äù

**Why this matters**

- Keeps the core tokenizer library extensible: third-party or optional encoding implementations can be packaged as plugins (under tiktoken_ext) and automatically discovered without modifying core code.
- Saves work and memory by constructing each Encoding only once (lazy instantiation + caching).
- Safe in multi-threaded programs thanks to locking and double-checked patterns ‚Äî avoids race conditions and duplicate construction.
- Provides clear, helpful errors when plugins are missing, duplicated, or malformed so users can diagnose install/config problems quickly.

**Important design choices & edge cases**

- Lazy + cached discovery and construction: avoids importing all plugins or building every Encoding at startup; constructs only what is used.
- Double-checked locking: reduces lock contention for the common fast path (ENCODINGS cache hit) while remaining safe when initializing resources.
- Idempotent failure handling: if plugin discovery fails, ENCODING_CONSTRUCTORS is reset to None and the exception is propagated. This avoids leaving a half-initialized broken state and allows retries.
- Duplicate name detection: prevents two plugins from claiming the same encoding name, which would create ambiguous behavior at runtime.
- Helpful errors: get_encoding gives the list of found plugins and tiktoken version to help debugging installation/version mismatches.

In [None]:
from __future__ import annotations

import functools
import importlib
import pkgutil
import threading
from typing import Any, Callable, Sequence

import tiktoken_ext

import tiktoken
from tiktoken.core import Encoding

_lock = threading.RLock()
ENCODINGS: dict[str, Encoding] = {}
ENCODING_CONSTRUCTORS: dict[str, Callable[[], dict[str, Any]]] | None = None

**What it does**
- Uses pkgutil.iter_modules over tiktoken_ext.__path__ to list submodule names inside the tiktoken_ext namespace package.
- Returns a list of full module names (e.g. tiktoken_ext.some_plugin).

**Why it matters**
- Fast, safe discovery of installed plugin modules without importing all of them immediately.
- Keeps plugin discovery efficient (namespace packages + pkgutil are lightweight and work well with editable installs).

**Notes**
- Decorated with functools.lru_cache, so it runs discovery once and caches the list for repeated calls.

In [None]:
@functools.lru_cache
def _available_plugin_modules() -> Sequence[str]:
    # tiktoken_ext is a namespace package
    # submodules inside tiktoken_ext will be inspected for ENCODING_CONSTRUCTORS attributes
    # - we use namespace package pattern so `pkgutil.iter_modules` is fast
    # - it's a separate top-level package because namespace subpackages of non-namespace
    #   packages don't quite do what you want with editable installs
    mods = []
    plugin_mods = pkgutil.iter_modules(tiktoken_ext.__path__, tiktoken_ext.__name__ + ".")
    for _, mod_name, _ in plugin_mods:
        mods.append(mod_name)
    return mods

**What it does**
- Thread-safely initializes ENCODING_CONSTRUCTORS if not already initialized.
- Iterates all plugin modules from _available_plugin_modules().
- Imports each plugin module and expects the module to define ENCODING_CONSTRUCTORS.
- Validates no duplicate encoding names across plugins; stores constructors in ENCODING_CONSTRUCTORS.
- On any exception during discovery, resets ENCODING_CONSTRUCTORS to None and re-raises (idempotent error behavior).

**Why it matters**
- Centralizes and validates plugin discovery logic.
- Prevents ambiguous constructors (duplicate names) which would cause unpredictable behavior later.
- The idempotent failure (reset to None) ensures subsequent calls can retry discovery rather than remaining in a broken partial state.

**Concurrency pattern**
- Uses _lock and a check-then-act approach (double-checked locking) so only one thread performs discovery and others wait/see results.

In [None]:

def _find_constructors() -> None:
    global ENCODING_CONSTRUCTORS
    with _lock:
        if ENCODING_CONSTRUCTORS is not None:
            return
        ENCODING_CONSTRUCTORS = {}

        try:
            for mod_name in _available_plugin_modules():
                mod = importlib.import_module(mod_name)
                try:
                    constructors = mod.ENCODING_CONSTRUCTORS
                except AttributeError as e:
                    raise ValueError(
                        f"tiktoken plugin {mod_name} does not define ENCODING_CONSTRUCTORS"
                    ) from e
                for enc_name, constructor in constructors.items():
                    if enc_name in ENCODING_CONSTRUCTORS:
                        raise ValueError(
                            f"Duplicate encoding name {enc_name} in tiktoken plugin {mod_name}"
                        )
                    ENCODING_CONSTRUCTORS[enc_name] = constructor
        except Exception:
            # Ensure we idempotently raise errors
            ENCODING_CONSTRUCTORS = None
            raise

**What it does**
1. Validates encoding_name is a str.
2.	Checks ENCODINGS cache ‚Äî if present, returns immediately (fast path, no lock).
3.	Acquires _lock and re-checks cache (avoid race).
4.	If constructors are not yet discovered, calls _find_constructors().
5.	Looks up encoding_name in ENCODING_CONSTRUCTORS. If not found raises ValueError with helpful diagnostics (which plugins were found and what tiktoken version is installed).
6.	Calls the constructor to get kwargs, constructs Encoding(**kwargs), caches it in ENCODINGS, and returns it.

**Why it matters**
- The main public accessor for getting a usable Encoding. Consumers call this to get an object with .encode()/.decode() etc.
- Combines lazy discovery + lazy construction + caching for efficiency.
- Provides clear errors when an encoding isn‚Äôt available or when plugin/config issues exist.
- Thread-safe: two parallel requests for the same encoding won‚Äôt construct two different Encoding instances.

**Behavioral guarantees**
- Returns the same Encoding instance for repeated calls with the same name (identity preserved).
- Raises early and informatively when input is wrong type or encoding unavailable.

In [None]:
def get_encoding(encoding_name: str) -> Encoding:
    if not isinstance(encoding_name, str):
        raise ValueError(f"Expected a string in get_encoding, got {type(encoding_name)}")

    if encoding_name in ENCODINGS:
        return ENCODINGS[encoding_name]

    with _lock:
        if encoding_name in ENCODINGS:
            return ENCODINGS[encoding_name]

        if ENCODING_CONSTRUCTORS is None:
            _find_constructors()
            assert ENCODING_CONSTRUCTORS is not None

        if encoding_name not in ENCODING_CONSTRUCTORS:
            raise ValueError(
                f"Unknown encoding {encoding_name}.\n"
                f"Plugins found: {_available_plugin_modules()}\n"
                f"tiktoken version: {tiktoken.__version__} (are you on latest?)"
            )

        constructor = ENCODING_CONSTRUCTORS[encoding_name]
        enc = Encoding(**constructor())
        ENCODINGS[encoding_name] = enc
        return enc

**What it does**
- Ensures constructors are discovered (via _find_constructors()), then returns a list of available encoding names (keys of ENCODING_CONSTRUCTORS).

**Why it matters**
- Provides a programmatic way to inspect which encodings are available (helpful for diagnostics, autocompletion, or UI lists).
- Respects lazy discovery: if discovery hasn‚Äôt run yet it will run now and populate constructors.

In [None]:
def list_encoding_names() -> list[str]:
    with _lock:
        if ENCODING_CONSTRUCTORS is None:
            _find_constructors()
            assert ENCODING_CONSTRUCTORS is not None
        return list(ENCODING_CONSTRUCTORS)

# **tiktoken/<span style='color: #0e90b1ff;'> **tiktoken_ext** </span>**

**Overall responsibility (short)**

- This module declares several named token-encoding constructors (**GPT-2, r50k, p50k, cl100k, o200k** variants) ‚Äî each returns the data (pat_str, BPE ranks / mergeable ranks, special token id map, vocab size hint) needed to create a tokenizer compatible with a particular model/encoding.

**Why that matters**

- Tokenization is the input interface to language models. Precise regexes, BPE merge ranks, and reserved token IDs ensure deterministic encoding/decoding, cross-implementation compatibility, correct model inputs, and security (hash checks). Without these configurations models would get different token streams and produce incorrect or irreproducible outputs.

**one-liner summary**

- A registry of ready-to-use tokenizer configurations (regex patterns, BPE ranks, and special token mappings) for multiple model encodings so tiktoken can build deterministic, model-compatible tokenizers.

In [1]:
from tiktoken.load import data_gym_to_mergeable_bpe_ranks, load_tiktoken_bpe

ENDOFTEXT = "<|endoftext|>"
FIM_PREFIX = "<|fim_prefix|>"
FIM_MIDDLE = "<|fim_middle|>"
FIM_SUFFIX = "<|fim_suffix|>"
ENDOFPROMPT = "<|endofprompt|>"

# The pattern in the original GPT-2 release is:
# r"""'s|'t|'re|'ve|'m|'ll|'d| ?[\p{L}]+| ?[\p{N}]+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+"""
# This is equivalent, but executes faster:
r50k_pat_str = (
    r"""'(?:[sdmt]|ll|ve|re)| ?\p{L}++| ?\p{N}++| ?[^\s\p{L}\p{N}]++|\s++$|\s+(?!\S)|\s"""
)


def gpt2():
    mergeable_ranks = data_gym_to_mergeable_bpe_ranks(
        vocab_bpe_file="https://openaipublic.blob.core.windows.net/gpt-2/encodings/main/vocab.bpe",
        encoder_json_file="https://openaipublic.blob.core.windows.net/gpt-2/encodings/main/encoder.json",
        vocab_bpe_hash="1ce1664773c50f3e0cc8842619a93edc4624525b728b188a9e0be33b7726adc5",
        encoder_json_hash="196139668be63f3b5d6574427317ae82f612a97c5d1cdaf36ed2256dbf636783",
    )
    return {
        "name": "gpt2",
        "explicit_n_vocab": 50257,
        "pat_str": r50k_pat_str,
        "mergeable_ranks": mergeable_ranks,
        "special_tokens": {ENDOFTEXT: 50256},
    }


def r50k_base():
    mergeable_ranks = load_tiktoken_bpe(
        "https://openaipublic.blob.core.windows.net/encodings/r50k_base.tiktoken",
        expected_hash="306cd27f03c1a714eca7108e03d66b7dc042abe8c258b44c199a7ed9838dd930",
    )
    return {
        "name": "r50k_base",
        "explicit_n_vocab": 50257,
        "pat_str": r50k_pat_str,
        "mergeable_ranks": mergeable_ranks,
        "special_tokens": {ENDOFTEXT: 50256},
    }


def p50k_base():
    mergeable_ranks = load_tiktoken_bpe(
        "https://openaipublic.blob.core.windows.net/encodings/p50k_base.tiktoken",
        expected_hash="94b5ca7dff4d00767bc256fdd1b27e5b17361d7b8a5f968547f9f23eb70d2069",
    )
    return {
        "name": "p50k_base",
        "explicit_n_vocab": 50281,
        "pat_str": r50k_pat_str,
        "mergeable_ranks": mergeable_ranks,
        "special_tokens": {ENDOFTEXT: 50256},
    }


def p50k_edit():
    mergeable_ranks = load_tiktoken_bpe(
        "https://openaipublic.blob.core.windows.net/encodings/p50k_base.tiktoken",
        expected_hash="94b5ca7dff4d00767bc256fdd1b27e5b17361d7b8a5f968547f9f23eb70d2069",
    )
    special_tokens = {ENDOFTEXT: 50256, FIM_PREFIX: 50281, FIM_MIDDLE: 50282, FIM_SUFFIX: 50283}
    return {
        "name": "p50k_edit",
        "pat_str": r50k_pat_str,
        "mergeable_ranks": mergeable_ranks,
        "special_tokens": special_tokens,
    }


def cl100k_base():
    mergeable_ranks = load_tiktoken_bpe(
        "https://openaipublic.blob.core.windows.net/encodings/cl100k_base.tiktoken",
        expected_hash="223921b76ee99bde995b7ff738513eef100fb51d18c93597a113bcffe865b2a7",
    )
    special_tokens = {
        ENDOFTEXT: 100257,
        FIM_PREFIX: 100258,
        FIM_MIDDLE: 100259,
        FIM_SUFFIX: 100260,
        ENDOFPROMPT: 100276,
    }
    return {
        "name": "cl100k_base",
        "pat_str": r"""'(?i:[sdmt]|ll|ve|re)|[^\r\n\p{L}\p{N}]?+\p{L}++|\p{N}{1,3}+| ?[^\s\p{L}\p{N}]++[\r\n]*+|\s++$|\s*[\r\n]|\s+(?!\S)|\s""",
        "mergeable_ranks": mergeable_ranks,
        "special_tokens": special_tokens,
    }


def o200k_base():
    mergeable_ranks = load_tiktoken_bpe(
        "https://openaipublic.blob.core.windows.net/encodings/o200k_base.tiktoken",
        expected_hash="446a9538cb6c348e3516120d7c08b09f57c36495e2acfffe59a5bf8b0cfb1a2d",
    )
    special_tokens = {ENDOFTEXT: 199999, ENDOFPROMPT: 200018}
    # This regex could be made more efficient. If I was the one working on this encoding, I would
    # have done a few other things differently too, e.g. I think you can allocate tokens more
    # efficiently across languages.
    pat_str = "|".join(
        [
            r"""[^\r\n\p{L}\p{N}]?[\p{Lu}\p{Lt}\p{Lm}\p{Lo}\p{M}]*[\p{Ll}\p{Lm}\p{Lo}\p{M}]+(?i:'s|'t|'re|'ve|'m|'ll|'d)?""",
            r"""[^\r\n\p{L}\p{N}]?[\p{Lu}\p{Lt}\p{Lm}\p{Lo}\p{M}]+[\p{Ll}\p{Lm}\p{Lo}\p{M}]*(?i:'s|'t|'re|'ve|'m|'ll|'d)?""",
            r"""\p{N}{1,3}""",
            r""" ?[^\s\p{L}\p{N}]+[\r\n/]*""",
            r"""\s*[\r\n]+""",
            r"""\s+(?!\S)""",
            r"""\s+""",
        ]
    )
    return {
        "name": "o200k_base",
        "pat_str": pat_str,
        "mergeable_ranks": mergeable_ranks,
        "special_tokens": special_tokens,
    }


def o200k_harmony():
    base_enc = o200k_base()
    name = "o200k_harmony"
    pat_str = base_enc["pat_str"]
    mergeable_ranks = base_enc["mergeable_ranks"]
    special_tokens = {
        **base_enc["special_tokens"],
        "<|startoftext|>": 199998,
        "<|endoftext|>": 199999,
        "<|reserved_200000|>": 200000,
        "<|reserved_200001|>": 200001,
        "<|return|>": 200002,
        "<|constrain|>": 200003,
        "<|reserved_200004|>": 200004,
        "<|channel|>": 200005,
        "<|start|>": 200006,
        "<|end|>": 200007,
        "<|message|>": 200008,
        "<|reserved_200009|>": 200009,
        "<|reserved_200010|>": 200010,
        "<|reserved_200011|>": 200011,
        "<|call|>": 200012,
    } | {f"<|reserved_{i}|>": i for i in range(200013, 201088)}
    return {
        "name": name,
        "pat_str": pat_str,
        "mergeable_ranks": mergeable_ranks,
        "special_tokens": special_tokens,
    }


ENCODING_CONSTRUCTORS = {
    "gpt2": gpt2,
    "r50k_base": r50k_base,
    "p50k_base": p50k_base,
    "p50k_edit": p50k_edit,
    "cl100k_base": cl100k_base,
    "o200k_base": o200k_base,
    "o200k_harmony": o200k_harmony,
}