In [1]:
from collections import defaultdict
import numpy as np
import pandas as pd
import string
import math

# 1. Getting Data

In [2]:
punct = set(string.punctuation)
noun_suffix = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood", "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry", "scape", "ship", "ty"]
verb_suffix = ["ate", "ify", "ise", "ize"]
adj_suffix = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive", "less", "ly", "ous"]
adv_suffix = ["ward", "wards", "wise"]

In [3]:
def assign_unknown(word):
    if any(char.isdigit() for char in word):
        return '--unk_digit--'
    elif any(char in punct for char in word):
        return '--unk_punct--'
    elif any(char.isupper() for char in word):
        return '--unk_upper--'
    elif any(word.endswith(suffix) for suffix in noun_suffix):
        return '--unk_noun--'
    elif any(word.endswith(suffix) for suffix in verb_suffix):
        return '--unk_verb--'
    elif any(word.endswith(suffix) for suffix in adj_suffix):
        return '--unk_adj--'
    elif any(word.endswith(suffix) for suffix in adv_suffix):
        return '--unk_adv--'
    return '--unk--'

In [4]:
def preprocess(vocab, path):
    data = []
    file = open(path).readlines()
    
    for index, word in enumerate(file):
        if not word.split():
            word = '--n--'
            data.append(word)
            continue
        elif word.strip() not in vocab:
            word = assign_unknown(word)
            data.append(word)
            continue
        data.append(word.strip())
    return data

In [5]:
vocab = open('vocab.txt').read().split('\n')
print('Number of vocabs:', len(vocab))
vocab[0:5]

Number of vocabs: 23777


['!', '#', '$', '%', '&']

In [6]:
test_words = preprocess(vocab, 'test.txt')
print('Number of test words:', len(test_words))
test_words[0:5]

Number of test words: 34199


['The', 'economy', "'s", 'temperature', 'will']

In [7]:
gold_corpus = open('gold.txt').readlines()
print('Number of gold words:', len(gold_corpus))
gold_corpus[0:5]

Number of gold words: 34199


['The\tDT\n', 'economy\tNN\n', "'s\tPOS\n", 'temperature\tNN\n', 'will\tMD\n']

In [8]:
train_corpus = open('train.txt').readlines()
print('Number of train words:', len(train_corpus))
train_corpus[0:5]

Number of train words: 989860


['In\tIN\n', 'an\tDT\n', 'Oct.\tNNP\n', '19\tCD\n', 'review\tNN\n']

# 2. Parts of Speech Tagging

## Training

In [9]:
def seperate_word_tag(word_tag, vocab): 
    if not word_tag.split():
        word = '--n--'
        tag = '--s--'
        return word, tag
    else:
        word, tag = word_tag.split()
        if word not in vocab: 
            word = assign_unknown(word)
        return word, tag
    return None 

In [10]:
def create_dictionaries(train_corpus, vocab):
    emission_counts = defaultdict(int)
    transition_counts = defaultdict(int)
    tag_counts = defaultdict(int)
    
    prev_tag = '--s--' 
    i = 0 
    
    for word_tag in train_corpus:
        i += 1
        if i % 50000 == 0: print(f'Processed {i} words...')
        word, tag = seperate_word_tag(word_tag, vocab) 
        
        transition_counts[(prev_tag, tag)] += 1
        emission_counts[(tag, word)] += 1
        tag_counts[tag] += 1
        prev_tag = tag
    return emission_counts, transition_counts, tag_counts

In [11]:
emission_counts, transition_counts, tag_counts = create_dictionaries(train_corpus, vocab)

Processed 100000 words...
Processed 200000 words...
Processed 300000 words...
Processed 400000 words...
Processed 500000 words...
Processed 600000 words...
Processed 700000 words...
Processed 800000 words...
Processed 900000 words...


In [12]:
states = sorted(tag_counts.keys())
print('Number of POS tags (States):', len(states))
print(states)

Number of POS tags (States): 46
['#', '$', "''", '(', ')', ',', '--s--', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB', '``']


In [13]:
print("Transition examples: ")
for example in list(transition_counts.items())[:3]:
    print(example)

Transition examples: 
(('--s--', 'IN'), 5050)
(('IN', 'DT'), 32364)
(('DT', 'NNP'), 9044)


In [14]:
print("Emission examples: ")
for example in list(emission_counts.items())[200:203]:
    print (example)

Emission examples: 
(('DT', 'any'), 721)
(('NN', 'decrease'), 7)
(('NN', 'insider-trading'), 5)


In [15]:
print("Ambiguous word example: ")
for tup, cnt in emission_counts.items():
    if tup[1] == 'back': print (tup, cnt)

Ambiguous word example: 
('RB', 'back') 304
('VB', 'back') 20
('RP', 'back') 84
('JJ', 'back') 25
('NN', 'back') 29
('VBP', 'back') 4


## Testing

In [18]:
def predict_pos(test_words, gold_corpus, emission_counts, vocab, states):
    num_correct = 0
    all_words = set(emission_counts.keys())
    total = len(gold_corpus)
    
    for word, gold_tuple in zip(test_words, gold_corpus): 
        gold_tuple_list = gold_tuple.split()
        if len(gold_tuple_list) != 2: continue
        else: true_label = gold_tuple_list[1]
    
        count_final = 0
        pos_final = ''
        if word not in vocab: continue
        
        for pos in states:
            if (pos, word) not in emission_counts: continue
            count = emission_counts[(pos, word)]
            
            if count > count_final:
                count_final = count
                pos_final = pos
                    
        if pos_final == true_label: num_correct += 1
    accuracy = num_correct / total
    return accuracy

In [19]:
accuracy = predict_pos(test_words, gold_corpus, emission_counts, vocab, states)
print('Accuracy of prediction:', accuracy)

Accuracy of prediction: 0.8888563993099213


# 3. Hidden Markov Models for POS

## Creating the 'A' transition probabilities matrix

In [20]:
def create_transition_matrix(alpha, tag_counts, transition_counts):
    all_tags = sorted(tag_counts.keys())
    num_tags = len(all_tags)
    
    A = np.zeros((num_tags,num_tags))
    trans_keys = set(transition_counts.keys())
    
    for i in range(num_tags):
        for j in range(num_tags):
            count = 0
            key = (all_tags[i],all_tags[j])
            if key in transition_counts: count = transition_counts[key]
                
            count_prev_tag = tag_counts[all_tags[i]]
            A[i, j] = (count + alpha) / (count_prev_tag + alpha * num_tags)
    return A

In [21]:
alpha = 0.001
A = create_transition_matrix(alpha, tag_counts, transition_counts)
df = pd.DataFrame(
    A[30:35, 30:35], 
    index = states[30:35], 
    columns = states[30:35]
)
df.head()

Unnamed: 0,RBS,RP,SYM,TO,UH
RBS,2.217069e-06,2.217069e-06,2.217069e-06,0.00887,2.217069e-06
RP,3.756509e-07,0.0007516775,3.756509e-07,0.051089,3.756509e-07
SYM,1.722772e-05,1.722772e-05,1.722772e-05,1.7e-05,1.722772e-05
TO,4.477336e-05,4.472863e-08,4.472863e-08,9e-05,4.477336e-05
UH,1.030439e-05,1.030439e-05,1.030439e-05,0.061837,0.03092348


## Create the 'B' emission probabilities matrix

In [22]:
def create_emission_matrix(alpha, tag_counts, emission_counts, vocab):
    all_tags = sorted(tag_counts.keys())
    num_tags = len(tag_counts)
    num_words = len(vocab)
    
    B = np.zeros((num_tags, num_words))
    emis_keys = set(list(emission_counts.keys()))
    
    for i in range(num_tags):
        for j in range(num_words):
            count = 0
            key = (all_tags[i],vocab[j])
            if key in emission_counts.keys(): count = emission_counts[key]
                
            count_tag = tag_counts[all_tags[i]]
            B[i, j] = (count + alpha) / (count_tag + alpha * num_words)
    return B

In [23]:
cidx  = ['725', 'adroitly', 'engineers', 'promoted', 'synergy']
rvals = ['CD', 'NN', 'NNS', 'VB', 'RB', 'RP']
cols = [vocab.index(a) for a in cidx]
rows = [states.index(a) for a in rvals]

B = create_emission_matrix(alpha, tag_counts, emission_counts, list(vocab))
df = pd.DataFrame(B[np.ix_(rows, cols)], index=rvals, columns=cidx)
df.head()

Unnamed: 0,725,adroitly,engineers,promoted,synergy
CD,8.201296e-05,2.732854e-08,2.732854e-08,2.732854e-08,2.732854e-08
NN,7.521128e-09,7.521128e-09,7.521128e-09,7.521128e-09,2.257091e-05
NNS,1.670013e-08,1.670013e-08,0.0004676203,1.670013e-08,1.670013e-08
VB,3.779036e-08,3.779036e-08,3.779036e-08,3.779036e-08,3.779036e-08
RB,3.226454e-08,6.456135e-05,3.226454e-08,3.226454e-08,3.226454e-08


# 4 Viterbi Algorithm and Dynamic Programming 

## Initialization

In [40]:
def viterbi_initialize(states, tag_counts, A, B, corpus, vocab):
    num_tags = len(tag_counts)
    s_idx = states.index('--s--')
    
    best_probs = np.zeros((num_tags, len(corpus)))
    best_paths = np.zeros((num_tags, len(corpus)), dtype=int)
    
    for i in range(num_tags):
        if A[s_idx,i] == 0: best_probs[i, 0] = float('-inf')
        else: 
            index = vocab.index(corpus[0])
            best_probs[i, 0] = math.log(A[s_idx, i]) + math.log(B[i, index])
    return best_probs, best_paths

In [42]:
best_probs, best_paths = viterbi_initialize(states, tag_counts, A, B, test_words, vocab)
print('best_probs[0, 0]:', best_probs[0, 0]) 
print('best_paths[2, 3]:', best_paths[2, 3])

best_probs[0, 0]: -22.60982633354825
best_paths[2, 3]: 0


## Forward

In [49]:
def viterbi_forward(A, B, corpus, best_probs, best_paths, vocab):
    num_tags = best_probs.shape[0]
    
    for i in range(1, len(corpus)): 
        if i % 5000 == 0: print(f'Processed {i} words...')
            
        for j in range(num_tags):
            best_prob_i = float('-inf')
            best_path_i = None
            
            for k in range(num_tags):
                index = vocab.index(corpus[i])
                prob = best_probs[k, i - 1] + math.log(A[k, j]) + math.log(B[j, index])

                if prob > best_prob_i:
                    best_prob_i = prob
                    best_path_i = k
                    
            best_probs[j, i] = best_prob_i
            best_paths[j, i] = best_path_i
            
    return best_probs, best_paths

In [None]:
best_probs, best_paths = viterbi_forward(A, B, test_words, best_probs, best_paths, vocab)
print('best_probs[0, 1]:', best_probs[0, 1]) 
print('best_probs[0, 4]:', best_probs[0, 4])

## Backward

In [None]:
def viterbi_backward(best_probs, best_paths, corpus, states):
    m = best_paths.shape[1] 
    z = [None] * m
    
    num_tags = best_probs.shape[0]
    best_prob_for_last_word = float('-inf')
    pred = [None] * m
    
    for k in range(num_tags):
        if best_probs[k,-1] > best_prob_for_last_word:
            best_prob_for_last_word = best_probs[k, -1]
            z[m - 1] = k
            
    pred[m - 1] = states[k]
    for i in range(len(corpus) - 1, -1, -1):
        pos_tag = best_paths[np.argmax(best_probs[:, i]), i]
        z[i - 1] = best_paths[pos_tag, i]
        pred[i - 1] = states[pos_tag]
    return pred

In [None]:
pred = viterbi_backward(best_probs, best_paths, prep, states)
m = len(pred)

print(f'The prediction for pred[-7:{m - 1}] is:')
print(test_words[-7:m - 1])
print(pred[-7:m - 1])

print('The prediction for pred[0:7] is:')
print(test_words[0:7])
print(pred[0:7])

## Compute Accuracy

In [None]:
def compute_accuracy(pred, y):
    num_correct = 0
    total = 0
    
    for prediction, y in zip(pred, y):
        word_tag_tuple = y.split()
        if len(word_tag_tuple) != 2: continue 

        word, tag = word_tag_tuple
        if prediction == tag: num_correct += 1
        total += 1
        
    return num_correct / total

In [None]:
print('Accuracy of the Viterbi algorithm:', compute_accuracy(pred, y))

In [None]:
# 4. So sanh