Text Analytics I HWS 22/23

# Home Assignment 2 (25 pts)



Submit your solution via Ilias until 23.59pm on Tuesday, October 25th. Late submissions are accepted until 10:15am on the following day (start of the exercise), with 1/4 of the total possible points deducted from the score.

Submit your solutions in teams of 3-4 students. Unless explicitly agreed otherwise in advance, **submissions from teams with more or less members will NOT be graded**.
List all members of the team with their student ID and full name in the cell below, and submit only one notebook per team. Only submit a notebook, do not submit the dataset(s) you used or image files that you have created - these have to be created from your notebook. Also, do NOT compress/zip your submission!

You may use the code from the exercises and basic functionalities that are explained in the official documentation of Python packages without citing, __all other sources must be cited__. In case of plagiarism (copying solutions from other teams or from the internet) ALL team members will be expelled from the course without warning.

#### General guidelines:
* Make sure that your code is executable, any task for which the code does not directly run on our machine will be graded with 0 points.
* Use only packages that are automatically installed along with Anaconda, plus some additional packages that have been introduced in the context of this class.
* Ensure that the notebook does not rely on the current notebook or system state!
  * Use `Kernel --> Restart & Run All` to see if you are using any definitions, variables etc. that 
    are not in scope anymore.
  * Do not rename any of the datasets you use, and load it from the same directory that your ipynb-notebook is located in, i.e., your working directory.
* Make sure you clean up your code before submission, e.g., properly align your code, and delete every line of code that you do not need anymore, even if you may have experimented with it. Minimize usage of global variables. Do not reuse variable names multiple times!
* Ensure your code/notebook terminates in reasonable time.
* Feel free to use comments in the code. While we do not require them to get full marks, they may help us in case your code has minor errors.
* For questions that require a textual answer, please do not write the answer as a comment in a code cell, but in a Markdown cell below the code. Always remember to provide sufficient justification for all answers.
* You may create as many additional cells as you want, just make sure that the solutions to the individual tasks can be found near the corresponding assignment.
* If you have any general question regarding the understanding of some task, do not hesitate to post in the student forum in Ilias, so we can clear up such questions for all students in the course.

In [1]:
# credentials of all team members
team_members = [
    {
        'first_name': 'Qi',
        'last_name': 'Jiang',
        'student_id': 1820722
    },
    {
        'first_name': 'Jiatong',
        'last_name': 'Yu',
        'student_id': 1908029
    },
    {
        'first_name': 'Yongyi',
        'last_name': 'Zheng',
        'student_id': 1823267
    },
    {
        'first_name': 'Zihang',
        'last_name': 'Chen',
        'student_id': 1819574
    }
]

In [3]:
from typing import List, Union, Dict, Set, Tuple, Sequence

### Task 1: WordNet word similarity (9 points)

In this task, we want to implement the similarity between two words in WordNet (https://www.nltk.org/api/nltk.corpus.reader.wordnet.html) using the NLTK package. The word similarity between two words is given by
$$
\frac{1}{1+d}
$$
where $d$ is the distance of the shortest path in the hypernym/hyponym hierarchy tree in WordNet between any pair of synsets that are associated with the two input words.

From NLTK you should __only__ use the `synsets`, `hypernyms` and `instance_hpyernyms` functions.

The following subtasks build on each other, i.e. the functions of the preceding subtasks can be used for the current subtask.

_Note: the distance of a synset to itself is 0, the distance to a direct hypernym is 1, ..._

In [19]:
from nltk.corpus import wordnet as wn
from nltk.corpus.reader.wordnet import Synset

__a)__ Write a function ``shortest_paths_to`` that takes a synset as input and computes the shortest paths to all nodes on the way to the root in the hypernym/hyponym hierarchy tree of WordNet. The function should return a dictionary that matches all visited hypernyms on the way(s) to the root to the distance of the shortest path from the input synset. Consider that a synset might have multiple paths to the root and that some nodes might appear in multiple paths. However, we only want to store the shortest distances. Moreover, keep in mind that the input synset might be an instance. __(3 pts)__

Use the signature in the cell below.

__Example:__ _Calling_ ``shortest_paths_to(s)`` _on the synset_ ``s = wn.synset('calculator.n.01')`` _should yield the following result:_

``
{Synset('calculator.n.01'): 0,
 Synset('expert.n.01'): 1,
 Synset('person.n.01'): 2,
 Synset('causal_agent.n.01'): 3,
 Synset('organism.n.01'): 3,
 Synset('physical_entity.n.01'): 4,
 Synset('living_thing.n.01'): 4,
 Synset('entity.n.01'): 5,
 Synset('whole.n.02'): 5,
 Synset('object.n.01'): 6}
``

In [47]:
def shortest_paths_to(start_syn: Synset) -> Dict[Synset, int]:
    """Compute the shortest distance to all nodes on paths to the root.
    :param start_syn: synset to which we want to compute the shortest distances
    :return: dict that matches all visited hypernyms to their distance to the input synset  
    """ 
    # your code here
    res = {start_syn: 0}
    for ss in start_syn.hypernyms() + start_syn.instance_hypernyms(): # iterates through all hypernyms of start_syn when start_syn is not the root, otherwise returns entity: 0
        shortest_paths_from_ss = {k: v+1 for k, v in shortest_paths_to(ss).items()} # calculates all shortest paths from start_syn and through the current hypernym ss
        res = merge_paths(res, shortest_paths_from_ss) # merges shortest paths through ss with the list of paths through iterated hypernyms
    return res

#before testing, please run the next code block first, to have merge_paths() defined

__b)__ Write a function ``merge_paths`` that gets two dictionaries that map synsets to shortest distances (you can assume they were created by the function from __a)__) and merges them. The function shold return a dictionary that includes all synsets and distances that appear in any of the input dictionaries. If a synset appears in both input dictionaries, we want to keep the shorter distance. Leave the input dictionaries unaltered. __(1.5 pts)__

Use the signature in the cell below.

In [49]:
def merge_paths(p1: Dict[Synset, int], p2: Dict[Synset, int]) -> Dict[Synset, int]:
    """Merge two paths keeping the shorter distance for synsets that appear more than once.
    :param p1: first dict that maps synsets to their shortest distances
    :param p2: second dict that maps synsets to their shortest distances
    :return: merged dict
    """
    # your code here
    res = {}
    for ss, distance in p1.items():
        if ss not in p2.keys(): # if item exists only in p1, simply put the item into the result dict
            res[ss] = distance
        else:
            res[ss] = min([distance, p2[ss]]) # if item exists in both dicts, choose the smaller value
    for ss, distance in p2.items(): # go through remaining items in p2
        if ss not in res.keys():
            res[ss] = distance
    return res

__c)__ Write a function ``all_hypernym_paths`` that gets a word as input and returns a dictionary that maps all hypernyms that are reachable from the set of synsets associated with the word to the shortest distance leading there. __(1.5 pts)__

Use the signature in the cell below.

In [55]:
def all_hypernym_paths(word: str) -> Dict[Synset, int]:
    """Get all hypernyms of all synsets associated with the input word and compute the shortest distance leading there.
    :param word: input word
    :return: dict that matches all reachable hypernyms to their shortest distance 
    """
    # your code here
    res = {}
    for ss in wn.synsets(word):
        res = merge_paths(res, shortest_paths_to(ss))
    return res

In [58]:
# Test
s = wn.synset('calculator.n.01')
print(sorted(shortest_paths_to(s).items(), key=lambda x: x[1]))
print()
print(sorted(all_hypernym_paths('calculator').items(), key=lambda x: x[1]))

[(Synset('calculator.n.01'), 0), (Synset('expert.n.01'), 1), (Synset('person.n.01'), 2), (Synset('causal_agent.n.01'), 3), (Synset('organism.n.01'), 3), (Synset('physical_entity.n.01'), 4), (Synset('living_thing.n.01'), 4), (Synset('entity.n.01'), 5), (Synset('whole.n.02'), 5), (Synset('object.n.01'), 6)]

[(Synset('calculator.n.01'), 0), (Synset('calculator.n.02'), 0), (Synset('expert.n.01'), 1), (Synset('machine.n.01'), 1), (Synset('person.n.01'), 2), (Synset('device.n.01'), 2), (Synset('causal_agent.n.01'), 3), (Synset('organism.n.01'), 3), (Synset('instrumentality.n.03'), 3), (Synset('physical_entity.n.01'), 4), (Synset('living_thing.n.01'), 4), (Synset('artifact.n.01'), 4), (Synset('entity.n.01'), 5), (Synset('whole.n.02'), 5), (Synset('object.n.01'), 6)]


__d)__  Write a function ``get_dist`` that returns the word similarity between two input words, according to the formula given in the task description at the beginning.  __(3 pts)__

Use the signature in the cell below.

In [59]:
def get_dist(w1 : str, w2 : str) -> float:
    """Compute the similarity between two input words in the WordNet hierarchy tree.
    :param w1: first input word
    :param w2: second input word
    :return: word similarity
    """
    # your code here
    common_hyper_paths = {}
    d1 = all_hypernym_paths(w1)
    d2 = all_hypernym_paths(w2)
    for ss, distance in d1.items():
        if ss in d2.keys():
            common_hyper_paths[ss] = d1[ss] + d2[ss]
    return 1/(1+min(common_hyper_paths.values()))

In [63]:
get_dist("human","artifact")

0.08333333333333333

### Task 2: Lesk algorithm (4 points)

In this task we want to implement a simple version of the Lesk algorithm, a knowledge-based method for word sense disambiguation. Given a target word $w$ and a context, the algorithm finds the word sense that is most fitting in the context. To achieve this, the Lesk algorithm computes the number of overlapping words between the context sentence and the definitions of the WordNet synsets, associated with $w$.

Write a function ``lesk`` that takes a word and a context string (e.g. a sentence) and returns the most fitting sense from the synsets associated with the word and the corresponding context overlap. The most fitting sense is the one whose definition shares the most words with the context string. Before matching tokens, make sure to 

* only include valid tokens (cf. HA 1, task 2a)
* remove stopwords
* ony match stems of words (e.g. consider the ``PorterStemmer`` from ``nltk``)

When computing the context overlap, count each stemmed word only once, even if they occur multiple times. If there is no fitting synset, i.e. the context overlap between the context and the synset definitions is 0, return None instead.

Use the signature in the cell below.

In [4]:
# HA 1, task 2a)
from nltk.corpus.reader.util import StreamBackedCorpusView 
from nltk.corpus import stopwords
import re
import string
from nltk.stem import PorterStemmer
from typing import List, Union, Dict, Set, Tuple
from nltk.corpus import wordnet as wn
from nltk.corpus.reader.wordnet import Synset


def get_valid_tokens(tokens: Union[List[str], StreamBackedCorpusView], remove_stopwords: bool=False) -> List[str]:
    """
    :param tokens: list of tokens that should be cleaned
    :param remove_stopwords: bool indicating if stopwords should be removed 
                             False by default
    :return: list of valid tokens
    """
    valid = []
    punct = string.punctuation
    stop = stopwords.words('english')
    digit = re.compile(r"\d+")
    
    for t in tokens:
        if t in punct:
            continue
        if remove_stopwords and t.lower() in stop:
            continue
        if re.fullmatch(digit, t):
            continue
        valid.append(t.lower())
    return valid

In [5]:
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
from nltk.corpus import wordnet as wn

def lesk(word: str, context: str) -> (Synset, int):
    '''
    Compute the most probable sense of a word in the given context.
    :param word: ambiguous word
    :param context: context in which the word appears
    :returns: 
        - synset with the most likely word sense
        - context overlap of synset and context          
    '''
    context_tokens=word_tokenize(context)#list
    valid_context_tokens=get_valid_tokens(context_tokens,True)#list

   ##get stemmer of text
    ps = PorterStemmer()
    stem_list1=[]
    for word in valid_context_tokens:
        stem_list1.append(ps.stem(word))
    #print(stem_list1)   
    list3=[]   ## use for record the number of common stem word
    #find all definition of word
    for ss in wn.synsets(word):## go through all the synsets
        #print(ss.definition())
        definition_tokens=get_valid_tokens(word_tokenize(ss.definition()),True)  ## get cleaned tokens
        #print( definition_tokens)
        stem_list2=[]  ## get stem word of definition list
        for words in definition_tokens:   
            stem_list2.append(ps.stem(words))
        #print(stem_list2)
        #print(len(set(stem_list1) & set(stem_list2)))
        list3.append(len(set(stem_list1) & set(stem_list2)))
    
    max_number=max(list3)
#     print(list3)
#     print(max_number)
    if max_number==0:
        return None,0 
    index=list3.index(max_number)
#     print(index)
#     print(wn.synsets(word))
#     print(len(wn.synsets(word)))
    res=wn.synsets(word)[index]
    print(res.definition())
    return res,max_number
    
    

In [6]:
test_word='happy'
text='  happy is a kind of feeling '
lesk(test_word,text)

have a feeling or perception about oneself in reaction to someone's behavior or attitude


(Synset('feel.v.05'), 1)

### Task 3: Markov chains (12 points)

In this task we want to create a language model by using the independence assumption af Markov. We therefore assume that the probability of a word is only dependent on a fixed number of preceding words. E.g. for a language model of ``order`` $n$ we can approximate the probability of a word $w_{i+1}$ following a sequence of words $w_1, ..., w_i$ by:

$P(w_{i+1}|w_1, ..., w_i) \approx P(w_{i+1}|w_{i-n}, ..., w_i)$

We will first train our model on a given corpus and then use it to automatically generate text.

Throughout this task we will define a single class with different functions. If you're unsure how to access class methods and attributes, take a look at the documentation (https://docs.python.org/3/tutorial/classes.html).

__a) Collecting the counts (3 pts)__

Write a function `process_corpus` that takes a corpus of text (as an iterator of tokens) as input and counts how often a n-gram of length $n$ (``order=n``) is followed by a certain word. The function should return a dictionary that maps every n-gram that is observed in the corpus to an inner dictionary. The inner dictionary maps each word to a number, that indicates how often the word succeeds the n-gram in the given corpus. We will need these counts to compute the conditional probabilities $P(w_{i+1}|w_{i-n}, ..., w_i)$.
Additionally, also return the entire vocabulary $V$ (i.e. the set of all unique tokens that appear in the corpus).

Make sure your implementation is efficient by using e.g. ``Counter`` and ``defaultdict`` from the package ``collections``.

__b) Conditional probabilities (3 pts)__

Write a function `transition_prob` that takes a n-gram $(w_{i-n}, ..., w_i)$ and a word $w_{i+1}$ of the vocabulary $V$ as input and returns the conditional probability that the given n-gram is followed by the input word $w_{i+1}$. Recall that this conditional probability can be computed as follows:

$P(w_{i+1}|w_{i-n}, ..., w_i) = \frac{\text{Count}(w_{i-n}, ..., w_i, w_{i+1})}{\sum_{w \in V}\text{Count}(w_{i-n}, ..., w_i, w)}$

If the n-gram has never been observed, return $\frac{1}{|V|}$.

__c) Most likely word (3 pts)__

Write a function `most_likely_word` that gets a n-gram as input and returns the word that is most likely to succeed the given n-gram. In case there are multiple words that are equally likely to follow the given n-gram, return all of them. 
Note that you should **not** loop over the **entire** vocabulary to obtain the most likely word.
In case the given n-gram has never been observed, return the entire vocabulary.

__d) Generating text (2 pts)__

Write a function `generate_text` that generates a text sequence of length `k`, given a starting sequence of words (`ngram`). The function should choose the most likely next word in every step; in case there are multiple equally likely words, randomly choose one. You should return a list of 'k' words, that includes the starting sequence and is the most probable continuation. 


Please do not implement other functions for the MarkovModel class.

Use the function signatures in the cell below.

In [4]:
from collections import defaultdict, Counter
from nltk.util import ngrams

from random import choice


class MarkovModel():
    '''Markov model for generating text.'''
    
    def __init__(self, tokens: Sequence[str], order: int):
        '''
        :param tokens: text corpus on which the model is trained on as an iterator of tokens
        :param order: length of the n-gram 
        '''
        self.order = order 
        self.counts, self.v = self.process_corpus(tokens)
        
    def process_corpus(self, tokens: Sequence[str]) -> (Dict[str, Dict[str, int]], Set):
        '''Training method of the model, counts the occurences of each word after each observed n-gram.
        :param tokens: text corpus on which the model is trained on as an iterator of tokens
        :returns: 
            - nested dict that maps each n-gram to the counts of the words succeeding it
            - the whole vocabulary as a set
        '''
        # your code here 
        voc = set()
        tmp = defaultdict(list)
        count = {}
        for item in tokens:
            voc.add(item)
        
        for i in range(len(tokens)-self.order):
            ngram = []
            for j in range(self.order):
                ngram += [tokens[i+j]]
            tmp[tuple(ngram)] += [tokens[i+self.order]]
        
        for key, value in tmp.items():
            count[key]=Counter(value)
        return count, voc
        
    def transition_prob(self, ngram: Tuple[str, ...], word: str) -> float:
        '''Compute the conditional probability that the input word follows the given n-gram.
        :param ngram: string tuple that represents an n-gram
        :param word: input word
        :return: probability that the n-gram is followed by the input word
        '''
        # your code here
        if ngram not in self.counts.keys():
            return 1/len(self.v)
        sum = 0
        for k,v in self.counts[ngram].items():
            sum += v
        return (self.counts[ngram][word]/sum)
    
    def most_likely_word(self, ngram) -> Set[str]:
        '''Computes which word is most likely to follow a given n-gram.
        :param ngram: n-gram we are interested in
        return: set of words that are most likely to follow the n-gram
        '''
        # your code here
        if ngram not in self.counts.keys():
            return self.v
        max = 0
        l = set()
        for k,v in self.counts[ngram].items():
            if v>max:
                max = v
        for k,v in self.counts[ngram].items():
            if v == max:
                l.add(k)
        return l
    
    def generate_text(self, ngram: Tuple[str, ...], k: int) -> List[str]:
        '''Generates a text sequence of length k, given a starting sequence.
        :param ngram: starting sequence
        :param k: total number of words in the generated sequence
        :return: sequence of generated words, including the starting sequence
        '''
        # your code here
        target = list(ngram)
        while len(target)<k:
            new_word = choice(list(self.most_likely_word(ngram)))
            target.append(new_word)
            ngram = tuple(list(ngram)[1:]+[new_word])
        return target

__e) Apply the model to a corpus (2 pts)__

Finally, we want to apply our functions to the King James Bible (`'bible-kjv.txt'`) that is part of the ``gutenberg`` corpus. Use the function from HA 1, task 2a) to obtain a list of valid tokens (do not remove stopwords) from the King Jame Bible.

Initialize the MarkovModel with the list of valid tokens and ``order=3`` and answer the following subtasks:

i) What is the probability that the word ``babylon`` follows the sequence ``the king of``? 

ii) What are the most likely words to follow the sequence ``the world is``? 

iii) Generate a sequence of length 20 that starts with ``mother mary was``.


In [6]:
import nltk
import string
from nltk.corpus import stopwords
from nltk.corpus.reader.util import StreamBackedCorpusView 

def get_valid_tokens(tokens: Union[List[str], StreamBackedCorpusView], remove_stopwords: bool=False) -> List[str]:
    """
    :param tokens: list of tokens that should be cleaned
    :param remove_stopwords: bool indicating if stopwords should be removed 
                             False by default
    :return: list of valid tokens
    """
    # your code here
    def judge(x,remove_stopwords=remove_stopwords):
        if x.isdigit() == False:
            if x not in string.punctuation:
                return x
    
    l_result = [judge(i) for i in tokens if judge(i)!=None]
    return l_result

In [7]:
nltk.download("gutenberg")
from nltk.corpus import gutenberg
tokens = gutenberg.words("bible-kjv.txt")
tokens = get_valid_tokens(tokens)
tokens

[nltk_data] Downloading package gutenberg to
[nltk_data]     C:\Users\czjia\AppData\Roaming\nltk_data...
[nltk_data]   Package gutenberg is already up-to-date!


['The',
 'King',
 'James',
 'Bible',
 'The',
 'Old',
 'Testament',
 'of',
 'the',
 'King',
 'James',
 'Bible',
 'The',
 'First',
 'Book',
 'of',
 'Moses',
 'Called',
 'Genesis',
 'In',
 'the',
 'beginning',
 'God',
 'created',
 'the',
 'heaven',
 'and',
 'the',
 'earth',
 'And',
 'the',
 'earth',
 'was',
 'without',
 'form',
 'and',
 'void',
 'and',
 'darkness',
 'was',
 'upon',
 'the',
 'face',
 'of',
 'the',
 'deep',
 'And',
 'the',
 'Spirit',
 'of',
 'God',
 'moved',
 'upon',
 'the',
 'face',
 'of',
 'the',
 'waters',
 'And',
 'God',
 'said',
 'Let',
 'there',
 'be',
 'light',
 'and',
 'there',
 'was',
 'light',
 'And',
 'God',
 'saw',
 'the',
 'light',
 'that',
 'it',
 'was',
 'good',
 'and',
 'God',
 'divided',
 'the',
 'light',
 'from',
 'the',
 'darkness',
 'And',
 'God',
 'called',
 'the',
 'light',
 'Day',
 'and',
 'the',
 'darkness',
 'he',
 'called',
 'Night',
 'And',
 'the',
 'evening',
 'and',
 'the',
 'morning',
 'were',
 'the',
 'first',
 'day',
 'And',
 'God',
 'said',


In [8]:
m = MarkovModel(tokens, 3)

In [10]:
m.transition_prob(('the', 'king', 'of'), 'Babylon')

0.20422535211267606

In [11]:
m.most_likely_word(('the', 'world', 'is'))

{'crucified', 'enmity', 'gone', 'mine', 'the'}

In [12]:
m.generate_text(('mother', 'Mary' ,'was'), 20)

['mother',
 'Mary',
 'was',
 'espoused',
 'to',
 'Joseph',
 'before',
 'they',
 'came',
 'together',
 'she',
 'was',
 'found',
 'with',
 'child',
 'of',
 'the',
 'devil',
 'And',
 'when']