In [None]:
from utils_pos import get_word_tag, preprocess
import pandas as pd
from collections import defaultdict
import numpy as np

In [None]:
# load the training corpus
with open('WSJ_02-21.pos','r') as f:
    training_corpus = f.readlines()

print(training_corpus[:5])

['In\tIN\n', 'an\tDT\n', 'Oct.\tNNP\n', '19\tCD\n', 'review\tNN\n']


In [None]:
# read the Hidden Markov Models vocabulary

with open('hmm_vocab.txt','r') as f:
    voc_l = f.read().split('\n')

print(voc_l[-10:])

['zeros', 'zinc', 'zip', 'zombie', 'zone', 'zones', 'zoning', '{', '}', '']


In [None]:
vocab = {}
for i, word in enumerate(sorted(voc_l)):
    vocab[word] = i

In [None]:
# load the test corpus
with open("WSJ_24.pos", 'r') as f:
    y = f.readlines()
y[:5]

['The\tDT\n', 'economy\tNN\n', "'s\tPOS\n", 'temperature\tNN\n', 'will\tMD\n']

In [None]:
# test data after preprocessing
_, preprocessed = preprocess(vocab, "test.words")
print(len(preprocessed))

34199


In [None]:
preprocessed[:5]

['The', 'economy', "'s", 'temperature', 'will']

In [None]:
def create_dictionaries(training_corpus, vocab):

    # used between pos and pos
    transition_count = defaultdict(int)
    # used between pos and word
    emission_count = defaultdict(int)
    tag_count = defaultdict(int)

    prev_tag = '--s--'

    for i, line in enumerate(training_corpus):

        word, tag = get_word_tag(line, vocab)

        transition_count[(prev_tag, tag)] += 1
        emission_count[(tag, word)] += 1
        tag_count[tag] += 1

        prev_tag = tag

    return transition_count, emission_count, tag_count

In [None]:
transition_count, emission_count, tag_count = create_dictionaries(training_corpus, vocab)

In [None]:
len(transition_count), len(emission_count), len(tag_count)

(1421, 31140, 46)

In [None]:
print("transition_count:")
print({k : v for k,v in list(transition_count.items())[:5]})
print()
print("emission_count:")
print({k : v for k,v in list(emission_count.items())[:5]})
print()
print("tag_count:")
print({k : v for k,v in list(tag_count.items())[:5]})

transition_count:
{('--s--', 'IN'): 5050, ('IN', 'DT'): 32364, ('DT', 'NNP'): 9044, ('NNP', 'CD'): 1752, ('CD', 'NN'): 7377}

emission_count:
{('IN', 'In'): 1735, ('DT', 'an'): 3142, ('NNP', 'Oct.'): 317, ('CD', '19'): 100, ('NN', 'review'): 36}

tag_count:
{'IN': 98554, 'DT': 81842, 'NNP': 91466, 'CD': 36568, 'NN': 132935}


In [None]:
states = sorted(tag_count.keys())
print(states[-5:])

['WDT', 'WP', 'WP$', 'WRB', '``']


In [None]:
def predict_pos(preprocessed, y, emission_count, vocab,states):

    words_dict, res = defaultdict(int), []

    for word in preprocessed:
        for tag in states:

            if emission_count.get((tag, word)):
                if word not in words_dict:

                    words_dict[word] = (tag,emission_count[(tag, word)])

                else:
                    if words_dict[word][1] < emission_count[(tag, word)]:
                        words_dict[word] = (tag, emission_count[(tag, word)])

    for i in y:

        word,tag = get_word_tag(i,words_dict)
        if word in words_dict:
            res.append(tag == words_dict[word][0])

    return sum(res) / len(y)

In [None]:
predict_pos(preprocessed,y,emission_count,vocab, states)

0.928185034650136

In [None]:
def create_transition_matrix(alpha, states, transition_count):

    # transition_matrix contains the proba between pos and pos
    num_tags = len(states)
    transition_matrix = np.zeros((num_tags, num_tags))

    for i, tag_row in enumerate(states):
        for j, tag_col in enumerate(states):

            transition_matrix[i,j] = (transition_count[(tag_row, tag_col)] + alpha) / (tag_count[tag_row] + alpha * num_tags)

    return transition_matrix

In [None]:
# A is the transition matrix
A = create_transition_matrix(.001,states,transition_count)
A_df = pd.DataFrame(A, index = states, columns = states)

In [None]:
A_df.iloc[30:35,30:35]

Unnamed: 0,RBS,RP,SYM,TO,UH
RBS,2.217069e-06,2.217069e-06,2.217069e-06,0.00887,2.217069e-06
RP,3.756509e-07,0.0007516775,3.756509e-07,0.051089,3.756509e-07
SYM,1.722772e-05,1.722772e-05,1.722772e-05,1.7e-05,1.722772e-05
TO,4.477336e-05,4.472863e-08,4.472863e-08,9e-05,4.477336e-05
UH,1.030439e-05,1.030439e-05,1.030439e-05,0.061837,0.03092348


In [None]:
def create_emission_matrix(alpha, emission_count, states, vocab):

    num_words = len(vocab)
    num_tags = len(states)
    B = np.zeros((num_tags , num_words))

    for i in range(num_tags):
        for j in range(num_words):

            pair = (states[i], vocab[j])
            B[i, j] = (emission_count[pair] + alpha) / (tag_count[states[i]] + alpha * num_tags)

    return B

In [None]:
B = create_emission_matrix(0.001, emission_count, states, list(vocab))
B_df = pd.DataFrame(B, index=states , columns = list(vocab))

In [None]:
B_df.iloc[35:40,35:40]

Unnamed: 0,--unk_noun--,--unk_punct--,--unk_upper--,--unk_verb--,.
VB,0.001021293,0.000567,0.002572,0.003858114,3.782428e-08
VBD,3.345707e-08,0.000335,0.001104,3.345707e-08,3.345707e-08
VBG,0.004715128,0.001078,0.010373,6.7358e-08,6.7358e-08
VBN,4.993996e-08,0.000949,0.004644,4.993996e-08,4.993996e-08
VBP,0.0008006535,0.0004,0.001201,0.0007205962,8.005735e-08


In [None]:
def initialize(states, tag_count, A, B, corpus, vocab):

    # this func used to fill the first column of best_probs

    num_tags = len(states)
    num_words = len(preprocessed)
    best_probs = np.zeros((num_tags, num_words))
    best_paths = np.zeros((num_tags, num_words), dtype= int)
    s_idx = states.index('--s--')

    for i in range(num_tags):

        word_idx = corpus.index(corpus[0])

        pos_pos = A[(s_idx, i)]
        pos_word = B[(i, word_idx)]

        best_probs[i,0] = np.log(pos_pos) + np.log(pos_word)

    return best_probs, best_paths

In [None]:
best_probs, best_paths = initialize(states, tag_count, A, B, preprocessed, vocab)

In [None]:
# num_tags * num_words
best_probs.shape

(46, 34199)

In [None]:
def viterbi_forward(A,B, best_probs, best_paths, vocab, corpus):

    # this func used to fill all best_probs

    num_tags = A.shape[0] # tags * words

    # for each word, we get the proba for the 46 tags
    for word in range(1, len(corpus)):
        for tag in range(num_tags):

            # the previous word, get max proba in column
            prev_max_proba = best_probs[:, word-1].max()
            prev_idx = best_probs[:, word-1].argmax()

            transition_proba = A[prev_idx, tag]
            emission_proba = B[tag, vocab[preprocessed[word]]]

            best_probs[tag, word] = prev_max_proba + np.log(transition_proba) + np.log(emission_proba)

        cur_idx = best_probs[:, word].argmax()
        best_paths[cur_idx, word] = prev_idx

    return best_probs, best_paths

In [None]:
best_probs_f, best_paths_f = viterbi_forward(A,B, best_probs, best_paths, vocab, preprocessed)

In [None]:
print(best_paths_f[:10])

[[ 0  0  0 ...  0  0  0]
 [ 0  0  0 ...  0  0  0]
 [ 0  0  0 ...  0  0  0]
 ...
 [ 0  0  0 ...  0 26  0]
 [ 0  0  0 ...  0  0  0]
 [ 0  0  0 ...  0  0  0]]


In [None]:
print(best_probs_f[:,4])

[-65.33686954 -66.45342627 -63.38551974 -62.93745037 -62.85029235
 -62.23615147 -67.4771228  -62.07644453 -62.23367702 -62.56956877
 -64.90295236 -65.58340568 -65.06218242 -64.58383033 -62.17195408
 -65.03349759 -64.19423937 -66.85650816 -71.56691683 -47.57162998
 -53.17784413 -65.3310501  -66.4803496  -62.83009473 -66.29434592
 -62.17206387 -64.481085   -66.52764438 -63.61908302 -62.69979056
 -64.7879488  -64.65713732 -63.74905712 -62.52213671 -72.55730602
 -59.12241821 -62.62972187 -63.76016966 -63.71642548 -64.25715582
 -62.39046396 -62.49333539 -63.04297671 -63.02050791 -62.97915487
 -64.18218987]


In [None]:
# Test this function
print(f"best_probs[0,1]: {best_probs_f[0,1]:.4f}")
print(f"best_probs[0,4]: {best_probs_f[0,4]:.4f}")

best_probs[0,1]: -40.5657
best_probs[0,4]: -65.3369


In [None]:
def viterbi_backward(best_probs, best_paths, corpus, states):

    # predict the most likely sequence of pos tags for a specific sequence of words

    res = [None] * best_paths.shape[1]
    res[-1] = states[best_probs[:,-1].argmax()]

    for i in range(best_paths.shape[1]-2,-1,-1):

        res[i] = states[best_paths[:, i].argmax()]

    return res

In [None]:
pred = viterbi_backward(best_probs_f, best_paths_f , preprocessed, states)

In [None]:
for i in range(10):
    print(pred[i],y[i].split()[1])

# DT
NN NN
POS POS
NN NN
MD MD
VB VB
VBN VBN
IN IN
JJ JJ
NNS NN


In [None]:
m=len(pred)
print('The prediction for pred[-7:m-1] is: \n', preprocessed[-7:m-1], "\n", pred[-7:m-1], "\n")

The prediction for pred[-7:m-1] is: 
 ['see', 'them', 'here', 'with', 'us', '.'] 
 ['VB', 'PRP', 'RB', 'IN', 'PRP', '.'] 



In [None]:
print('The third word is:', preprocessed[3])
print('Your prediction is:', pred[3])
print('Your corresponding label y is: ', y[3])

The third word is: temperature
Your prediction is: NN
Your corresponding label y is:  temperature	NN



In [None]:
def compute_accuracy(pred, y):

    num_correct = 0
    total = 0
    for prediction, y in zip(pred, y):
        # Split the label into the word and the POS tag
        word_tag_tuple = y.split()

        # Check that word and tag are not none
        if len(word_tag_tuple)==2:

            word, tag = word_tag_tuple

            if prediction == tag:
                num_correct += 1

            total += 1

    return (num_correct/total)

In [None]:
print(f"Accuracy of the Viterbi algorithm is {compute_accuracy(pred, y):.4f}")

Accuracy of the Viterbi algorithm is 0.9442
