In [None]:
import nltk
from nltk.corpus import brown
nltk.download('brown')
import numpy as np
from numba import njit

# Hidden Markov Model
|||
|---|---|
| $Q=q_1q_2\dots q_N$ | a set of N states|
| $A=a_{11}\dots a_{ij}\dots a_{NN}$ | a transition probability matrix $A$, each $a_{ij}$ representing the probability of moving from state $i$ to state $j$, s.t. $\sum_{j=1}^N a_{ij} = 1 \ \forall i$|
| $B=b_i(o_t)$ | a sequence of observation likelihoods, also called emission probabilities, each expressing the probability of an observation $o_t$ (drawn from a vocabulary $V = v_1, v_2, \dots, v_V$) being generated from a state $q_i$|
|$\pi = \pi_1, \pi_2, \dots, \pi_N$| an initial probability distribution over states. $\pi_i$ is the probability that the Markov chain will start in state $i$. Some states $j$ may have $\pi_j = 0$, meaning that they cannot be initial states. Also, $\sum_{i=1}^N \pi_i = 1$|



# Calculating prior probabilities

In [None]:
words = list(set(brown.words()))
pos_tags = ['PRON', 'VERB', 'CONJ', '.', 'X', 'DET', 'NUM', 'ADJ', 'ADV', 'ADP', 'PRT', 'NOUN'] 
# pos_tags = list(set(pair[1] for pair in brown.tagged_words(tagset='universal')))
tagged_sents = list(brown.tagged_sents(tagset='universal'))


In [None]:
def to_numpy(XY:list[list[tuple[str,str]]], X_name:list, Y_name:list, to_sort=False):
    X_id_map = {v: i for i, v in enumerate(X_name)}
    Y_id_map = {v: i for i, v in enumerate(Y_name)}
    if to_sort: XY = sorted(XY,key=len)
        
    n = len(XY)
    m = max(len(s) for s in XY)
    ST = np.zeros((n, m, 2), dtype=np.int32)-1

    for i, s in enumerate(XY):
        for j, w in enumerate(s):
            ST[i, j] = np.array([X_id_map[w[0]], Y_id_map[w[1]]])

    return ST

ST = to_numpy(tagged_sents, words, pos_tags)
# print(ST)

In [None]:
@njit
def get_prob(stB:np.ndarray, no_words:int, no_tags:int, eps:float=0.00001):
    A = np.zeros((no_tags,no_tags), dtype=np.float64)
    B = np.zeros((no_tags,no_words), dtype=np.float64)
    Pi = np.zeros(no_tags, dtype=np.float64)

    m, n, _ = stB.shape
    
    for i in range(m):
        if stB[i,0,0] == -1 : break
        Pi[stB[i,0,1]] += 1
        for j in range(n-1):
            wnth = stB[i,j,0]
            nth, nnth = stB[i,j:j+2,1]
            B[nth, wnth] += 1
            if nnth == -1 : break
            A[nth,nnth] += 1
        if nnth != -1 : B[stB[i,n-1,1], stB[i,n-1,0]] += 1
    A /= np.sum(A,axis=1).reshape(-1,1) + eps
    B /= np.sum(B,axis=1).reshape(-1,1) + eps
    Pi /= np.sum(Pi) + eps
    return A, B, Pi

A, B, Pi = get_prob(ST, len(words), len(pos_tags))
# print(A)
# print(B)
# print(Pi)

# Viterbi parameters
|||
|---|---|
 |$v_{t-1}(i)$ | the previous Viterbi path probability from the previous time step|
 |$a_{ij}$ | the transition probability from previous state $q_i$ to current state $q_j$|
 |$b_j(o_t)$| the state observation likelihood of the observation symbol $o_t$ given the current state $j$|

In [None]:
from viterbi import viterbi_log

# ST = to_numpy(tagged_sents, words, pos_tags)
# A, B, Pi = get_prob(ST, len(words), len(pos_tags))

r = 1028              # sample number to test
test_sent = []
test_tags = []
for word, tag in tagged_sents[r]:
    test_sent.append(word)
    test_tags.append(tag)
O = [words.index(word) for word in test_sent] # producing observation states (words) in terms of its word index
# correct_tag_seq = np.array([pos_tags.index(pair[1]) for pair in brown.tagged_sents()[r]])

optim_state_seq, log_prob_trellis, backtrack_matrix = viterbi_log(A,Pi,B,O)

optim_state_seq = [pos_tags[t] for t in optim_state_seq]

# The following was to check if it was working or not. These parts need to be better done.
print('Observation sequence:   O  = ', test_sent)
print('Correct state sequence: S* = ', test_tags)
print('Optimal state sequence: S  = ', optim_state_seq)
print("Do they match ? : ", test_tags==optim_state_seq)

from pandas import DataFrame
test_result_df = DataFrame(index=test_sent,columns=['Correct','Guessed'],data=zip(test_tags,optim_state_seq)).T
test_result_df.iloc[:,(test_result_df.nunique()!=1).values]

Observation sequence:   O  =  ['Catcher', 'Frank', "House's", 'throw', 'in', 'an', 'effort', 'to', 'nab', 'Throneberry', 'was', 'wide', 'and', 'in', 'the', 'dirt', '.']
Correct state sequence: S* =  ['NOUN', 'NOUN', 'NOUN', 'NOUN', 'ADP', 'DET', 'NOUN', 'PRT', 'VERB', 'NOUN', 'VERB', 'ADV', 'CONJ', 'ADP', 'DET', 'NOUN', '.']
Optimal state sequence: S  =  ['NOUN', 'NOUN', 'NOUN', 'VERB', 'ADP', 'DET', 'NOUN', 'PRT', 'VERB', 'NOUN', 'VERB', 'ADJ', 'CONJ', 'ADP', 'DET', 'NOUN', '.']
Do they match ? :  False


Unnamed: 0,throw,wide
Correct,NOUN,ADV
Guessed,VERB,ADJ
