Text Analytics I HWS 23/24

# Home Assignment 3 (30pts)

Submit your solution via Ilias until 23.59h on Wednesday, November 8th. Late submissions are accepted until 12:00am on the following day, with 1/4 of the total possible points deducted from the score.

Submit your solutions in teams of 3-4 students. Unless explicitly agreed otherwise in advance, **submissions from teams with more or less members will NOT be graded**.
List all members of the team with their student ID in the cell below, and submit only one notebook per team. Only submit a notebook, do not submit the dataset(s) you used. Also, do NOT compress/zip your submission!

You may use the code from the exercises and basic functionalities that are explained in official documentation of Python packages without citing, __all other sources must be cited__. In case of plagiarism (copying solutions from other teams or from the internet) ALL team members may be expelled from the course without warning.

#### General guidelines:
* Make sure that your code is executable, any task for which the code does not directly run on our machine will be graded with 0 points.
* If you use packages that are not available on the default or conda-forge channel, list them below. Also add a link to installation instructions. 
* Ensure that the notebook does not rely on the current notebook or system state!
  * Use `Kernel --> Restart & Run All` to see if you are using any definitions, variables etc. that 
    are not in scope anymore.
  * Do not rename any of the datasets you use, and load it from the same directory that your ipynb-notebook is located in, i.e., your working directory.
* Make sure you clean up your code before submission, e.g., properly align your code, and delete every line of code that you do not need anymore, even if you may have experimented with it. Minimize usage of global variables. Avoid reusing variable names multiple times!
* Ensure your code/notebook terminates in reasonable time.
* Feel free to use comments in the code. While we do not require them to get full marks, they may help us in case your code has minor errors.
* For questions that require a textual answer, please do not write the answer as a comment in a code cell, but in a Markdown cell below the code. Always remember to provide sufficient justification for all answers.
* You may create as many additional cells as you want, just make sure that the solutions to the individual tasks can be found near the corresponding assignment.
* If you have any general question regarding the understanding of some task, do not hesitate to post in the student forum in Ilias, so we can clear up such questions for all students in the course.

In [1]:
# studentIDs of all team members
team_members = [1966868,1967897, 1968154, 1978986, 1951865]

Additional packages (if any):
 - Example: `powerlaw`, https://github.com/jeffalstott/powerlaw

In [2]:
from typing import List, Union, Dict, Set, Tuple
from numpy.typing import NDArray
import nltk

### Task 1: POS tagging (6 points)

In this task, we want to explore sentences with similar part of speech (POS) tag structure. For this, we need a corpus of text with tags. We will generate such a corpus by using NLTK’s currently recommended POS tagger to tag a given list of tokens (https://www.nltk.org/api/nltk.tag.html).

In [3]:
# NLTK's off-the-shelf POS tagger
from nltk import pos_tag

__a)__ Given a corpus of text ``corpus`` as a sequence of tokens, we want to collect all words that are tagged with a certain POS tag. Implement a function ``collect_words_for_tag`` that first tags the given corpus using NLTK's off-the-shelf tagger imported in the cell above. Then, for each POS tag, collect all words that were tagged with it. You should return a dictionary that maps each POS tag that was observed to the set of words that were assigned this tag in the given corpus. __(2 pts)__

In [4]:
from collections import defaultdict
from nltk.corpus.reader.util import StreamBackedCorpusView 
from nltk.tokenize import word_tokenize
# nltk.download('averaged_perceptron_tagger') requires this download


def collect_words_for_tag(corpus: Union[List[str], StreamBackedCorpusView]) -> Dict[str, Set[str]]:
    '''
    :param corpus: sequence of tokens that represents the text corpus
    :return: dict that maps each tag to a set of tokens that were assigned this tag in the corpus
    '''
    tags = defaultdict(set)
    for token, tag in pos_tag(corpus):
        tags[tag].add(token)
    return tags

test = collect_words_for_tag(word_tokenize("This is a simple test"))
print(test) 

defaultdict(<class 'set'>, {'DT': {'a', 'This'}, 'VBZ': {'is'}, 'JJ': {'simple'}, 'NN': {'test'}})


__b)__ Implement a function ``generate_sentences`` that gets a sentence and a POS dictionary (assume the POS dictionary was generated by your function in __a)__) as input and generates ``n`` sequences of words with the same tag structure. The words in your generated sequence should be randomly taken from the set of words associated with the current tag. 

Additionally, the user should have the option to achieve sentences of ``better_quality``. Thus, if ``better_quality=True``, make sure that the tag structure of the output sentences actually matches the tag structure of the input sentence, as the tags may change depending on the context. 

You can assume that the training corpus is large enough to include all possible POS tags. __(2 pts)__

_Hint: consider the_ ``random`` _module_

In [5]:
import random
def generate_rand(sentence: List[str], pos_dict: Dict[str, Set[str]], n: int, better_quality: bool=False) -> List[List[str]]:
    '''
    :param sentence: input sentence that sets the tag pattern
    :param pos_dict: maps each tag to a list of associated words
    :param n: number of sentences that should be generated
    :return: List of sentences with the same tag structure as the input sentence
    '''
    
    # create a list of tags from the input sentence
    tags = [tag for _, tag in pos_tag(sentence)]

    # create a list of sentences
    sentences = []
    for _ in range(n):
        # create a list of sentences
        new_sentance = [random.choice(list(pos_dict[t])) for t in tags]
        if (better_quality):
            new_tags = [tag for _, tag in pos_tag(new_sentance)]
            while new_tags != tags:
                new_sentance = [random.choice(list(pos_dict[t])) for t in tags]
                new_tags = [tag for _, tag in pos_tag(sentence)]

        sentences.append(new_sentance)

    return sentences

__c)__ Using the input sentence ``This test is very difficult``, test your implementation to generate 10 sentences based on  

* "Emma" by Jane Austen

* The "King James Bible"

Store your POS dictionary in ``emma_tags``and ``bible_tags``, respectively. Your generated sentences should be stored in ``emma_sent`` and ``bible_sent``. __(2 pts)__

In [6]:
sent = ["This", "test", "is", "very", "difficult"]

In [7]:
emma_jane_austen = nltk.corpus.gutenberg.words('austen-emma.txt')
king_james_bible = nltk.corpus.gutenberg.words('bible-kjv.txt')

emma_tags = collect_words_for_tag(emma_jane_austen)
king_tags = collect_words_for_tag(king_james_bible)

In [8]:
emma_sent = generate_rand(sent, emma_tags, 10, better_quality=True)
king_sent = generate_rand(sent, king_tags, 10)

for s in emma_sent:
    print(s)

['either', 'improper', 'changes', 'thoughtfully', 'nephews']
['a', 'carriage', 'means', '"', 'argument']
['every', 'poverty', 'saves', 'accordingly', '_any_']
['this', 'penitence', 'is', 'openly', 'sudden']
['both', 'guinea', 'plays', 'agreeably', 'depressed']
['these', '_purport_', 'sets', 'later', 'destin']
['all', 'eating', 'passes', 'formerly', 'entire']
['loth', 'grandeur', 'awes', 'Absolutely', 'frozen']
['some', 'repugnance', 'sposo', 'composedly', 'honour']
['Every', 'scholar', 'turns', 'inexpressibly', 'untainted']


### Task 2: The Viterbi algorithm (12 points)
Implement the Viterbi algorithm as introduced in the lecture (lecture 8, slide 20) and the exercise. The input of your function is a sentence that should be tagged, a dictionary with state transition probabilites and a dictionary with word emission probabilities. You may assume that the _transition probabilities_ are complete, i.e. the dictionary includes every combination of states. In contrast, we assume that all combinations of words and POS tags that are not in the dictionary of _emission probabilities_ have an emission probability of 0.

The function should return a list of POS tags, s.t. that each tag corresponds to a word of the input sentence. Moreover, return the probability of the sequence of POS tags that you found. 

You can test your function on the given example that was discussed in the Pen&Paper exercise. For the sentence ``the fans watch the race`` and the provided probabilities, your function should return the POS tag sequence ``['DT', 'N', 'V', 'DT', 'N']`` and a probability of ``9.720000000000002e-06``.

Additionally, implement beam search in the viterbi algorithm. The beam size is defined by the parameter `beam`. For example for `beam=2` we only keep the best 2 scores per column in each step and discard the rest. You may use the example from the lecture to test your implementation.

In [9]:
# test sentence
sentence = ["the", "fans", "watch", "the", "race"]

# state transition probabilities (complete)
state_trans_prob = {('<s>','DT'):0.8,('<s>','N'):0.2,('<s>','V'):0.0,
                    ('DT','DT'):0.0,('DT','N'):0.9,('DT','V'):0.1,
                    ('N','DT'):0.0,('N','N'):0.5,('N','V'):0.5,
                    ('V','DT'):0.5,('V','N'):0.5,('V','V'):0.0}

# word emission probabilities (not complete, all combinations that are not present have probability 0)
word_emission_prob = {('the','DT'):0.2, ('fans','N'):0.1,('fans','V'):0.2,('watch','N'):0.3,
                      ('watch','V'):0.15,('race','N'):0.1,('race','V'):0.3}

In [10]:
import time

start_time = time.time()
import numpy as np
def Viterbi(sentence: List[str], trans_prob: Dict[Tuple[str,str], float], emiss_prob: Dict[Tuple[str,str], float], beam: int=0) -> (List[str], float):
    '''
    :param sentence: sentence that we want to tag
    :param trans_prob: dict with state transition probabilities
    :param emiss_prob: dict with word emission probabilities
    :param beam: beam size for beam search. If 0, don't apply beam search
    :returns: 
        - list with POS tags for each input word
        - float that indicates the probability of the tag sequence
    '''
    all_possible_pos_tags = set([tag for _, tag in emiss_prob.keys()])
    viterbi = np.zeros(
        shape=(len(all_possible_pos_tags), len(sentence)), dtype=float)
    backpointer = np.zeros(
        shape=(len(all_possible_pos_tags), len(sentence)), dtype=int)
    
    # initialize first column
    for i, tag in enumerate(all_possible_pos_tags):
        viterbi[i,0] = trans_prob[('<s>', tag)] * emiss_prob.get((sentence[0], tag), 0)
        # backpointer[i,0] = 0

    # Recursion step
    for t in range(1,len(sentence)):
        for i, tag in enumerate(all_possible_pos_tags):
            if beam == 0:
                max_prob = -1
                max_prob_tag = None
                for j, prev_tag in enumerate(all_possible_pos_tags):
                    prob = viterbi[j, t-1] * trans_prob.get((prev_tag, tag), 0) * emiss_prob.get((sentence[t], tag), 0)
                    if prob > max_prob:
                        max_prob = prob
                        max_prob_tag = prev_tag
                viterbi[i, t] = max_prob
                backpointer[i, t] = list(all_possible_pos_tags).index(max_prob_tag)
            else:
                top_k_probs = []
                top_k_tags = []
                for j, prev_tag in enumerate(all_possible_pos_tags):
                    prob = viterbi[j, t-1] * trans_prob.get((prev_tag, tag), 0) * emiss_prob.get((sentence[t], tag), 0)
                    if prob > 0:
                        top_k_probs.append(prob)
                        top_k_tags.append(j)
                if len(top_k_probs) > beam:
                    top_k_probs, top_k_tags = zip(*sorted(zip(top_k_probs, top_k_tags), reverse=True)[:beam])
                for k, prob in enumerate(top_k_probs):
                    viterbi[k, t] = prob
                    backpointer[k, t] = top_k_tags[k]


    # Termination step
    best_path_prob = np.max(viterbi[:, t])
    best_path_pointer = np.argmax(viterbi[:, t])
    best_path = [best_path_pointer]
    for t in range(len(sentence)-1, 0, -1):
        best_path_pointer = backpointer[best_path_pointer, t]
        best_path.append(best_path_pointer)
    best_path.reverse()
    best_path = [list(all_possible_pos_tags)[i] for i in best_path]
    return best_path, best_path_prob


best_path, best_path_prob = Viterbi(sentence, state_trans_prob, word_emission_prob, beam=0)
print(best_path)
print(best_path_prob) 
print("--- %s seconds ---" % (time.time() - start_time))

['DT', 'N', 'V', 'DT', 'N']
9.720000000000002e-06
--- 0.0009989738464355469 seconds ---


### Task 3: ML Basics - Naive Bayes Classification (10pts)

### Task 3: ML Basics - Naive Bayes Classification (12pts)
In this task, we want to build a Naive Bayes classifier with add-1 smoothing for text classification (pseudocode given below), e.g., to assign a category to a document. Use the class-skeleton provided below for your implementation.

#### Naive Bayes Pseudocode
##### TrainMultiNomialNB($\mathbb C$,$\mathbb D$)  
$V \leftarrow extractVocabulary(\mathbb D)$  
$N \leftarrow countDocs(\mathbb D)$    
for $c \in \mathbb C$:  
&nbsp;&nbsp;&nbsp;&nbsp;$N_c \leftarrow countDocsInClass(\mathbb D, c)$  
&nbsp;&nbsp;&nbsp;&nbsp;$prior[c] \leftarrow \frac{N_c}{N}$  
&nbsp;&nbsp;&nbsp;&nbsp;$text_c \leftarrow concatenateTextOfAllDocsInClass(\mathbb D, c)$   
&nbsp;&nbsp;&nbsp;&nbsp;for $t \in V$:  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;$T_{ct} \leftarrow countTokensOfTerm(text_c,t)$  
&nbsp;&nbsp;&nbsp;&nbsp;for $t \in V$:  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;$condprob[t][c] \leftarrow \frac{T_{ct} + 1}{\sum_{t'}(T_{ct'} + 1)}$  
return $V,prior,condprob$

##### ApplyMultinomialNB($\mathbb C,V,prior,condprob,d$)
$W \leftarrow extractTokensFromDoc(V,d)$   
for $c \in \mathbb C$:  
&nbsp;&nbsp;&nbsp;&nbsp;$score[c] \leftarrow log(prior[c])$  
&nbsp;&nbsp;&nbsp;&nbsp;for $t \in W$:  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;$score[c] += log(condprob[t][c])$  
return $argmax_{c \in \mathbb C} score[c]$

__a) Tokenization (1pt)__  
Implement the function `tokenize` to transform a text document to a list of tokens with the regex pattern `\b\w\w+\b`. Transform all tokens to lowercase.

__b) Naive Bayes "Training" (6pts)__  
Implement the `__init__` function to set up the Naive Bayes Model. Cf. TrainMultiNomialNB($\mathbb C$,$\mathbb D$) in the pseudocode above. Contrary to the pseudocode, the `__init__` function should not return anything, but the vocabulary, priors and conditionals should be stored in class variables. We only want to keep tokens with a frequeny > `min_count` in the vocabulary.

__c) Naive Bayes Classification (3pts)__  
Implement the `classify` function to return the most probable class for the provided document according to your Naive Bayes model.

In [11]:
import re
import numpy as np
from collections import Counter, defaultdict

class NaiveBayesClassifier:
    '''Naive Bayes for text classification.'''


    def __init__(self, docs: List[str], labels: List[int], min_count: int=1):
        '''
        :param docs: list of documents from which to build the model (corpus)
        :param labels: list of classes assigned to the list of documents (labels[i] is the class for docs[i])
        :param min_count: minimum frequency of token in vocabulary (tokens that occur less times are discarded)
        '''
        self.docs_tokens = [self.tokenize(doc) for doc in docs]
        self.V = set([token for doc in self.docs_tokens for token in doc])

        # remove tokens that occur less than min_count times
        token_counts = Counter([token for doc in self.docs_tokens for token in doc])
        self.V = set([token for token in self.V if token_counts[token] > min_count])
        # if we include tokens with the mincount we get the same result at sklearns NBClassifier
        
        N = len(self.docs_tokens)
        classes = set(labels)
        # compute prior probabilities
        self.prior = Counter(labels)
        self.conditional = defaultdict(dict)
        for c in classes:
            self.prior[c] /= N

            all_doc_c = [doc for doc, label in zip(self.docs_tokens, labels) if label == c]

            all_text_c_counts = Counter(token for doc in all_doc_c for token in doc)
            sum_all_text_c_counts = sum(all_text_c_counts.values())

            for token in self.V:
                self.conditional[c][token] = (all_text_c_counts[token] +1) / (sum_all_text_c_counts + len(self.V))
                
    def tokenize(self, doc: str):
        '''
        :param doc: document to tokenize
        :return: document as a list of tokens
        '''
        return re.findall(r'\b\w\w+\b', doc.lower())
        

    def classify(self, doc: str):
        '''
        :param doc: document to classify
        :return: most probable class
        '''
        # your code for Task 3c) here
        tokens = self.tokenize(doc)
        tokens = [token for token in tokens if token in self.V]
        probs = {}
        for c in self.prior.keys():
            probs[c] = np.log(self.prior[c])
            for token in tokens:
                probs[c] += np.log(self.conditional[c][token])
        # print(probs)
        return max(probs.items(), key=lambda x: x[1])[0]

__d) Evaluation (2pts)__
Test your implementation on the 20newsgroups dataset. If implemented correctly, with `min_count=1` your Naive Bayes classifier should obtain the same accuracy as the implementation by scikit-learn (see below for comparison).

In [12]:
# see https://scikit-learn.org/0.19/datasets/twenty_newsgroups.html for details
from sklearn.datasets import fetch_20newsgroups
train = fetch_20newsgroups(subset='train')
test = fetch_20newsgroups(subset='test')

In [13]:
# train the model
model = NaiveBayesClassifier(train.data, train.target)
print(model.classify("God is love"))

15


In [14]:
accuracy = 0
for i, doc in enumerate(test.data):
    if model.classify(doc) == test.target[i]:
        accuracy += 1
accuracy /= len(test.data)
print(accuracy)

0.7939458311205523


If we include tokens with the mincount we get the same result at sklearns NBClassifier.

token_counts[token] > min_count results in 0.79
token_counts[token] >= min_count results in 0.77


In [16]:
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import accuracy_score

test = fetch_20newsgroups(subset='test')

vectorizer = CountVectorizer()
x = vectorizer.fit_transform(train.data)
clf = MultinomialNB()
clf.fit(x,train.target)

pred = clf.predict(vectorizer.transform(test.data))

accuracy_score(test.target,pred)

0.7728359001593202