In [1]:
import nltk
from collections import Counter
import numpy as np
nltk.download('brown')
nltk.download('universal_tagset')

[nltk_data] Downloading package brown to
[nltk_data]     /Users/erictay1997/nltk_data...
[nltk_data]   Package brown is already up-to-date!
[nltk_data] Downloading package universal_tagset to
[nltk_data]     /Users/erictay1997/nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!


True

# Q1

In [2]:
sentences = list(nltk.corpus.brown.tagged_sents(tagset="universal")[:1000])
# Change words to lower-case
for sentence in sentences:
    for i in range(len(sentence)):
        sentence[i] = (sentence[i][0].lower(), sentence[i][1])

In [3]:
vocab = Counter([x[0] for sentence in sentences for x in sentence])
print(len(vocab))
oov = set([x for x in vocab if vocab[x] == 1])
print(len(oov))

4272
2367


#### Since a significant portion of vocab words only have count one, we set all words with count one to be out-of-vocabulary words. We do not set any more at risk of losing an even larger chunk of our training vocabulary

In [4]:
# Replace word with 1 count with "oov"
for sentence in sentences:
    for i in range(len(sentence)):
        if sentence[i][0] in oov:
            sentence[i] = ("oov", sentence[i][1])
vocab = set([x[0] for sentence in sentences for x in sentence])

In [5]:
# pos stores parts of speech as keys, and a counter of words as values
pos = {}
for sentence in sentences:
    for i in range(len(sentence)):
        if sentence[i][1] not in pos:
            pos[sentence[i][1]] = Counter()
        if sentence[i][0] not in pos[sentence[i][1]]:
            pos[sentence[i][1]][sentence[i][0]] = 0
        pos[sentence[i][1]][sentence[i][0]] += 1

In [6]:
for key in pos:
    print(key)
# We have 12 POS Tags

DET
NOUN
ADJ
VERB
ADP
.
ADV
CONJ
PRT
PRON
NUM
X


In [7]:
pos_to_index = {}
start = 0
for key in pos:
    pos_to_index[key] = start
    start += 1

In [8]:
# Create a counter of which POS tag starts each sentence
initial = Counter()
for sentence in sentences:
    if sentence[0][1] not in initial:
        initial[sentence[0][1]] = 0
    initial[sentence[0][1]] += 1
initial
# Only 11 POS tags start our training sentences

Counter({'DET': 273,
         '.': 103,
         'PRON': 102,
         'NOUN': 266,
         'ADV': 34,
         'ADP': 71,
         'VERB': 45,
         'NUM': 23,
         'CONJ': 29,
         'ADJ': 38,
         'PRT': 16})

In [9]:
initial["X"] = 0 # Add the last POS tag
for pos_state in initial:
    # Add 0.001 smoothing
    initial[pos_state] += 0.001
total_sum_initial = sum(initial.values())

In [10]:
# initial_state is the initial state probabilities, 
# the probability distribution over 12 POS tags for the first word of a sentence
initial_state = [0]*len(pos)
for key in pos:
    index = pos_to_index[key]
    initial_state[index] = initial[key]/total_sum_initial

In [11]:
# A is a state transition probability matrix
# A[i][j] will represent the probability of transitioning from state i to state j
# Add 0.001 smoothing
A = [[0.001]*len(pos) for _ in range(len(pos))] 
# Store Counts
for sentence in sentences:
    for i in range(len(sentence)-1):
        pos_0 = sentence[i][1]
        pos_1 = sentence[i+1][1]
        index_0 = pos_to_index[pos_0]
        index_1 = pos_to_index[pos_1]
        A[index_0][index_1] += 1
# Normalize Probabilities
for i in range(len(A)):
    total_sum = sum(A[i])
    A[i] = [x/total_sum for x in A[i]]
A = np.matrix(A)

In [12]:
word_to_index = {}
start = 0
for word in vocab:
    word_to_index[word] = start
    start += 1

In [13]:
# B is a observation probability matrix
# B[i][j] will represent the probability of generating word j from state i
# Add 0.001 smoothing
B = [[0.001]*len(vocab) for _ in range(len(pos))] 
# Store Counts
for sentence in sentences:
    for i in range(len(sentence)):
        word = sentence[i][0]
        pos_for_word = sentence[i][1]
        pos_index = pos_to_index[pos_for_word]
        word_index = word_to_index[word]
        B[pos_index][word_index] += 1
# Normalize Probabilities
for i in range(len(B)):
    total_sum = sum(B[i])
    B[i] = [x/total_sum for x in B[i]]
B = np.matrix(B)

In [14]:
# Task one is complete
# print(initial_state) # Initial State Distribution
# print(A) # Transition Matrix
# print(B) # Observation Matrix

# Q2

In [15]:
# Params:
# obs - observations [list of ints]
# pi - the initial state probabilities [list of floats]
# A - the state transition probability matrix [2D numpy array]
# B - the observation probability matrix [2D numpy array]
# returns states - the inferred state sequence [list of ints]

def viterbi(obs, pi, A, B):
    
    # prob_dp[i][j] stores the maximal log probability of a state sequence of length j+1,
    # ending in state i
    # path_dp stores the path of prob_dp[i][j]
    prob_dp = [[-float("Inf")]*len(obs) for _ in range(A.shape[0])]
    path_dp = [[None]*len(obs) for _ in range(A.shape[0])]
    
    for i in range(A.shape[0]):
        prob_dp[i][0] = np.log(pi[i]) + np.log(B[i,obs[0]])
        path_dp[i][0] = [i]
        
    for j in range(1, len(obs)):
        for i in range(A.shape[0]):
            for prev_i in range(A.shape[0]):
                if (prob_dp[prev_i][j-1] + np.log(A[prev_i,i]) + np.log(B[i,obs[j]])) > prob_dp[i][j]:
                    prob_dp[i][j] = prob_dp[prev_i][j-1] + np.log(A[prev_i,i]) + np.log(B[i,obs[j]])
                    path_dp[i][j] = path_dp[prev_i][j-1]+[i]
                    
    maximum = max([x[len(obs)-1] for x in prob_dp])
    for i in range(A.shape[0]):
        if prob_dp[i][len(obs)-1] == maximum:
            return path_dp[i][len(obs)-1]

# Q3

In [16]:
test = list(nltk.corpus.brown.tagged_sents(tagset="universal")[10150:10153])

In [17]:
# Convert words to lower-case, and unknown words to "oov"
# Then convert words to ints, due to requirements of function
for sentence in test:
    for i in range(len(sentence)):
        word = sentence[i][0].lower()
        if word in vocab:
            index = word_to_index[word]
        else:
            index = word_to_index["oov"]
        sentence[i] = (index, sentence[i][1])

In [18]:
test_1 = [x[0] for x in test[0]]
labels_1 = [x[1] for x in test[0]]
test_2 = [x[0] for x in test[1]]
labels_2 = [x[1] for x in test[1]]
test_3 = [x[0] for x in test[2]]
labels_3 = [x[1] for x in test[2]]

In [19]:
index_to_pos = {}
for pos in pos_to_index:
    index_to_pos[pos_to_index[pos]] = pos

In [20]:
# words [list of ints]
# true_labels [list of strings]

def test(words, true_labels):
    predicted_states = viterbi(words, initial_state, A, B)
    predicted_labels = [index_to_pos[x] for x in predicted_states]
    print("Predicted Labels: \n{}".format(predicted_labels))
    print("True Labels: \n{}".format(true_labels))
    print("Accuracy: {}{}\n\n".format(np.mean([predicted_labels[i] == true_labels[i] for i in range(len(true_labels))])*100, "%"))

In [21]:
test(test_1, labels_1)
test(test_2, labels_2)
test(test_3, labels_3)

Predicted Labels: 
['DET', 'VERB', 'ADP', 'ADJ', 'NOUN', 'VERB', 'VERB', 'DET', 'NOUN', 'PRT', 'VERB', 'VERB', '.']
True Labels: 
['DET', 'VERB', 'ADP', 'ADJ', 'NOUN', 'VERB', 'VERB', 'DET', 'NOUN', 'PRT', 'VERB', 'VERB', '.']
Accuracy: 100.0%


Predicted Labels: 
['DET', 'ADJ', 'NOUN', 'VERB', 'DET', 'ADJ', 'NOUN', 'NOUN', 'ADP', 'DET', 'ADJ', 'NOUN', 'VERB', 'VERB', 'ADP', 'NUM', 'DET', '.']
True Labels: 
['DET', 'ADJ', 'NOUN', 'VERB', 'DET', 'ADJ', 'ADJ', 'NOUN', 'ADP', 'DET', 'ADJ', 'NOUN', 'VERB', 'VERB', 'ADP', 'NUM', 'DET', '.']
Accuracy: 94.44444444444444%


Predicted Labels: 
['PRON', 'VERB', 'DET', 'ADJ', 'NOUN', 'ADP', 'DET', 'NOUN', 'ADP', 'DET', 'ADJ', 'NOUN', 'CONJ', 'DET', 'NOUN', '.']
True Labels: 
['PRON', 'VERB', 'DET', 'ADJ', 'NOUN', 'ADP', 'DET', 'NOUN', 'ADP', 'DET', 'ADJ', 'NOUN', 'CONJ', 'DET', 'NOUN', '.']
Accuracy: 100.0%




### Algorithm has close to 100% Accuracy!