# Home Assignment 3 (30pts)

Submit your solution via Ilias until 23.59h on Tuesday, November 18th. Late submissions are **not possible**.

Submit your solutions in teams of 5 students. Unless explicitly agreed otherwise in advance, submissions from teams with more or less members will NOT be graded (i.e., count as failed).

**Make sure that all team members are part of the submitting group on Ilias.**

You may use the code from the exercises and basic functionalities that are explained in the official documentation of Python packages without citing, __all other sources must be cited__. In case of plagiarism (copying solutions from other teams or from the internet) ALL team members may be expelled from the course without warning.

#### General guidelines:
* Make sure that your code is executable, any task for which the code does not directly run on our machine will be graded with 0 points.
* If you use packages that are not available on the default or conda-forge channel, list them below. Also add a link to installation instructions. 
* Ensure that the notebook does not rely on the current notebook or system state!
  * Use `Kernel --> Restart & Run All` to see if you are using any definitions, variables etc. that 
    are not in scope anymore.
  * Do not rename any of the datasets you use, and load it from the same directory that your ipynb-notebook is located in, i.e., your working directory.
* Make sure you clean up your code before submission, e.g., properly align your code, and delete every line of code that you do not need anymore, even if you may have experimented with it. Minimize usage of global variables. Avoid reusing variable names multiple times!
* Ensure your code/notebook terminates in reasonable time.
* Feel free to use comments in the code. While we do not require them to get full marks, they may help us in case your code has minor errors.
* For questions that require a textual answer, please do not write the answer as a comment in a code cell, but in a Markdown cell below the code. Always remember to provide sufficient justification for all answers.
* You may create as many additional cells as you want, just make sure that the solutions to the individual tasks can be found near the corresponding assignment.
* If you have any general question regarding the understanding of some task, do not hesitate to post in the student forum in Ilias, so we can clear up such questions for all students in the course.

Additional packages (if any):
 - Example: `powerlaw`, https://github.com/jeffalstott/powerlaw

In [1]:
from typing import List, Union, Dict, Set, Tuple
from numpy.typing import NDArray
import nltk
import numpy as np
import random
from collections import defaultdict

nltk.download('punkt')
nltk.download('averaged_perceptron_tagger_eng')
nltk.download('gutenberg')

[nltk_data] Downloading package punkt to /Users/I569362/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger_eng to
[nltk_data]     /Users/I569362/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger_eng.zip.
[nltk_data] Downloading package gutenberg to
[nltk_data]     /Users/I569362/nltk_data...
[nltk_data]   Package gutenberg is already up-to-date!


True

### Task 1: POS tagging (6 points)

In this task, we want to explore sentences with similar part of speech (POS) tag structure. For this, we need a corpus of text with tags. We will generate such a corpus by using NLTKâ€™s currently recommended POS tagger to tag a given list of tokens (https://www.nltk.org/api/nltk.tag.html).

__a)__ Given a corpus of text ``corpus`` as a sequence of tokens, we want to collect all words that are tagged with a certain POS tag. Implement a function ``collect_words_for_tag`` that first tags the given corpus using NLTK's off-the-shelf tagger imported in the cell above. Then, for each POS tag, collect all words that were tagged with it. You should return a dictionary that maps each POS tag that was observed to the set of words that were assigned this tag in the given corpus. __(2 pts)__

In [2]:
from nltk.corpus.reader.util import StreamBackedCorpusView 
from nltk import pos_tag

def collect_words_for_tag(corpus: Union[List[str], StreamBackedCorpusView]) -> Dict[str, Set[str]]:
    """
    :param corpus: sequence of tokens that represents the text corpus
    :return: dict that maps each tag to a set of tokens that were assigned this tag in the corpus
    """
    tagged_corpus = pos_tag(corpus)
    pos_dict = defaultdict(set)
    for word, tag in tagged_corpus:
        pos_dict[tag].add(word)
    return pos_dict

__b)__ Implement a function ``generate_sentences`` that gets a sentence and a POS dictionary (assume the POS dictionary was generated by your function in __a)__) as input and generates ``n`` sequences of words with the same tag structure. The words in your generated sequence should be randomly taken from the set of words associated with the current tag. 

Additionally, the user should have the option to achieve sentences of ``better_quality``. Thus, if ``better_quality=True``, make sure that the tag structure of the output sentences actually matches the tag structure of the input sentence, as the tags may change depending on the context. 

You can assume that the training corpus is large enough to include all possible POS tags. __(2 pts)__

_Hint: consider the_ ``random`` _module_

In [3]:
def generate_rand(sentence: List[str], pos_dict: Dict[str, Set[str]], n: int, better_quality: bool=False) -> List[List[str]]:
    """
    :param sentence: input sentence that sets the tag pattern
    :param pos_dict: maps each tag to a list of associated words
    :param n: number of sentences that should be generated
    :return: List of sentences with the same tag structure as the input sentence
    """
    original_tags = [tag for word, tag in pos_tag(sentence)]
    generated_sentences = []
    
    for _ in range(n):
        new_sentence = []
        if better_quality:
            while True:
                new_sentence = [random.choice(list(pos_dict[tag])) for tag in original_tags]
                new_tags = [tag for word, tag in pos_tag(new_sentence)]
                if new_tags == original_tags:
                    break
        else:
            new_sentence = [random.choice(list(pos_dict[tag])) for tag in original_tags]
        generated_sentences.append(new_sentence)
        
    return generated_sentences

__c)__ Using the input sentence ``This test is very difficult``, test your implementation to generate 10 sentences based on  

* "Emma" by Jane Austen

* The "King James Bible"

Store your POS dictionary in ``emma_tags``and ``bible_tags``, respectively. Your generated sentences should be stored in ``emma_sent`` and ``bible_sent``. __(2 pts)__

In [4]:
sent = ["This", "test", "is", "very", "difficult"]

In [5]:
from nltk.corpus import gutenberg

# Emma by Jane Austen
emma_corpus = gutenberg.words("austen-emma.txt")
emma_tags = collect_words_for_tag(emma_corpus)
emma_sent = generate_rand(sent, emma_tags, 10)

# King James Bible
bible_corpus = gutenberg.words("bible-kjv.txt")
bible_tags = collect_words_for_tag(bible_corpus)
bible_sent = generate_rand(sent, bible_tags, 10)

print("Sentences from Emma:")
for s in emma_sent:
    print(" ".join(s))

print("\nSentences from King James Bible:")
for s in bible_sent:
    print(" ".join(s))

Sentences from Emma:
some dulness mails improperly wet
neither disorder works needless formal
Another sideboard treats alphabetically necessary
every ballroom follows extremely exhausted
Any pew denotes glibly gratified
an solicitation chuses eyes wine
No interior dances sufficiently suppress
a depend _is_ Fortunately involved
An quaint Yours positively young
A transaction lives unconsciously thorough

Sentences from King James Bible:
Another wealth drieth forgiven pine
This evilfavouredness purify ever else
Each east enrich asketh taught
All bleating saith carefully drink
obey enchanter hearkeneth lovely ringstraked
that revengeth hadst frankly sheddeth
smooth flattereth pillars Nevertheless skin
An scabbard confesseth deliciously commodious
some murmur nameth scorn cockle
broth thirsteth oft woven pleaseth


### Task 2: The Viterbi algorithm (11 points)
Implement the Viterbi algorithm as introduced in the lecture and the exercise. The input of your function is a sentence that should be tagged, a dictionary with state transition probabilites and a dictionary with word emission probabilities. You may assume that the _transition probabilities_ are complete, i.e. the dictionary includes every combination of states. In contrast, we assume that all combinations of words and POS tags that are not in the dictionary of _emission probabilities_ have an emission probability of 0.

The function should return a list of POS tags, s.t. that each tag corresponds to a word of the input sentence. Moreover, return the probability of the sequence of POS tags that you found. 

You can test your function on the given example that was discussed in the Pen&Paper exercise. For the sentence ``the fans watch the race`` and the provided probabilities, your function should return the POS tag sequence ``['DT', 'N', 'V', 'DT', 'N']`` and a probability of ``9.720000000000002e-06``.

Additionally, implement beam search in the viterbi algorithm. The beam size is defined by the parameter `beam`. For example for `beam=2` we only keep the best 2 scores per column in each step and discard the rest. You may use the example from the lecture to test your implementation.

In [6]:
# test sentence
sentence = ["the", "fans", "watch", "the", "race"]

# state transition probabilities (complete)
state_trans_prob = {('<s>','DT'):0.8,('<s>','N'):0.2,('<s>','V'):0.0,
                    ('DT','DT'):0.0,('DT','N'):0.9,('DT','V'):0.1,
                    ('N','DT'):0.0,('N','N'):0.5,('N','V'):0.5,
                    ('V','DT'):0.5,('V','N'):0.5,('V','V'):0.0}

# word emission probabilities (not complete, all combinations that are not present have probability 0)
word_emission_prob = {('the','DT'):0.2, ('fans','N'):0.1,('fans','V'):0.2,('watch','N'):0.3,
                      ('watch','V'):0.15,('race','N'):0.1,('race','V'):0.3}

In [7]:
def Viterbi(sentence: List[str], trans_prob: Dict[Tuple[str,str], float], emiss_prob: Dict[Tuple[str,str], float], beam: int=0) -> (List[str], float):
    """
    :param sentence: sentence that we want to tag
    :param trans_prob: dict with state transition probabilities
    :param emiss_prob: dict with word emission probabilities
    :param beam: beam size for beam search. If 0, dont apply beam search
    :returns: 
        - list with POS tags for each input word
        - float that indicates the probability of the tag sequence
    """
    states = list(set([s[1] for s in trans_prob.keys()]))
    
    viterbi_matrix = [{}]
    path = {}

    # Initialization step
    for state in states:
        viterbi_matrix[0][state] = trans_prob.get(("<s>", state), 0) * emiss_prob.get((sentence[0], state), 0)
        path[state] = [state]

    # Recursion step
    for t in range(1, len(sentence)):
        viterbi_matrix.append({})
        new_path = {}

        for state in states:
            max_prob = 0
            max_state = None
            for prev_state in states:
                prob = viterbi_matrix[t-1].get(prev_state, 0.0) * trans_prob.get((prev_state, state), 0) * emiss_prob.get((sentence[t], state), 0)
                if prob > max_prob:
                    max_prob = prob
                    max_state = prev_state
            
            viterbi_matrix[t][state] = max_prob
            if max_state:
                new_path[state] = path[max_state] + [state]

        path = new_path
        if beam > 0:
            top_k = sorted(viterbi_matrix[t].items(), key=lambda item: item[1], reverse=True)[:beam]
            viterbi_matrix[t] = dict(top_k)


    # Termination step
    max_prob = 0
    best_path = None
    if not viterbi_matrix[-1]: # Handle empty last state
        return [], 0.0

    last_states = viterbi_matrix[-1].keys()
    for state in last_states:
        if viterbi_matrix[-1][state] > max_prob:
            max_prob = viterbi_matrix[-1][state]
            best_path = path[state]
            
    return best_path, max_prob

### Task 1: Term Frequency - Inverse Document Frequency (13 pts)

In this task we want to use the term frequency - inverse document frequency (tf-idf) weighting method to compare documents with each other and to queries. 

In case you need to tokenize any sentences in the following tasks, please use a tokenizer from NLTK and not the ``string.split`` function.

__a)__ To test your implementation throughout this task, you are given an example from the exercise. Start by implementing a function ``process_docs`` that takes the provided dictionary of documents and returns the following data structures. __(4 pts)__

- ``word2index``: a dictionary that maps each word that appears in any document to a unique integer identifier starting at 0 
- ``doc2index``: a dictionary that maps each document name (here given as the dictionary keys) to a unique integer identifier starting at 0
- ``index2doc``: a dictionary that maps each document identifier to the corresponding document name (reverse to ``doc2index``)
- ``doc_word_vectors``: a dictionary that maps each document name to a list of word ids that indicate which words appeared in the document in their order of appearance. Words that appear multiple times must also be included multiple times.

In [8]:
# example from exercise 8
d1 = "cold beer beach"
d2 = "ice cream beer beer"
d3 = "beach cold ice cream"
d4 = "cold beer frozen yogurt frozen beer"
d5 = "frozen ice ice beer ice cream"
d6 = "yogurt ice cream ice cream"

docs = {"d1": d1, "d2": d2, "d3": d3, "d4": d4, "d5": d5, "d6": d6}

In [9]:
from nltk.tokenize import word_tokenize

def process_docs(docs: Dict[str, str]) -> (Dict[str, int], Dict[str, int], Dict[int, str], Dict[str, List[int]]):
    """
    :params docs: dict that maps each document name to the document content
    :returns:
        - word2index: dict that maps each word to a unique id
        - doc2index: dict that maps each document name to a unique id
        - index2doc: dict that maps ids to their associated document name
        - doc_word_vectors: dict that maps each document name to a list of word ids that appear in it
    """
    word2index = {}
    doc2index = {}
    index2doc = {}
    doc_word_vectors = {}
    
    word_id = 0
    doc_id = 0
    
    for doc_name, content in docs.items():
        if doc_name not in doc2index:
            doc2index[doc_name] = doc_id
            index2doc[doc_id] = doc_name
            doc_id += 1
        
        tokens = word_tokenize(content.lower())
        word_ids = []
        for token in tokens:
            if token not in word2index:
                word2index[token] = word_id
                word_id += 1
            word_ids.append(word2index[token])
        doc_word_vectors[doc_name] = word_ids
        
    return word2index, doc2index, index2doc, doc_word_vectors

In [10]:
# The output for the provided example could look like this:

# word2index:
# {'cold': 0, 'beer': 1, 'beach': 2, 'ice': 3, 'cream': 4, 'frozen': 5, 'yogurt': 6}

# doc2index:
# {'d1': 0, 'd2': 1, 'd3': 2, 'd4': 3, 'd5': 4, 'd6': 5}

# index2doc
# {0: 'd1', 1: 'd2', 2: 'd3', 3: 'd4', 4: 'd5', 5: 'd6'}

# doc_word_vectors:
# {'d1': [0, 1, 2],
#  'd2': [3, 4, 1, 1],
#  'd3': [2, 0, 3, 4],
#  'd4': [0, 1, 5, 6, 5, 1],
#  'd5': [5, 3, 3, 1, 3, 4],
#  'd6': [6, 3, 4, 3, 4]}

__b)__ Set up a term-document matrix where each column corresponds to a document and each row corresponds to a word that was observed in any of the documents. The row/column indices should correspond to the word/document ids that are set in the input dicts ``word2index`` and ``doc2index``. Count how often each word appears in each document and fill the term document matrix. __(3 pts)__

_Example: The word "beer" with the word id_ ``1`` _appears two times in the document "d4" that has the document id_ ``3``. _Therefore the the entry at position_ ``[1, 3]`` _in the term-document matrix is_ ``2``.

In [11]:
def term_document_matrix(doc_word_v: Dict[str, List[int]], doc2index: Dict[str, int], word2index: Dict[str, int]) -> NDArray[NDArray[float]]:
    """
    :param doc_word_v: dict that maps each document to the list of word ids that appear in it
    :param doc2index: dict that maps each document name to a unique id
    :param word2index: dict that maps each word to a unique id
    :return: term-document matrix (each word is a row, each document is a column) that indicates the count of each word in each document 
    """
    num_words = len(word2index)
    num_docs = len(doc2index)
    td_matrix = np.zeros((num_words, num_docs))
    
    for doc_name, word_ids in doc_word_v.items():
        doc_id = doc2index[doc_name]
        for word_id in word_ids:
            td_matrix[word_id, doc_id] += 1
            
    return td_matrix

__c)__ Implement the function ``to_tf_idf_matrix`` that takes a term-document matrix and returns the corresponding term frequency (tf) matrix. If the parameter ``idf`` is set to ``True``, the tf-matrix should further be transformed to a tf-idf matrix (i.e. every entry corresponds to the tf-idf value of the associated word and document). Your implementation should leave the input term-document matrix unchanged. __(3 pts)__

Use the following formulas:

\begin{equation}
  tf_{t,d} =
    \begin{cases}
      1+log_{10}\text{count}(t,d) & \text{if count}(t, d) > 0\\
      0 & \text{otherwise}
    \end{cases}       
\end{equation}  

\begin{equation}
  idf_t = log_{10}(\frac{N}{df_t})
\end{equation}

\begin{equation}
  tf\text{-}idf_{t,d} = tf_{t,d} \cdot idf_t
\end{equation}

In [12]:
def to_tf_idf_matrix(td_matrix: NDArray[NDArray[float]], idf: bool=True) -> NDArray[NDArray[float]]:
    """
    :param td_matrix: term-document matrix 
    :param idf: computes the tf-idf matrix if True, otherwise computes only the tf matrix
    :return: matrix with tf(-idf) values for each word-document pair 
    """
    tf_matrix = np.copy(td_matrix)
    
    # Calculate TF
    tf_matrix[tf_matrix > 0] = 1 + np.log10(tf_matrix[tf_matrix > 0])

    if not idf:
        return tf_matrix

    # Calculate IDF
    num_docs = td_matrix.shape[1]
    df = np.count_nonzero(td_matrix, axis=1)
    idf_vector = np.log10(num_docs / df)

    # Calculate TF-IDF
    tf_idf_matrix = tf_matrix * idf_vector[:, np.newaxis]
    
    return tf_idf_matrix

__d)__ We now want to test the implementation on our running example. First, print the tf-idf for each word of the query ``ice beer`` with respect to each document. Second, find the two most similar documents from ``d1, d2, d3`` according to cosine similarity and print all similarity values.  __(3 pts)__

In [13]:
# Process docs
word2index, doc2index, index2doc, doc_word_vectors = process_docs(docs)
td_matrix = term_document_matrix(doc_word_vectors, doc2index, word2index)
tf_idf_matrix = to_tf_idf_matrix(td_matrix)

# Query
query = "ice beer"
query_tokens = word_tokenize(query.lower())

print("TF-IDF for query ice beer:")
for token in query_tokens:
    if token in word2index:
        word_id = word2index[token]
        print(f"\nWord: {token}")
        for doc_id in range(len(doc2index)):
            doc_name = index2doc[doc_id]
            print(f"  {doc_name}: {tf_idf_matrix[word_id, doc_id]}")

# Cosine similarity
from numpy.linalg import norm

def cosine_similarity(v1, v2):
    return np.dot(v1, v2) / (norm(v1) * norm(v2))

print("\nCosine similarities between d1, d2, d3:")
doc_ids_to_compare = [doc2index["d1"], doc2index["d2"], doc2index["d3"]]
for i in range(len(doc_ids_to_compare)):
    for j in range(i + 1, len(doc_ids_to_compare)):
        doc1_id = doc_ids_to_compare[i]
        doc2_id = doc_ids_to_compare[j]
        doc1_name = index2doc[doc1_id]
        doc2_name = index2doc[doc2_id]
        
        sim = cosine_similarity(tf_idf_matrix[:, doc1_id], tf_idf_matrix[:, doc2_id])
        print(f"  sim({doc1_name}, {doc2_name}): {sim}")

TF-IDF for query ice beer:

Word: ice
  d1: 0.0
  d2: 0.17609125905568124
  d3: 0.17609125905568124
  d4: 0.0
  d5: 0.260108141521493
  d6: 0.22910001000567795

Word: beer
  d1: 0.17609125905568124
  d2: 0.22910001000567795
  d3: 0.0
  d4: 0.22910001000567795
  d5: 0.17609125905568124
  d6: 0.0

Cosine similarities between d1, d2, d3:
  sim(d1, d2): 0.20173094133460298
  sim(d1, d3): 0.8732802004950571
  sim(d2, d3): 0.29719757610240577
