# Week 2: Turning Words into Tokens

In this notebook, we will explore the process of converting text into tokens, a fundamental step in NLP tasks.

## 0. Setup

We will begin by importing the necessary libraries.

In [100]:
# Import necessary libraries
import re
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from typing import List, Dict, Tuple, Union, Optional

In [2]:
# Import file checking whether TODO has been removed
from helpers.check_todo import check_implementation

In [3]:
# Create the src directory if it doesn't exist
import os
os.makedirs('src', exist_ok=True)

## 1. Running Simple Tokenization

This section demonstrates a basic approach to tokenization using Python's built-in libraries and PyTorch. We will implement a basic tokenization function. This function will split the text into individual tokens.

In [4]:
sample_text = "Hello, how are you doing today?"

In [5]:
code_text = """
def calculate_llm_perplexity(model, text, max_length=1024):
    tokens = tokenizer.encode(text, max_length=max_length, truncation=True)
    input_ids = torch.tensor([tokens]).to(device)
    with torch.no_grad():
        outputs = model(input_ids, labels=input_ids)
    loss = outputs.loss
    return math.exp(loss.item())

# Example usage
perplexity = calculate_llm_perplexity(gpt2_model, "Hello, world!")
print(f"Perplexity: {perplexity:.2f}")
"""

In [23]:
import re

def tokenize(text: str) -> List[str]:
    preprocessed = re.split(r'([,.:;?_!"()\']|--|\s)', text.strip())
    return [item.strip() for item in preprocessed if item.strip()]



Our time to test whether you have reviewed 'TODO' in the first function we implement together. Remove 'TODO' once you're done implementing and no error message will appear.

In [9]:
try:
    check_implementation(tokenize)
except NotImplementedError as e:
    print(e)

[92m'tokenize' has been implemented. No TODO found.[0m


In [24]:
print("Tokenized text:", tokenize(sample_text))

Tokenized text: ['Hello', ',', 'how', 'are', 'you', 'doing', 'today', '?']


In [25]:
print("Tokenized code:", tokenize(code_text))

Tokenized code: ['def', 'calculate', '_', 'llm', '_', 'perplexity', '(', 'model', ',', 'text', ',', 'max', '_', 'length=1024', ')', ':', 'tokens', '=', 'tokenizer', '.', 'encode', '(', 'text', ',', 'max', '_', 'length=max', '_', 'length', ',', 'truncation=True', ')', 'input', '_', 'ids', '=', 'torch', '.', 'tensor', '(', '[tokens]', ')', '.', 'to', '(', 'device', ')', 'with', 'torch', '.', 'no', '_', 'grad', '(', ')', ':', 'outputs', '=', 'model', '(', 'input', '_', 'ids', ',', 'labels=input', '_', 'ids', ')', 'loss', '=', 'outputs', '.', 'loss', 'return', 'math', '.', 'exp', '(', 'loss', '.', 'item', '(', ')', ')', '#', 'Example', 'usage', 'perplexity', '=', 'calculate', '_', 'llm', '_', 'perplexity', '(', 'gpt2', '_', 'model', ',', '"', 'Hello', ',', 'world', '!', '"', ')', 'print', '(', 'f', '"', 'Perplexity', ':', '{perplexity', ':', '.', '2f}', '"', ')']


## 2. Creating a Vocabulary

In this section we will create a function that takes a list of texts as input and returns a dictionary. In it each key is a unique word (or token) from the texts and its corresponding value is a unique index. The function should also reserve a special token <UNK> with index 0 to represent unknown words that may appear in future texts.

In [48]:
import itertools

def build_vocabulary(texts: List[str]) -> Dict[str, int]:

    # TODO: Create a function to build a word-level vocabulary from a list of texts
    # Hint: Use a set to collect unique tokens, then convert to a dictionary
    # with enumerated indices
    
    # Do not forget to reserve a slot for unknown tokens as {'<UNK>': 0}
    preprocessed  = itertools.chain(*[tokenize(text) for text in texts])
    words = sorted(set(preprocessed))
    vocab_size = len(words)
    return {**{ token: i+1 for i, token in enumerate(words)}, **{'<UNK>': 0}}
    
build_vocabulary(sample_dataset)

{'!': 1,
 ',': 2,
 '.': 3,
 '42': 4,
 '?': 5,
 'Everything': 6,
 'Hello': 7,
 'LLM': 8,
 'Life': 9,
 'Munich': 10,
 'This': 11,
 'Trailblazers': 12,
 'Ultimate': 13,
 'Universe': 14,
 'What': 15,
 'and': 16,
 'another': 17,
 'answer': 18,
 'example': 19,
 'for': 20,
 'in': 21,
 'is': 22,
 'like': 23,
 'of': 24,
 'the': 25,
 'today': 26,
 'weather': 27,
 'world': 28,
 '<UNK>': 0}

In [None]:
try:
    check_implementation(build_vocabulary)
except NotImplementedError as e:
    print(e)

In [49]:
# TODO: Use your examples for a sample dataset
# We won't be checking whether you have removed TODO here
# But using your own sentences is encouraged!

sample_dataset = [
    "42 is the Ultimate answer for Life, the Universe, and Everything.",
    "Hello, world of LLM Trailblazers! This is another example.",
    "What is the weather like today in Munich?"
]

In [50]:
vocab = build_vocabulary(sample_dataset)
print("Vocabulary:", vocab)

Vocabulary: {'!': 1, ',': 2, '.': 3, '42': 4, '?': 5, 'Everything': 6, 'Hello': 7, 'LLM': 8, 'Life': 9, 'Munich': 10, 'This': 11, 'Trailblazers': 12, 'Ultimate': 13, 'Universe': 14, 'What': 15, 'and': 16, 'another': 17, 'answer': 18, 'example': 19, 'for': 20, 'in': 21, 'is': 22, 'like': 23, 'of': 24, 'the': 25, 'today': 26, 'weather': 27, 'world': 28, '<UNK>': 0}


## 3. Implementing a Custom Dataloader

We have a lot of text data, but it's all different lengths. We need to make it work for our model. To do this, we'll create two special helpers:

1. A `Dataset` class: This will help us prepare our text data for our model. We'll break down the text into smaller pieces and convert it into a format our model can understand.
2. A `DataLoader` class: This will help us feed our prepared data to our model in batches. We'll sort the batches by length, add padding to make them all the same size, and create a mask to ignore the extra padding.

By using these two helpers, we'll be able to get our data in order and make it easy for our model to work with. This will make our training process smoother and more efficient.

In [79]:


class TextDataset(Dataset):
    def __init__(self, texts: List[str], vocab: Dict[str, int]):
        """
        Initialize the dataset with texts and vocabulary.

        :param texts: A list of text samples.
        :param vocab: A dictionary representing the vocabulary, where keys are tokens and values are their corresponding IDs.
        """
        self.texts = texts
        self.vocab = vocab
    
    def __len__(self) -> int:
        
        return len(self.texts)
    
    def __getitem__(self, idx: int) -> torch.Tensor:
        return torch.Tensor([self.vocab.get(item, "<UNK>") for item in self._tokenize(self.texts[idx])]).long()
        
    
    def _tokenize(self, text: str) -> List[str]:
        preprocessed = re.split(r'([,.:;?_!"()\']|--|\s)', text.strip())
        return [item.strip() for item in preprocessed if item.strip()]

In [67]:
try:
    check_implementation(TextDataset)
except NotImplementedError as e:
    print(e)

[92mAll methods in class 'TextDataset' have been implemented. No TODO found.[0m


In [84]:
# Create a dataset instance
dataset = TextDataset(sample_dataset, vocab)


In [85]:
batch_size = 1
simple_dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

In [86]:
# Display a batch of data
for batch in simple_dataloader:
    print("Batch shape:", batch.shape)
    print("Sample batch:", batch)
    break

Batch shape: torch.Size([1, 14])
Sample batch: tensor([[ 4, 22, 25, 13, 18, 20,  9,  2, 25, 14,  2, 16,  6,  3]])


In [88]:
print("Attempting to iterate through the dataloader:")
try:
    for batch in simple_dataloader:
        print("Processed batch:", batch)
        break
except RuntimeError as e:
    print(f"Caught an error: {e}")
    print("\nThis error occurs because we're trying to batch sequences of different lengths.")

Attempting to iterate through the dataloader:
Processed batch: tensor([[ 4, 22, 25, 13, 18, 20,  9,  2, 25, 14,  2, 16,  6,  3]])


Now, let's implement a custom collate_fn to handle variable-length sequences. 

In [120]:

# TODO: analyse this function and consider how to implement it without pad_sequence()

PAD_VALUE=0

def pad_sequences(sequences: List[torch.Tensor], pad_value: int )->torch.Tensor:
    """
    Pads list of sequences to same length; batch first
    """
    max_len = max(seq.size(0) for seq in sequences)
    padded_sequences = torch.full((len(sequences), max_len), pad_value)

    for i, seq in enumerate(sequences):
        padded_sequences[i, :seq.size(0)] = seq

    return padded_sequences

def collate_fn(
        batch: Union[List[torch.Tensor], List[Tuple[torch.Tensor, torch.Tensor]]]
    ) -> Union[torch.Tensor, Tuple[torch.Tensor, Optional[torch.Tensor]]]:
    
    print(batch)

    # Separate the input sequences and targets
    if isinstance(batch, tuple):
        # Case where batch contains both sequences and targets
        sequences, targets = zip(*batch)
        
        
        # Pad the sequences
        padded_sequences = pad_sequences(sequences, pad_value=PAD_VALUE)
        
        # Pad the targets if they are sequences, otherwise just stack them
        if isinstance(targets[0], torch.Tensor) and targets[0].dim() > 0:
            padded_targets = pad_sequences(targets, pad_value=PAD_VALUE)
        else:
            padded_targets = torch.stack(targets)
        
        return padded_sequences, padded_targets
    else:
        return pad_sequences(batch, pad_value=0)

In [121]:
dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)

In [126]:
# Display a batch of data
print("Iterating through the dataloader with custom collate_fn:")
for batch in dataloader:
    print("Batch shape:", batch.shape)
    print("Sample batch using the batch size of sequences and its value (1 in targets) or padding (0):\n", batch)
    break

# TODO: Experiment with setting DataLoader with shuffle=False

Iterating through the dataloader with custom collate_fn:
[tensor([ 7,  2, 28, 24,  8, 12,  1, 11, 22, 17, 19,  3]), tensor([ 4, 22, 25, 13, 18, 20,  9,  2, 25, 14,  2, 16,  6,  3])]
Batch shape: torch.Size([2, 14])
Sample batch using the batch size of sequences and its value (1 in targets) or padding (0):
 tensor([[ 7,  2, 28, 24,  8, 12,  1, 11, 22, 17, 19,  3,  0,  0],
        [ 4, 22, 25, 13, 18, 20,  9,  2, 25, 14,  2, 16,  6,  3]])


In [125]:
batch[1]

tensor([15, 22, 25, 27, 23, 26, 21, 10,  5,  0,  0,  0])

The TextProcessor now successfully handles variable-length sequences!

## 4. Putting It All Together

Time to combine tokenization, vocabulary creation and data preparation in batches. That's where our `TextProcessor` will help.

In [134]:
class TextProcessor:
    def __init__(self):
        self.vocab: Dict[str, int] = None
    
    def tokenize(self, text: str) -> List[str]:
        preprocessed = re.split(r'([,.:;?_!"()\']|--|\s)', text.strip())
        return [item.strip() for item in preprocessed if item.strip()]
    
    def build_vocab(self, texts: List[str]) -> None:
        
        preprocessed  = itertools.chain(*[tokenize(text) for text in texts])
        words = sorted(set(preprocessed))
        vocab_size = len(words)
        self.vocab = {**{ token: i+1 for i, token in enumerate(words)}, **{'<UNK>': 0}}
        return self.vocab
        
    def create_dataloader(self, texts: List[str], batch_size: int) -> DataLoader:
    
        self.dataset = TextDataset(texts, self.vocab)
        return DataLoader(self.dataset, batch_size, shuffle=True, collate_fn=self._collate_fn)

    @staticmethod
    def _pad_sequences(sequences: List[torch.Tensor], pad_value: int )->torch.Tensor:
        """
        Pads list of sequences to same length; batch first
        """
        max_len = max(seq.size(0) for seq in sequences)
        padded_sequences = torch.full((len(sequences), max_len), pad_value)

        for i, seq in enumerate(sequences):
            padded_sequences[i, :seq.size(0)] = seq

        return padded_sequences

    @staticmethod
    def _collate_fn(
        batch: Union[List[torch.Tensor], List[Tuple[torch.Tensor, torch.Tensor]]]
    ) -> Union[torch.Tensor, Tuple[torch.Tensor, Optional[torch.Tensor]]]:
        # Separate the input sequences and targets
        if isinstance(batch, tuple):
            # Case where batch contains both sequences and targets
            sequences, targets = zip(*batch)
            
            # Pad the sequences
            padded_sequences = pad_sequences(sequences, pad_value=PAD_VALUE)
            
            # Pad the targets if they are sequences, otherwise just stack them
            if isinstance(targets[0], torch.Tensor) and targets[0].dim() > 0:
                padded_targets = pad_sequences(targets, pad_value=PAD_VALUE)
            else:
                padded_targets = torch.stack(targets)
            
            return padded_sequences, padded_targets
        else:
            return pad_sequences(batch, pad_value=0)

In [None]:
try:
    check_implementation(TextProcessor)
except NotImplementedError as e:
    print(e)

In [136]:
# Test the TextProcessor
processor = TextProcessor()
processor.build_vocab(sample_dataset)
dataloader = processor.create_dataloader(sample_dataset, batch_size=2)

In [137]:
for batch in dataloader:
    print("Processed batch:", batch)
    break

Processed batch: tensor([[15, 22, 25, 27, 23, 26, 21, 10,  5,  0,  0,  0],
        [ 7,  2, 28, 24,  8, 12,  1, 11, 22, 17, 19,  3]])


#### Congratulations! You've implemented a basic text processing pipeline. This will be useful for handling input data in your LLM projects.

## Extra: Reviewing Tokenization Libraries

We'll use `tiktoken`at a later stage for tokenization, so let's see what it does and compare it to another simple tokenization library `NLTK`.

### Using NLTK

In [140]:
import nltk
nltk.download('punkt_tab')
from nltk.tokenize import word_tokenize

[nltk_data] Downloading package punkt_tab to /home/user/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


In [141]:
nltk_tokens = word_tokenize(sample_text)
print("NLTK Tokens:", nltk_tokens)

NLTK Tokens: ['Hello', ',', 'how', 'are', 'you', 'doing', 'today', '?']


In [142]:
nltk_code_tokens = word_tokenize(code_text)
print("NLTK Tokens for Code:")

NLTK Tokens for Code:


### Using Tiktoken

In [143]:
import tiktoken

In [144]:
enc = tiktoken.get_encoding("cl100k_base")
tiktoken_tokens = enc.encode(sample_text)
print("Tiktoken Tokens:", tiktoken_tokens)
print("Decoded Tiktoken Tokens:", enc.decode(tiktoken_tokens))

Tiktoken Tokens: [9906, 11, 1268, 527, 499, 3815, 3432, 30]
Decoded Tiktoken Tokens: Hello, how are you doing today?


In [145]:
print(f"NLTK token count: {len(nltk_tokens)}")
print(f"Tiktoken token count: {len(tiktoken_tokens)}")

NLTK token count: 8
Tiktoken token count: 8


In [146]:
tiktoken_code_tokens = enc.encode(code_text)
print("\nTiktoken Tokens (decoded for readability):")
print(enc.decode_tokens_bytes(tiktoken_code_tokens))
print(f"Tiktoken token count: {len(tiktoken_code_tokens)}")


Tiktoken Tokens (decoded for readability):
[b'\n', b'def', b' calculate', b'_ll', b'm', b'_per', b'plex', b'ity', b'(model', b',', b' text', b',', b' max', b'_length', b'=', b'102', b'4', b'):\n', b'   ', b' tokens', b' =', b' tokenizer', b'.encode', b'(text', b',', b' max', b'_length', b'=max', b'_length', b',', b' trunc', b'ation', b'=True', b')\n', b'   ', b' input', b'_ids', b' =', b' torch', b'.tensor', b'([', b'tokens', b']).', b'to', b'(device', b')\n', b'   ', b' with', b' torch', b'.no', b'_grad', b'():\n', b'       ', b' outputs', b' =', b' model', b'(input', b'_ids', b',', b' labels', b'=input', b'_ids', b')\n', b'   ', b' loss', b' =', b' outputs', b'.loss', b'\n', b'   ', b' return', b' math', b'.exp', b'(loss', b'.item', b'())\n\n', b'#', b' Example', b' usage', b'\n', b'per', b'plex', b'ity', b' =', b' calculate', b'_ll', b'm', b'_per', b'plex', b'ity', b'(g', b'pt', b'2', b'_model', b',', b' "', b'Hello', b',', b' world', b'!")\n', b'print', b'(f', b'"', b'Per', b'plex