## Load data and split it into train and validation.

In [1]:
import json
import os
import glob
import tiktoken
import numpy as np

def extract_poems(folder_path):
    poem_lines = []
    for file_path in glob.glob(os.path.join(folder_path, '*.json')):
        with open(file_path, 'r', encoding='us-ascii') as file:
            data = json.load(file)
            for body in data:
                for stanza in body['body']:
                    for line in stanza:
                        poem_lines.append(line['text'])
    return poem_lines

folder_path = r'./corpusCzechVerse/ccv'  # Replace with your folder path
data = extract_poems(folder_path)
n = len(data)
text = '\n'.join(data)
train_text = '\n'.join(data[:int(n*0.9)])
val_text = '\n'.join(data[int(n*0.9):])

In [75]:
print("Sample:\n", text[0:100])

Sample:
 Rozpřádá večer sítě své 
a teď je noří ve kouzlo;
můj bože! a to srdce mé
zas na ten hrad mi uklouzl


## Character encoding

### Task: Get unique characters from text.

In [5]:
keys = sorted(set(list(text)))

### Task: Create mappings from characters to integers.

In [93]:
stoi = dict(zip(keys, [i for i in range(len(keys))]))
itos = dict((v,k) for k,v in stoi.items())
token_map = {'itos':itos, 'stoi':stoi, 'vocab_size':len(stoi)}
import pickle
pickle.dump(token_map, open("meta.pkl", "wb"))

### Create encode/decode

In [94]:
def encode(data: str) -> np.array:
    data_encoded = [stoi[c] for c in data]
    return np.array(data_encoded, dtype=np.uint16)

def decode(data: np.array) -> str:
    data_decoded = ''.join([itos[i] for i in data])
    return data_decoded

In [89]:
e = encode(train_text[0:100])
print(e)
d = decode(e)
print(d)

[ 44  71  82  72 140 101  60 101   1  78  61 126  61  74   1  75 110  76
 131   1  75  78 107   1   0  57   1  76  61 128   1  66  61   1  70  71
 140 110   1  78  61   1  67  71  77  82  68  71  24   0  69 148  66   1
  58  71 151  61   2   1  57   1  76  71   1  75  74  60  59  61   1  69
 107   0  82  57  75   1  70  57   1  76  61  70   1  64  74  57  60   1
  69  65   1  77  67  68  71  77  82  68]
Rozpřádá večer sítě své 
a teď je noří ve kouzlo;
můj bože! a to srdce mé
zas na ten hrad mi uklouzl


In [90]:
train_ids = encode(train_text)
val_ids = encode(val_text)

Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x7fbf5c418220>>
Traceback (most recent call last):
  File "/home/michal-hlousek/.local/lib/python3.8/site-packages/ipykernel/ipkernel.py", line 770, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(
KeyboardInterrupt: 


KeyboardInterrupt: 

In [None]:
train_ids.tofile('train.bin')
val_ids.tofile('val.bin')

## Byte pair encoding

In [2]:
import re
from collections import Counter, defaultdict


def build_vocab(corpus: str) -> dict:
    """Step 1. Build vocab from text corpus"""

    # Separate each char in word by space and add mark end of token
    # I have included a character blacklist there, I know it might not be a good idea, but I hated having m.-type tokens.
    tokens = [" ".join(word.replace(".","").replace(",","").replace(";","")) + " </w>" for word in corpus.split()] 
    
    # Count frequency of tokens in corpus
    vocab = Counter(tokens)  

    return vocab


def get_stats(vocab: dict) -> dict:
    """Step 2. Get counts of pairs of consecutive symbols"""

    pairs = defaultdict(int)
    for word, frequency in vocab.items():
        symbols = word.split()

        # Counting up occurrences of pairs
        for i in range(len(symbols) - 1):
            pairs[symbols[i], symbols[i + 1]] += frequency

    return pairs


def merge_vocab(pair: tuple, v_in: dict) -> dict:
    """Step 3. Merge all occurrences of the most frequent pair"""
    
    v_out = {}
    bigram = re.escape(' '.join(pair))
    p = re.compile(r'(?<!\S)' + bigram + r'(?!\S)')
    
    for word in v_in:
        # replace most frequent pair in all vocabulary
        w_out = p.sub(''.join(pair), word)
        v_out[w_out] = v_in[word]

    return v_out

In [3]:
vocab = build_vocab(text)
num_merges = 500  # Hyperparameter
for i in range(num_merges):
    pairs = get_stats(vocab)  # Step 2
    if not pairs:
        break
    # step 3
    best = max(pairs, key=pairs.get)
    print(f"Iteration {i} out of {num_merges}. Pair = {best}, Frequency = {pairs.get(best)}")
    vocab = merge_vocab(best, vocab)

Iteration 0 out of 500. Pair = ('e', '</w>'), Frequency = 1375001
Iteration 1 out of 500. Pair = ('a', '</w>'), Frequency = 1125683
Iteration 2 out of 500. Pair = ('u', '</w>'), Frequency = 815594
Iteration 3 out of 500. Pair = ('í', '</w>'), Frequency = 771726
Iteration 4 out of 500. Pair = ('m', '</w>'), Frequency = 746950
Iteration 5 out of 500. Pair = ('i', '</w>'), Frequency = 744478
Iteration 6 out of 500. Pair = ('o', '</w>'), Frequency = 728064
Iteration 7 out of 500. Pair = ('c', 'h'), Frequency = 684931
Iteration 8 out of 500. Pair = ('s', 't'), Frequency = 626058
Iteration 9 out of 500. Pair = ('y', '</w>'), Frequency = 624775
Iteration 10 out of 500. Pair = ('k', '</w>'), Frequency = 450457
Iteration 11 out of 500. Pair = ('l', '</w>'), Frequency = 428481
Iteration 12 out of 500. Pair = ('e', 'n'), Frequency = 425558
Iteration 13 out of 500. Pair = ('v', '</w>'), Frequency = 421120
Iteration 14 out of 500. Pair = ('é', '</w>'), Frequency = 372391
Iteration 15 out of 500. Pa

In [6]:
def extract_unique_tokens(dictionary):
    unique_tokens = set(keys)
    for key in dictionary.keys():
        # Remove "</w>" tokens from the key
        key = key.replace("</w>", "")
        
        # Extract unique characters/bigrams/trigrams separated by a space
        tokens = key.split()
        for token in tokens:
            unique_tokens.add(token)
    return unique_tokens

d = extract_unique_tokens(vocab)

In [7]:
sdef encode(mapping, text):
    encoded_text = []
    smap = set(mapping.keys())
    while text:
        i = 0
        for _ in range(1,len(text)+1):
            if text[:i+1] not in smap:
                break
            else:
                i += 1
        encoded_text.append(mapping[text[:i]])
        text = text[i:]
    return encoded_text

def decode(inverted_mapping, encoded_text):
    return [inverted_mapping[c] for c in encoded_text]

# Example usage
#mapping = {token: i for i, token in enumerate(extract_unique_tokens(vocab))}
#inverted_mapping = {v: k for k, v in mapping.items()}
#text = train_text
#encoded_text = encode(mapping, text)
#print(decode(inverted_mapping, encoded_text))

In [None]:
train_ids = encode(mapping, train_text)
train_ids.tofile('train.bin')
val_ids = encode(mapping, val_text)
val_ids.tofile('val.bin')

In [None]:
token_map = {'itos':mapping, 'stoi':inverted_mapping, 'vocab_size':len(mapping)}
import pickle
pickle.dump(token_map, open("meta.pkl", "wb"))