In [1]:
import numpy as np
import pandas as pd
import json
import functools as fc
from sklearn.metrics import accuracy_score

## Task 1: Vocabulary Creation

In [None]:
#train = pd.read_csv('hw3/data/train', sep='\t', names=['index', 'word', 'POS'])
train = pd.read_csv('data/train', sep='\t', names=['index', 'word', 'POS'])
train.head()

In [None]:
word = train['word'].values.tolist()
index = train['index'].values.tolist()
pos = train['POS'].values.tolist()

In [None]:
vocab = {}

for i in range(len(word)):
    if word[i] in vocab:
        vocab[word[i]] += 1
    else:
        vocab[word[i]] = 1

In [None]:
# replace rare words with <unk> (threshold = 3)
vocab2 = {}
num_unk = 0

for w in vocab:
    if vocab[w] >= 3:
        vocab2[w] = vocab[w]
    else:
        num_unk += vocab[w]

In [None]:
# sort the vocabulary by occurrences of words
vocab_sorted = sorted(vocab.items(), key=lambda item: item[1], reverse=True)

In [None]:
# write the sorted vocabulary to vocab file
#with open('recap/vocab.txt', 'w') as vocab_file:
with open('output/vocab.txt', 'w') as vocab_file:
    # the format of the vocab is word index occurrence
    # we add <unk> to the top of the vocabulary manually
    vocab_file.write('<unk>' + '\t' + str(0) + '\t' + str(num_unk) + '\n')
    for i in range(len(vocab_sorted)):
        vocab_file.write(vocab_sorted[i][0] + '\t' + str(i+1) + '\t' + str(vocab_sorted[i][1]) + '\n')

In [None]:
print(f'The total size of my vocabulary is {len(vocab_sorted)}\n')
print(f'The total occurrences of <unk> is {num_unk}\n')

## Task 2: Model Learning

In [None]:
# build a vocabulary list with only frequent words (i.e. occur no less than 3 times)
vocab_ls = list(vocab2.keys())

# write the frequent words into a json file
#with open('recap/vocab_frequent.txt', 'w') as output:
with open('output/vocab_frequent.txt', 'w') as output:
    for word in vocab_ls:
        output.write(word + '\n')

# replace non-frequent words in word with <unk>
for i in range(len(word)):
    if word[i] not in vocab_ls:
        word[i] = '<unk>'

# count (s, s') and (s, x) pairs
ss = {}
sx = {}
for i in range(len(word)-1):
    # make sure the index of the current word is less than the next
    # ss = {pos[i+1]|pos[i]: count}
    # we are not using the format {(pos[i], pos[i+1]): count} because
    # json doesn't support tuple
    if index[i] < index[i+1]:
        if str(pos[i+1]) + '|' + str(pos[i]) in ss:
            ss[str(pos[i+1]) + '|' + str(pos[i])] +=1
        else:
            ss[str(pos[i+1]) + '|' + str(pos[i])] = 1
            
        if str(word[i]) + '|' + str(pos[i]) in sx:
            sx[str(word[i]) + '|' + str(pos[i])] +=1
        else:
            sx[str(word[i]) + '|' + str(pos[i])] = 1
            
# for ss, we need to count the times that a pos tag occurs at the beginning
# of a sequence (i.e. (s|<s>))
for i in range(len(word)):
    if index[i] == 1:
        if str(pos[i]) + '|' + '<s>' in ss:
            ss[str(pos[i]) + '|' + '<s>'] += 1
        else:
            ss[str(pos[i]) + '|' + '<s>'] = 1

In [None]:
# build an emission and a transition dictionaries
emission = {}
transition = {}

# count occurrences of pos tags
count_pos = {}

for p in pos:
    if p in count_pos:
        count_pos[p] += 1
    else:
        count_pos[p] = 1
        
# don't forget to count the occurrences of <start>
count_pos['<s>'] = 0
for i in index:
    if i == 1:
        count_pos['<s>'] += 1

# emission dictionary {(s, x): count(s, x) / count(s)}
for sx_pair in sx:
    emission[sx_pair] = sx[sx_pair] / count_pos[sx_pair.split('|')[1]]

# transition dictionary {(s, s'): count(s, s') / count(s)}
for ss_pair in ss:
    transition[ss_pair] = ss[ss_pair] / count_pos[ss_pair.split('|')[1]]

In [None]:
print(f'There are {len(transition)} transition parameters in my HMM\n')
print(f'There are {len(emission)} emission parameters in my HMM\n')

In [None]:
# write the emission and transition dictionaries into a json file
emission_transition = [emission, transition]
#with open('recap/hmm.json', 'w') as output:
with open('output/hmm.json', 'w') as output:
    json.dump(emission_transition, output)

In [None]:
# build a list of distinct pos
pos_distinct = list(count_pos.keys())

# write the pos_distinct into a txt file
#with open('recap/pos.txt', 'w') as pos_output:
with open('output/pos.txt', 'w') as pos_output:
    for _, pos in enumerate(pos_distinct):
        pos_output.write(pos + '\n')

## Task 3: Greedy Decoding with HMM

In [2]:
# load txt file vocab
vocab_frequent = []
#with open('recap/vocab_frequent.txt', 'r') as vocab_txt:
with open('output/vocab_frequent.txt', 'r') as vocab_txt:
    for word in vocab_txt:
        word = word.strip('\n')
        vocab_frequent.append(word)

In [3]:
vocab_frequent

['Pierre',
 ',',
 '61',
 'years',
 'old',
 'will',
 'join',
 'the',
 'board',
 'as',
 'a',
 'nonexecutive',
 'director',
 'Nov.',
 '29',
 '.',
 'Mr.',
 'is',
 'chairman',
 'of',
 'N.V.',
 'Dutch',
 'publishing',
 'group',
 'Rudolph',
 'Agnew',
 '55',
 'and',
 'former',
 'Consolidated',
 'Gold',
 'Fields',
 'PLC',
 'was',
 'named',
 'this',
 'British',
 'industrial',
 'conglomerate',
 'A',
 'form',
 'asbestos',
 'once',
 'used',
 'to',
 'make',
 'Kent',
 'cigarette',
 'filters',
 'has',
 'caused',
 'high',
 'percentage',
 'cancer',
 'deaths',
 'among',
 'workers',
 'exposed',
 'it',
 'more',
 'than',
 '30',
 'ago',
 'researchers',
 'reported',
 'The',
 'fiber',
 'crocidolite',
 'unusually',
 'resilient',
 'enters',
 'lungs',
 'with',
 'even',
 'brief',
 'exposures',
 'causing',
 'symptoms',
 'that',
 'show',
 'up',
 'decades',
 'later',
 'said',
 'Lorillard',
 'Inc.',
 'unit',
 'New',
 'York-based',
 'Loews',
 'Corp.',
 'makes',
 'cigarettes',
 'stopped',
 'using',
 'in',
 'its',
 '1956

In [4]:
# load txt file pos
pos_distinct = []

#with open('recap/pos.txt', 'r') as pos_txt:
with open('output/pos.txt', 'r') as pos_txt:
    for pos in pos_txt:
        pos = pos.strip('\n')
        pos_distinct.append(pos)

In [5]:
# load json file hmm                
#with open('recap/hmm.json', 'r') as hmm:
with open('output/hmm.json', 'r') as hmm:
    json_data = json.load(hmm)

emission, transition = json_data[0], json_data[1]

In [6]:
#dev = pd.read_csv('hw3/data/dev', sep='\t', names=['index', 'word', 'POS'])
dev = pd.read_csv('data/dev', sep='\t', names=['index', 'word', 'POS'])
dev.head()

Unnamed: 0,index,word,POS
0,1,The,DT
1,2,Arizona,NNP
2,3,Corporations,NNP
3,4,Commission,NNP
4,5,authorized,VBD


In [7]:
index_dev = dev.loc[:, 'index'].values.tolist()
word_dev = dev.loc[:, 'word'].values.tolist()
pos_dev = dev.loc[:, 'POS'].values.tolist()

In [8]:
# split dev lists (index, word and pos) to individual samples (list --> list of sublists)
word_dev2 = []
pos_dev2 = []
word_sample = []
pos_sample = []
for i in range(len(dev)-1):
    if index_dev[i] < index_dev[i+1]:
        word_sample.append(word_dev[i])
        pos_sample.append(pos_dev[i])
    else:
        word_sample.append(word_dev[i])
        word_dev2.append(word_sample)
        word_sample = []
        
        pos_sample.append(pos_dev[i])
        pos_dev2.append(pos_sample)
        pos_sample = []

In [9]:
def greedy(sentence):
    # initialize a dictionary to keep track of the pos for each position
    pos = []
    
    # predict the pos of the first word in the sentence
    
    # we need to make sure the first word is in the vocabulary. If not, replace
    # with <unk>
    if sentence[0] not in vocab_frequent:
        sentence[0] = '<unk>'
    # predict pos based on the product of the emission and transition
    max_prob = 0
    p0 = 'UNK'
    
    for p in pos_distinct:
        try:
            temp = emission[sentence[0] + '|' + p] * transition[p + '|' + '<s>']
            if temp > max_prob:
                max_prob = temp
                p0 = p
        except:
            pass
        
    pos.append(p0)
    
    # predict the pos of the remaining words
    
    
    for i in range(1, len(sentence)):
        # again, we need to check the existence of the word in the vocabulary.
        if sentence[i] not in vocab_frequent:
            sentence[i] = '<unk>'
        
        max_prob = 0
        pi = 'UNK'
        
        for p in pos_distinct:
            try:
                temp = emission[sentence[i] + '|' + p] * transition[p + '|' + pos[-1]]
                if temp > max_prob:
                    max_prob = temp
                    pi = p
            except:
                pass
            
        pos.append(pi)
    
    return pos  

In [10]:
pos_greedy = [greedy(s) for s in word_dev2]

In [11]:
# concatenate the list of sublists into one single list
pos_greedy = fc.reduce(lambda a, b: a + b, pos_greedy)
pos_dev = fc.reduce(lambda a, b: a + b, pos_dev2)

In [12]:
acc = accuracy_score(pos_dev, pos_greedy)
print('The prediction accuracy on the dev data is {:.2f}%'.format(acc * 100))

The prediction accuracy on the dev data is 92.67%


## Task 4: Viterbi Decoding with HMM

In [23]:
# load txt file vocab
vocab_frequent = []
#with open('recap/vocab_frequent.txt', 'r') as vocab_txt:
with open('output/vocab_frequent.txt', 'r') as vocab_txt:
    for word in vocab_txt:
        word = word.strip('\n')
        vocab_frequent.append(word)

In [24]:
# load txt file pos
pos_distinct = []

#with open('recap/pos.txt', 'r') as pos_txt:
with open('output/pos.txt', 'r') as pos_txt:
    for pos in pos_txt:
        pos = pos.strip('\n')
        pos_distinct.append(pos)

In [25]:
# load json file hmm                
#with open('recap/hmm.json', 'r') as hmm:
with open('output/hmm.json', 'r') as hmm:
    json_data = json.load(hmm)

emission, transition = json_data[0], json_data[1]

In [26]:
#dev = pd.read_csv('hw3/data/dev', sep='\t', names=['index', 'word', 'POS'])
dev = pd.read_csv('data/dev', sep='\t', names=['index', 'word', 'POS'])
dev.head()

Unnamed: 0,index,word,POS
0,1,The,DT
1,2,Arizona,NNP
2,3,Corporations,NNP
3,4,Commission,NNP
4,5,authorized,VBD


In [27]:
index_dev = dev.loc[:, 'index'].values.tolist()
word_dev = dev.loc[:, 'word'].values.tolist()
pos_dev = dev.loc[:, 'POS'].values.tolist()

In [28]:
# split dev lists (index, word and pos) to individual samples (list --> list of sublists)
word_dev2 = []
pos_dev2 = []
word_sample = []
pos_sample = []
for i in range(len(dev)-1):
    if index_dev[i] < index_dev[i+1]:
        word_sample.append(word_dev[i])
        pos_sample.append(pos_dev[i])
    else:
        word_sample.append(word_dev[i])
        word_dev2.append(word_sample)
        word_sample = []
        
        pos_sample.append(pos_dev[i])
        pos_dev2.append(pos_sample)
        pos_sample = []

In [29]:
# define a function to predict the pos for an input sentence
def viterbi(sentence):
    # initialize a dictionary that keeps track of the highest cumulative probability of each possible
    # pos at each position of the input sentence
    seq = {i:{} for i in range(len(sentence))}
    # also initialize a dictionary that keeps track of the pos of the previous pos that leads to the
    # highest cumulative probability of each possible pos at each position of the input sentence
    # for instance, for a pos of NNP at position i, we want to know which pos of position i-1 leads to
    # the highest cumulative probability of NNP at position i.
    pre_pos = {i:{} for i in range(len(sentence))}    
    
    # for the first position, the highest cumulative probability of each possible pos would be
    # emission[sentence[0]|pos] * transition[pos|<s>]
    
    # check if the first word is in the vocabualry. If not, replace with '<unk>'
    if sentence[0] not in vocab_frequent:
        sentence[0] = '<unk>'
        
    for p in pos_distinct:
        if p + '|' + '<s>' in transition:
            try:
                seq[0][p] = transition[p + '|' + '<s>'] * \
                            emission[sentence[0] + '|' + p]
            except:
                seq[0][p] = 0
    # set <s> as the previous pos of each possible pos at the first position
    for p in seq[0].keys():
        pre_pos[0][p] = '<s>'
    
    # for position i > 0, the highest cumulative probability of each possible pos would be
    # emission[sentence[i]|pos[i]] * transition[pos[i]|pos[i-1]] * seq[i-1][pos]
    for i in range(1, len(sentence)):
        # still, check if the word is in the vocabulary
        if sentence[i] not in vocab_frequent:
            sentence[i] = '<unk>'
            
        for p in seq[i-1].keys():
            for p_prime in pos_distinct:
                if p_prime + '|' + p in transition:
                    if p_prime in seq[i]:
                        try:
                            temp = seq[i-1][p] * \
                                   transition[p_prime + '|' + p] * \
                                   emission[sentence[i] + '|' + p_prime]
                            if  temp > seq[i][p_prime]:
                                seq[i][p_prime] = temp
                                pre_pos[i][p_prime] = p
                        except:
                            pass
                    else:
                        try:
                            seq[i][p_prime] = seq[i-1][p] * \
                                              transition[p_prime + '|' + p] * \
                                              emission[sentence[i] + '|' + p_prime]
                            pre_pos[i][p_prime] = p
                        except:
                            seq[i][p_prime] = 0
    # after we get the maximum probability for every possible pos at every position of a sentence,
    # we can trace backward to find out our prediction on the pos for the sentence.
    seq_predict = []
    
    # The pos of the last word in the sentence is the one with the highest probability
    # after predicting the pos of the last word in the sentence, we can iterate through pre_pos to predict
    # the pos of the remaining words in the input sentence in the reverse order
    
    # the highest probability
    prob_max = max(seq[len(sentence)-1].values())
    # the index of the highest probability
    index_max = list(seq[len(sentence)-1].values()).index(prob_max)
    # the pos of the highest probability
    pos_max = list(seq[len(sentence)-1].keys())[index_max]
    seq_predict.append(pos_max)
    
    # iterate through pre_pos
    for i in range(len(sentence)-1, 0, -1):
        # for some rare ss or sx pairs, there is no corresponding key in the
        # transition or emission dictionary. In this case, we need to set manually
        # the pos to 'UNK' at those positions
        try:
            pos_max = pre_pos[i][pos_max]
            seq_predict.append(pos_max)
        except:
            seq_predict.append('UNK')
        
    # The final seq_predict should be the reverse of the original
    seq_predict = [seq_predict[i] for i in range(len(seq_predict)-1, -1, -1)]
    return seq_predict

In [30]:
# use viterbi to predict pos for dev
pos_viterbi = [viterbi(s) for s in word_dev2]

In [31]:
# merge the list of sublists to a single list
pos_viterbi = fc.reduce(lambda a, b: a + b, pos_viterbi)
pos_dev = fc.reduce(lambda a, b: a + b, pos_dev2)

In [32]:
acc = accuracy_score(pos_dev, pos_viterbi)
print('The prediction accuracy on the dev data is {:.2f}%'.format(acc * 100))

The prediction accuracy on the dev data is 94.36%
