In [19]:
import string
import math
import pandas as pd
import numpy as np
from collections import Counter, defaultdict
from utils_pos import get_word_tag, preprocess  

In [21]:
a = [0.1,0.3,0.2]
b = [0.9,0.7,0.5]

In [23]:
np.array(list(map(lambda x: math.log(x),a))) + np.array(list(map(lambda x: math.log(x),b)))

array([-2.40794561, -1.56064775, -2.30258509])

In [25]:
math.log(2,51e-08) * 2

-0.09568004859442561

In [2]:
# load in the training corpus
with open("WSJ_02-21.pos", 'r') as f:
    training_corpus = f.readlines()

# load in the test corpus
with open("WSJ_24.pos", 'r') as f:
    testing_corpus = f.readlines()

In [8]:
## Create Vocab
words = [line.split("\t")[0] for line in training_corpus]
words_freq = Counter(words)
vocab_list = [k for k, v in words_freq.items() if (v > 1 and k != '\n')]
## add unknown words in vocab
unk_words_list = ["--unk_digit--","--unk_punct--", "--unk_upper--","--unk_noun--","--unk_verb--","--unk_adj--","--unk_adv--","--unk--","--n--",""]
vocab_list = vocab_list + unk_words_list
vocab_list.sort()
vocab = {k:i for i,k in enumerate(sorted(vocab_list))}
print(f"Vocab Size : {len(vocab)}")

Vocab Size : 23777


In [14]:
def assign_unk(word):
    punct = set(string.punctuation)
    # Suffixes
    noun_suffix = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood", "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry", "scape", "ship", "ty"]
    verb_suffix = ["ate", "ify", "ise", "ize"]
    adj_suffix = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive", "less", "ly", "ous"]
    adv_suffix = ["ward", "wards", "wise"]

    if any(char.isdigit() for char in word):
        return "--unk_digit--"

    elif any(char in punct for char in word):
        return "--unk_punct--"

    elif any(char.isupper() for char in word):
        return "--unk_upper--"

    elif any(word.endswith(suffix) for suffix in noun_suffix):
        return "--unk_noun--"

    elif any(word.endswith(suffix) for suffix in verb_suffix):
        return "--unk_verb--"

    elif any(word.endswith(suffix) for suffix in adj_suffix):
        return "--unk_adj--"

    elif any(word.endswith(suffix) for suffix in adv_suffix):
        return "--unk_adv--"
    
    return "--unk--"


def get_word_tag(line, vocab):

    if not line.split():
        word = "--n--"
        tag = "--s--"
    else:
        word, tag = line.split()
        if word not in vocab:
            word = assign_unk(word)
        
    return word,tag


## Process train and test data
train = [get_word_tag(line, vocab) for line in training_corpus]
test = [get_word_tag(line, vocab) for line in testing_corpus]
_, prep = preprocess(vocab, "test.words")  

In [15]:
def create_dictionaries(training_corpus, vocab):
    ## Transition, Emission, tags count
    transition_counts = defaultdict(int)
    emission_counts = defaultdict(int)
    tag_counts = defaultdict(int)

    prev_tag = "--s--"
    for word, tag in train:
        transition_counts[(prev_tag, tag)] += 1
        emission_counts[(tag, word)] +=1
        tag_counts[tag] +=1
        prev_tag = tag

    return emission_counts, transition_counts, tag_counts

In [16]:
emission_counts, transition_counts, tag_counts = create_dictionaries(training_corpus, vocab)

## Calculate Accuracy

In [7]:
states = sorted(tag_counts.keys())
print(f"Number of pos tags : {len(states)}")

Number of pos tags : 46


In [8]:
print("transition examples: ")
for ex in list(transition_counts.items())[:3]:
    print(ex)
print()

print("emission examples: ")
for ex in list(emission_counts.items())[200:203]:
    print (ex)
print()

print("ambiguous word example: ")
for tup,cnt in emission_counts.items():
    if tup[1] == 'back': print (tup, cnt) 

transition examples: 
(('--s--', 'IN'), 5050)
(('IN', 'DT'), 32364)
(('DT', 'NNP'), 9044)

emission examples: 
(('DT', 'any'), 721)
(('NN', 'decrease'), 7)
(('NN', 'insider-trading'), 5)

ambiguous word example: 
('RB', 'back') 304
('VB', 'back') 20
('RP', 'back') 84
('JJ', 'back') 25
('NN', 'back') 29
('VBP', 'back') 4


In [9]:
def calculate_accuracy(prep, y):

    correct_pred = 0

    for word, y_tup in zip(prep, y):

        y_tup_l = y_tup.split()
        if len(y_tup_l) == 2:
            true_label = y_tup_l[1]
        else:
            continue

        final_counts = 0
        final_tag = ""

        if word in vocab:
            for tag in states:
                key = (tag,word)
                if key in emission_counts:
                    count = emission_counts[key]
                    if count>final_counts:
                        final_tag = tag
                        final_counts = count

        if final_tag == true_label:
            correct_pred +=1

    
    return correct_pred/len(test)

calculate_accuracy(prep, testing_corpus)

0.8888563993099213

## Hidden Markov Model

In [266]:
alpha = 0.001
num_tags = len(states)
num_words = len(vocab)

trasmission_mat = np.zeros((num_tags,num_tags))
emission_mat = np.zeros((num_tags, num_words))

for (prevtag,tag),value in dict(transition_counts).items():
    trasmission_mat[states.index(prevtag), states.index(tag)] = value

total = trasmission_mat.sum(axis=1).reshape(-1,1) + (alpha * num_tags)
trasmission_mat = trasmission_mat + alpha
trasmission_mat = trasmission_mat/total

for (tag,word), value in emission_counts.items():
    emission_mat[states.index(tag), vocab[word]] = value

total = emission_mat.sum(axis=1).reshape(-1,1) + (alpha * num_words)
emission_mat = emission_mat + alpha
emission_mat = emission_mat/total

assert all(trasmission_mat.sum(axis=1)), "Transmission mat is not correct !!" 
assert all(emission_mat.sum(axis=1)), "Emission mat is not correct !!" 

## Viterbi Algo

In [267]:
# def step(mu_prev,emission_probs,transition_probs,observed_state):
#     pre_max = mu_prev * transition_probs.T
#     max_prev_states = np.argmax(pre_max, axis=1)
#     max_vals = pre_max[np.arange(len(max_prev_states)), max_prev_states]
#     mu_new = max_vals * emission_probs[:, observed_state]
#     return mu_new, max_prev_states


# def viterbi(emission_probs,transition_probs,start_probs,observed_states):
#     # Runs the forward pass, storing the most likely previous state.
#     mu = start_probs * emission_probs[:, observed_states[0]]
#     all_prev_states = []
#     for observed_state in observed_states[1:]:
#         mu, prevs = step(mu, emission_probs, transition_probs, observed_state)
#         all_prev_states.append(prevs)
    
#     # Traces backwards to get the maximum likelihood sequence.
#     state = np.argmax(mu)
#     sequence_prob = mu[state]
#     state_sequence = [state]
#     for prev_states in all_prev_states[::-1]:
#         state = prev_states[state]
#         state_sequence.append(state)
    
#     return state_sequence[::-1], sequence_prob


# s_idx = states.index("--s--")
# best_probs = np.zeros((num_tags, len(prep)))
# best_paths = np.zeros((num_tags, len(prep)), dtype=int)
# start_probs = trasmission_mat[s_idx,:]

# ## test
# observed_states = [vocab[w] for w in ['Loss', 'tracks','upward']]
# max_seq, seq_prob = viterbi(emission_mat, trasmission_mat, start_probs, observed_states)
# print([states[seq] for seq in max_seq])

# observed_states = [vocab[w] for w in prep]
# max_seq, seq_prob = viterbi(emission_mat, trasmission_mat, start_probs, observed_states)
# pred = [states[seq] for seq in max_seq]

In [273]:
def initialize(states, tag_counts, A, B, corpus, vocab):
    num_tags = len(tag_counts)
    best_probs = np.zeros((num_tags, len(corpus)))
    best_paths = np.zeros((num_tags, len(corpus)), dtype=int)
    s_idx = states.index("--s--")
    for i in range(num_tags): 
        if A[s_idx,i] == 0: 
            best_probs[i,0] = float('-inf')
        else:
            best_probs[i,0] = math.log(A[s_idx,i]) + math.log(B[i,vocab[corpus[0]]] )            
    return best_probs, best_paths


def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab):
   
    num_tags = best_probs.shape[0]
    
    for i in range(1, len(test_corpus)): 
        if i % 5000 == 0:
            print("Words processed: {:>8}".format(i))
            
        for j in range(num_tags): # complete this line
            best_prob_i =  float("-inf")
            best_path_i = None
            for k in range(num_tags): # complete this line
                prob = best_probs[k,i-1]+ math.log(A[k,j]) +math.log(B[j,vocab[test_corpus[i]]])
                if prob > best_prob_i: # complete this line
                    best_prob_i = prob
                    best_path_i = k

            best_probs[j,i] = best_prob_i
            best_paths[j,i] = best_path_i

    return best_probs, best_paths

def viterbi_backward(best_probs, best_paths, corpus, states):
  
    m = best_paths.shape[1] 
    z = [None] * m
    num_tags = best_probs.shape[0]
    best_prob_for_last_word = float('-inf')
    pred = [None] * m
    
    for k in range(num_tags): 
        if best_probs[k,-1]>best_prob_for_last_word: 
            best_prob_for_last_word = best_probs[k,-1]
            z[m - 1]=k
    pred[m - 1] = states[k]
    
    for i in range(len(corpus)-1, -1, -1):
        pos_tag_for_word_i = best_paths[np.argmax(best_probs[:,i]),i]
        z[i - 1] = best_paths[pos_tag_for_word_i,i]
        pred[i - 1] = states[pos_tag_for_word_i]
        
    return pred

In [274]:
best_probs, best_paths = initialize(states, tag_counts, trasmission_mat, emission_mat, prep, vocab)
best_probs, best_paths = viterbi_forward(trasmission_mat, emission_mat, prep, best_probs, best_paths, vocab)
pred = viterbi_backward(best_probs, best_paths, prep, states)

Words processed:     5000
Words processed:    10000
Words processed:    15000
Words processed:    20000
Words processed:    25000
Words processed:    30000


In [277]:
def compute_accuracy(pred, y):
    '''
    Input: 
        pred: a list of the predicted parts-of-speech 
        y: a list of lines where each word is separated by a '\t' (i.e. word \t tag)
    Output: 
        
    '''
    num_correct = 0
    total = 0
    
    for prediction, y in zip(pred, y):
        word_tag_tuple = y.split()
        
        if len(word_tag_tuple)!=2: # complete this line
            continue 
        word, tag = word_tag_tuple
        if prediction == tag: # complete this line
            num_correct += 1
        total += 1
    return (num_correct/total)

In [278]:
print(f"Accuracy of the Viterbi algorithm is {compute_accuracy(pred, testing_corpus):.4f}")

Accuracy of the Viterbi algorithm is 0.9528
