Text Analytics I HWS 23/24

# Home Assignment 3 (30pts)

Submit your solution via Ilias until 23.59h on Thursday, November 9th. Late submissions are **not possible**.

Submit your solutions in teams of 3-4 students. Unless explicitly agreed otherwise in advance, **submissions from teams with more or less members will NOT be graded**.
List all members of the team with their student ID in the cell below, and submit only one notebook per team. Only submit a notebook, do not submit the dataset(s) you used. Also, do NOT compress/zip your submission!

You may use the code from the exercises and basic functionalities that are explained in official documentation of Python packages without citing, __all other sources must be cited__. In case of plagiarism (copying solutions from other teams or from the internet) ALL team members may be expelled from the course without warning.

#### General guidelines:
* Make sure that your code is executable, any task for which the code does not directly run on our machine will be graded with 0 points.
* If you use packages that are not available on the default or conda-forge channel, list them below. Also add a link to installation instructions. 
* Ensure that the notebook does not rely on the current notebook or system state!
  * Use `Kernel --> Restart & Run All` to see if you are using any definitions, variables etc. that 
    are not in scope anymore.
  * Do not rename any of the datasets you use, and load it from the same directory that your ipynb-notebook is located in, i.e., your working directory.
* Make sure you clean up your code before submission, e.g., properly align your code, and delete every line of code that you do not need anymore, even if you may have experimented with it. Minimize usage of global variables. Avoid reusing variable names multiple times!
* Ensure your code/notebook terminates in reasonable time.
* Feel free to use comments in the code. While we do not require them to get full marks, they may help us in case your code has minor errors.
* For questions that require a textual answer, please do not write the answer as a comment in a code cell, but in a Markdown cell below the code. Always remember to provide sufficient justification for all answers.
* You may create as many additional cells as you want, just make sure that the solutions to the individual tasks can be found near the corresponding assignment.
* If you have any general question regarding the understanding of some task, do not hesitate to post in the student forum in Ilias, so we can clear up such questions for all students in the course.

In [1]:
# studentIDs of all team members
team_members = [12345,67899,880800,234242]

Additional packages (if any):
 - Example: `powerlaw`, https://github.com/jeffalstott/powerlaw

In [5]:
from typing import List, Union, Dict, Set, Tuple
from numpy.typing import NDArray
import nltk

### Task 1: POS tagging (6 points)

In this task, we want to explore sentences with similar part of speech (POS) tag structure. For this, we need a corpus of text with tags. We will generate such a corpus by using NLTK’s currently recommended POS tagger to tag a given list of tokens (https://www.nltk.org/api/nltk.tag.html).

In [12]:
# NLTK's off-the-shelf POS tagger
from nltk import pos_tag

__a)__ Given a corpus of text ``corpus`` as a sequence of tokens, we want to collect all words that are tagged with a certain POS tag. Implement a function ``collect_words_for_tag`` that first tags the given corpus using NLTK's off-the-shelf tagger imported in the cell above. Then, for each POS tag, collect all words that were tagged with it. You should return a dictionary that maps each POS tag that was observed to the set of words that were assigned this tag in the given corpus. __(2 pts)__

In [13]:
from nltk import pos_tag as nltk_pos_tag
from nltk.corpus.reader.util import StreamBackedCorpusView 
from typing import List, Dict, Set, Union
import nltk

# Download the NLTK data files for part-of-speech tagging
nltk.download('averaged_perceptron_tagger')

def collect_words_for_tag(corpus: Union[List[str], StreamBackedCorpusView]) -> Dict[str, Set[str]]:
    '''
    :param corpus: sequence of tokens that represents the text corpus
    :return: dict that maps each tag to a set of tokens that were assigned this tag in the corpus
    '''
    # Initialize an empty list to store the tagged tokens
    tagged_corpus = []

    # Iterate through the sentences in the corpus and tag their tokens
    for sentence in corpus:
        tagged_sentence = nltk_pos_tag(sentence)
        tagged_corpus.extend(tagged_sentence)

    # Create a dictionary to store words for each POS tag
    words_by_tag = {}

    # Iterate through the tagged tokens and collect words for each tag
    for token, pos_tag in tagged_corpus:
        if pos_tag not in words_by_tag:
            words_by_tag[pos_tag] = set()
        words_by_tag[pos_tag].add(token)

    return words_by_tag


# # Sample corpus
# sample_corpus = ["This", "is", "a", "sample", "sentence", "for", "testing", "POS", "tagging", "."]

# # Test the function with the sample corpus
# result = collect_words_for_tag(sample_corpus)

# # Print the results
# for pos_tag, words in result.items():
#     print(f"POS Tag: {pos_tag}")
#     print(f"Words: {words}")

[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /Users/yinhsuan/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


__b)__ Implement a function ``generate_sentences`` that gets a sentence and a POS dictionary (assume the POS dictionary was generated by your function in __a)__) as input and generates ``n`` sequences of words with the same tag structure. The words in your generated sequence should be randomly taken from the set of words associated with the current tag. 

Additionally, the user should have the option to achieve sentences of ``better_quality``. Thus, if ``better_quality=True``, make sure that the tag structure of the output sentences actually matches the tag structure of the input sentence, as the tags may change depending on the context. 

You can assume that the training corpus is large enough to include all possible POS tags. __(2 pts)__

_Hint: consider the_ ``random`` _module_

In [21]:
import random
from typing import List, Dict, Set
from nltk import pos_tag

def generate_rand(sentence: List[str], pos_dict: Dict[str, Set[str]], n: int, better_quality: bool=False) -> List[List[str]]:
    generated_sentences = []
    input_tags = []  # To store the POS tags of the input sentence.

    # Tokenize the input sentence into words and POS tags.
    for word in sentence:
        for pos_tag, word_set in pos_dict.items():
            if word in word_set:
                input_tags.append(pos_tag)
                break

    if len(input_tags) == 0:
        # If any word in the input sentence doesn't have a corresponding POS tag in the pos_dict, return an empty list.
        return []

    for _ in range(n):
        generated_sentence = []

        for input_word, input_tag in zip(sentence, input_tags):
            if better_quality:
                # If better_quality is True, select a random word from the set associated with the input POS tag.
                generated_word = random.choice(list(pos_dict[input_tag]))
            else:
                # If better_quality is False, select a random word from any set in the pos_dict.
                generated_word = random.choice(list(random.choice(list(pos_dict.values()))))

            generated_sentence.append(generated_word)

        generated_sentences.append(generated_sentence)

    return generated_sentences

# Sample input sentence with POS tags
input_sentence = [
    ("This", "DT"),
    ("is", "VBZ"),
    ("a", "DT"),
    ("sample", "NN"),
    ("sentence", "NN"),
    ("for", "IN"),
    ("testing", "VBG"),
    ("POS", "NNS"),
    ("tagging", "NN"),
    (".", ".")
]

# Create a sample POS dictionary
pos_dict = {
    "DT": {"the", "this", "a"},
    "VBZ": {"is", "am", "are"},
    "NN": {"cat", "dog", "book"},
    "IN": {"for", "on", "with"},
    "VBG": {"running", "playing", "working"},
    "NNS": {"cats", "dogs", "books"},
    ".": {".", "!", "?"}
}

# Extract words from the input sentence
input_words = [word for word, _ in input_sentence]

# Generate 5 random sentences with the same tag structure as the input sentence
generated_sentences = generate_rand(input_words, pos_dict, n=5, better_quality=False)

# Print the generated sentences
for i, sentence in enumerate(generated_sentences, start=1):
    sentence_str = ' '.join([word + "/" + tag for word, (word, tag) in zip(sentence, input_sentence)])
    print(f"Sentence {i}: {sentence_str}")


Sentence 1: This/DT is/VBZ a/DT sample/NN
Sentence 2: This/DT is/VBZ a/DT sample/NN
Sentence 3: This/DT is/VBZ a/DT sample/NN
Sentence 4: This/DT is/VBZ a/DT sample/NN
Sentence 5: This/DT is/VBZ a/DT sample/NN


__c)__ Using the input sentence ``This test is very difficult``, test your implementation to generate 10 sentences based on  

* "Emma" by Jane Austen

* The "King James Bible"

Store your POS dictionary in ``emma_tags``and ``bible_tags``, respectively. Your generated sentences should be stored in ``emma_sent`` and ``bible_sent``. __(2 pts)__

In [15]:
sent = ["This", "test", "is", "very", "difficult"]

In [22]:
# your code here
from nltk.corpus import gutenberg

# Download the "Emma" and "King James Bible" corpora
nltk.download('gutenberg')
# nltk.download('punkt')

# Load the "Emma" and "King James Bible" corpora
# emma_corpus = [sent for sent in nltk.corpus.gutenberg.sents('austen-emma.txt') if not any(word.isdigit() for word in sent)]
# bible_corpus = [sent for sent in nltk.corpus.gutenberg.sents('bible-kjv.txt') if not any(word.isdigit() for word in sent)]
emma_corpus = gutenberg.sents('austen-emma.txt')
bible_corpus = gutenberg.sents('bible-kjv.txt')

# Prepare POS dictionaries for "Emma" and "King James Bible"
emma_tags = collect_words_for_tag(emma_corpus)
bible_tags = collect_words_for_tag(bible_corpus)

# Generate sentences based on "Emma" corpus
emma_sent = generate_rand(sent, emma_tags, n=10, better_quality=False)
print(emma_sent)

# Generate sentences based on "King James Bible" corpus
bible_sent = generate_rand(sent, bible_tags, n=10, better_quality=False)
print(bible_sent)

[nltk_data] Downloading package gutenberg to
[nltk_data]     /Users/yinhsuan/nltk_data...
[nltk_data]   Package gutenberg is already up-to-date!


[['what', 'sooner', 'surprize', 'endeavour'], ['indisputable', 'aloud', 'farther', ')'], ['tottering', 'owned', 'across', 'Do'], [';', 'up', 'how', 'speedy'], ['weather', 'ashamed', 'weather', 'attend'], ['gratification', 'understood', '"', 'flatterer'], ['amid', 'Such', '(', 'Coles'], ['s', 'mis', 'fifty', 'Prince'], ['softer', '(', '?', '('], ['acquire', '.', 'suppose', 'longer']]
[['wherein', 'thee', 'To'], ['speaketh', 'ye', 'set'], [',', 'their', 'reasons'], ['dipped', 'Oh', 'no'], [';)', 'must', 'most'], ['.', 'therein', 'Jasiel'], ['mourneth', ')', ')'], ['!', 'riot', ','], ['manna', 'denied', '('], ['killest', 'behind', 'kin']]


### Task 2: The Viterbi algorithm (12 points)
Implement the Viterbi algorithm as introduced in the lecture (lecture 8, slide 20) and the exercise. The input of your function is a sentence that should be tagged, a dictionary with state transition probabilites and a dictionary with word emission probabilities. You may assume that the _transition probabilities_ are complete, i.e. the dictionary includes every combination of states. In contrast, we assume that all combinations of words and POS tags that are not in the dictionary of _emission probabilities_ have an emission probability of 0.

The function should return a list of POS tags, s.t. that each tag corresponds to a word of the input sentence. Moreover, return the probability of the sequence of POS tags that you found. 

You can test your function on the given example that was discussed in the Pen&Paper exercise. For the sentence ``the fans watch the race`` and the provided probabilities, your function should return the POS tag sequence ``['DT', 'N', 'V', 'DT', 'N']`` and a probability of ``9.720000000000002e-06``.

Additionally, implement beam search in the viterbi algorithm. The beam size is defined by the parameter `beam`. For example for `beam=2` we only keep the best 2 scores per column in each step and discard the rest. You may use the example from the lecture to test your implementation.

In [18]:
# test sentence
sentence = ["the", "fans", "watch", "the", "race"]

# state transition probabilities (complete)
state_trans_prob = {('<s>','DT'):0.8,('<s>','N'):0.2,('<s>','V'):0.0,
                    ('DT','DT'):0.0,('DT','N'):0.9,('DT','V'):0.1,
                    ('N','DT'):0.0,('N','N'):0.5,('N','V'):0.5,
                    ('V','DT'):0.5,('V','N'):0.5,('V','V'):0.0}

# word emission probabilities (not complete, all combinations that are not present have probability 0)
word_emission_prob = {('the','DT'):0.2, ('fans','N'):0.1,('fans','V'):0.2,('watch','N'):0.3,
                      ('watch','V'):0.15,('race','N'):0.1,('race','V'):0.3}

In [20]:
from typing import List, Dict, Tuple

def Viterbi(sentence: List[str], trans_prob: Dict[Tuple[str,str], float], emiss_prob: Dict[Tuple[str,str], float], beam: int=0) -> (List[str], float):
    '''
    :param sentence: sentence that we want to tag
    :param trans_prob: dict with state transition probabilities
    :param emiss_prob: dict with word emission probabilities
    :param beam: beam size for beam search. If 0, don't apply beam search
    :returns: 
        - list with POS tags for each input word
        - float that indicates the probability of the tag sequence
    '''
    # Initialize matrices to store probabilities and backpointers
    n = len(sentence)
    states = list(set(state for state, _ in trans_prob.keys()))
    num_states = len(states)
    probabilities = [[0.0] * num_states for _ in range(n)]
    backpointers = [[-1] * num_states for _ in range(n)]

    # Initialize the first column with initial probabilities
    for j, state in enumerate(states):
        word_state = (sentence[0], state)
        if word_state in emiss_prob:
            probabilities[0][j] = emiss_prob[word_state]
        else:
            probabilities[0][j] = 0.0

    # Viterbi algorithm
    for i in range(1, n):
        for j, state in enumerate(states):
            max_prob = 0.0
            best_prev_state = None

            for k, prev_state in enumerate(states):
                transition_prob = trans_prob.get((prev_state, state), 0.0)
                word_state = (sentence[i], state)
                emiss_prob_value = emiss_prob.get(word_state, 0.0)

                prev_prob = probabilities[i - 1][k]
                current_prob = prev_prob * transition_prob * emiss_prob_value

                if current_prob > max_prob:
                    max_prob = current_prob
                    best_prev_state = k

            probabilities[i][j] = max_prob
            backpointers[i][j] = best_prev_state

    # Trace back to find the best path
    best_path = [0] * n
    max_prob = max(probabilities[-1])
    best_path[-1] = probabilities[-1].index(max_prob)
    for i in range(n - 2, -1, -1):
        best_path[i] = backpointers[i + 1][best_path[i + 1]]

    best_path_tags = [states[state_index] for state_index in best_path]

    # Calculate the probability of the best path
    probability = max_prob

    if beam == 0:
        return best_path_tags, probability
    else:
        return apply_beam_search(probabilities, backpointers, states, beam)

def apply_beam_search(probabilities, backpointers, states, beam):
    n = len(probabilities)
    num_states = len(states)
    best_path = [0] * n
    max_probs = [0.0] * num_states

    for i in range(n - 1, -1, -1):
        sorted_states = sorted(range(num_states), key=lambda j: -probabilities[i][j])
        for j in range(min(beam, num_states)):
            state = sorted_states[j]
            best_path[i] = state
            max_probs[state] = probabilities[i][state]

        # Update backpointers for the next column
        if i > 0:
            for j in range(num_states):
                backpointers[i - 1][j] = best_path[i]

    best_path_tags = [states[state_index] for state_index in best_path]
    probability = max(max_probs)

    return best_path_tags, probability

# Without beam search
result_no_beam = Viterbi(sentence, state_trans_prob, word_emission_prob)
print("Viterbi without beam search:", result_no_beam)

# With beam search (beam size = 2)
result_with_beam = Viterbi(sentence, state_trans_prob, word_emission_prob, beam=2)
print("Viterbi with beam search:", result_with_beam)

Viterbi without beam search: (['DT', 'N', 'V', 'DT', 'N'], 1.215e-05)
Viterbi with beam search: (['V', 'V', 'V', 'V', 'V'], 0.2)


### Task 3: ML Basics - Naive Bayes Classification (10pts)

### Task 3: ML Basics - Naive Bayes Classification (12pts)
In this task, we want to build a Naive Bayes classifier with add-1 smoothing for text classification (pseudocode given below), e.g., to assign a category to a document. Use the class-skeleton provided below for your implementation.

#### Naive Bayes Pseudocode
##### TrainMultiNomialNB($\mathbb C$,$\mathbb D$)  
$V \leftarrow extractVocabulary(\mathbb D)$  
$N \leftarrow countDocs(\mathbb D)$    
for $c \in \mathbb C$:  
&nbsp;&nbsp;&nbsp;&nbsp;$N_c \leftarrow countDocsInClass(\mathbb D, c)$  
&nbsp;&nbsp;&nbsp;&nbsp;$prior[c] \leftarrow \frac{N_c}{N}$  
&nbsp;&nbsp;&nbsp;&nbsp;$text_c \leftarrow concatenateTextOfAllDocsInClass(\mathbb D, c)$   
&nbsp;&nbsp;&nbsp;&nbsp;for $t \in V$:  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;$T_{ct} \leftarrow countTokensOfTerm(text_c,t)$  
&nbsp;&nbsp;&nbsp;&nbsp;for $t \in V$:  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;$condprob[t][c] \leftarrow \frac{T_{ct} + 1}{\sum_{t'}(T_{ct'} + 1)}$  
return $V,prior,condprob$

##### ApplyMultinomialNB($\mathbb C,V,prior,condprob,d$)
$W \leftarrow extractTokensFromDoc(V,d)$   
for $c \in \mathbb C$:  
&nbsp;&nbsp;&nbsp;&nbsp;$score[c] \leftarrow log(prior[c])$  
&nbsp;&nbsp;&nbsp;&nbsp;for $t \in W$:  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;$score[c] += log(condprob[t][c])$  
return $argmax_{c \in \mathbb C} score[c]$

__a) Tokenization (1pt)__  
Implement the function `tokenize` to transform a text document to a list of tokens with the regex pattern `\b\w\w+\b`. Transform all tokens to lowercase.

__b) Naive Bayes "Training" (6pts)__  
Implement the `__init__` function to set up the Naive Bayes Model. Cf. TrainMultiNomialNB($\mathbb C$,$\mathbb D$) in the pseudocode above. Contrary to the pseudocode, the `__init__` function should not return anything, but the vocabulary, priors and conditionals should be stored in class variables. We only want to keep tokens with a frequeny > `min_count` in the vocabulary.

__c) Naive Bayes Classification (3pts)__  
Implement the `classify` function to return the most probable class for the provided document according to your Naive Bayes model.

In [5]:
class NaiveBayesClassifier:
    '''Naive Bayes for text classification.'''
    def __init__(self, docs: List[str], labels: List[int], min_count: int=1):
        '''
        :param docs: list of documents from which to build the model (corpus)
        :param labels: list of classes assigned to the list of documents (labels[i] is the class for docs[i])
        :param min_count: minimum frequency of token in vocabulary (tokens that occur less times are discarded)
        '''
        # your code for Task 3b) here
                
    def tokenize(self, doc: str):
        '''
        :param doc: document to tokenize
        :return: document as a list of tokens
        '''
        # your code for Task 3a) here

    def classify(self, doc: str):
        '''
        :param doc: document to classify
        :return: most probable class
        '''
        # your code for Task 3c) here

__d) Evaluation (2pts)__
Test your implementation on the 20newsgroups dataset. If implemented correctly, with `min_count=1` your Naive Bayes classifier should obtain the same accuracy as the implementation by scikit-learn (see below for comparison).

In [6]:
# your code here

In [4]:
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import accuracy_score

# see https://scikit-learn.org/0.19/datasets/twenty_newsgroups.html for details
from sklearn.datasets import fetch_20newsgroups
train = fetch_20newsgroups(subset='train')
test = fetch_20newsgroups(subset='test')

vectorizer = CountVectorizer()
x = vectorizer.fit_transform(train.data)
clf = MultinomialNB()
clf.fit(x,train.target)

pred = clf.predict(vectorizer.transform(test.data))

accuracy_score(test.target,pred)