In [None]:
import os 
from pathlib import Path
import re
from typing import List,Optional,Dict,Callable,Never

In [None]:
with open(Path(os.path.join("assets","songs.txt")),encoding="utf-8") as f:
    raw_song = f.read()

with open(Path(os.path.join("assets","texts.txt")),encoding="utf-8") as f:
    raw_text = f.read()

In [None]:
def vocab_prepare(text,*,extended_tokens:List[str]=["<|endoftext|>", "<|unk|>"],verbose:bool=True)->Dict[str,int]:
    preprocessed = re.split(r'([,.:;?_!"()\']|--|\s)', text)
    preprocessed = [item.strip() for item in preprocessed if item.strip()]
    all_words = sorted(set(preprocessed))
    all_words.extend(extended_tokens)
    vocab_size = len(all_words)
    vocab = {token:integer for integer,token in enumerate(all_words)}
    if verbose:
        print(f"total vocab size is, {vocab_size}")
    return vocab

In [None]:
class SimpleTokenizerV1:
    def __init__(self,vocab:Dict[str,int]) -> None:
        self.str_to_int:Dict = vocab
        self.int_to_str:Dict = {i:s for s,i in vocab.items()}
        
    def encode(self,text:str)->List[int]:
        preprocessed = re.split(pattern=r'''([,.:;?_!"()\']|--|\s)''', string=text) 
        preprocessed = [item.strip() for item in preprocessed if item.strip()]

        preprocessed = [ item  if item in self.str_to_int else "<|unk|>" for item in preprocessed ]

        ids = [self.str_to_int[s] for s in preprocessed]
        return ids
    
    def decode(self,ids:List[int])->str:
        text = " ".join([self.int_to_str[i] for i in ids])
        text = re.sub(r'\s+([,.?!"()\'])', r'\1',text)
        return text

In [None]:
spl_tokens = ["<|endoftext|>", "<|unk|>"]
eng_vocab = vocab_prepare(text=raw_text,extended_tokens=spl_tokens)

In [None]:
tokenizerv1 = SimpleTokenizerV1(vocab=eng_vocab)

In [None]:
tokenizerv1.decode(tokenizerv1.encode("He laughed again, and loved her"))

Some of these special tokens are

- `[BOS]` (beginning of sequence) marks the beginning of text
- `[EOS]` (end of sequence) marks where the text ends (this is usually used to concatenate multiple unrelated texts, e.g., two different Wikipedia articles or two different books, and so on)
- `[PAD]` (padding) if we train LLMs with a batch size greater than 1 (we may include multiple texts with different lengths; with the padding token we pad the shorter texts to the longest length so that all texts have an equal length)
- `[UNK]` to represent words that are not included in the vocabulary

In [None]:
import tiktoken

In [None]:
tokenizerv2 = tiktoken.get_encoding("p50k_base")
print(f"{tokenizerv2._pat_str=}")
print(f"{tokenizerv2._mergeable_ranks=}")
print(f"{tokenizerv2._special_tokens=}")
print(f"{tokenizerv2.special_tokens_set=}")
print(f"{tokenizerv2.n_vocab=}")

In [None]:
tokenizerv2.encode(text="I loved her as Revathi",allowed_special={"<endoftext>"})

In [None]:
tokenizerv2.decode([40, 6151, 607, 355, 5416, 44202])

# Embedding

In [1]:
import torch 
from torch.utils.data.dataset import Dataset
from torch.utils.data.dataloader import DataLoader,default_collate
from tqdm  import tqdm 
import tiktoken
import os 
from pathlib import Path

In [2]:
class GPTDataset(Dataset):
    def __init__(self,txt:str, tokenizer:tiktoken.Encoding, max_length:int,stride:int) -> None:
        self.input_ids = []
        self.target_ids = []

        token_ids = tokenizer.encode(text=txt,allowed_special={"<|endoftext|>"})
        assert len(token_ids) > max_length, "Number of tokenized inputs must be equal to max_lenght+1"

        iterable = range(0,len(token_ids)-max_length,stride)
        for i in tqdm(iterable):
            inp_chunk = token_ids[i:i+max_length]
            oup_chunk = token_ids[i+1:i+max_length+1]
            self.input_ids.append(torch.tensor(inp_chunk))
            self.target_ids.append(torch.tensor(oup_chunk))

    def __len__(self)->int:
        return len(self.input_ids)
    
    def __getitem__(self, index) -> tuple:
        return self.input_ids[index],self.target_ids[index]
    

# An example using stride equal to the context length

In [3]:
with open(Path(os.path.join("assets","texts.txt")),encoding="utf-8") as f:
    raw_text = f.read()

tokenizerv2 = tiktoken.get_encoding('gpt2')
ds = GPTDataset(raw_text,tokenizerv2,512,1)

100%|██████████| 4637/4637 [00:00<00:00, 11114.65it/s]


In [4]:
dl = DataLoader(ds,batch_size=32,shuffle=True,drop_last=False)

In [5]:
single_batch = next(iter(dl))

In [None]:
inpb,outb = single_batch
print(inpb.shape)
print()

torch.Size([32, 512])
<built-in method masked_fill of Tensor object at 0x73a65cc24b90>


In [16]:
inpb

tensor([[  290, 13064,    13,  ...,   673,   531,  2407],
        [  198,     1,  5812,  ...,  2627,   262,  4286],
        [10597,  1115,   812,  ...,  3619,   338, 19992],
        ...,
        [  618,   520,  5493,  ...,   314,   508,  2067],
        [ 5223,   438,     1,  ...,   470,   345,  1683],
        [  475,   314,   836,  ...,   257,   410,  5040]])