Text Analytics I HWS 23/24

# Home Assignment 2 (26 pts)

Submit your solution via Ilias until 23.59h on Wednesday, October 25th. Late submissions are accepted until 12:00am on the following day, with 1/4 of the total possible points deducted from the score.

Submit your solutions in teams of 3-4 students. Unless explicitly agreed otherwise in advance, **submissions from teams with more or less members will NOT be graded**.
List all members of the team with their student ID in the cell below, and submit only one notebook per team. Only submit a notebook, do not submit the dataset(s) you used. Also, do NOT compress/zip your submission!

You may use the code from the exercises and basic functionalities that are explained in official documentation of Python packages without citing, __all other sources must be cited__. In case of plagiarism (copying solutions from other teams or from the internet) ALL team members may be expelled from the course without warning.

#### General guidelines:
* Make sure that your code is executable, any task for which the code does not directly run on our machine will be graded with 0 points.
* If you use packages that are not available on the default or conda-forge channel, list them below. Also add a link to installation instructions. 
* Ensure that the notebook does not rely on the current notebook or system state!
  * Use `Kernel --> Restart & Run All` to see if you are using any definitions, variables etc. that 
    are not in scope anymore.
  * Do not rename any of the datasets you use, and load it from the same directory that your ipynb-notebook is located in, i.e., your working directory.
* Make sure you clean up your code before submission, e.g., properly align your code, and delete every line of code that you do not need anymore, even if you may have experimented with it. Minimize usage of global variables. Avoid reusing variable names multiple times!
* Ensure your code/notebook terminates in reasonable time.
* Feel free to use comments in the code. While we do not require them to get full marks, they may help us in case your code has minor errors.
* For questions that require a textual answer, please do not write the answer as a comment in a code cell, but in a Markdown cell below the code. Always remember to provide sufficient justification for all answers.
* You may create as many additional cells as you want, just make sure that the solutions to the individual tasks can be found near the corresponding assignment.
* If you have any general question regarding the understanding of some task, do not hesitate to post in the student forum in Ilias, so we can clear up such questions for all students in the course.

#### Team members and studentIDs:
* Kok Teng Ng, 1936360
* I-Chen Hsieh, 1870180
* Hyewon Kang, 1980634
* Minjeong Lee, 1978925

Additional packages (if any):
 - Example: `powerlaw`, https://github.com/jeffalstott/powerlaw

In [1]:
from typing import List, Union, Dict, Set, Tuple, Sequence

### Task 1: WordNet word similarity (9 points)

In this task, we want to implement the similarity between two words in WordNet (https://www.nltk.org/api/nltk.corpus.reader.wordnet.html) using the NLTK package. The word similarity between two words is given by
$$
\frac{1}{1+d}
$$
where $d$ is the distance of the shortest path in the hypernym/hyponym hierarchy tree in WordNet between any pair of synsets that are associated with the two input words.

From NLTK you should __only__ use the `synsets`, `hypernyms` and `instance_hpyernyms` functions.

The following subtasks build on each other, i.e. the functions of the preceding subtasks can be used for the current subtask.

_Note: the distance of a synset to itself is 0, the distance to a direct hypernym is 1, ..._

In [2]:
from nltk.corpus import wordnet as wn
from nltk.corpus.reader.wordnet import Synset

__a)__ Write a function ``shortest_paths_to`` that takes a synset as input and computes the shortest paths to all nodes on the way to the root in the hypernym hierarchy tree of WordNet. The function should return a dictionary that matches all visited hypernyms on the way(s) to the root to the distance of the shortest path from the input synset. Consider that a synset might have multiple paths to the root and that some nodes might appear in multiple paths. However, we only want to store the shortest distances. Moreover, keep in mind that the input synset might be an instance. __(3 pts)__

Use the signature in the cell below.

__Example:__ _Calling_ ``shortest_paths_to(s)`` _on the synset_ ``s = wn.synset('calculator.n.01')`` _should yield the following result:_

``
{Synset('calculator.n.01'): 0,
 Synset('expert.n.01'): 1,
 Synset('person.n.01'): 2,
 Synset('causal_agent.n.01'): 3,
 Synset('organism.n.01'): 3,
 Synset('physical_entity.n.01'): 4,
 Synset('living_thing.n.01'): 4,
 Synset('entity.n.01'): 5,
 Synset('whole.n.02'): 5,
 Synset('object.n.01'): 6}
``

In [3]:
def shortest_paths_to(start_syn: Synset) -> Dict[Synset, int]:
    """Compute the shortest distance to all nodes on paths to the root.
    :param start_syn: synset to which we want to compute the shortest distances
    :return: dict that matches all visited hypernyms to their distance to the input synset  
    """ 
    # your code here
    
    # initialize visit_node and distances
    visit_node = [(start_syn, 0)]
    distances = {start_syn: 0}
    
    while visit_node: # loop until visit_node is empty
        # delete visited node for visit the next node
        cur_synset, cur_distance = visit_node.pop(0)

        # get all hypernyms and instance hypernyms
        all_hypernyms = cur_synset.hypernyms() + cur_synset.instance_hypernyms()
        
        for hypernym in all_hypernyms:
            # check if the path is shorter than previously found paths
            if hypernym not in distances or cur_distance + 1 > distances[hypernym]:
                distances[hypernym] = cur_distance + 1
                visit_node.append((hypernym, cur_distance + 1))
    
    return distances


In [4]:
# test for a)
test_s = wn.synset('calculator.n.01')
result = shortest_paths_to(test_s)

print(result)

{Synset('calculator.n.01'): 0, Synset('expert.n.01'): 1, Synset('person.n.01'): 2, Synset('causal_agent.n.01'): 3, Synset('organism.n.01'): 3, Synset('physical_entity.n.01'): 7, Synset('living_thing.n.01'): 4, Synset('entity.n.01'): 8, Synset('whole.n.02'): 5, Synset('object.n.01'): 6}


__b)__ Write a function ``merge_paths`` that gets two dictionaries that map synsets to shortest distances (you can assume they were created by the function from __a)__) and merges them. The function shold return a dictionary that includes all synsets and distances that appear in any of the input dictionaries. If a synset appears in both input dictionaries, we want to keep the shorter distance. Leave the input dictionaries unaltered. __(1.5 pts)__

Use the signature in the cell below.

In [5]:
def merge_paths(p1: Dict[Synset, int], p2: Dict[Synset, int]) -> Dict[Synset, int]:
    """Merge two paths keeping the shorter distance for synsets that appear more than once.
    :param p1: first dict that maps synsets to their shortest distances
    :param p2: second dict that maps synsets to their shortest distances
    :return: merged dict
    """
    # your code here
    
    # initialize merged_paths to p1
    merged_paths = p1.copy()
    
    for synset, distance in p2.items():
        # if the synset is not in the merged_paths or the distance in p2 is shorter than current paths, update with the shorter one
        if synset not in merged_paths or distance < merged_paths[synset]:
            merged_paths[synset] = distance
            
    return merged_paths

In [6]:
# test for b)
path1 = {wn.synset('calculator.n.01'): 0, wn.synset('expert.n.01'):1}
path2 = {wn.synset('calculator.n.01'): 1, wn.synset('person.n.01'):2}
result = merge_paths(path1, path2)

print(result)

{Synset('calculator.n.01'): 0, Synset('expert.n.01'): 1, Synset('person.n.01'): 2}


__c)__ Write a function ``all_hypernym_paths`` that gets a word as input and returns a dictionary that maps all hypernyms that are reachable from the set of synsets associated with the word to the shortest distance leading there. __(1.5 pts)__

Use the signature in the cell below.

In [7]:
def all_hypernym_paths(word: str) -> Dict[Synset, int]:
    """Get all hypernyms of all synsets associated with the input word and compute the shortest distance leading there.
    :param word: input word
    :return: dict that matches all reachable hypernyms to their shortest distance 
    """
    # your code here
    
    # get synsets and initialize an empty dictionary 
    synsets = wn.synsets(word)
    paths = {}
    
    for synset in synsets:
        # compute the shortest path
        cur_path = shortest_paths_to(synset)
        # merge the paths
        paths = merge_paths(paths, cur_path)
    return paths

In [8]:
#test for c)
result = all_hypernym_paths('calculator')

print(result)

{Synset('calculator.n.01'): 0, Synset('expert.n.01'): 1, Synset('person.n.01'): 2, Synset('causal_agent.n.01'): 3, Synset('organism.n.01'): 3, Synset('physical_entity.n.01'): 7, Synset('living_thing.n.01'): 4, Synset('entity.n.01'): 8, Synset('whole.n.02'): 5, Synset('object.n.01'): 6, Synset('calculator.n.02'): 0, Synset('machine.n.01'): 1, Synset('device.n.01'): 2, Synset('instrumentality.n.03'): 3, Synset('artifact.n.01'): 4}


__d)__  Write a function ``get_dist`` that returns the word similarity between two input words, according to the formula given in the task description at the beginning.  __(3 pts)__

Use the signature in the cell below.

In [9]:
def get_dist(w1 : str, w2 : str) -> float:
    """Compute the similarity between two input words in the WordNet hierarchy tree.
    :param w1: first input word
    :param w2: second input word
    :return: word similarity
    """
    # your code here
    
    path_w1 = all_hypernym_paths(w1)
    path_w2 = all_hypernym_paths(w2)
    
    hypernyms = set(path_w1.keys()) & set(path_w2.keys())
    
    # if no common hypernyms, similarity = 0
    if not hypernyms:
        return 0.0
    
    shortest_distance = float('inf')
    
    for hypernym in hypernyms:
        distance = path_w1[hypernym] + path_w2[hypernym]
        shortest_distance = min(shortest_distance, distance)
    
    # calculate similarity
    similarity = 1 / (1 + shortest_distance)
    
    return similarity 

In [10]:
# test for d)
w1, w2 = "calculator", "expert"
result = get_dist(w1, w2)

print(result)

0.5


### Task 2: Lesk algorithm (4 points)

In this task we want to implement a simple version of the Lesk algorithm, a thesaurus-based method for word sense disambiguation. Given a target word $w$ and a context, the algorithm finds the word sense that is most fitting in the context. To achieve this, the Lesk algorithm computes the number of overlapping words between the context sentence and the definitions of the WordNet synsets, associated with $w$.

Write a function ``lesk`` that takes a word and a context string (e.g. a sentence) and returns the most fitting sense from the synsets associated with the word and the corresponding context overlap. The most fitting sense is the one whose definition shares the most words with the context string. Before matching tokens, make sure to 

* only include valid tokens (cf. HA 1, task 2a)
* remove stopwords
* only match stems of words (e.g. consider the ``PorterStemmer`` from ``nltk``)

When computing the context overlap, count each stemmed word only once, even if they occur multiple times. If there is no fitting synset, i.e. the context overlap between the context and the synset definitions is 0, return None instead.

Use the signature in the cell below.

In [11]:
# HA 1, task 2a)
from nltk.corpus.reader.util import StreamBackedCorpusView 
from nltk.corpus import stopwords
import re
import string

def get_valid_tokens(tokens: Union[List[str], StreamBackedCorpusView], remove_stopwords: bool=False) -> List[str]:
    """
    :param tokens: list of tokens that should be cleaned
    :param remove_stopwords: bool indicating if stopwords should be removed 
                             False by default
    :return: list of valid tokens
    """
    valid = []
    punct = string.punctuation
    stop = stopwords.words('english')
    digit = re.compile(r"\d+")

    for t in tokens:
        if t in punct:
            continue
        if remove_stopwords and t.lower() in stop:
            continue
        if re.fullmatch(digit, t):
            continue
        valid.append(t.lower())
    return valid

In [12]:
from nltk.corpus import wordnet as wn
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
ps = PorterStemmer()

def lesk(word: str, context: str) -> (Synset, int):
    '''
    Compute the most probable sense of a word in the given context.
    :param word: ambiguous word
    :param context: context in which the word appears
    :returns: 
        - synset with the most likely word sense
        - context overlap of synset and context          
    '''
    def preprocess_token(string:str) -> List[str]:
        '''
        Transform string into valid token list with removing stopwords, stemming, and deduplication.
        :param string: word or context
        :returns: list of valid tokens
        '''
        tokens = word_tokenize(string)
        valid_tokens = get_valid_tokens(tokens, remove_stopwords = True)
        stem_tokens =[]
        for t in valid_tokens:
            stem_tokens.append(ps.stem(t))
        return(set(stem_tokens))

    max_overlap = 0
    preprocess_context = preprocess_token(context)
    for ss in wn.synsets(word):
        preprocess_word = preprocess_token(ss.definition())
        overlap_count = len(preprocess_word.intersection(preprocess_context))
        if overlap_count > max_overlap:
            # ignore same overlapping, since only pick one synset
            most_fitting_synset = ss.name() 
            max_overlap = overlap_count
    
    if max_overlap == 0:
        return(None, 0)
    else:
        return(most_fitting_synset, max_overlap)

In [13]:
## testing
print(lesk('tea', 'I love to have tea and cake as meal'))
print(lesk('tea', 'Tea leaves'))
print(lesk('tea', 'I like apple'))

('tea.n.02', 3)
('tea.n.01', 2)
(None, 0)


In [14]:
## checking
from nltk.corpus import wordnet as wn
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
ps = PorterStemmer()

test = ['tea', 'cake', 'meal']

for ss in wn.synsets('tea'):
    print(ss.definition())
    ss_tokens = word_tokenize(ss.definition())
    print(ss_tokens)
    ss_valid_tokens = get_valid_tokens(ss_tokens, remove_stopwords = True)
    print(ss_valid_tokens)
    stem_tokens =[]
    for w in ss_valid_tokens:
        stem_tokens.append(ps.stem(w))
    print(stem_tokens)
    print(set(stem_tokens))
    print(len(set(stem_tokens).intersection(set(test))))
    print()

a beverage made by steeping tea leaves in water
['a', 'beverage', 'made', 'by', 'steeping', 'tea', 'leaves', 'in', 'water']
['beverage', 'made', 'steeping', 'tea', 'leaves', 'water']
['beverag', 'made', 'steep', 'tea', 'leav', 'water']
{'steep', 'beverag', 'tea', 'made', 'leav', 'water'}
1

a light midafternoon meal of tea and sandwiches or cakes
['a', 'light', 'midafternoon', 'meal', 'of', 'tea', 'and', 'sandwiches', 'or', 'cakes']
['light', 'midafternoon', 'meal', 'tea', 'sandwiches', 'cakes']
['light', 'midafternoon', 'meal', 'tea', 'sandwich', 'cake']
{'midafternoon', 'light', 'meal', 'cake', 'tea', 'sandwich'}
3

a tropical evergreen shrub or small tree extensively cultivated in e.g. China and Japan and India; source of tea leaves
['a', 'tropical', 'evergreen', 'shrub', 'or', 'small', 'tree', 'extensively', 'cultivated', 'in', 'e.g', '.', 'China', 'and', 'Japan', 'and', 'India', ';', 'source', 'of', 'tea', 'leaves']
['tropical', 'evergreen', 'shrub', 'small', 'tree', 'extensively'

### Task 3: Markov chains (13 points)

In this task we want to create a language model by using the independence assumption af Markov. We therefore assume that the probability of a word is only dependent on a fixed number of preceding words. E.g. by restricting the number of preceding words to $n$ we can approximate the probability of a word $w_{i}$ following a sequence of words $w_1, ..., w_{i-1}$ by:

$P(w_{i}|w_1, ..., w_{i-1}) \approx P(w_{i}|w_{i-n}, ..., w_{i-1})$

We will first train our model on a given corpus and then use it to automatically generate text.

Throughout this task we will define a single class with different functions. If you're unsure how to access class methods and attributes, take a look at the documentation (https://docs.python.org/3/tutorial/classes.html).

__a) Collecting the counts (3 pts)__

Write a function `process_corpus` that takes a corpus of text (as a sequence of tokens) as input and counts how often an n-gram of length $n$ (``context_len=n``) is followed by a certain word (the n-grams should __not__ be padded). The function should return a dictionary that maps every n-gram that is observed in the corpus to an inner dictionary. The inner dictionary maps each word to a number, that indicates how often the word succeeds the n-gram in the given corpus. We will need these counts to compute the conditional probabilities $P(w_{i}|w_{i-n}, ..., w_{i-1})$.
Additionally, also return the entire vocabulary $V$ (i.e. the set of all unique tokens that appear in the corpus).

Make sure your implementation is efficient by using e.g. ``Counter`` and ``defaultdict`` from the package ``collections``.   

__b) Conditional probabilities (3 pts)__

Write a function `transition_prob` that takes an n-gram $(w_{i-n}, ..., w_{i-1})$ and a word $w_{i}$ of the vocabulary $V$ as input and returns the conditional probability that the given n-gram is followed by the input word $w_{i}$. Recall that this conditional probability can be computed as follows:

$P(w_{i}|w_{i-n}, ..., w_{i-1}) = \frac{\text{Count}(w_{i-n}, ..., w_{i-1}, w_{i})}{\sum_{w \in V}\text{Count}(w_{i-n}, ..., w_{i-1}, w)}$

If the n-gram has never been observed, return $\frac{1}{|V|}$.

__c) Most likely word (3 pts)__

Write a function `most_likely_word` that gets an n-gram as input and returns the word that is most likely to succeed the given n-gram. In case there are multiple words that are equally likely to follow the given n-gram, return all of them. 
Note that you should **not** loop over the **entire** vocabulary to obtain the most likely word.
In case the given n-gram has never been observed, return the entire vocabulary.

__d) Generating text (2 pts)__

Write a function `generate_text` that generates a text sequence of length `k`, given a starting sequence of words (`ngram`). The function should choose the most likely next word in every step; in case there are multiple equally likely words, randomly choose one. You should return a list of ``k`` words, that includes the starting sequence and is the most probable continuation. 


Please do not implement other functions for the MarkovModel class.

Use the function signatures in the cell below.

In [15]:
from collections import defaultdict, Counter
from nltk.util import ngrams
import random

# combined with hyewon code
class MarkovModel():
    '''Markov model for generating text.'''
    
    def __init__(self, tokens: Sequence[str], context_len: int):
        '''
        :param tokens: text corpus on which the model is trained on as an iterator of tokens
        :param context_len: length of the n-gram (number of preceding words)
        '''
        self.context_len = context_len 
        self.counts, self.vocabulary = self.process_corpus(tokens)
        
    def process_corpus(self, tokens: Sequence[str]) -> (Dict[Tuple[str, ...], Dict[str, int]], Set):
        '''Training method of the model, counts the occurences of each word after each observed n-gram.
        :param tokens: text corpus on which the model is trained on as an iterator of tokens
        :returns: 
            - nested dict that maps each n-gram to the counts of the words succeeding it
            - the whole vocabulary as a set
        '''
        # define the ngram occurrence dictionary
        counts = defaultdict(Counter)
        # to return the vocabulary from those given tokens as unique set
        vocabulary = set(tokens)
        # getting the ngram from the tokens with the following token, don't adding padding to the given tokens
        n_grams = list(ngrams(tokens, self.context_len + 1, pad_left = False, pad_right = False))
        for ngram in n_grams:
            # getting the following token
            next_token = ngram[-1]
            # getting the given conditional of ngram
            context = tuple(ngram[:-1])
            # updating the occurrence counter within the dictionary
            counts[context][next_token] = counts[context][next_token] + 1
        # return occurrence and vocabulary
        return counts, vocabulary
        
    def transition_prob(self, ngram: Tuple[str, ...], word: str) -> float:
        '''Compute the conditional probability that the input word follows the given n-gram.
        :param ngram: string tuple that represents an n-gram
        :param word: input word
        :return: probability that the n-gram is followed by the input word
        '''
        # deal with the assumption for zero frequency of the ngram (returning result as 1/|V|)
        if ngram not in self.counts:
            return 1/(len(self.vocabulary))
        # compute the conditional probability
        else:
            # if the word are occurring belong to the ngrams, then only compute the conditional probability
            if word in self.counts[ngram]:
                # compute the joint counting
                joint_counting = self.counts[ngram][word]
                # to get the evidence occurrence frequency
                prior_evidence = sum(self.counts[ngram].values())
                # compute the conditional probability
                probability = joint_counting / prior_evidence
                return probability
            # if the word doesn't occur belong to the ngrams, then return 0.00 as probability result
            else:
                return 0.0
            
    def most_likely_word(self, ngram: Tuple[str, ...]) -> Set[str]:
        '''Computes which word is most likely to follow a given n-gram.
        :param ngram: n-gram we are interested in
        return: set of words that are most likely to follow the n-gram
        '''
        # deal with the assumption for zero frequency of the ngram (returning the entire vocabulary)
        if ngram not in self.counts:
            return set(self.vocabulary)
        # else find the more likely bi-gram 
        else:
            # define the probability of ngram and token
            probability_of_words = 0.0
            # initial the list to store those most likely words that are more likely following the n-gram
            likely_words = []
            # loop entire vocabulary to compute the probability that the word is likely to follow the given n-gram, then do comparison as select the highest probability word that are following to the according given n-gram
            for word in self.vocabulary:
                # compute the probability that the word are belong after the n-gram
                temp_prob = self.transition_prob(ngram, word)
                # compare the probability if current word are more likely to belong to the previous word, then replace it
                if temp_prob > probability_of_words:
                    # update the probability as the most likely word
                    probability_of_words = temp_prob
                    # clear the entire list because there might have potential as multiple words have equally likely to follow the n-grams
                    likely_words.clear()
                    # add the highest probability word into the list
                    likely_words.append(word)
                # deal with those words might have equally likely to follow the n-grams
                elif temp_prob == probability_of_words:
                    likely_words.append(word)
            # return the set of words that are most likely to follow the n-gram
            return set(likely_words)
    
    def generate_text(self, ngram: Tuple[str, ...], k: int) -> List[str]:
        '''Generates a text sequence of length k, given a starting sequence.
        :param ngram: starting sequence
        :param k: total number of words in the generated sequence
        :return: sequence of generated words, including the starting sequence
        '''
        # initial the list to store sequence of generated words
        sequence_words = []
        # start the looping to generate the word sequentially 
        for sequence in range (0, k):
            # convert the most likely word that belong to the according ngram into list for easily manupulating
            generated_token = list(self.most_likely_word(ngram))
            # randomly select a token from the generated token list 
            random_token = random.choice(generated_token)
            # append the first token into the sequence word from the first element of the generated token
            sequence_words.append(random_token)
            # update the ngram to predict the upcoming words until k
            ngram = tuple([ngram[1:], random_token])
        # return the list
        return sequence_words

__e) Apply the model to a corpus (2 pts)__

Finally, we want to apply our functions to the King James Bible (`'bible-kjv.txt'`) that is part of the ``gutenberg`` corpus. Use the function from HA 1, task 2a) to obtain a list of valid tokens (do not remove stopwords) from the King Jame Bible.

Initialize the MarkovModel with the list of valid tokens and ``context_len=3`` and answer the following subtasks:

i) What is the probability that the word ``babylon`` follows the sequence ``the king of``? 

ii) What are the most likely words to follow the sequence ``the world is``? 

iii) Generate a sequence of length 20 that starts with ``mother mary was``.


In [16]:
import nltk
from nltk.corpus.reader.util import StreamBackedCorpusView 

def get_valid_tokens(tokens: Union[List[str], StreamBackedCorpusView]) -> List[str]:
    """
    :param tokens: list of tokens that should be cleaned
    :return: list of valid tokens
    """
    # Remove punctuation mark and entirely numeric digits, and convert to lower case
    valid_token = [i.lower() for i in tokens if i not in string.punctuation and not re.fullmatch(r'\d+', i)]
    return valid_token

In [17]:
# Download datasets and corpora
nltk.download('gutenberg')

tokens = nltk.corpus.gutenberg.words('bible-kjv.txt')
valid_tokens = get_valid_tokens(tokens)

markov_model = MarkovModel(valid_tokens, 3)

result_i = markov_model.transition_prob(("the", "king", "of"), "babylon")
print(f"The result for question i : {result_i}")

result_ii = markov_model.most_likely_word(("the", "world", "is"))
print(f"The result for question ii : {result_ii}")

result_iii = markov_model.generate_text(("mother", "mary", "was"), 20)
print(f"The result for question iii : {result_iii}")

[nltk_data] Downloading package gutenberg to
[nltk_data]     /Users/ngkokteng/nltk_data...
[nltk_data]   Package gutenberg is already up-to-date!


The result for question i : 0.1906779661016949
The result for question ii : {'enmity', 'gone', 'mine', 'the', 'crucified'}
The result for question iii : ['espoused', 'persis', 'mint', 'midst', 'sharp', 'served', 'slanderously', 'hararite', 'descend', 'hadlai', 'moriah', 'bethphelet', 'sodden', 'rush', 'reward', 'foundations', 'ephod', 'belteshazzar', 'tasteth', 'edify']


# hyewon code

In [None]:
from collections import defaultdict, Counter
from nltk.util import ngrams
import random

class MarkovModel():
    '''Markov model for generating text.'''
    
    def __init__(self, tokens: Sequence[str], context_len: int):
        '''
        :param tokens: text corpus on which the model is trained on as an iterator of tokens
        :param context_len: length of the n-gram (number of preceding words)
        '''
        self.context_len = context_len 
        self.counts, self.v = self.process_corpus(tokens)
        
    def process_corpus(self, tokens: Sequence[str]) -> (Dict[Tuple[str, ...], Dict[str, int]], Set):
        '''Training method of the model, counts the occurences of each word after each observed n-gram.
        :param tokens: text corpus on which the model is trained on as an iterator of tokens
        :returns: 
            - nested dict that maps each n-gram to the counts of the words succeeding it
            - the whole vocabulary as a set
        '''
         # Initialize the n-gram counts and vocabulary
        ngram_counts = defaultdict(Counter)
        vocabulary = set()
        
        # Iterate through the tokens to collect n-gram counts
        for ngram in ngrams(tokens, self.context_len + 1, pad_left=False, pad_right=False):
            context = tuple(ngram[:-1])
            word = ngram[-1]
            ngram_counts[context][word] += 1
            vocabulary.add(word)
        
        return ngram_counts, vocabulary
        
    def transition_prob(self, ngram: Tuple[str, ...], word: str) -> float:
        '''Compute the conditional probability that the input word follows the given n-gram.
        :param ngram: string tuple that represents an n-gram
        :param word: input word
        :return: probability that the n-gram is followed by the input word
        '''
       # Calculate the count of the n-gram followed by the word
        ngram_count = self.counts.get(ngram, Counter())
        count_word = ngram_count[word]
        
        # Calculate the sum of counts for all possible words in the vocabulary
        total_count = sum(ngram_count.values())
        total_vocabulary = len(self.v)
        
        # Compute the conditional probability
        if total_count == 0:
            return 1 / total_vocabulary  # If the n-gram is unseen, return uniform probability
        else:
            return count_word / total_count
    
    def most_likely_word(self, ngram: Tuple[str, ...]) -> Set[str]:
        '''Computes which word is most likely to follow a given n-gram.
        :param ngram: n-gram we are interested in
        return: set of words that are most likely to follow the n-gram
        '''
       # Deal with the assumption for zero frequency of the n-gram (returning the entire vocabulary)
        if ngram not in self.counts:
            return set(self.v)
    
        else:
        # Get the n-gram counts
            ngram_count = self.counts[ngram]
        
        # Find the maximum count (the most likely count)
            max_count = max(ngram_count.values(), default=0)
        
        # Use a list comprehension to find all words with the maximum count (the most likely words)
            most_likely_words = {word for word, count in ngram_count.items() if count == max_count}
        
        return most_likely_words
    
    def generate_text(self, ngram: Tuple[str, ...], k: int) -> List[str]:
        '''Generates a text sequence of length k, given a starting sequence.
        :param ngram: starting sequence
        :param k: total number of words in the generated sequence
        :return: sequence of generated words, including the starting sequence
        '''
        sequence_words = list(ngram)

        for _ in range(k - len(ngram)):
            most_likely_words = self.most_likely_word(ngram)
            next_word = random.choice(list(most_likely_words))
            sequence_words.append(next_word)
            ngram = ngram[1:] + (next_word,)

        return sequence_words

In [None]:
import nltk
from nltk.corpus.reader.util import StreamBackedCorpusView 

def get_valid_tokens(tokens: Union[List[str], StreamBackedCorpusView], remove_stopwords: bool=False) -> List[str]:
    """
    :param tokens: list of tokens that should be cleaned
    :return: list of valid tokens
    """
    # Remove punctuation mark and entirely numeric digits, and convert to lower case
    valid_token = [i.lower() for i in tokens if i not in string.punctuation and not re.fullmatch(r'\d+', i)]
    return valid_token

In [None]:
# Download datasets and corpora
nltk.download('gutenberg')

tokens = nltk.corpus.gutenberg.words('bible-kjv.txt')
valid_tokens = get_valid_tokens(tokens)

markov_model = MarkovModel(valid_tokens, 3)

result_i = markov_model.transition_prob(("the", "king", "of"), "babylon")
print(f"The result for question i : {result_i}")

result_ii = markov_model.most_likely_word(("the", "world", "is"))
print(f"The result for question ii : {result_ii}")

result_iii = markov_model.generate_text(("mother", "mary", "was"), 20)
print(f"The result for question iii : {result_iii}")

### References

References for Task 3

* https://www.kaggle.com/code/alvations/n-gram-language-model-with-nltk
* https://sourajit16-02-93.medium.com/language-model-for-nlp-2fb56d4944aa