In [15]:
import nltk
import numpy as np
import pandas as pd
import random
from sklearn.model_selection import train_test_split
import pprint, time
import math
from itertools import islice


In [16]:
nltk_data = list(nltk.corpus.treebank.tagged_sents(tagset= 'universal'))
print(len(nltk_data))

3914


In [17]:
train_set, test_set = train_test_split(nltk_data, train_size= 0.8, test_size=0.2, random_state=101)

In [18]:
train_words = []
for doc in train_set:
    for word, tag in doc:
        train_words.append([word.lower(), tag])

test_words = [word.lower() for doc in test_set for word, tag in doc]
print(len(train_words))
print(len(test_words))
print(train_words[0])

vocab = set(word[0] for word in train_words)
print(len(vocab))

80310
20366
['drink', 'NOUN']
10164


In [19]:
# Precalculate the number of words in each tag
tag_sets = set(tag for doc in train_set for word, tag in doc)
print(tag_sets)
store_tags = {}   # Store the numbers of word in each tag
store_words = {}    # Store the numbers of words exist in the train set

for tag in tag_sets:
    s = 0
    for doc in train_set:
        for word, check_tag in doc:
            if tag == check_tag:
                s += 1
    store_tags[tag] = s

print(store_tags)
print(sum(store_tags.values()))

for word in vocab:
    store_words[word] = {
        'ADJ': 0, 'DET': 0, 'CONJ': 0, '.': 0, 
        'NOUN': 0, 'PRT': 0, 'ADP': 0, 'VERB': 0, 
        'NUM': 0, 'ADV': 0, 'X': 0, 'PRON': 0
    }

for word in train_words:
    store_words[word[0]][word[1]] += 1

print(len(store_words))
first_three = dict(islice(store_words.items(), 3))
print(first_three)
print(store_words['all'])

{'NOUN', 'ADJ', 'VERB', 'ADP', 'NUM', 'CONJ', 'PRT', 'PRON', 'ADV', 'DET', '.', 'X'}
{'NOUN': 22966, 'ADJ': 5150, 'VERB': 10860, 'ADP': 7902, 'NUM': 2801, 'CONJ': 1822, 'PRT': 2555, 'PRON': 2195, 'ADV': 2578, 'DET': 6957, '.': 9321, 'X': 5203}
80310
10164
{'prolonged': {'ADJ': 0, 'DET': 0, 'CONJ': 0, '.': 0, 'NOUN': 0, 'PRT': 0, 'ADP': 0, 'VERB': 2, 'NUM': 0, 'ADV': 0, 'X': 0, 'PRON': 0}, 'fled': {'ADJ': 0, 'DET': 0, 'CONJ': 0, '.': 0, 'NOUN': 0, 'PRT': 0, 'ADP': 0, 'VERB': 1, 'NUM': 0, 'ADV': 0, 'X': 0, 'PRON': 0}, 'santa': {'ADJ': 0, 'DET': 0, 'CONJ': 0, '.': 0, 'NOUN': 2, 'PRT': 0, 'ADP': 0, 'VERB': 0, 'NUM': 0, 'ADV': 0, 'X': 0, 'PRON': 0}}
{'ADJ': 0, 'DET': 93, 'CONJ': 0, '.': 0, 'NOUN': 0, 'PRT': 0, 'ADP': 0, 'VERB': 0, 'NUM': 0, 'ADV': 3, 'X': 0, 'PRON': 0}


In [20]:
def P(classification, tag): # deal with the unknown words by 
    alpha = 0.35
    if classification == "UNKNOWN_CAPITAL":
        if tag == 'NOUN':
            return alpha
        else:
            return (1 - alpha) / len(tag_sets)
    elif classification == "UNKNOWN_ING":
        if tag == 'VERB':
            return alpha
        else:
            return (1 - alpha) / len(tag_sets)
    elif classification == "UNKNOWN_ED":
        if tag == 'VERB':
            return alpha
        else:
            return (1 - alpha) / len(tag_sets)
    elif classification == "UNKNOWN_LY":
        if tag == 'ADV':
            return alpha
        else:
            return (1 - alpha) / len(tag_sets)
    elif classification == "UNKNOWN_HYPHEN":
        if tag == 'ADJ':
            return alpha
        else:
            return (1 - alpha) / len(tag_sets)
    elif classification == "UNKNOWN_NUMBER":
        if tag == 'NOUN':
            return alpha
        else:
            return (1 - alpha) / len(tag_sets)
    elif classification == "UNKNOWN_SHORT":
        if tag == 'PRT' or tag == 'CONJ':
            return alpha / 2
        else:
            return (1 - alpha) / (len(tag_sets) - 1)
        

Calculate emission probabilities

In [21]:
# From words and tags => return probability
def emission_prob(word, tag):
    if (word.lower() not in store_words):
        if word[0].isupper():
            return P("UNKNOWN_CAPITAL", tag)  # Capitalized word (proper noun)
        elif word.endswith("ing"):
            return P("UNKNOWN_ING", tag)  # Word ends with -ing (verb)
        elif word.endswith("ed"):
            return P("UNKNOWN_ED", tag)  # Word ends with -ed (past tense verb)
        elif word.endswith("ly"):
            return P("UNKNOWN_LY", tag)  # Word ends with -ly (adverb)
        elif "-" in word:
            return P("UNKNOWN_HYPHEN", tag)  # Hyphenated word (compound)
        elif any(char.isdigit() for char in word):
            return P("UNKNOWN_NUMBER", tag)  # Word contains digits (number or date)
        elif len(word) <= 2:
            return P("UNKNOWN_SHORT", tag)
        return store_tags[tag] / len(train_words)
    return store_words[word.lower()][tag] / store_tags[tag]
print(emission_prob('company', 'NOUN'))

0.00875206827484107


Calculate transition probabilities

In [22]:
# Calculate the start and end state probabilities with each tag
store_start = {}
store_end = {}

for tag in tag_sets:
    store_start[tag] = 0
    store_end[tag] = 0

for tag in tag_sets:
    s0, s1 = 0, 0
    for doc in train_set:
        if doc[0][1] == tag:
            s0 += 1
        if doc[-1][1] == tag:
            s1 += 1
    store_start[tag] = s0 / len(train_set)
    store_end[tag] = s1 / len(train_set)
    
print(store_start)
print(store_end)

{'NOUN': 0.2884062599808368, 'ADJ': 0.042159054615138934, 'VERB': 0.01085915043117215, 'ADP': 0.13318428617055253, 'NUM': 0.00830405621207282, 'CONJ': 0.053018205046311086, 'PRT': 0.0015969338869370809, 'PRON': 0.07824976045991695, 'ADV': 0.05269881826892367, 'DET': 0.2312360268284893, '.': 0.07920792079207921, 'X': 0.021079527307569467}
{'NOUN': 0.004471414883423826, 'ADJ': 0.0, 'VERB': 0.0, 'ADP': 0.0009581603321622484, 'NUM': 0.0006387735547748323, 'CONJ': 0.0, 'PRT': 0.0, 'PRON': 0.0, 'ADV': 0.00031938677738741617, 'DET': 0.00031938677738741617, '.': 0.9929734908974769, 'X': 0.00031938677738741617}


In [23]:
# This calculate tag2 given tag1
def transition_prob(tag2, tag1):
    cnt = 0
    for doc in train_set:
        for i in range(1, len(doc)):
            if doc[i - 1][1] == tag1 and doc[i][1] == tag2:
                cnt += 1
    return cnt / store_tags[tag1]

In [24]:
# Build the transition table 
tags_matrix = np.zeros((len(tag_sets), len(tag_sets)), dtype='float32')
for i, t1 in enumerate(list(tag_sets)):
    for j, t2 in enumerate(list(tag_sets)):
        tags_matrix[i][j] = transition_prob(t2, t1)

In [25]:
tags_df = pd.DataFrame(tags_matrix, columns= list(tag_sets), index= list(tag_sets))
display(tags_df)

Unnamed: 0,NOUN,ADJ,VERB,ADP,NUM,CONJ,PRT,PRON,ADV,DET,.,X
NOUN,0.26217,0.01254,0.149134,0.17674,0.009144,0.042411,0.043935,0.004616,0.016895,0.012976,0.240007,0.028825
ADJ,0.696893,0.063301,0.011456,0.080583,0.021748,0.016893,0.011456,0.000194,0.005243,0.005243,0.066019,0.020971
VERB,0.110589,0.06639,0.167956,0.092357,0.022836,0.005433,0.030663,0.035543,0.083886,0.13361,0.034807,0.21593
ADP,0.323462,0.107062,0.008479,0.016958,0.063275,0.000886,0.001266,0.069476,0.014553,0.320931,0.038724,0.034548
NUM,0.351303,0.035345,0.020707,0.037487,0.18422,0.014281,0.026062,0.001428,0.00357,0.00357,0.118886,0.202428
CONJ,0.349067,0.113611,0.150384,0.055982,0.040615,0.000549,0.004391,0.060373,0.05708,0.123491,0.035126,0.00933
PRT,0.250489,0.082975,0.401174,0.019569,0.056751,0.002348,0.001174,0.017613,0.009393,0.10137,0.04501,0.012133
PRON,0.212756,0.070615,0.484738,0.022323,0.006834,0.005011,0.014123,0.006834,0.036902,0.009567,0.041913,0.088383
ADV,0.032196,0.130721,0.339022,0.119472,0.029868,0.006982,0.01474,0.012025,0.081458,0.070985,0.139255,0.022886
DET,0.635906,0.206411,0.040247,0.009918,0.022855,0.000431,0.000287,0.003306,0.012074,0.005893,0.017393,0.045134


In [26]:
print(tags_matrix[0][0], tags_matrix[0][1])

0.26217017 0.012540277


In [27]:
def Viterbi(consider_string):
    word = consider_string.split()
    for i in range(len(word)):
        word[i] = word[i]
        
    n = len(word)
    m = len(tag_sets)
    
    dp = [[0] * m for _ in range(n)]    # Get the max probability
    preBestPath = [[] for _ in range(m)]    # Get the path of all best possibilities
    bestPath = [[] for _ in range(m)]  
    
    list_tag_sets = list(tag_sets)
    
    # Initialize the start probabilities
    for i in range(m):
        dp[0][i] = store_start[list_tag_sets[i]] * emission_prob(word[0], list_tag_sets[i]) * 64
        bestPath[i].append(list_tag_sets[i])
    
    # Dynamic programming loop
    for i in range(1, n):
        for j in range(m):
            chosenPath = []  # the best node can be chosen 
            for k in range(m):
                # Calculate the new potential value
                newVal = dp[i - 1][k] * emission_prob(word[i], list_tag_sets[j]) * tags_matrix[k][j] * 512
                
                # Update dp[i][j] with the max probability and corresponding path
                if len(chosenPath) == 0 or newVal > dp[i][j]:
                    dp[i][j] = newVal
                    chosenPath = bestPath[k][:]  # Copy the best path to avoid modifying the original
                
            # Update the path for the current state
            preBestPath[j] = chosenPath
            preBestPath[j].append(list_tag_sets[j])
        
        # After updating paths for the current word, set bestPath = preBestPath for the next word
        bestPath = [path[:] for path in preBestPath]  
    
    # Solve for the end state
    for i in range(m):
        dp[n - 1][i] *= store_end[list_tag_sets[i]] * 8
        
    # for i in range(m):
    #     print(bestPath[i], dp[n - 1][i])

    lastDp = dp[-1]
    #print("probability = ", max(lastDp))
    
    return bestPath[lastDp.index(max(lastDp))]

val = Viterbi("i will never forget you .")
print('The best path is ', val)

The best path is  ['PRON', 'VERB', 'ADV', 'VERB', 'PRON', '.']


In [28]:
# Prepare for the test
test_data = []
used_for_test = test_set
correct_seq = []
total_words = 0

for doc in used_for_test:
    temp = []
    temp2 = []
    for word, tag in doc:
        temp.append(word.lower())
        temp2.append(tag)
        total_words += 1
    test_data.append(temp)
    correct_seq.append(temp2)

#print(correct_seq)
#print(test_data)
check_seq = []
start = time.time()
for doc in test_data:
    tagged_seq = Viterbi(' '.join(doc))
    check_seq.append(tagged_seq)
end = time.time()
difference = end-start
 
print("Time taken in seconds: ", difference)

# accuracy
cnt = 0

incorrect_words = 0
unknown_incorrect_words, unknown_correct_words = 0, 0

for i in range(len(check_seq)):
    for j in range(len(check_seq[i])):
        if check_seq[i][j] == correct_seq[i][j]:
            if test_data[i][j] not in store_words:
                unknown_correct_words += 1
            cnt += 1
        else:
            incorrect_words += 1
            if test_data[i][j] not in store_words:
                unknown_incorrect_words += 1
            
accuracy = cnt / total_words
print('Number of incorrect words : ', incorrect_words)
print('Number of unknown incorrect words : ', unknown_incorrect_words)
print('Number of unknown correct words : ', unknown_correct_words)
# The accuracy after adding start and end state is 94%, without that is 91%
print('Viterbi Algorithm Accuracy: ',accuracy*100)

Time taken in seconds:  1.7086594104766846
Number of incorrect words :  1076
Number of unknown incorrect words :  464
Number of unknown correct words :  851
Viterbi Algorithm Accuracy:  94.71668467052932
