In [115]:
import re
import numpy as np
from collections import Counter
from collections import defaultdict
import pandas as pd
import math
import string

In [116]:
# Useful functions for handling oov and getting tags & words
# Punctuation characters
punct = set(string.punctuation)

# Morphology rules used to assign unknown word tokens
noun_suffix = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood", "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry", "scape", "ship", "ty"]
verb_suffix = ["ate", "ify", "ise", "ize"]
adj_suffix = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive", "less", "ly", "ous"]
adv_suffix = ["ward", "wards", "wise"]


def get_word_tag(line, vocab): 
    if not line.split():
        word = "--n--"
        tag = "--s--"
        return word, tag
    else:
        word, tag = line.split()
        if word not in vocab: 
            # Handle unknown words
            word = assign_unk(word)
        return word, tag
    return None 


def preprocess(vocab, data_fp):
    """
    Preprocess data
    """
    orig = []
    prep = []

    # Read data
    with open(data_fp, "r") as data_file:

        for cnt, word in enumerate(data_file):

            # End of sentence
            if not word.split():
                orig.append(word.strip())
                word = "--n--"
                prep.append(word)
                continue

            # Handle unknown words
            elif word.strip() not in vocab:
                orig.append(word.strip())
                word = assign_unk(word)
                prep.append(word)
                continue

            else:
                orig.append(word.strip())
                prep.append(word.strip())

    assert(len(orig) == len(open(data_fp, "r").readlines()))
    assert(len(prep) == len(open(data_fp, "r").readlines()))

    return orig, prep


def assign_unk(tok):
    """
    Assign unknown word tokens
    """
    # Digits
    if any(char.isdigit() for char in tok):
        return "--unk_digit--"

    # Punctuation
    elif any(char in punct for char in tok):
        return "--unk_punct--"

    # Upper-case
    elif any(char.isupper() for char in tok):
        return "--unk_upper--"

    # Nouns
    elif any(tok.endswith(suffix) for suffix in noun_suffix):
        return "--unk_noun--"

    # Verbs
    elif any(tok.endswith(suffix) for suffix in verb_suffix):
        return "--unk_verb--"

    # Adjectives
    elif any(tok.endswith(suffix) for suffix in adj_suffix):
        return "--unk_adj--"

    # Adverbs
    elif any(tok.endswith(suffix) for suffix in adv_suffix):
        return "--unk_adv--"

    return "--unk--"

In [117]:
# load in the training corpus
with open("WSJ_POS_CORPUS_FOR_STUDENTS/WSJ_02-21.pos", 'r') as f:
    training_corpus = f.readlines()

In [118]:
# making hmm vocab
# words used more than twice are included 
# unknown tokens for handling unknown words
vocab = {}
words = []
two_or_more = []

for i in training_corpus:
    items = i.split()
    if not len(items) is 2 :
        continue
    words.append(items[0])
count_words = Counter(words)

for i in count_words.keys():
    if count_words[i] >= 2:
        two_or_more.append(i)

un_token = ['--n--', '--unk--', '--unk_adj--', '--unk_adv--', '--unk_digit--', '--unk_noun--', '--unk_punct--', '--unk_upper--', '--unk_verb--']
new = [i for i in (Counter(un_token+two_or_more)).keys()]

for i, word in list(enumerate(sorted(new))):
    vocab[word] = i     

orig, prep = preprocess(vocab, 'WSJ_POS_CORPUS_FOR_STUDENTS/WSJ_23.words')

In [119]:
def create_dictionaries(training_corpus, vocab):
    # initialize
    emission_counts = defaultdict(int)
    transition_counts = defaultdict(int)
    tag_counts = defaultdict(int)
    
    # default
    prev_tag = '--s--' 
    
    # get words and tags from training corpus
    for word_tag in training_corpus:
        # helper function (imported from utils_pos.py)
        word, tag = get_word_tag(word_tag, vocab)
        # Increment the transition count for the previous word and tag
        transition_counts[(prev_tag, tag)] += 1
        # Increment the emission count for the tag and word
        emission_counts[(tag, word)] += 1
        # Increment the tag count
        tag_counts[(tag)] += 1
        # Set the previous tag to this tag (for the next iteration of the loop)
        prev_tag = tag
        
    return emission_counts, transition_counts, tag_counts

In [120]:
emission_counts, transition_counts, tag_counts = create_dictionaries(training_corpus, vocab)

In [121]:
# get all the POS states
states = sorted(tag_counts.keys())

In [122]:
def create_transition_matrix(alpha, tag_counts, transition_counts):
    
    # POS tags
    all_tags = sorted(tag_counts.keys())
    
    # num of POS tags
    num_tags = len(all_tags)
    
    # Initialize the transition matrix
    trans_matrix = np.zeros((num_tags,num_tags))
    
    # transition tuples (previous POS, current POS)
    trans_keys = set(transition_counts.keys())
    
    # row of the transition matrix 
    for i in range(num_tags):
        
        # column of the transition matrix 
        for j in range(num_tags):

            # Initialize the count of the (prev POS, current POS) to zero
            count = 0
        
            # Define the tuple (prev POS, current POS)
            key = (all_tags[i], all_tags[j])

            # Check if the (prev POS, current POS) tuple 
            # exists in the transition counts dictionary
            if key in trans_keys:
                
                # Get count for the (prev POS, current POS) tuple
                count = transition_counts.get(key)
                
            # Get the count of the previous 
            count_prev_tag = tag_counts.get(all_tags[i])
            
            # Apply smoothing 
            trans_matrix[i,j] = (count + alpha) / (count_prev_tag + alpha * num_tags)
    
    return trans_matrix

In [123]:
alpha = 0.001
trans_matrix = create_transition_matrix(alpha, tag_counts, transition_counts)

print("A subset of transition matrix")
trans_matrix_sub = pd.DataFrame(trans_matrix[20:25,20:25], index=states[20:25], columns = states[20:25] )
print(trans_matrix_sub)

A subset of transition matrix
            NN       NNP      NNPS       NNS           PDT
NN    0.122172  0.009749  0.000090  0.077797  1.505246e-05
NNP   0.058328  0.376807  0.016695  0.024249  1.094395e-05
NNPS  0.038159  0.277212  0.015713  0.011224  3.741050e-07
NNS   0.020817  0.003057  0.000033  0.010525  5.013696e-05
PDT   0.000003  0.000003  0.000003  0.000003  2.702367e-06


In [124]:
def create_emission_matrix(alpha, tag_counts, emission_counts, vocab):
    
    # num of POS tag
    num_tags = len(tag_counts)
    
    # all POS tags
    all_tags = sorted(tag_counts.keys())
    
    # num of unique words in the vocab
    num_words = len(vocab)
    
    # Initialize the emission matrix 
    emis_matrix = np.zeros((num_tags, num_words))
    
    # keys of the emission_counts dictionary
    emis_keys = set(list(emission_counts.keys()))
    
    # row (POS tags)
    for i in range(num_tags): # complete this line
        
        # column (words)
        for j in range(num_words): # complete this line

            # Initialize the emission count for the (POS tag, word) to zero
            count = 0
                    
            # (POS tag, word) tuple 
            key = (all_tags[i], vocab[j])

            # check if the (POS tag, word) tuple exists as a key in emission counts
            if key in emis_keys: 
        
                # Get the count of (POS tag, word) from the emission_counts d
                count = emission_counts.get(key)
                
            # Get the count of the POS tag
            count_tag = tag_counts.get(all_tags[i])
                
            # Apply smoothing
            emis_matrix[i,j] = (count + alpha) / (count_tag + alpha * num_words)

    return emis_matrix

In [125]:
emis_matrix = create_emission_matrix(alpha, tag_counts, emission_counts, list(vocab))

idx  = ['1987','computer','engineers', 'paid', 'level']

cols = [vocab[a] for a in cidx]

vals =['CD','NN','NNS', 'VB','RB','RP']

rows = [states.index(a) for a in rvals]

# Get the emissions for the sample of words, and the sample of POS tags
emis_matrix_sub = pd.DataFrame(emis_matrix[np.ix_(rows,cols)], index=vals, columns = idx )
print(emis_matrix_sub)

             1987      computer     engineers          paid         level
CD   8.201296e-05  2.732855e-08  2.732855e-08  2.732855e-08  2.732855e-08
NN   7.521128e-09  7.521128e-09  7.521128e-09  7.521128e-09  2.257091e-05
NNS  1.670013e-08  1.670013e-08  4.676203e-04  1.670013e-08  1.670013e-08
VB   3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08
RB   3.226454e-08  6.456135e-05  3.226454e-08  3.226454e-08  3.226454e-08
RP   3.723319e-07  3.723319e-07  3.723319e-07  3.723319e-07  3.723319e-07


In [126]:
def initialize(states, tag_counts, trans_matrix, emis_matrix, corpus, vocab):
    
    # num of unique POS tags
    num_tags = len(tag_counts)
    
    # Initialize best_probs matrix 
    best_probs = np.zeros((num_tags, len(corpus)))
    
    # Initialize best_paths matrix
    best_paths = np.zeros((num_tags, len(corpus)), dtype=int)
    
    # The start token
    s_idx = states.index("--s--")
   
    #  The POS tags
    for i in range(num_tags): # complete this line
    
        # The special case when the transition from start token to POS tag i is zero
        if trans_matrix[i][0] == 0: # complete this line
            
            # Initialize best_probs at POS tag 'i', column 0, to negative infinity
            best_probs[i,0] = float('-inf')
        
        # For all other cases when transition from start token to POS tag i is non-zero:
        else:
            # Initialize best_probs at POS tag 'i', column 0
            best_probs[i,0] = math.log(trans_matrix[s_idx, i]) + math.log(emis_matrix[i, vocab[corpus[0]]])
                         
    return best_probs, best_paths

In [127]:
best_probs, best_paths = initialize(states, tag_counts, trans_matrix, emis_matrix, prep, vocab)
# Test the function
print(f"best_probs[0,0]: {best_probs[0,0]:.4f}") 
print(f"best_paths[2,5]: {best_paths[2,5]:.4f}")

best_probs[0,0]: -22.6098
best_paths[2,5]: 0.0000


In [128]:
# GRADED FUNCTION: viterbi_forward
def viterbi_forward(trans_matrix, emis_matrix, test_corpus, best_probs, best_paths, vocab):

    # num of unique POS tags
    num_tags = best_probs.shape[0]
    
    # Go through every word in the corpus starting from word 1
    for i in range(1, len(test_corpus)): 
            
        for j in range(num_tags): 
            
            # Initialize best_prob for word i to negative infinity
            best_prob_i = float("-inf")
            
            # Initialize best_path for current word i to None
            best_path_i = None
            
            # list for storing probs of viterbi
            probs = []
            
            for k in range(num_tags): 
            
                # The prob = 
                # best probs of POS tag k, previous word i-1 + 
                # log(prob of transition POS k -> POS j) + 
                # log(prob that emission POS j -> word i)
                prob = best_probs[k, i-1] + math.log(trans_matrix[k,j]) + math.log(emis_matrix[j,vocab[test_corpus[i]]])
                
                # stack every prob
                probs.append(prob)
            
            # find max prob
            best_prob_i = max(probs)
            
            # find index of max prob
            best_path_i = probs.index(best_prob_i)
            
            # Save the best prob for the 
            # given current word's POS tag and the position 
            best_probs[j,i] = best_prob_i
            
            # Save the unique integer ID of the previous POS tag
            best_paths[j,i] = best_path_i

    return best_probs, best_paths

In [129]:
# This will take a minute 
best_probs, best_paths = viterbi_forward(trans_matrix, emis_matrix, prep, best_probs, best_paths, vocab)

In [132]:
def viterbi_backward(best_probs, best_paths, corpus, states):
    
    # num of words in the corpus
    num_words = best_paths.shape[1] # col in best_paths
    
    # Initialize array z, same length as the corpus
    z = [None] * num_words
    
    # Initialize pred array, same length as corpus
    pred = [None] * num_words
    
    # Save the index of the best prob of the last col values 
    z[num_words-1] = np.argmax(best_probs[:,-1])
    
    # Initialize the last data 
    pred[num_words - 1] = states[z[num_words-1]]
    
    # Find the best POS tags by backtracking through the best_paths
    for i in range(num_words-1, 0, -1): # complete this line
        
        # Retrieve the unique integer ID (the word at position 'i' in the corpus)
        pos_tag_for_word_i = z[i]
        
        # go backward thru best_paths and find the index of best prob
        z[i - 1] = best_paths[pos_tag_for_word_i, i]
        
        # Get the previous word's POS tag in string form
        pred[i - 1] = states[z[i-1]]
        
    return pred

In [133]:
pred = viterbi_backward(best_probs, best_paths, prep, states)

In [134]:
# write a file with words and proper tags 
with open('WSJ_POS_CORPUS_FOR_STUDENTS/submission.pos', 'w') as f:
    for i, t in zip(orig, pred):
        if t == "--s--":
            f.write("\n")
        else:
            f.write(f"{i}\t{t}\n")