## Util Definitions


In [1]:
import random
import torch

def extract_test_and_vald_set(text, test_size = 0.1, validation_size = 0.1):
    total_length = len(text)
    test_length = int(total_length * test_size)
    val_length = int(total_length * validation_size)

    test_start_index = random.randint(0, total_length - test_length)
    test_end_index = test_start_index + test_length
    test_text = text[test_start_index:test_end_index]

    total_length_without_test = total_length - test_length
    val_start_index = random.randint(0, total_length_without_test - val_length)
    val_end_index = val_start_index + val_length
    validation_text = text[val_start_index:val_end_index]   

    train_text = text[:val_start_index] + text[val_end_index:]  

    return train_text, test_text, validation_text

def encode_vocab_to_index(vocabulary):
    stoi = {token:i for i, token in enumerate(vocabulary) }
    def encode(token_list):
        return [stoi[token] for token in token_list]
    return encode

def decode_index_to_vocab(vocabulary):
    itos = {i: token for i, token in enumerate(vocabulary)}
    def decode(index_list):
        return [itos[i] for i in index_list]
    return decode

def save_vocabulary_or_merges(vocabulary, file_path):
    with open(file_path, 'w', encoding='utf-8') as file:
        for token in vocabulary:
            file.write(f"{token}\n")

def save_document(text, file_path):
    with open(file_path, 'w', encoding='utf-8') as file:
        file.write(text)

def open_text_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        return file.read()

### Hypterparameter

In [5]:
batch_size = 4
block_size = 8

## Data Preprocessing

### BPE

In [2]:
from collections import defaultdict

class BPE():

    def __init__(self):
        pass

    def learner(corpus, merge_count=10):
        corpus = corpus.lower()
        words = [list(word) + ['_'] for word in corpus.split()]

        merges = []

        for m in range(merge_count):
            vocab = defaultdict(int)
            for word in words:
                for i in range(len(word) - 1):
                    pair = (word[i], word[i+1])
                    vocab[pair] += 1

            most_frequent = max(vocab, key=vocab.get)
            merges.append(most_frequent)

            new_token = ''.join(most_frequent)
            new_words = []
            for word in words:
                new_word = []
                i = 0
                while i < len(word):
                    # Merge durchführen
                    if i < len(word) - 1 and (word[i], word[i+1]) == most_frequent:
                        new_word.append(new_token)
                        i += 2
                    else:
                        new_word.append(word[i])
                        i += 1
                new_words.append(new_word)
            words = new_words  # Corpus aktualisieren

            print(f"Iteration {m+1}: merged {most_frequent}")

        vocabulary = set()
        for word in words:
            for token in word:
                vocabulary.add(token)
        #vocabulary = sorted(vocabulary)


        return merges, vocabulary

    def segmenter(corpus, merges):
        words = [list(word) + ['_'] for word in corpus.lower().split()]
        for merge in merges:
            new_token = ''.join(merge)
            new_words = []
            for word in words:
                new_word = []
                i = 0
                while i < len(word):
                    if i < len(word) - 1 and (word[i], word[i+1]) == merge:
                        new_word.append(new_token)
                        i += 2
                    else:
                        new_word.append(word[i])
                        i += 1
                new_words.append(new_word)
            words = new_words
            tokenised_corpus = [''.join(word).strip('_') for word in words]
            flat_tokens = [token for word in words for token in word if token != '_']
        return flat_tokens

    #text_path = 'data/shakespeare.txt'
    #corpus = open_text_file(text_path)

    #train_corpus, test_corpus = extract_test_set(corpus, test_size=0.1)
    #save_document(train_corpus, 'data/train_corpus.txt')
    #save_document(test_corpus, 'data/test_corpus.txt')

    #train_corpus = open_text_file('data/train_corpus.txt')

    #new_corpus, merges, tokens = learner(train_corpus, merge_count=200)
    #save_vocabulary(tokens, 'data/vocabulary.txt')



### Splitting Corpus in train, test, and validation

In [17]:
# text, test_text, validation_text = extract_test_and_vald_set(open_text_file('data/shakespeare.txt'))
train_corpus = open_text_file('data/Shakespeare_clean_train.txt')
test_corpus = open_text_file('data/Shakespeare_clean_test.txt')
validation_corpus = open_text_file('data/Shakespeare_clean_valid.txt')

### Training

In [None]:

# Take a look at the text
print("Training Text Sample:", train_corpus[:100])
# Take a look at the current characters that occur
chars = sorted(list(set(train_corpus)))
print("Init Vocabulary:", ''.join(chars))
print("Vocabulary size:", len(chars))

# Train the BPE model
merges, vocabulary = BPE.learner(train_corpus, merge_count=15)

# Save the vocabulary and merges
save_vocabulary_or_merges(vocabulary, 'data/vocabulary.txt')
save_vocabulary_or_merges(merges, 'data/merges.txt')

# Apply the Segmenter to the training corpus
tokenised_train_corpus = BPE.segmenter(train_corpus, merges)
tokenised_validation_corpus = BPE.segmenter(validation_corpus, merges)
tokenised_test_corpus = BPE.segmenter(test_corpus, merges)

print("Tokenised Training Corpus Sample:", tokenised_train_corpus[:100])  # Print first 100 tokens

# Encode and decode the vocabulary to a list of indices
encode_vocab = encode_vocab_to_index(vocabulary)
decode_vocab = decode_index_to_vocab(vocabulary)

# Encode and decode the training corpus
train_ids = encode_vocab(tokenised_train_corpus)
test_ids = encode_vocab(tokenised_test_corpus)
validation_ids = encode_vocab(tokenised_validation_corpus)

print("Encoded Training Corpus Sample:", train_ids[:100]) 
text =  decode_vocab(train_ids)
print("decoded Training Ids:", text[:100])  # Print first 100 decoded tokens




Training Text Sample: The Tragedy of Antony and Cleopatra Dramatis Personae MARK ANTONY OCTAVIUS CAESAR M. AEMILIUS LEPIDU
Init Vocabulary:  !&',-.12689:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz
Vocabulary size: 67
Iteration 1: merged ('e', '_')
Iteration 2: merged ('t', 'h')
Iteration 3: merged (',', '_')
Iteration 4: merged ('t', '_')
Iteration 5: merged ('s', '_')
Iteration 6: merged ('d', '_')
Iteration 7: merged ('a', 'n')
Iteration 8: merged ('e', 'r')
Iteration 9: merged ('o', 'u')
Iteration 10: merged ('i', 'n')
Iteration 11: merged ('o', '_')
Iteration 12: merged ('y', '_')
Iteration 13: merged ('e', 'n')
Iteration 14: merged ('.', '_')
Iteration 15: merged ('o', 'r')
Tokenised Training Corpus Sample: ['th', 'e_', 't', 'r', 'a', 'g', 'e', 'd', 'y_', 'o', 'f', 'an', 't', 'o', 'n', 'y_', 'an', 'd_', 'c', 'l', 'e', 'o', 'p', 'a', 't', 'r', 'a', 'd', 'r', 'a', 'm', 'a', 't', 'i', 's_', 'p', 'er', 's', 'o', 'n', 'a', 'e_', 'm', 'a', 'r', 'k', 'an', 't', 'o', 'n', 'y_',

In [None]:
# convert the training corpus to a tensor
train_data = torch.tensor(train_ids, dtype=torch.long)
test_data = torch.tensor(test_ids, dtype=torch.long)
validation_data = torch.tensor(validation_ids, dtype=torch.long)

print(train_data.shape, train_data.dtype)
print("Training Corpus Encoded Sample:", train_data[:100])  # Print first 100 encoded tokens

torch.Size([633285]) torch.int64
Training Corpus Encoded Sample: tensor([ 0, 10,  9, 12,  5, 32, 34, 13, 48, 53, 38,  1,  9, 53,  3, 48,  1, 40,
        36, 41, 34, 53, 30,  5,  9, 12,  5, 13, 12,  5, 22,  5,  9, 26, 23, 30,
        51,  7, 53,  3,  5, 10, 22,  5, 12, 49,  1,  9, 53,  3, 48, 53, 36,  9,
         5, 16, 26,  6, 23, 36,  5, 34,  7,  5, 12, 22, 43,  5, 34, 22, 26, 41,
        26,  6, 23, 41, 34, 30, 26, 13,  6, 23,  9, 12, 26,  6, 22, 16, 26, 12,
         7, 43,  7, 34, 54,  9,  6, 23, 30, 53])


In [21]:
input = train_data[:block_size]
expected_output = train_data[1:block_size+1]

for t in range(block_size):
    context = input[:t+1]
    target = expected_output[t] 
    print(f"Context: {context}, Target: {target}")


Context: tensor([0]), Target: 10
Context: tensor([ 0, 10]), Target: 9
Context: tensor([ 0, 10,  9]), Target: 12
Context: tensor([ 0, 10,  9, 12]), Target: 5
Context: tensor([ 0, 10,  9, 12,  5]), Target: 32
Context: tensor([ 0, 10,  9, 12,  5, 32]), Target: 34
Context: tensor([ 0, 10,  9, 12,  5, 32, 34]), Target: 13
Context: tensor([ 0, 10,  9, 12,  5, 32, 34, 13]), Target: 48


tensor([ 0, 10,  9,  ...,  6,  3, 18])

In [None]:
def get_batch(split):
    if split == 'train':
        data = train_data
    elif split == 'val':
        data = validation_data
    #elif split == 'test':
    #    data = test_ids
    else:
        raise ValueError("Invalid split. Choose from 'train', 'val', or 'test'.")

    intx = torch.randint(len(data) - block_size, (batch_size,))
    input = torch.stack([data[i:i+block_size] for i in intx])
    expected_output = torch.stack([data[i+1:i+block_size+1] for i in intx])

    return input, expected_output

inputMatrix, outputMatrix = get_batch('train')
print("Input Matrix Shape:", inputMatrix.shape)  
print("Input Matrix:", inputMatrix)
print("Output Matrix Shape:", outputMatrix.shape)
print("Output Matrix:", outputMatrix) 

### Testing