# Parts-of-Speech Tagging (POS)

This is an example of POS: [Link](https://www.freecodecamp.org/news/an-introduction-to-part-of-speech-tagging-and-the-hidden-markov-model-953d45338f24/#:~:text=HMMs%20for%20Part%20of%20Speech%20Tagging&text=The%20states%20in%20an%20HMM,POS%20tags%20for%20the%20words.)

Training data: WSJ-2_21.pos
Test data: WSJ-24.pos

Information about the tags used in the dataset: [Tags](http://relearn.be/2015/training-common-sense/sources/software/pattern-2.6-critical-fork/docs/html/mbsp-tags.html)

In this notebook two approaches are used. First, simply by calculating transition and emission materices of Hidden Markov Model (HMM) and assigning the highest probability at each location/word of the text.

The second approach uses Viterbi algorithm to consider the previous word's tag and probability when assigning the tag for each word.

There are many other techniques and models like LSTMs and BERT that have been used and reported in the literature for the same datasets. [Link1](https://paperswithcode.com/task/part-of-speech-tagging) [Link2](http://nlpprogress.com/english/part-of-speech_tagging.html#:~:text=A%20standard%20dataset%20for%20POS,are%20evaluated%20based%20on%20accuracy.)

In [1]:
import pandas as pd
from collections import defaultdict
import math
import numpy as np

## Reading and visualizing the training data

In [2]:
#Training data
with open("./data/WSJ_02-21.pos", 'r') as f:
    training_corpus = f.readlines()

#Vocabulary already created from the training data
with open("./data/hmm_vocab.txt", 'r') as f:
    voc_l = f.read().split('\n')

#Test data
with open("./data/WSJ_24.pos", 'r') as f:
    y = f.readlines()

Creating a vocabulary dictionary from the words. This will be later used to give the index of each word.

In [3]:
vocab = {}
for i, word in enumerate(sorted(voc_l)): 
    vocab[word] = i

Some preprocessing to improve detecting unknown words

In [4]:
import string

punct = set(string.punctuation)
noun_suffix = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood", "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry", "scape", "ship", "ty"]
verb_suffix = ["ate", "ify", "ise", "ize"]
adj_suffix = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive", "less", "ly", "ous"]
adv_suffix = ["ward", "wards", "wise"]

def assign_unk(tok):
    # Digits
    if any(char.isdigit() for char in tok):
        return "--unk_digit--"

    # Punctuation
    elif any(char in punct for char in tok):
        return "--unk_punct--"

    # Upper-case
    elif any(char.isupper() for char in tok):
        return "--unk_upper--"

    # Nouns
    elif any(tok.endswith(suffix) for suffix in noun_suffix):
        return "--unk_noun--"

    # Verbs
    elif any(tok.endswith(suffix) for suffix in verb_suffix):
        return "--unk_verb--"

    # Adjectives
    elif any(tok.endswith(suffix) for suffix in adj_suffix):
        return "--unk_adj--"

    # Adverbs
    elif any(tok.endswith(suffix) for suffix in adv_suffix):
        return "--unk_adv--"

    return "--unk--"

In [5]:
def get_word_tag(word_tag, vocab):
    if not word_tag.split():
        word = "--n--"
        tag = "--s--"
        return word, tag
    else:
        word, tag = word_tag.split()
        if word not in vocab: 
            
            word = assign_unk(word)
        return word, tag
    return None 

In [6]:
def preprocess(vocab, y):
    words = []
    for line in y:
        word, tag = get_word_tag(line, vocab)
        words.append(word)
    return words

Processing the test data

In [7]:
prep = preprocess(vocab, y)   

# Create Transition and Emission matrices

Transition matrix dictionary gives the number of seen transitions between two tags. Each time the two tags happen after each other we increment the value of the key = (tag1, tag2)

Emission matrix dictionary gives the number of times a word is seen with a tag. Each time the word with a certain tag is seen we increment the value of the key = (tag, word)

These matrices are again be used later for creating the Hidden Markov Model matrices.

In [8]:
def create_dictionaries(training_corpus, vocab, verbose=True):
    
    emission_counts = defaultdict(int)
    transition_counts = defaultdict(int)
    tag_counts = defaultdict(int)
    
    prev_tag = '--s--' 
    
    for word_tag in training_corpus:
        
        word, tag = get_word_tag(word_tag, vocab)
        
        transition_counts[(prev_tag, tag)] += 1
        
        emission_counts[(tag, word)] += 1

        tag_counts[tag] += 1

        prev_tag = tag
        
    return emission_counts, transition_counts, tag_counts

In [9]:
emission_counts, transition_counts, tag_counts = create_dictionaries(training_corpus, vocab)
states = sorted(tag_counts.keys())

# Predictions with approach 1:

We simply check the emission matrix and for each word return the tag that has the maximal probability.

In [10]:
def predict_pos(prep, y, emission_counts, vocab, states):
    
    num_correct = 0
    
    all_words = set(emission_counts.keys())
    
    total = len(y)
    for word, y_tup in zip(prep, y): 

        y_tup_l = y_tup.split()
        
        if len(y_tup_l) == 2:
            
            true_label = y_tup_l[1]

        else:
            continue
    
        count_final = 0
        pos_final = ''
        
        if word in vocab:
            for pos in states:

                key = (pos, word)

                if key in emission_counts:

                    count = emission_counts.get(key)

                    if count > count_final:

                        count_final = count

                        pos_final = pos

            if pos_final == true_label: 
                num_correct += 1
            
    accuracy = num_correct / total
    
    return accuracy

### Reporting the accuracy of approach 1:

In [11]:
accuracy_predict_pos = predict_pos(prep, y, emission_counts, vocab, states)
print(f"Accuracy of prediction using predict_pos is {accuracy_predict_pos:.4f}")

Accuracy of prediction using predict_pos is 0.8914


# Hidden Markov Model:

Two matrices need to be created from the training data:
- Transition Matrix: A(i, j) = probability of state j right after state i
- Emission Matrix: B(i, j) = probaility of the word j happening at state i

First these two matrices are calculated and then Viterbi algorithm is used to find the best probability and tag for each word.

## Creating the transition and emission matrices:

In [12]:
def create_transition_matrix(alpha, tag_counts, transition_counts):
    
    all_tags = sorted(tag_counts.keys())
    
    num_tags = len(all_tags)
    
    A = np.zeros((num_tags,num_tags))
    
    trans_keys = set(transition_counts.keys())
    
    for i in range(num_tags):
        
        count_prev_tag = tag_counts.get(all_tags[i])
        
        for j in range(num_tags):
            count = 0
        
            key = (all_tags[i], all_tags[j]) 

            if key in transition_counts: 
                
                count = transition_counts.get(key)                
            A[i,j] = (count + alpha) / (count_prev_tag + num_tags * alpha)

    return A

In [13]:
def create_emission_matrix(alpha, tag_counts, emission_counts, vocab):
    
    num_tags = len(tag_counts)
    
    all_tags = sorted(tag_counts.keys())
    
    num_words = len(vocab)
    
    B = np.zeros((num_tags, num_words))
    
    emis_keys = set(list(emission_counts.keys()))
    
    for i in range(num_tags): 
        
        for j in range(num_words): 

            count = 0 
                    
            key = (all_tags[i], vocab[j]) 

            if key in emission_counts: 
        
                count = emission_counts.get(key)
                
            count_tag = tag_counts.get(all_tags[i])
                
            B[i,j] = (count + alpha) / (count_tag + num_words * alpha)

    return B

In [14]:
alpha = 0.001
A = create_transition_matrix(alpha, tag_counts, transition_counts)
B = create_emission_matrix(alpha, tag_counts, emission_counts, list(vocab))

## Viterbi algorithm

Viterbi algorithm ([Link](https://en.wikipedia.org/wiki/Viterbi_algorithm)) is a dynamic programming algorithm for calculating the maximum a posteriori probability estimate of the most likely sequence of hidden states—called the Viterbi path—that results in a sequence of observed events.

The main idea is that, we start from the beginning of the sentence. The sentence is started by a start tag and thus initialized that way. 

For each word, we look at all possible tags and fill the matrix C so that:
C(i, j) = max( C(k, j - 1) * A(k, i) * B (i, index(word[j])) ) over all possible k (tags)

In the above:
- C(k, j - 1) is the max probability for having k as the tag for previous work
- A(k, j) is the transition probability from tag k to tag j
- B(i, index(word[j])) is the emission probability of having word[i] with tag i

This will be done for all possible tags for the word[j] and j is started from 0 in the begging of the sentence to the end length of the sentence. Then the maximum value and the index of the tag that had the highest probability are stored.

D(i, j) = the tag index at which C(:, j) is maximum

The algorithm has three steps:
- Initialization: the first column of the C, and D matrices are filled since we are always going to look at one step before.
- Forward Pass: At each column (meaning each word), we use the already calculated values from the previous column, and using the equations above calculate the values of the column. And find the highest probability tag.
- Backward Pass: When all the columns are found, we move backward from the last word in the sentence to the begining and find the tag for each word based on the highest probability of the column of tags for each word.

In [15]:
def initialize(states, tag_counts, A, B, corpus, vocab):
    
    num_tags = len(states)
    
    best_probs = np.zeros((num_tags, len(corpus)))
    
    best_paths = np.zeros((num_tags, len(corpus)), dtype=int)
    
    s_idx = states.index("--s--")
    for i in range(num_tags): 
        
        if A[s_idx, i] == 0: 
            
            best_probs[i,0] = float("-inf")
            
        else:
            best_probs[i,0] = math.log(A[s_idx, i]) + math.log(B[i][vocab[corpus[0]]])
    
    return best_probs, best_paths

In [16]:
def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab, verbose=True):
    
    num_tags = best_probs.shape[0]
    
    for i in range(1, len(test_corpus)): 
        
        if i % 10000 == 0 and verbose:
            print("Words processed: {:>8}".format(i))
            
        for j in range(num_tags): 
            
            best_prob_i = float("-inf")
            
            best_path_i = None 

            for k in range(num_tags): 
            
                prob = best_probs[k, i - 1] + math.log(A[k, j]) + math.log(B[j, vocab[test_corpus[i]]])

                if prob > best_prob_i: 
                    
                    best_prob_i = prob
                    best_path_i = k

            best_probs[j,i] = best_prob_i
            
            best_paths[j,i] = best_path_i

    return best_probs, best_paths

In [17]:
def viterbi_backward(best_probs, best_paths, corpus, states):
    
    m = best_paths.shape[1] 
    
    z = [None] * m
    
    num_tags = best_probs.shape[0]
    
    best_prob_for_last_word = float('-inf')
    
    pred = [None] * m
    l = len(corpus)
    
    for k in range(num_tags): 

        if best_probs[k, m - 1] > best_prob_for_last_word: 
            
            best_prob_for_last_word = best_probs[k, m - 1]

            z[m - 1] = k
            
    pred[m - 1] = states[k]
    
    for i in range(l - 1, -1, -1): 
        pos_tag_for_word_i = z[i]
        z[i - 1] = best_paths[pos_tag_for_word_i, i]
        pred[i - 1] = states[z[i - 1]]
        
    return pred

In [18]:
# Initialize
best_probs, best_paths = initialize(states, tag_counts, A, B, prep, vocab)

# Forward pass
best_probs, best_paths = viterbi_forward(A, B, prep, best_probs, best_paths, vocab)

# Backward pass and find the tags
pred = viterbi_backward(best_probs, best_paths, prep, states)

Words processed:    10000
Words processed:    20000
Words processed:    30000


## Calculate the accuracy

We can compare the estimate tag of each word with the labeled true value in the test data and calculate the accuracy of the model.

In [19]:
def compute_accuracy(pred, y):
    
    num_correct = 0
    total = 0
    
    for prediction, y in zip(pred, y):
        word_tag_tuple = y.split()
        
        if len(word_tag_tuple) < 2: 
            continue

        word, tag = word_tag_tuple
        
        if tag == prediction: 
            num_correct += 1
            
        total += 1

    return num_correct/total

In [20]:
print(f"Accuracy of the Viterbi algorithm is {compute_accuracy(pred, y):.4f}")

Accuracy of the Viterbi algorithm is 0.9545


## We can see that the accuracy is improve from 89% to 95.5%

This is great, because we can see from the literature that much more complicated models based on deep-NNs give accuracies in the range of 97-98%

# Tagging an arbitrary and untagged sentence:

The above had processing for a tagged training and test data. What if we want to apply the model on any sentence that is not tagged?

We can write a simple function to create data in the same format. In this case, accuracy cannot be tested and we are only interested in the tags of the tested model.

In [21]:
import re
def process_sentence(sentence):
    words = re.findall( r'\w+|[^\s\w]+', sentence)
    prep_ = []
    for word in words:
        if word not in vocab: 
            word = assign_unk(word)
        prep_.append(word)
    return prep_

You can change the sentence string variable to your desired text.

In [22]:
sentence = "how are you today?"
sentence = sentence.lower()
prep_sentence = process_sentence(sentence)


Applying the tested model to the sentence:

In [23]:
# Initialize
best_probs, best_paths = initialize(states, tag_counts, A, B, prep_sentence, vocab)

# Forward pass
best_probs, best_paths = viterbi_forward(A, B, prep_sentence, best_probs, best_paths, vocab)

# Backward pass and find the tags
pred = viterbi_backward(best_probs, best_paths, prep_sentence, states)

## Checking the predicted tags:

In [24]:
for word, tag in zip(prep_sentence, pred):
    print(word + ":" + tag)

how:WRB
are:VBP
you:PRP
today:NN
?:#
