In [1]:
from collections import Counter, defaultdict
import string
import math
import numpy as np
import pickle as pk
from nltk.tokenize import word_tokenize

In [2]:
def get_data(path):

    with open(path, 'r') as f:   
        data = f.readlines()
        
    return data

### PREPROCESSING DATA TO HANDLE OOV WORDS AND MAKE A VOCABULARY

In [3]:
def get_words(data):
    
    words = []
    for w in data:
        tup = w.split()
        if len(tup) == 2:
            words.append(tup[0].lower())
            
    return words

def make_vocab(words):
    
    word_count = dict(Counter(words))
    
    #vocab consists of all words which appear atleast twice in the training corpus
    vocab = [w for w, freq in word_count.items() if (freq > 1 and w != '\n')]  
    
    #adding new words to vocab which will be used to classify words which are not seen before
    unk_list = ["--unk--", "--n--"]
    vocab += unk_list
    vocab = sorted(vocab)
    
    return vocab

def get_word_tag(line, vocab):
    
    # If line is empty return placeholders for word and tag
    if not line.split():
        word = "--n--"  #word indicating new line
        tag = "--s--"   #tag used to indicate transition from initial state(starting of sentence) to a particular state in the HMM.
    else:
        word, tag = line.split()
        word = word.lower()
        if word not in vocab: 
            word = "--unk--"  #unknown word tag
            
    return word, tag

In [4]:
data = get_data('WSJ_02-21.pos')
words = get_words(data)
vocab = make_vocab(words)

### MAKING DICTIONARIES DEPICTING COUNTS OF VARIOUS TAG AND WORD COMBINATIONS

In [5]:
def make_dicts(data, vocab):
    
    prev_tag = '--s--'
    transition_counts = defaultdict(int)
    emission_counts = defaultdict(int)
    tags = defaultdict(int)
    
    for line in data:
        w, t = get_word_tag(line, vocab)
        transition_counts[(prev_tag, t)] += 1
        emission_counts[(t, w)] += 1
        tags[t] += 1
        prev_tag = t
    
    return transition_counts, emission_counts, tags

In [6]:
# transition_counts, emission_counts, tags = make_dicts(data, vocab)
# (Takes time to complete, thus, these dictionaries have been calculated and stored in files)

def save_dict(file, filename):
    
    title = filename + '.pkl'
    f = open(title, 'wb')
    pk.dump(file, f)
    f.close()

def load_dict(filename):
    
    title = filename + '.pkl'
    f = open(title, 'rb')
    return pk.load(f)

'''save_dict(transition_counts, 'transition_counts')    #already saved
save_dict(emission_counts, 'emission_counts')
save_dict(tags, 'tags')'''

#loading dictionaries 
transition_counts = load_dict('transition_counts')
emission_counts = load_dict('emission_counts')
tags = load_dict('tags')

In [7]:
def dict_words(vocab):
    word_to_idx = {}
    for i, word in enumerate(vocab):
        word_to_idx[word] = i
    
    return word_to_idx

def dict_tags(tags):
    tag_to_idx = {}
    all_tags = sorted(tags.keys())
    for i, tag in enumerate(all_tags):
        tag_to_idx[i] = tag
        
    return tag_to_idx

### MAKING THE TRANSITION AND EMISSION MATRICES FOR THE HMM MODEL

In [8]:
def make_matrices(transition_counts, emission_counts, tags, vocab):
    
    all_tags = sorted(tags.keys())
    m = len(all_tags) #no. of tags
    n = len(vocab) #no of words
    
    A = np.zeros((m, m))
    B = np.zeros((m, n))
    
    for i in range(len(all_tags)):
        for j in range(len(tags.keys())):
            t1 = all_tags[i]
            t2 = all_tags[j]
            num = 1
            
            if (t1, t2) in transition_counts.keys():
                num = transition_counts[(t1, t2)] + 1
            denom = tags[t1] + m
            
            A[i, j] = num / denom
        
    for i in range(len(all_tags)):
        for j in range(len(vocab)):
            tag = all_tags[i]
            word = vocab[j]
            num = 1
            if (tag, word) in emission_counts.keys():
                num = emission_counts[(tag, word)] + 1
            denom = tags[tag] + n
            
            B[i, j] = num / denom
            
    return A, B

In [9]:
word_to_idx = dict_words(vocab)
idx_to_tag = dict_tags(tags)
transition_matrix, emission_matrix = make_matrices(transition_counts, emission_counts, tags, vocab)

### VITERBI ALGORITHM (FOR PREDICTION OF POS TAGS)

In [10]:
def viterbi_initialization(sentence, A, B, tags):
    
    num_tags = A.shape[0]
    all_tags = sorted(tags.keys())
    k = len(sentence)
    probs = np.zeros((num_tags, k))
    states = np.zeros((num_tags, k), dtype = int)
    start_idx = all_tags.index('--s--')
    
    for i in range(num_tags):
        if A[start_idx, i] == 0:
            probs[i, 0] = float('-inf')  #log0
        else:
            probs[i, 0] = np.log(A[start_idx, i]) + np.log(B[i, word_to_idx[sentence[0]]])
        
        states[i, 0] = start_idx
        
    return probs, states

def viterbi_forward(probs, states, A, B, sentence, word_to_idx):
    
    num_tags = A.shape[0]
    
    for i in range(1, len(sentence)):
        for j in range(num_tags):
            max_prob = float('-inf')
            best_state = None
            for k in range(num_tags):
                prob = probs[k, i - 1] + math.log(A[k, j]) + math.log(B[j, word_to_idx[sentence[i]]])
                if prob > max_prob:
                    max_prob = prob
                    best_state = k
            
            probs[j, i] = max_prob
            states[j, i] = best_state
            

def viterbi_backward(probs, states, sentence, idx_to_tag):
    
    k = len(sentence)
    pos = [None] * k
    idx = [None] * k
    num_tags = probs.shape[0]
    best_prob = float('-inf')
    prev_state = None
    
    for i in range(num_tags):
        prob = probs[i, k - 1]
        if prob > best_prob:
            best_prob = prob
            idx[k - 1] = i
            
    pos[k - 1] = idx_to_tag[idx[k - 1]]
    
    for i in range(k - 1, -1 , -1):
        prev_tag = idx[i]
        idx[i - 1] = states[prev_tag, i]
        pos[i - 1] = idx_to_tag[idx[i - 1]]
        
    return pos

def predictPOS(transition_matrix, emission_matrix, sentence, tags, word_to_idx, idx_to_tag):
    
    probs, states = viterbi_initialization(sentence, transition_matrix, emission_matrix, tags)
    viterbi_forward(probs, states, transition_matrix, emission_matrix, sentence, word_to_idx)
    pred = viterbi_backward(probs, states, sentence, idx_to_tag)
    
    return pred

def accuracy(pred, test_tags):
    
    c = 0
    for i in range(len(pred)):
        if pred[i] == test_tags[i]:
            c += 1
    return (c / len(pred)) * 100

### TESTING

In [11]:
test = get_data('WSJ_24.pos')
test_words = []
test_tags = []

for line in test:
    word, tag = get_word_tag(line, vocab)
    test_words.append(word)
    test_tags.append(tag)

In [12]:
pred = predictPOS(transition_matrix, emission_matrix, test_words, tags, word_to_idx, idx_to_tag)
accuracy(pred, test_tags)

92.18983011199158

### PREDICTING FOR USER-INPUT SENTENCE

In [15]:
s = input('Enter sentence : ')
user_words = word_tokenize(s)
user_tests = []
for word in user_words:
    word = word.lower()
    if word not in vocab:
        user_tests.append('--unk--')
    else:
        user_tests.append(word)

Enter sentence : I believe the predictions made are good. You can try for yourself!


In [16]:
pos_tags = predictPOS(transition_matrix, emission_matrix, user_tests, tags, word_to_idx, idx_to_tag)

for i in range(len(user_words)):
    print(user_words[i] + ' : ' + pos_tags[i])

I : PRP
believe : VBP
the : DT
predictions : NNS
made : VBN
are : VBP
good : JJ
. : .
You : PRP
can : MD
try : VB
for : IN
yourself : PRP
! : --s--
