In [2]:
from datasets import load_dataset
from collections import Counter
from tokenizers import ByteLevelBPETokenizer

# Load a dataset
dataset = load_dataset('wmt14', 'de-en')
train_dataset = dataset['train']
valid_dataset = dataset['validation']

# Accessing an example
print(train_dataset[0])

{'translation': {'de': 'Wiederaufnahme der Sitzungsperiode', 'en': 'Resumption of the session'}}


In [3]:
class Indexer():
    def __init__(self):
        self.objs_to_ints = {}
        self.ints_to_objs = {}
    
    def add_and_get_index(self, object, add=True):
        """
        Adds the object to the index if it isn't present, always returns a nonnegative index
        :param object: object to look up or add
        :param add: True by default, False if we shouldn't add the object. If False, equivalent to index_of.
        :return: The index of the object
        """
        if not add:
            return self.index_of(object)
        if (object not in self.objs_to_ints):
            new_idx = len(self.objs_to_ints)
            self.objs_to_ints[object] = new_idx
            self.ints_to_objs[new_idx] = object
        return self.objs_to_ints[object]
    
    def index_of(self, object):
        """
        :param object: object to look up
        :return: Returns -1 if the object isn't present, index otherwise
        """
        if (object not in self.objs_to_ints):
            return -1
        else:
            return self.objs_to_ints[object]
    
    def get_object(self, index):
        """
        :param index: integer index to look up
        :return: Returns the object corresponding to the particular index or None if not found
        """
        if (index not in self.ints_to_objs):
            return None
        else:
            return self.ints_to_objs[index]

    def __repr__(self):
        return str([str(self.get_object(i)) for i in range(0, len(self.objs_to_ints))])
    
    def __len__(self):
        return len(self.objs_to_ints)

In [21]:
class BPE():
    def __init__(self, vocab):
        self.vocab = vocab
        
    def get_pairs(self, word):
        """
        Get the character pairs in a word.
        params word: string word
        return: Returns all the possible character set of pairs in a word.
        """
        pairs = set()
        prev_char = word[0]
        for char in word[1:]:
            pairs.add((prev_char, char))
            prev_char = char
        return pairs

    def count_pairs(self, D):
        """
        Count the number of character-pairs(bigrams) in the text corpus
        params corpus: list of sentences
        Return dictionary of pairs and their counts in a corpus.
        """
        pairs = Counter()
        for word in D:
            if len(word) > 1:
                word_pairs = self.get_pairs(word)
                for pair in word_pairs:
                    pairs[pair] += 1
        return pairs

    def merge_vocab(self, pair, D, new_index):
        """
        Replace the bigram with the highest count with the new index
        params pair: bigram with highest count
        params v_in: corpus
        params new_index: new vocabulary index
        return: corpus with new index replacing bigram
        """
        v_out = []
        for word in D:
            sen = []
            i = 0
            while i < len(word):
                if word[i] == pair[0] and i < len(word)-1 and word[i+1] == pair[1]:
                    sen.append(new_index)
                    i += 2
                else:
                    sen.append(word[i])   
                    i += 1 
            v_out.append(sen)
        return v_out

    def build_vocab(self, corpus, length):
        """
        Build vocabulary of length length
        params D: list of sentences
        params vocab: Indexer
        params length: length of desired vocabulary
        return: vocab with desired length
        """
        # Build initial vocabulary
        print('Building initial vocab...')
        alphabetic_chars = [chr(i) for i in range(ord('a'), ord('z')+1)] + [chr(i) for i in range(ord('A'), ord('Z')+1)]
        digits = [str(i) for i in range(10)]
        punctuation = ['.', ',', '!', '?', "'", '-', ':', ';', '(', ')', '"', '/', '\\', '@', '#', '$', '%', '^', '&', '*', '_', '+', '=', '~', '`']

        for char in alphabetic_chars:
            self.vocab.add_and_get_index(char)
        for char in digits:
            self.vocab.add_and_get_index(char)
        for char in punctuation:
            self.vocab.add_and_get_index(char)
        
        for sentence in corpus:
            for word in sentence:
                self.vocab.add_and_get_index(word)
                
        # build initial tokenised D
        print("Length of vocab: ", len(self.vocab))
        print('Building initial D tokens...')
        D = []
        for sentence in corpus:
            words = []
            for word in sentence:
                if word.isalpha():
                    words.append(self.vocab.index_of(word))
                else:
                    D.append(words)
                    words = []
                    words.append(self.vocab.index_of(word))
            D.append(words)
        print('Adding new vocab...')
        while len(self.vocab) < length:
            try:
                most_common_bigram = self.count_pairs(D).most_common(1)[0]
                if most_common_bigram[1] <= 2:
                    print('Not enough bigrams to add new vocab')
                    break
                most_common_pair = most_common_bigram[0]
            except IndexError:
                
                break
            pair = ''.join([self.vocab.get_object(most_common_pair[0]), self.vocab.get_object(most_common_pair[1])])
            self.vocab.add_and_get_index(pair)
            D = self.merge_vocab(most_common_pair, D, self.vocab.index_of(pair))
        print(f"Completed building vocab... Vocab length: {len(self.vocab)}")
    
    def tokenize_sentence(self, sentence):
        tokenized_sentence = []
        i = 0
        while i < len(sentence):
            found_long_token = False
            for j in range(len(sentence), i, -1):
                token = sentence[i:j]
                if self.vocab.index_of(token) != -1:
                    tokenized_sentence.append(token)
                    i = j
                    found_long_token = True
                    break
            if not found_long_token:
                tokenized_sentence.append(sentence[i])
                i += 1
        return tokenized_sentence
    
    def __repr__(self) -> str:
        return f"{self.vocab} \n Length of vocab: {len(self.vocab)}"

In [5]:
desired_vocab_size = 1000
vocab = Indexer()
train_corpus = [x['de'] for x in train_dataset[:5000]['translation']]

bpe = BPE(vocab)
bpe.build_vocab(train_corpus, desired_vocab_size)
print(bpe)

Building initial vocab...
Length of vocab:  107
Building initial D tokens...


Adding new vocab...
Completed building vocab... Vocab length: 1000
['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '.', ',', '!', '?', "'", '-', ':', ';', '(', ')', '"', '/', '\\', '@', '#', '$', '%', '^', '&', '*', '_', '+', '=', '~', '`', ' ', 'ä', 'ü', 'ß', 'ö', 'í', 'á', 'Ä', 'ó', 'Ö', 'Ü', 'º', 'é', '\xad', 'ø', 'ã', 'ô', 'è', 'ç', 'ê', 'en', 'er', ' d', 'ch', 'ei', 'ie', 'un', 'ich', 'st', ' die', 'in', ' w', ' a', 'on', 'ge', 'sch', 'ein', 'ung', ' der', ' un', ' s', 'es', 'an', 'or', ' da', ' v', ' und', 'ti', 'zu', ' ein', 'll', 'ten', ' m', ' b', 'te', 'gen', ' n', 'den', 'ir', 'ra', ' zu', 'it', ' i', ' au', 'ss', 'em', ' S', ' A', ' B', 'eit', 'ar', 'ur', ' in', 'mm', 'lich', 'hr', ' h', ' E', ' f', ' K', 'der', 'al

In [9]:
tokenizer = ByteLevelBPETokenizer()
tokenizer.train_from_iterator(train_corpus, vocab_size=1000, min_frequency=1)

In [10]:
val_corpus = [x['de'] for x in valid_dataset[:1000]['translation']]

for i in range(len(val_corpus)):
    print(f"Text {i + 1}:")
    encoded = bpe.tokenize_sentence(val_corpus[i])
    print("Custom BPE:", encoded)
    encoded = tokenizer.encode(val_corpus[i])
    print("Tokenizer BPE:", encoded.tokens)
    print()

Text 1:
Custom BPE: ['Ein', 'e', ' re', 'p', 'u', 'bl', 'ik', 'an', 'ische', ' Stra', 'teg', 'ie', ',', ' um', ' der', ' W', 'ie', 'der', 'w', 'ahl', ' von', ' O', 'b', 'am', 'a', ' ent', 'ge', 'gen', 'zu', 't', 'reten']
Tokenizer BPE: ['Ein', 'e', 'Ġre', 'p', 'u', 'bl', 'ik', 'an', 'ische', 'ĠStra', 'te', 'g', 'ie', ',', 'Ġum', 'Ġder', 'ĠW', 'ie', 'der', 'w', 'ahl', 'Ġvon', 'ĠO', 'b', 'am', 'a', 'Ġent', 'ge', 'gen', 'zu', 't', 'reten']

Text 2:
Custom BPE: ['Die', ' F', 'ühr', 'ungs', 'k', 'rä', 'ft', 'e', ' der', ' Re', 'p', 'u', 'bl', 'ik', 'an', 'er', ' re', 'cht', 'fe', 'r', 'tigen', ' ihre', ' P', 'olitik', ' mit', ' der', ' N', 'otwend', 'igkeit', ',', ' den', ' W', 'ahl', 'be', 't', 'ru', 'g', ' zu', ' be', 'k', 'ä', 'mpf', 'en', '.']
Tokenizer BPE: ['Die', 'ĠF', 'Ã¼hr', 'ungs', 'k', 'rÃ¤', 'f', 'te', 'Ġder', 'ĠRe', 'p', 'u', 'bl', 'ik', 'an', 'er', 'Ġre', 'cht', 'f', 'er', 'tigen', 'Ġihre', 'ĠP', 'olitik', 'Ġmit', 'Ġder', 'ĠN', 'o', 'tw', 'end', 'igkeit', ',', 'Ġden', 'ĠW', 'a

Tokenizer BPE: ['W', 'enn', 'Ġwir', 'Ġall', 'er', 'ding', 's', 'Ġbei', 'Ġeinem', 'ĠH', 'ei', 'm', 'sp', 'iel', 'Ġgegen', 'ĠT', 'ro', 'y', 'es', 'Ġ', '4', '-', '0', 'Ġgew', 'innen', ',', 'Ġund', 'Ġman', 'Ġi', 'mm', 'er', 'Ġnoch', 'Ġ', 'et', 'w', 'as', 'Ġf', 'ind', 'et', ',', 'Ġwas', 'Ġman', 'Ġuns', 'Ġvor', 'wer', 'fen', 'Ġkann', ',', 'Ġist', 'Ġdas', 'Ġmit', 'ĠSicherheit', 'Ġ', 'et', 'w', 'as', 'Ġf', 'ru', 'st', 'r', 'ier', 'end', '.']

Text 307:
Custom BPE: ['M', 'an', ' f', 'rag', 't', ' sich', ',', ' was', ' die', ' Le', 'ut', 'e', ' noch', ' er', 'w', 'ar', 'ten', '.']
Tokenizer BPE: ['M', 'an', 'Ġf', 'ra', 'gt', 'Ġsich', ',', 'Ġwas', 'Ġdie', 'ĠL', 'eu', 'te', 'Ġnoch', 'Ġer', 'w', 'ar', 'ten', '.']

Text 308:
Custom BPE: ['Wir', ' kön', 'nt', 'en', ' n', 'ie', 'mal', 's', ' jed', 'es', ' W', 'och', 'en', 'end', 'e', ' mit', ' ', '4', '-', '0', ' gew', 'innen', '.']
Tokenizer BPE: ['Wir', 'ĠkÃ¶n', 'n', 'ten', 'Ġn', 'ie', 'mal', 's', 'Ġjed', 'es', 'ĠW', 'och', 'en', 'en', 'de', 'Ġmit',

In [20]:
encoded = bpe.tokenize_sentence(val_corpus[0])
print("Custom BPE:", encoded)
encoded = tokenizer.encode(val_corpus[0])
print("Tokenizer BPE:", encoded.tokens)
print()

Custom BPE: ['Ein', 'e', ' re', 'p', 'u', 'bl', 'ik', 'an', 'ische', ' Stra', 'teg', 'ie', ',', ' um', ' der', ' W', 'ie', 'der', 'w', 'ahl', ' von', ' O', 'b', 'am', 'a', ' ent', 'ge', 'gen', 'zu', 't', 'reten']
Tokenizer BPE: ['Ein', 'e', 'Ġre', 'p', 'u', 'bl', 'ik', 'an', 'ische', 'ĠStra', 'te', 'g', 'ie', ',', 'Ġum', 'Ġder', 'ĠW', 'ie', 'der', 'w', 'ahl', 'Ġvon', 'ĠO', 'b', 'am', 'a', 'Ġent', 'ge', 'gen', 'zu', 't', 'reten']

