# Tutoriel Andrej Karpathy - révision

[![Youtube video cover](assets/img/yt_tokenizer_cover.webp)](https://www.youtube.com/watch?v=zduSFxRajkE)

In [51]:
with open("data/musiques/nekfeu/niquelesclones.txt", "r") as f:
    texte = f.read()

In [52]:
sample = texte[:100]
print(sample)

[Paroles de "Nique les clones Part. II"]

[Intro]
Je ne vois plus que des clones, ça a commencé à l'


In [53]:
len(sample)

100

In [54]:
sample_tokens = list(sample.encode("utf-8"))
len(sample_tokens)

103

In [55]:
from collections import Counter

def count_pairs(tokens: list):
    return Counter(zip(tokens, tokens[1:]))

counts = count_pairs(sample_tokens)
counts.most_common(1)

[((115, 32), 6)]

In [56]:
def merge(tokens: list, pair: tuple[int, int], ind: int):
    compressed_tokens = []

    i = 0
    while i < len(tokens):
        if i < (len(tokens) - 1) and (tokens[i], tokens[i + 1]) == pair:
            compressed_tokens.append(ind)
            i += 2
        else:
            compressed_tokens.append(tokens[i])
            i += 1

    return compressed_tokens


merge([1, 2, 3], (1, 3), 8)

[1, 2, 3]

In [63]:
class BPETokenizer:
    def __init__(self):
        self.merges: dict[tuple[int, int], int] = {}

    @property
    def vocab_size(self) -> int:
        return 256 + len(self.merges)

    @property
    def vocab(self):
        try:
            if len(self.__vocab) != self.vocab_size:
                return self.__vocab
        except AttributeError:
            pass

        self.__vocab = {i: bytes([i]) for i in range(256)}
        for (i1, i2), i in self.merges.items():
            self.__vocab[i] = bytes(self.vocab[i1] + self.vocab[i2])

        return self.__vocab

    @staticmethod
    def get_top_pair(tokens: list[int]):
        if len(tokens) < 2:
            return None

        counts = Counter(zip(tokens, tokens[1:]))

        pair, _ = counts.most_common(1)[0]
        return pair

    @staticmethod
    def merge(tokens: list[int], pair: tuple[int, int], ind: int):
        compressed_tokens = []

        i = 0
        while i < len(tokens):
            if i < (len(tokens) - 1) and (tokens[i], tokens[i + 1]) == pair:
                compressed_tokens.append(ind)
                i += 2
            else:
                compressed_tokens.append(tokens[i])
                i += 1

        return compressed_tokens

    def train(self, text: str, vocab_size: int):
        tokens = list(text.encode("utf-8"))

        while self.vocab_size < vocab_size:
            pair = self.get_top_pair(tokens)

            if pair is None:
                break

            tokens = self.merge(tokens, pair, self.vocab_size)
            self.merges[pair] = self.vocab_size

    def encode(self, text: str):
        tokens = list(text.encode("utf-8"))

        while len(tokens) >= 2:
            pair = min(
                zip(tokens, tokens[1:]), key=lambda p: self.merges.get(p, float("inf"))
            )

            if pair in self.merges:
                tokens = self.merge(tokens, pair, self.merges[pair])
            else:
                break

        return tokens

    def decode(self, tokens: list[int]):
        btext = b"".join([self.vocab[i] for i in tokens])
        return btext.decode("utf-8", errors="replace")


tokenizer = BPETokenizer()
tokenizer.train(texte, 500)

In [64]:
tokens = tokenizer.encode("Bonjour comment ça va ? ")
print(tokens)
print(tokenizer.decode(tokens))


[66, 260, 326, 268, 300, 109, 321, 356, 272, 118, 272, 63, 32]
Bonjour comment ça va ? 


In [61]:
tokenizer.decode([128])

'�'

In [70]:
import random

random.choice({1, 2})

TypeError: 'set' object is not subscriptable

In [143]:
from typing import Iterable, Hashable
from itertools import chain, islice
from collections import Counter
from random import Random


class MarkovLM:
    def __init__(self, C: int, seed: int | None = None):
        self.C = C
        self.stats = {}
        self.vocab: list = []

        self._seed = seed
        self._random = Random(seed)

    def to_state_seq(self, seq: Iterable[Hashable]):
        offsets = []
        for i in range(self.C):
            offsets.append(chain.from_iterable([[None] * (self.C - i), seq]))

        return zip(zip(*offsets), seq)

    def fit(self, seq: Iterable[Hashable]):
        state_seq = self.to_state_seq(seq)

        self.stats = Counter(state_seq)
        self.vocab = list(set(seq).union(self.vocab))

    def generate(self, seq: list[Hashable], max_tokens: int):
        new_seq = []

        if len(seq) < self.C:
            current_state = tuple([*[None] * (self.C - len(seq)), *seq[-self.C :]])
        else:
            current_state = tuple(seq[-self.C :])

        for _ in range(max_tokens):

            probs = {}
            for (state, t), count in self.stats.items():
                if state == current_state:
                    probs[t] = probs.get(t, 0) + count

            if probs:
                probs_keys = list(probs.keys())
                probs_weights = list(probs.values())

                next_token = self._random.choices(probs_keys, weights=probs_weights)[0]
            else:
                next_token = self._random.choices(self.vocab)[0]

            new_seq.append(next_token)
            current_state = current_state[1:] + (next_token,)

        return new_seq


lm = MarkovLM(3)
lm.fit("   ab ac ad")

lm.generate("a", 10)


['c', 'c', ' ', 'd', 'c', ' ', 'c', 'b', 'c', 'b']

In [113]:
random.choices(["a", "b"], weights=[0, 0])

ValueError: Total of weights must be greater than zero

In [3]:
from collections import Counter, defaultdict

x = defaultdict(Counter)
x["a"]

Counter()

In [28]:
from typing import Hashable
from itertools import chain
from collections import Counter, defaultdict
from random import Random


class MarkovLM2:
    def __init__(self, C: int, vocab: str | list[Hashable], seed: int | None = None):
        self.C = C
        self.stats: dict[tuple, dict] = defaultdict(Counter)
        self.vocab = vocab

        self._seed = seed
        self._random = Random(seed)

    def to_state_seq(self, seq: str | list[Hashable]):
        offsets = []
        for i in range(self.C):
            offsets.append(chain.from_iterable([[None] * (self.C - i), seq]))

        return zip(zip(*offsets), seq)

    def fit(self, seq: str | list[Hashable]):
        for state, next_token in self.to_state_seq(seq):
            self.stats[state][next_token] += 1

    def generate(self, seq: str | list[Hashable], max_tokens: int):
        new_seq = []

        if len(seq) < self.C:
            current_state = tuple([*[None] * (self.C - len(seq)), *seq[-self.C :]])
        else:
            current_state = tuple(seq[-self.C :])

        for _ in range(max_tokens):
            if current_state in self.stats:
                counts = self.stats[current_state]
                next_token = self._random.choices(
                    list(counts.keys()), weights=list(counts.values()), k=1
                )[0]

            else:
                next_token = self._random.choice(self.vocab)

            new_seq.append(next_token)
            current_state = current_state[1:] + (next_token,)

        return new_seq


lm = MarkovLM2(10, " abcd")
lm.fit("   ab ac ad")
lm.generate("", 10)


[' ', ' ', ' ', 'a', 'b', ' ', 'a', 'c', ' ', 'a']