<div class="alert alert-block alert-info">
    <h1>Natural Language Processing</h1>
    Assignment 01
    <h3>General Information:</h3>
    <p>Please do not add or delete any cells. Answers belong into the corresponding cells (below the question). If a function is given (either as a signature or a full function), you should not change the name, arguments or return value of the function.<br><br> If you encounter empty cells underneath the answer that can not be edited, please ignore them, they are for testing purposes.<br><br>When editing an assignment there can be the case that there are variables in the kernel. To make sure your assignment works, please restart the kernel and run all cells before submitting (e.g. via <i>Kernel -> Restart & Run All</i>).</p>
    <p>Code cells where you are supposed to give your answer often include the line  ```raise NotImplementedError```. This makes it easier to automatically grade answers. If you edit the cell please outcomment or delete this line.</p>
    <h3>Submission:</h3>
    <p>Please submit your notebook via the web interface (in the main view -> Assignments -> Submit). The assignments are due on <b>Monday at 15:00</b>.</p>
    <h3>Group Work:</h3>
    <p>You are allowed to work in groups of up to three people. Please enter the UID (your username here) of each member of the group into the next cell. We apply plagiarism checking, so do not submit solutions from other people except your team members. If an assignment has a copied solution, the task will be graded with 0 points for all people with the same solution.</p>
    <h3>Questions about the Assignment:</h3>
    <p>If you have questions about the assignment please post them in the LEA forum before the deadline. Don't wait until the last day to post questions.</p>
    
</div>

In [4]:
'''
Group Work:
Enter the username of each team member into the variables. 
If you work alone please leave the second variable empty.
'''
member1 = 'tghane2s'
member2 = 'rmore2s'
member3 = 'psheth2s'

# Byte Pair Encoding

We want to implement BPE.

## Byte Pair Encoding A) [10 points]

First we want to do pre-tokenization using white spaces.

Please complete the function `pretokenize` below. This takes a list of sentences or documents and returns a list of tokenized sentences or documents. Look at the example in the docstring for more information.

In [5]:
from typing import List

def pretokenize(sentences: List[str]) -> List[List[str]]:
    """
    Tokenizes a list of sentences into a list of lists of tokens using white spaces.

    Args:
        sentences (List[str]): List of sentences to be tokenized.

    Returns:
        List[List[str]]: List of lists of tokens, where each inner list represents
                         the tokens of a single sentence.
    Example:
        >>> sentences = ["Hello world", "This is a test!"]
        >>> pretokenize(sentences)
        [['Hello', 'world'], ['This', 'is', 'a', 'test!']]
    """
    tokenized_sentences = [] # storing as list
    for sentence in sentences: # Loop for each sentence
        tokens = sentence.split() # Splitting the sentences
        tokenized_sentences.append(tokens) # Append tokens to list of tokenized sentences
    
    return tokenized_sentences

example_sentences = [
    "This is an  example sentence",
    "Another sentence",
    "The final sentence."
]

tokenized = pretokenize(example_sentences)
"""
Expected Output:
[['This', 'is', 'an', 'example', 'sentence'],
 ['Another', 'sentence'],
 ['The', 'final', 'sentence.']]
"""
tokenized

[['This', 'is', 'an', 'example', 'sentence'],
 ['Another', 'sentence'],
 ['The', 'final', 'sentence.']]

## Byte Pair Encoding B) [10 points]

For BPE we first need an initial vocabulary. The input is a pretokenized list of sentences / documents.

The output should be a set of characters present in this list.

In [9]:
from typing import List, Set

def build_initial_vocabulary(corpus: List[List[str]]) -> Set[str]:
    """
    Build the initial vocabulary from a corpus of tokenized sentences.

    Args:
        corpus (List[List[str]]): A list of tokenized sentences, where each sentence
            is represented as a list of strings (tokens).

    Returns:
        Set[str]: A set containing all unique characters from all tokens in the corpus.

    Example:
        >>> corpus = [['hello', 'world'], ['This', 'is', 'a', 'test']]
        >>> build_initial_vocabulary(corpus)
        {'T', 'a', 'd', 'e', 'h', 'i', 'l', 'o', 'r', 's', 't', 'w'}
        # Note: Order may vary since sets are unordered
    """
    letters = set()
    for words in corpus:
        for letter in words: # Loop for each letter in the word
            value=set(letter) # Creating a set with the letter
            letters.update(value) # Adding letter to set of letters                
    return letters   

build_initial_vocabulary(pretokenize(["hello world", "This is a test"]))

{'T', 'a', 'd', 'e', 'h', 'i', 'l', 'o', 'r', 's', 't', 'w'}

## Byte Pair Encoding C) [10 points]


Now we want to build our dictionary for the split tokens. Complete the function `get_splits` below. Look at the example in the docstring!

Make sure to add the end of word symbol (`</w>`) to each token.

In [15]:
from collections import Counter
from typing import Dict, Tuple

def get_splits(corpus: List[List[str]]) -> Dict[Tuple[str], int]:
    """
    Get subword splits of tokens in a corpus.

    Args:
        corpus (List[List[str]]): A list of sentences where each sentence is represented
            as a list of tokens.

    Returns:
        Dict[Tuple[str], int]: A dictionary where keys are tuples representing subword splits
            and values are the counts of occurrences of those splits in the corpus.

    Example:
        >>> corpus = [['apple', 'banana', 'apple'], ['apple']]
        >>> get_splits(corpus)
        {('a', 'p', 'p', 'l', 'e', '</w>'): 3, ('b', 'a', 'n', 'a', 'n', 'a', '</w>'): 1}
    """
    count = Counter() # Creating a counter to store counts of subword splits
    for sentence in corpus: # Loop for each sentence in corpus
        for token in sentence: # Loop for each token in sentence
            token_split = tuple(token) + ('</w>',) # Add "</w>' at the end
            count[token_split] += 1 # count number of times the token split
    return dict(count)
get_splits(pretokenize(["apple banana apple", "apple"])) 

{('a', 'p', 'p', 'l', 'e', '</w>'): 3,
 ('b', 'a', 'n', 'a', 'n', 'a', '</w>'): 1}

## Byte Pair Encoding D) [10 points]

In the next step we want to find the most common pair from a splits dictionary.

Complete the function `find_most_frequent_pair` which returns the most frequent pair alongside its count (e.g. `(('a', 'n'), 2)`)

In [45]:
from typing import Dict, Tuple

def find_most_frequent_pair(splits: Dict[Tuple[str], int]) -> Tuple[Tuple[str, str], int]:
    """
    Find the most frequent pair of characters from a dictionary of split words along with its count.

    Args:
        splits (Dict[Tuple[str], int]): A dictionary where keys are tuples of split words and values 
            are their counts.

    Returns:
        Tuple[Tuple[str, str], int]: A tuple containing the most frequent pair of characters and its count.

    Example:
        >>> splits = {('a', 'p', 'p', 'l', 'e', '</w>'): 3,
                      ('b', 'a', 'n', 'a', 'n', 'a', '</w>'): 1,
                      ('l', 'e', 'a', 'd', '</w>'): 1}
        >>> find_most_frequent_pair(splits)
        (('l', 'e'), 4) 
        # Explanation: 'l' and 'e' appear in 'apple' and 'lead'. 
        # The word 'apple' appears 3x and the word 'lead' 1x.
    """
    pair_counts: Dict[Tuple[str, str], int] = {} #create a dictionary


    for split, count in splits.items():    # iterate over each split

        for i in range(len(split) - 1): # Loop through all adjacent character pair
            pair = (split[i], split[i + 1]) 
            pair_counts[pair] = pair_counts.get(pair, 0) + count

    most_frequent_pair = max(pair_counts, key=pair_counts.get) # find the pair with highest frequency
    
    return (most_frequent_pair, pair_counts[most_frequent_pair])

print(find_most_frequent_pair(get_splits(pretokenize(["apple banana apple", "apple lead"]))))
print(find_most_frequent_pair(get_splits(pretokenize(["a", "a b"]))))


(('l', 'e'), 4)
(('a', '</w>'), 2)


## Byte Pair Encoding E) [15 points]

Now write a function that takes a pair and the splits and merges all occurences of the pair in the splits.

In [60]:
def merge_split(split: Tuple[str], pair: Tuple[str, str]):
    """
    Merge a split tuple if it contains the given pair.
    This function merges **all occurrences** of the pair in the split.

    Args:
        split (Tuple[str]): The split tuple to merge.
        pair (Tuple[str, str]): The pair to merge.

    Returns:
        Tuple[str]: The merged split tuple.
        
    Example:
        >>> merge_split(split=('b', 'a', 'n', 'a', 'n', 'a', '</w>'), pair=('n', 'a'))
        # Returns:
        ('b', 'an', 'an', 'a', '</w>')
        
        >>> merge_split(split=('b', 'an', 'an', 'a', '</w>'), pair=('an', 'an'))
        # Returns:
        ('b', 'anan', 'a', '</w>')
    """
    merged = []
    i = 0
    while i < len(split):
        if i < len(split) - 1 and split[i] == pair[0] and split[i + 1] == pair[1]:
            merged.append(split[i] + split[i + 1])
            i += 2  
        else:
            merged.append(split[i])
            i += 1
    return tuple(merged)
    
def merge_splits(splits: Dict[Tuple[str], int], pair: Tuple[str, str]):
    """
    Merge all split tuples in a dictionary that contain the given pair.

    Args:
        splits (Dict[Tuple[str], int]): A dictionary of split tuples and their counts.
        pair (Tuple[str, str]): The pair to merge.

    Returns:
        Dict[Tuple[str], int]: A dictionary with merged split tuples and their counts.
        
    Example:
        >>> splits = {
            ('a', 'p', 'p', 'l', 'e', '</w>'): 3,
            ('b', 'a', 'n', 'a', 'n', 'a', '</w>'): 1
        }
        >>> merge_splits(splits, ('a', 'n'))
        # Returns this:
        {
            ('a', 'p', 'p', 'l', 'e', '</w>'): 3,
            ('b', 'an', 'an', 'a', '</w>'): 1
        }
    """
    merged_splits: Dict[Tuple[str], int] = {}
    for split, count in splits.items():
        merged = merge_split(split, pair)
        merged_splits[merged] = merged_splits.get(merged, 0) + count # Keep track of counts for merged pairs
    return merged_splits  
    
splits = get_splits(pretokenize(["apple banana apple", "apple"]))

most_frequent_pair, count = find_most_frequent_pair(splits)

merge_splits(splits, most_frequent_pair)

print (splits)
print (most_frequent_pair,count)
print(merge_splits(splits, most_frequent_pair))

{('a', 'p', 'p', 'l', 'e', '</w>'): 3, ('b', 'a', 'n', 'a', 'n', 'a', '</w>'): 1}
('a', 'p') 3
{('ap', 'p', 'l', 'e', '</w>'): 3, ('b', 'a', 'n', 'a', 'n', 'a', '</w>'): 1}


## Byte Pair Encoding F) [40 points]

Now let us put this all together into a single class. Complete the methods `train`, `encode` and `decode`.

- `train` will learn the vocabulary and a list of merged pairs to use for encoding / tokenizing.
- `encode` will tokenize a list of strings using the merge rules by applying them in order
- `decode` will take a BPE encoded list of lists and merge subwords

Look at the examples in the docstrings for more information.

In [None]:
class BPETokenizer:
    """
    Byte-Pair Encoding (BPE) Tokenizer.
    
    This tokenizer learns a vocabulary and encodes/decodes text using the Byte-Pair Encoding algorithm.
    """
    
    def __init__(self):
        """
        Initialize the BPETokenizer.
        """
        self.vocab: set = set() # Vocabulary learned from the corpus. Initially just characters, later also joined characters.
        self.end_of_word: str = "</w>" # End of word symbol
        self.merge_pairs: List[Tuple[str, str]] = [] # List of merge pairs learned from the corpus
        
    def train(self, corpus: List[str], max_vocab_size: int) -> None:
        """
        Train the tokenizer on a given corpus.
        This method pretokenizes the corpus, builds an initial vocabulary, and applies BPE
        to learn merge pairs until the vocabulary reaches the max_vocab_size.
        
        Steps:
        1. Pretokenize
        2. Build initial vocab
        3. Get initial splits
        4. Until we reach the max_vocab_size or do not have a most frequent pair DO:
            4.1 Find most frequent pair
            4.2 Update your splits by using the merge_splits function
            4.3 Update vocab
            4.4 Update merge pairs
            
        Hint:
        Make sure to update correctly everytime you find a new most frequent pair:
        Assume your most frequent pair is ("ba", "na"). Then you add "bana" to your self.vocab
        and ("ba", "na") to self.merge_pairs
           

        Args:
            corpus (List[str]): The corpus of text for training.
            max_vocab_size (int): The maximum size of the vocabulary.

        Returns:
            None: In the end your tokenizer will have an updated vocabulary and merge pairs.
            
        Example:
        >>> corpus = [
            "lowest lower newer newest",
            "low lower new"
        ]
        >>> tokenizer.train(corpus, max_vocab_size=20)
        """
        # YOUR CODE HERE
        raise NotImplementedError()
        
    def encode(self, corpus: List[str]) -> List[List[str]]:
        """
        Encode / Tokenize a corpus of text using the learned vocabulary and merge pairs.
        This method applies the learned Byte Pair Encoding (BPE) algorithm to a given corpus.
        It splits each word in the input corpus, adds the special end-of-word token `</w>`,
        and applies the learned merge pairs in order to convert words into subword units.

        Args:
            corpus (List[str]): The corpus of text to be encoded.

        Returns:
            List[List[str]]: The encoded corpus.
        
        Example:
        >>> corpus = [
            "lowest lower newer newest",
            "low lower new"
        ]
        >>> tokenizer.train(corpus, max_vocab_size=20)
        >>> tokenizer.encode(corpus)
        [['lowest</w>', 'lower</w>', 'newer</w>', 'newe', 'st</w>'],
         ['lo', 'w</w>', 'lower</w>', 'ne', 'w</w>']]
        
        """
        # YOUR CODE HERE
        raise NotImplementedError()
        
    def decode(self, tokenized: List[List[str]]) -> List[List[str]]:
        """
        Decode a corpus of tokenized text.

        Args:
            tokenized (List[List[str]]): The tokenized text to be decoded.

        Returns:
            List[List[str]]: The decoded text.
            
        Example:
        >>> corpus = [
            "lowest lower newer newest",
            "low lower new"
        ]
        >>> tokenizer.train(corpus, max_vocab_size=20)
        >>> tokenizer.decode([['lowest</w>', 'lower</w>', 'newer</w>', 'newe', 'st</w>'],
                              ['lo', 'w</w>', 'lower</w>', 'ne', 'w</w>']])
        [['lowest', 'lower', 'newer', 'newest'], ['low', 'lower', 'new']]                              
        """
        # YOUR CODE HERE
        raise NotImplementedError()
        
corpus = [
    "lowest lower newer newest",
    "low lower new"
]
tokenizer = BPETokenizer()
tokenizer.train(corpus, 20)

print("Vocabulary is:")
print(tokenizer.vocab)
"""
Expected vocabulary:
{'e', 'l', 'lo', 'lowe', 'lower</w>', 'lowest</w>', 'n',
 'ne', 'newe', 'newer</w>', 'o', 'r', 'r</w>', 's', 'st',
 'st</w>', 't', 'w', 'w</w>', 'we'}
"""

print("We learned the following merge pairs:")
"""
Expected merge pairs:
[
    ('w', 'e'), ('l', 'o'), ('lo', 'we'), ('r', '</w>'),
    ('n', 'e'), ('s', 't'), ('st', '</w>'), ('lowe', 'r</w>'),
    ('ne', 'we'), ('w', '</w>'), ('lowe', 'st</w>'), ('newe', 'r</w>')
]
"""
print(tokenizer.merge_pairs)

encoded = tokenizer.encode(corpus)
print("The encoded corpus:")
print(encoded)
"""
Expected encoded corpus:
[
    ['lowest</w>', 'lower</w>', 'newer</w>', 'newe', 'st</w>'], 
    ['lo', 'w</w>', 'lower</w>', 'ne', 'w</w>']
]
"""

decoded = tokenizer.decode(encoded)
print("The decoded corpus:")
print(decoded)
"""
Expected decoded corpus:
[
    ['lowest', 'lower', 'newer', 'newest'], 
    ['low', 'lower', 'new']
]
"""


In [None]:
tokenizer.merge_pairs

## Byte Pair Encoding F) [5 points]

Use your BPE tokenizer on sentences from YELP reviews. Then encode a random sentence using the tokenizer. Finally decode the sentence again.

**Note: The encode method of the tokenizer expects a list of sentences (list of lists).**

Training might take a few minutes.

In [None]:
with open("/srv/shares/NLP/datasets/yelp/reviews_sents.txt", "r") as f:
    sentences = f.read().split("\n")

tokenizer = BPETokenizer()
# YOUR CODE HERE
raise NotImplementedError()
encoded = ...
print("Encoded:\n", encoded)
decoded = ...
print("Decoded:\n", decoded)