# Task 1: Vocabulary Creation
In this task, we will create a vocabulary using the training data. We will replace all words with occurences less than a threshold (default: 3) to a special token \<unk>. Finally, we will export this vocabulary to a file named vocab.txt where each line is in the format of 'word\tindex\toccurences', where index starts at 0 and occurences ordered in descending order. Moreover, the first line should be the \<unk> token with index 0.

In [1]:
# count the word frequencies in the given file without using any threshold
def count_word_frequencies(fname):
    word_frequencies = {}
    with open(fname, 'r') as f:
        lines = f.readlines()
        for line in lines:
            # Read one word of the sentence at a time
            line = line.strip()
            if line:
                arguments = line.split('\t')
                if len(arguments) != 3:
                    print(f'Invalid format on line: {line}')
                else:
                    # arguments: [index, word, pos_tag]
                    word = arguments[1]
                    if word in word_frequencies:
                        word_frequencies[word] += 1
                    else:
                        word_frequencies[word] = 1
    return word_frequencies

# create the vocab using word frequencies of all the words with a cutoff threshold
# any words with frequency lower than the threshold will be omitted and counted as the speical token <unk>
def create_vocab(word_frequencies, threshold=3):
    vocab = {}
    # Add special token <unk> which includes all the removed token (those with occurence less than the threshold)
    unk_frequency = 0
    for word, frequency in word_frequencies.items():
        if frequency >= threshold:
            vocab[word] = frequency
        else:
            unk_frequency += frequency
    return vocab, unk_frequency

# exports the vocabulary and frequency of <unk> tokens into a file where the each line is in the format of
#'word\tindex\toccurences', where index starts at 0 and occurences ordered in descending order
# also, the first line should be the <unk> token with index 0
def export_vocab(fname, vocab, unk_frequency):
    with open(fname, 'w') as f:
        # First line is <unk> special token
        f.write(f'<unk>\t0\t{unk_frequency}\n')
        # sort the vocab in descending order of frequency
        sorted_vocab = sorted(vocab.items(), key=lambda x: x[1], reverse=True)
        # write each word in the format 'word\tindex\toccurences'
        for index, (word, frequency) in enumerate(sorted_vocab):
            f.write(f'{word}\t{index+1}\t{frequency}')
            # Add new line if not on the last entry
            if index < len(sorted_vocab)-1:
                f.write('\n')

In [2]:
# hyper-parameter for word cutoff based on occurences
threshold = 10

word_frequencies = count_word_frequencies('data/train')
vocab, unk_frequency = create_vocab(word_frequencies, threshold=threshold)
export_vocab('vocab.txt', vocab, unk_frequency)

In [3]:
print(f'Total size of vocabulary: {len(vocab)}')
print(f'Occurences of <unk>: {unk_frequency}')

Total size of vocabulary: 7096
Occurences of <unk>: 80313


## Conclusion
I tried experimenting with the threshold hyperparameter and found that the lower the threshold, the higher accuracy I obtain: (0.94 greedy acc with threshold=2 vs 0.932 greedy acc with threshold=4). However, I do not believe a lower threshold would be better if met with a testing set that does not share many of the words. As such, having some recollection of how to deal with \<unk> would probably be better in the long run.

Moreover, I tested text pre-processing such as converting all words to lowercase, but found that this harmed the accuracy aswell. My justification for this is that we are sometimes introducing ambiguity by doing this, for example the NN 'Will' vs MD 'will' will have one record but are entirely different. Will will write his will. This becomes even more confusing and the results reflect that, and that's why I refrained from lowercasing the text.

# Task 2: Model Learning
In this task we will create an HMM from the training data. The closed-form for the emission and transition parameters in HMM are $$t(s*|s) = count(s->s*)/count(s)$$ $$e(x|s) = count(s->x)/count(s)$$ respectively, where $s$ is current tag, $s*$ is next tag, and x is the current word. After calculating these probabilities, we export them as two dictionaries to 'hmm.json'. The first dictionary, transition, contains items with pairs of $(s,s*)$ as key and $t(s*|s)$ as value. The second dictionary, emission, contains items with pairs of $(s,x)$ as key and $e(x|s)$ as value.

Note that when we parse the sentences from the training data, we inject two special tokens, \<start> and \<end>, at the start and end of each sentence respectively. This trick aids us in computing the probabilities for the hmm, specifically t(s1) becomes t(s1|\<start>).

In [4]:
import json

# Parses the given file using the specified vocab
# All words not in the vocab will be replaced with <unk> word
# Returns list of sentences where each sentence has two dictionaries, 'words' and 'tags'
# This can only be used on training/dev files as it expects the line to have the format: index word tag
def read_sentences(fname, vocab):
    sentences = []
    with open(fname, 'r') as f:
        lines = f.readlines()
        words = ['<start>']
        tags = ['<start>']
        # each line represents a single token (word, punct, number, etc) in a sentence
        for line in lines:
            # Read one word of the sentence at a time
            line = line.strip()
            # On non-empty lines, parse line, which is index\tword\ttag
            if line:
                # arguments = [index, word, tag]
                arguments = line.split('\t')
                # Skip on bad lines
                if len(arguments) != 3:
                    print(f'Invalid format on line: {line}')
                    continue
                else:
                    word = arguments[1]
                    tag = arguments[2]
                    # only add the word if its in the vocab
                    if word in vocab:
                        words.append(word)
                        tags.append(tag)
                    # otherwise, add <unk> token
                    else:
                        words.append('<unk>')
                        tags.append(tag) # TODO: is it tag or <unk>?
            # On empty lines, it indicates a new sentence
            else:
                # Add end tag to the end of the sentence
                words.append('<end>')
                tags.append('<end>')
                
                # add words/tags to the sentences
                sentences.append({'words': words, 'tags': tags})
                
                # reset sentence for the next sentence
                words = ['<start>']
                tags = ['<start>']
    return sentences

# Given a list of sentences, this will calculate the occurences of all tags
def calc_tag_frequencies(sentences):
    tag_frequencies = {}
    for sentence in sentences:
        tags = sentence['tags']
        for tag in tags:
            if tag in tag_frequencies:
                tag_frequencies[tag] += 1
            else:
                tag_frequencies[tag] = 1
    return tag_frequencies

# Given a list of sentences and tag frequencies, this will create two dictionaries, transition and emission
def create_hmm(sentences, tag_frequencies):
    transition = {}
    emission = {}
    # Count the occurences for each transition/emission (excluding denominator normalization)
    for sentence in sentences:
        words = sentence['words']
        tags = sentence['tags']
        for i in range(len(words)):
            # Skip the first/last word/tag as it is an injected <start>/<end> token
            if i == 0 or i == len(words) - 1:
                continue
            word = words[i]
            tag = tags[i]
            prev_tag = tags[i-1]
            # transition
            transition_key = (prev_tag, tag)
            if transition_key in transition:
                transition[transition_key] += 1
            else:
                transition[transition_key] = 1
            # emission
            emission_key = (tag, word)
            if emission_key in emission:
                emission[emission_key] += 1
            else:
                emission[emission_key] = 1

    # Normalize the transition into probabilities using the denominator
    for key, value in transition.items():
        # key: (s, s')
        # value: t(s'|s)
        prev_tag, tag = key
        transition[key] = value / tag_frequencies[prev_tag]
        
    # Normalize the emission into probabilities using the denominator
    for key, value in emission.items():
        # key: (s, x)
        # value: e(x|s)
        tag, word = key
        emission[key] = value / tag_frequencies[tag]
    
    return transition, emission

# Exports the transition and emission probabilities to the specified filename as json format
def export_hmm(fname, transition, emission):
    with open(fname, 'w') as f:
        # Convert the tuple key to string key to be able to export as json
        t = {str(k): v for k, v in transition.items()}
        e = {str(k): v for k, v in emission.items()}
        json.dump({'transition': t, 'emission': e}, f)

In [5]:
# Parse in the training data and filter using our earlier vocab
sentences = read_sentences('data/train', vocab)

# Calculate the tag frequencies, i.e. count(s) for all s
tag_frequencies = calc_tag_frequencies(sentences)

# Compute the transition and emission probabilities and then export to the file
transition, emission = create_hmm(sentences, tag_frequencies)
export_hmm('hmm.json', transition, emission)

In [6]:
transition_params = len(transition)
emission_params = len(emission)

print(f'Transition Parameters in HMM: {transition_params}')
print(f'Emission Parameters in HMM: {emission_params}')

Transition Parameters in HMM: 1392
Emission Parameters in HMM: 11118


# Task 3: Greedy Decoding with HMM
Here we implement and evalute the greedy decoding algorithm on the development data and report the accuracy. Then, we predict the pos tags on the test data and export the result into a file with format similar to the training data.

In [7]:
# Parses the given file using the specified vocab
# All words not in the vocab will be replaced with <unk> word
# Returns list of sentences where each sentence has one dictionary, 'words'
# This can only be used on test file as it expects the line to have the format: index word
def read_sentences_test(fname, vocab):
    sentences = []
    with open(fname, 'r') as f:
        lines = f.readlines()
        words = ['<start>']
        # each line represents a single token (word, punct, number, etc) in a sentence
        for line in lines:
            # Read one word of the sentence at a time
            line = line.strip()
            # On non-empty lines, parse line, which is index\tword
            if line:
                # arguments = [index, word]
                arguments = line.split('\t')
                # Skip on bad lines
                if len(arguments) != 2:
                    print(f'Invalid format on line: {line}')
                    continue
                else:
                    word = arguments[1]
                    # only add the word if its in the vocab
                    if word in vocab:
                        words.append(word)
                    # otherwise, add <unk> token
                    else:
                        words.append('<unk>')
            # On empty lines, it indicates a new sentence
            else:
                # Add end tag to the end of the sentence
                words.append('<end>')
                
                # add words/tags to the sentences
                sentences.append({'words': words})
                
                # reset sentence for the next sentence
                words = ['<start>']
    return sentences

In [8]:
# Parse in the development data and filter using our earlier vocab
sentences = read_sentences('data/dev', vocab)

# Keep counters to calculate accuracy
total_correct = 0
total = 0

for sentence in sentences:
    words = sentence['words']
    tags = sentence['tags'] # used only for accuracy measurement
    
    # for each word,
    # find s* that maximizes t(s | s_prev) * e(word | s) where s_prev is the last predicted tag
    # To simplify our model, we skip the first word (<start>) but predict the tag <start> at the beginning
    # this is so the first 'real' tag, i.e. NN Dog, can follow the formula easily instead of having two cases
    # where one case is t(s) and the other is t(s | s_prev)
    predicted_tags = ['<start>']
    for i in range(len(words)):
        # skip the first/last word which is an injected token <start>/<end>
        if i == 0 or i == len(words)-1:
            continue
        
        word = words[i]
        
        # Keep counter of which tag has the best likelihood
        best_tag = None
        best_tag_value = 0
        
        # Enumerate through all possible transitions, and only consider those that fit the formula
        # and those that do, we calculate its likelihood, and if its better than our best, we set it to our current best
        for key, value in transition.items():
            # key: (s, s')
            # value: t(s'|s)
            prev_tag, tag = key
            
            # skip the transitions that are not starting from our last predicted tag
            if prev_tag != predicted_tags[-1]:
                continue
                
            t = value # t(s' | s*) where s* is last predicted tag
            e = 0
            if (tag, word) in emission:
                e = emission[(tag, word)]
            computed = t * e
            
            if best_tag is None or computed > best_tag_value:
                best_tag = tag
                best_tag_value = computed
        
        # After looping through all transitions, we have found the best tag to predict
        predicted_tags.append(best_tag)
    predicted_tags.append('<end>')
    
    # Remove start/end tokens from the tags for a fair unpadded accuracy measurement
    tags = tags[1:-1]
    predicted_tags = predicted_tags[1:-1]
    
    # Compare actual tags with our predicted tags, and calculate the accuracy
    for i in range(len(tags)):
        if tags[i] == predicted_tags[i]:
            total_correct += 1
    total += len(tags)
    
print(f'Greedy Accuracy: {total_correct / total : .4f}')

Greedy Accuracy:  0.9078


In [9]:
# Parse in the test data and filter using our earlier vocab
sentences = read_sentences_test('data/test', vocab)

# A list to contain all of our sentence pos predictions
output = []

for sentence in sentences:
    words = sentence['words']
    
    # for each word,
    # find s* that maximizes t(s | s_prev) * e(word | s) where s_prev is the last predicted tag
    # To simplify our model, we skip the first word (<start>) but predict the tag <start> at the beginning
    # this is so the first 'real' tag, i.e. NN Dog, can follow the formula easily instead of having two cases
    # where one case is t(s) and the other is t(s | s_prev)
    predicted_tags = ['<start>']
    for i in range(len(words)):
        # skip the first/last word which is an injected token <start>/<end>
        if i == 0 or i == len(words)-1:
            continue
        
        word = words[i]
        
        # Keep counter of which tag has the best likelihood
        best_tag = None
        best_tag_value = 0
        
        # Enumerate through all possible transitions, and only consider those that fit the formula
        # and those that do, we calculate its likelihood, and if its better than our best, we set it to our current best
        for key, value in transition.items():
            # key: (s, s')
            # value: t(s'|s)
            prev_tag, tag = key
            
            # skip the transitions that are not starting from our last predicted tag
            if prev_tag != predicted_tags[-1]:
                continue
                
            t = value # t(s' | s*) where s* is last predicted tag
            e = 0
            if (tag, word) in emission:
                e = emission[(tag, word)]
            computed = t * e
            
            if best_tag is None or computed > best_tag_value:
                best_tag = tag
                best_tag_value = computed
        
        # After looping through all transitions, we have found the best tag to predict
        predicted_tags.append(best_tag)
    predicted_tags.append('<end>')
    
    # Remove start/end tokens from the words and tags
    words = words[1:-1]
    predicted_tags = predicted_tags[1:-1]
        
    output.append({'words': words, 'tags': predicted_tags})
    
# Export result
with open('greedy.out', 'w') as f:
    for index, sentence in enumerate(output):
        sentence_size = len(sentence['words'])
        for i in range(sentence_size):
            word = sentence['words'][i]
            tag = sentence['tags'][i]
            f.write(f'{i+1}\t{word}\t{tag}\n')
        # to match formatting, only add new line after each sentence if its not the last sentence
        if index != len(output) - 1:
            f.write('\n')

# Task 4: Viterbi Decoding with HMM
Here we implement and evalute the viterbi decoding algorithm on the development data and report the accuracy. Then, we preidct the pos tags on the test data and export the result into a file with format similar to the training data.

In [10]:
for tag1 in tag_frequencies.keys():
    prob_sum = 0
    for tag2 in tag_frequencies.keys():
        if (tag1, tag2) in transition:
            prob_sum += transition[(tag1, tag2)]
    if prob_sum < 0.99:
        print(tag1 + " = " + str(prob_sum))
        print('freq = ' + str(tag_frequencies[tag1]))
        for tag2 in tag_frequencies.keys():
            if (tag1, tag2) in transition:
                print("\t" + str((tag1, tag2)) + " = " + str(transition[(tag1, tag2)]))

. = 0.06934691938123648
freq = 37882
	('.', 'NNP') = 0.00139908135790085
	('.', ',') = 7.919328440948208e-05
	('.', 'CD') = 0.0001319888073491368
	('.', 'NNS') = 5.279552293965472e-05
	('.', 'JJ') = 0.00010559104587930944
	('.', 'MD') = 2.639776146982736e-05
	('.', 'VB') = 0.0003167731376379283
	('.', 'DT') = 0.00010559104587930944
	('.', 'NN') = 0.0003167731376379283
	('.', 'IN') = 7.919328440948208e-05
	('.', '.') = 0.0001847843302887915
	('.', 'VBZ') = 2.639776146982736e-05
	('.', 'CC') = 2.639776146982736e-05
	('.', 'VBD') = 2.639776146982736e-05
	('.', 'VBN') = 2.639776146982736e-05
	('.', 'RB') = 7.919328440948208e-05
	('.', 'TO') = 2.639776146982736e-05
	('.', 'PRP') = 2.639776146982736e-05
	('.', 'VBP') = 2.639776146982736e-05
	('.', '``') = 0.0005279552293965472
	('.', "''") = 0.058523837178607256
	('.', 'WP') = 2.639776146982736e-05
	('.', ':') = 0.00023757985322844623
	('.', 'WRB') = 2.639776146982736e-05
	('.', 'NNPS') = 0.0001319888073491368
	('.', '-LRB-') = 0.00147827464

In [11]:
# Parse in the development data and filter using our earlier vocab
sentences = read_sentences('data/dev', vocab)

# Keep counters to calculate accuracy
total_correct = 0
total = 0

for sentence in sentences:
    words = sentence['words']
    tags = sentence['tags'] # used only for accuracy measurement
    
    viterbi = {}
    backpointer = {}
    
    for i in range(len(words)):
        # skip the first/last word which is an injected token <start>/<end>
        if i == 0 or i == len(words)-1:
            continue
        
        word = words[i]
        
        # Initialization of viterbi on first word
        if i == 1:
            for tag in tag_frequencies.keys():
                t_key = ('<start>', tag)
                t = 0
                if t_key in transition:
                    t = transition[t_key]
                e_key = (tag, word)
                e = 0
                if e_key in emission:
                    e = emission[e_key]
                viterbi[(1, tag)] = t * e
                
                # also init backpointer
                backpointer[(1, tag)] = tag
        else:
            for tag_cur in tag_frequencies.keys():
                viterbi[(i, tag_cur)] = 0
                for tag_prev in tag_frequencies.keys():
                    table_value = viterbi[(i-1, tag_prev)]
                    t_key = (tag_prev, tag_cur)
                    t = 0
                    if t_key in transition:
                        t = transition[t_key]
                    e_key = (tag_cur, word)
                    e = 0
                    if e_key in emission:
                        e = emission[e_key]
                    viterbi_value = table_value * t * e
                    if viterbi_value >= viterbi[(i, tag_cur)]:
                        viterbi[(i, tag_cur)] = viterbi_value
                        backpointer[(i, tag_cur)] = tag_prev
        
    t = len(words)-2 # -1 to start from last character, another -1 to skip <end>
    p = None
    predicted_tags = []
    while t > 0: # Not >= 0 so we skip the <start>
        p_val = 0
        for x in tag_frequencies.keys():
            x_val = viterbi[(t, x)]
            if p is None or x_val >= p_val:
                p = x
                p_val = x_val
        predicted_tags.append(p)
        t -= 1
   
    # predicted_tags dont include <start> and <end> tokens
    predicted_tags.reverse()
    
    # Remove start/end tokens from the tags for a fair unpadded accuracy measurement
    tags = tags[1:-1]
    
    # Compare actual tags with our predicted tags, and calculate the accuracy
    for i in range(len(tags)):
        if tags[i] == predicted_tags[i]:
            total_correct += 1
    total += len(tags)
    
print(f'Viterbi Accuracy: {total_correct / total : .4f}')

Viterbi Accuracy:  0.9089


In [12]:
# Parse in the test data and filter using our earlier vocab
sentences = read_sentences_test('data/test', vocab)

# A list to contain all of our sentence pos predictions
output = []

for sentence in sentences:
    words = sentence['words']
    
    viterbi = {}
    backpointer = {}
    
    for i in range(len(words)):
        # skip the first/last word which is an injected token <start>/<end>
        if i == 0 or i == len(words)-1:
            continue
        
        word = words[i]
        
        # Initialization of viterbi on first word
        if i == 1:
            for tag in tag_frequencies.keys():
                t_key = ('<start>', tag)
                t = 0
                if t_key in transition:
                    t = transition[t_key]
                e_key = (tag, word)
                e = 0
                if e_key in emission:
                    e = emission[e_key]
                viterbi[(1, tag)] = t * e
                
                # also init backpointer
                backpointer[(1, tag)] = tag
        else:
            for tag_cur in tag_frequencies.keys():
                viterbi[(i, tag_cur)] = 0
                for tag_prev in tag_frequencies.keys():
                    table_value = viterbi[(i-1, tag_prev)]
                    t_key = (tag_prev, tag_cur)
                    t = 0
                    if t_key in transition:
                        t = transition[t_key]
                    e_key = (tag_cur, word)
                    e = 0
                    if e_key in emission:
                        e = emission[e_key]
                    viterbi_value = table_value * t * e
                    if viterbi_value >= viterbi[(i, tag_cur)]:
                        viterbi[(i, tag_cur)] = viterbi_value
                        backpointer[(i, tag_cur)] = tag_prev
        
    t = len(words)-2 # -1 to start from last character, another -1 to skip <end>
    p = None
    predicted_tags = []
    while t > 0: # Not >= 0 so we skip the <start>
        p_val = 0
        for x in tag_frequencies.keys():
            x_val = viterbi[(t, x)]
            if p is None or x_val >= p_val:
                p = x
                p_val = x_val
        predicted_tags.append(p)
        t -= 1
   
    # predicted_tags dont include <start> and <end> tokens
    predicted_tags.reverse()
    
    # Remove start/end tokens from the words
    words = words[1:-1]
        
    output.append({'words': words, 'tags': predicted_tags})
    
# Export result
with open('viterbi.out', 'w') as f:
    for index, sentence in enumerate(output):
        sentence_size = len(sentence['words'])
        for i in range(sentence_size):
            word = sentence['words'][i]
            tag = sentence['tags'][i]
            f.write(f'{i+1}\t{word}\t{tag}\n')
        # to match formatting, only add new line after each sentence if its not the last sentence
        if index != len(output) - 1:
            f.write('\n')