#  Parts-of-Speech Tagging (POS) using Hidden Markov Model (HMM)

POS encoded like [Penn Treebank II tag set](http://relearn.be/2015/training-common-sense/sources/software/pattern-2.6-critical-fork/docs/html/mbsp-tags.html) is used to designate POS of words. 

Two tagged data sets collected from the **Wall Street Journal (WSJ)** is used.

- Training Data: **WSJ-2_21.pos**.
- Test Data: **WSJ-24.pos**. 

In [1]:
%reload_ext autoreload
%autoreload 2

In [2]:
# Import required libraries
import numpy as np
import pandas as pd
from utils import create_vocab, get_word_tag, process, build_word_index, build_pos_tag_index
from collections import defaultdict

In [3]:
# Load the training corpus
with open('./WSJ_02-21.pos', 'r') as f:
    training_corpus = f.readlines()

print('Some lines from the training corpus: ', training_corpus[:5])

Some lines from the training corpus:  ['In\tIN\n', 'an\tDT\n', 'Oct.\tNNP\n', '19\tCD\n', 'review\tNN\n']


In [4]:
# Create Vocabulary from training set only considering words that occur more than once.
vocab = create_vocab('./WSJ_02-21.pos')


In [5]:
print(f'Length of the vocabulary: {len(vocab)}')
print(f'Few words in the vocabulary: {vocab[:30]}')

Length of the vocabulary: 23776
Few words in the vocabulary: ['!', '#', '$', '%', '&', "'", "''", "'40s", "'60s", "'70s", "'80s", "'86", "'90s", "'N", "'S", "'d", "'em", "'ll", "'m", "'n'", "'re", "'s", "'til", "'ve", '(', ')', ',', '-', '--', '--n--']


In [6]:
# Load test corpus
with open('./WSJ_24.pos', 'r') as f:
    test_corpus = f.readlines()
print('Some lines from the test corpus: ', test_corpus[:5])

Some lines from the test corpus:  ['The\tDT\n', 'economy\tNN\n', "'s\tPOS\n", 'temperature\tNN\n', 'will\tMD\n']


### Create Transition, Emission, and Tag Counts
For a word sequence, $(w_1, w_2, w_3...w_n)$ and its associated POS tags, $(t_1, t_2, t_3...t_n)$ -

$w_i: $ word at the ith index in the sequence \
$t_i: $ tag associated with the word at ith index in the sequence \

$C(t_{i-1}, t_i): $ The number of times the tags $(t_{i-1}, t_i)$ occur in the training corpus in that order. This is the transition count.

$C(t_{i}, w_i): $ The number of times the word $w_i$ occurs due to the current pos state $t_i$ in the training corpus. This is the emission count.

$C(t_{i}): $ The number of times the tag $t_i$ occurs in the training corpus. This is the tag count.

We will calculate the above counts for item in the sequence.

In [7]:
def create_counts(training_corpus, vocab):
    """
    Generate the Transition, Emission and Tag counts dictionary

    Params:
    ----------
    training_corpus: list of str
        The pre-tagged training corpus.
    vocab: list
        The vocabulary being used.

    Returns:
    ----------
    transition_dict: defaultdict
        Dictionary containing the transition counts from the training corpus.
    emission_dict: defaultdict
        Dictionary containing the emission counts from the training corpus.
    tag_dict: defaultdict
        Dictionary containing the tag counts from the training corpus.
    """
    # Start with an initial tag - the start tag
    prev_tag = '--s--'

    # Initialize the dictionaries
    transition_dict, emission_dict, tag_dict = defaultdict(int), defaultdict(int), defaultdict(int)

    for line in training_corpus:
        word, tag = get_word_tag(line, vocab)
        transition_dict[(prev_tag, tag)] += 1
        emission_dict[(tag, word)] += 1
        tag_dict[tag] += 1
        prev_tag = tag
       
    
    return transition_dict, emission_dict, tag_dict

In [8]:
transition_dict, emission_dict, tag_dict = create_counts(training_corpus, vocab)

In [9]:
# view some entries of the generated dictionaries
print(list(transition_dict.items())[:5])
print(list(emission_dict.items())[:5])
print(list(tag_dict.items())[:5])

[(('--s--', 'IN'), 5050), (('IN', 'DT'), 32364), (('DT', 'NNP'), 9044), (('NNP', 'CD'), 1752), (('CD', 'NN'), 7377)]
[(('IN', 'In'), 1735), (('DT', 'an'), 3142), (('NNP', 'Oct.'), 317), (('CD', '19'), 100), (('NN', 'review'), 36)]
[('IN', 98554), ('DT', 81842), ('NNP', 91466), ('CD', 36568), ('NN', 132935)]


In [10]:
# get all the POS states
pos_states = sorted(set(tag_dict.keys()))
print(f"Number of POS tags: {len(pos_states)}")
print("View these POS tags:")
print(pos_states)

Number of POS tags: 46
View these POS tags:
['#', '$', "''", '(', ')', ',', '--s--', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB', '``']


In [11]:
print("ambiguous word example: ")
for tup,cnt in emission_dict.items():
    if tup[1] == 'back': print (tup, cnt) 

ambiguous word example: 
('RB', 'back') 304
('VB', 'back') 20
('RP', 'back') 84
('JJ', 'back') 25
('NN', 'back') 29
('VBP', 'back') 4


### Test using Naive POS Tagger
The tagger will assign a part of speech to a word, assigning the most frequent POS for that word in the training set.

$$POS(w_i) = \hat{t} \rightarrow \underset{t_i \ \in \ T }{\arg\max}\ C(t_i, w_i)$$

In [164]:
def predict_accuracy(test_corpus, emission_dict, vocab, pos_states):
    """
    Compute the accuracy of the model.

    Params: 
    ----------
    test_corpus: list
        The pre-tagged test corpus.
    emission_dict: defaultdict
        Dictionary containing the emission counts from the training corpus.
    vocab: list
        The vocabulary being used.
    pos_states: set
        Set of possible POS tags to set to test data.

    Returns:
    ----------
    accuracy: float
        The accuracy of the model.
    """

    accuracy = 0
    tot = len(test_corpus)
    
    for line in test_corpus:
        word, true_pos_tag = get_word_tag(line, vocab)

        # find and choose the most frequest POS that occured for the word in the training set 
        max_val = 0
        pred_pos_tag = ''
        for tag in pos_states:
            if emission_dict[(tag, word)] > max_val:
                max_val = emission_dict[(tag, word)]
                pred_pos_tag = tag
        # Check the accuracy
        if pred_pos_tag == true_pos_tag: 
            accuracy += 1
    
    accuracy = accuracy * 100 / tot
    return f'{accuracy:.4f}'

    

In [165]:
print(f'The accuracy of the naive model is: {predict_accuracy(test_corpus, emission_dict, vocab, pos_states)}')

The accuracy of the naive model is: 93.0729


In [162]:
def predict_pos(word_list, emission_dict, vocab, pos_states):
    
    pos_tags = []
    # word_list = process(sent)  # ignore POS tagging of non-word characters
    for w in word_list:
        word, _ = get_word_tag(w+'\t'+'#', vocab)    # append the word with a dummy tag '#' to use the get_word_tag module

        # find and choose the most frequest POS that occured for the word in the training set 
        max_val = 0
        pred_pos_tag = ''
        for tag in pos_states:
            if emission_dict[(tag, word)] > max_val:
                max_val = emission_dict[(tag, word)]
                pred_pos_tag = tag
        pos_tags.append(pred_pos_tag)
        
    print(word_list)
    print(pos_tags)

In [163]:
word_list = ["Ram", "'s", "book", "was", "taken", "by", "me","."]
print('The word tokens are: ', word_list)
predict_pos(word_list, emission_dict, vocab, pos_states)

The word tokens are:  ['Ram', "'s", 'book', 'was', 'taken', 'by', 'me', '.']
['Ram', "'s", 'book', 'was', 'taken', 'by', 'me', '.']
['NNP', 'POS', 'NN', 'VBD', 'VBN', 'IN', 'PRP', '.']


### POS Tagging using HMM Model

The model helps to find the best POS sequences such that,

$$ 
\hat{t}_{1:T} = 
\underset{t_1...t_T}{\arg\max} \
P(t_1...t_T | w_1...w_T) \approx
\underset{t_1...t_T}{\arg\max} \
\begin{equation*} 
\prod_{i=1}^{T}
\overbrace{P(t_i | t_{i-1})}^\text{transition} \ 
\overbrace{P(w_i | t_i)}^\text{emission} 
\end{equation*}
$$

- $T$ is the total number of word sequence for which POS tags need to be assigned.


Create the 'A' transition probabilities matrix using smoothing.

$$ P(t_i | t_{i-1}) = \frac{C(t_{i-1}, t_{i}) + \alpha }{C(t_{i-1}) +\alpha * N}$$

- $N$ is the total number of tags
- $C(t_{i-1}, t_{i})$ is the count of the tuple (previous POS, current POS) in `transition_counts` dictionary.
- $C(t_{i-1})$ is the count of the previous POS in the `tag_counts` dictionary.
- $\alpha$ is a smoothing parameter.

In [76]:
def create_transition_matrix(transition_dict, tag_dict, alpha):
    """
    Computes the transition probabilities matrix given the transition counts.

    Params:
    ----------
    transition_dict: defaultdict
        The transition counts dictionary which provides the number of times 
        the POS sequence (prev_pos, curr_pos) occurs in the training corpus.
    tag_dict: defaultdict
        Dictionary containing the tag counts from the training corpus.
    alpha: float
        The smoothing parameter.

    Returns:
    ----------
    A: numpy array
        The transition probabilities matrix
    """

    pos_states = sorted(set(tag_dict.keys()))
    # Get the dimension of the matrix from the pos_states.
    N = len(pos_states)
    # Initialize the transition matrix.
    A = np.zeros((N, N))

    for i, pre_pos in enumerate(pos_states):
        for j, curr_pos in enumerate(pos_states):
            A[i, j] = (transition_dict[(pre_pos, curr_pos)] + alpha) / (tag_dict[pre_pos] + alpha * N)
    return A


In [77]:
# Display the Transition Matrix
A = create_transition_matrix(transition_dict, tag_dict, 0.001)
df = pd.DataFrame(A, index=pos_states, columns=pos_states)


In [78]:
print('Entries in the Transition Matrix:')
df.iloc[20:25,20:25]

Entries in the Transition Matrix:


Unnamed: 0,NN,NNP,NNPS,NNS,PDT
NN,0.122172,0.009749,9e-05,0.077797,1.505246e-05
NNP,0.058328,0.376807,0.016695,0.024249,1.094395e-05
NNPS,0.038159,0.277212,0.015713,0.011224,3.74105e-07
NNS,0.020817,0.003057,3.3e-05,0.010525,5.013696e-05
PDT,3e-06,3e-06,3e-06,3e-06,2.702367e-06


### Create the 'B' emission probabilities matrix using smoothing.

$$P(w_i | t_i) = \frac{C(t_i, word_i)+ \alpha}{C(t_{i}) +\alpha * N}$$

- $C(t_i, word_i)$ is the number of times $word_i$ was associated with $tag_i$ in the training data (stored in `emission_counts` dictionary).
- $C(t_i)$ is the number of times $tag_i$ was in the training data (stored in `tag_counts` dictionary).
- $N$ is the number of words in the vocabulary
- $\alpha$ is a smoothing parameter. 

In [79]:
def create_emission_matrix(emission_dict, tag_dict, alpha):
    """
    Computes the emission probabilities matrix given the transition counts.

    Params:
    ----------
    emission_dict: defaultdict
        The emission counts dictionary which provides the number of times 
        a tag was associated with a word in the training corpus.
    tag_dict: defaultdict
        Dictionary containing the tag counts from the training corpus.
    alpha: float
        The smoothing parameter.

    Returns:
    ----------
    B: numpy array
        The emission probabilities matrix
    """
    # Get the ordered POS tags list being used.
    pos_states = sorted(set(tag_dict.keys()))
    # Get the ordered words list being used.
    vocab = sorted(set(word for _, word in emission_dict.keys()))

    # The dimensions of the matrix
    N = len(vocab)
    row_len = len(pos_states)
    

    # Initialize the Emission Matrix
    B = np.zeros((row_len, N))

    for i, pos in enumerate(pos_states):
        for j, word in enumerate(vocab):
            B[i, j] = (emission_dict[pos, word] + alpha) / (tag_dict[pos] + alpha * N)
    return B

In [80]:
# Display the Emission Matrix
B = create_emission_matrix(emission_dict, tag_dict, 0.001)
df = pd.DataFrame(B, index=pos_states, columns=vocab)

In [81]:
print('Entries in the Emission Matrix:')
df.iloc[30:35,11500:11505]

Entries in the Emission Matrix:


Unnamed: 0,citywide,civic,civil,civil-rights,civilian
RBS,2.106256e-06,2.106256e-06,2.106256e-06,2.106256e-06,2.106256e-06
RP,3.723319e-07,3.723319e-07,3.723319e-07,3.723319e-07,3.723319e-07
SYM,1.222853e-05,1.222853e-05,1.222853e-05,1.222853e-05,1.222853e-05
TO,4.46812e-08,4.46812e-08,4.46812e-08,4.46812e-08,4.46812e-08
UH,8.279791e-06,8.279791e-06,8.279791e-06,8.279791e-06,8.279791e-06


### POS Tagging using - Viterbi Algorithm

The steps in the algorithm.

<img src="./viterbi_algorithm.JPG" width=800px/>

Visual Structure representation of the Viterbi Algorithm.

<img src="./viterbi_lattice.JPG" width=800px/>

Visualization of the computation flow.

<img src="./viterbi_algo_flow.JPG" width=800px/>


In [143]:
def pos_tagger(word_list, transition_proba_matrix, emission_proba_matrix, pos_states, vocab):
    """
    Tags POS for a given sentence using the viterbi algorithm.

    Params:
    ----------
    word_list: list
        The list of words whose POS tagging is required.
    transition_proba_matrix: numpy array
        The transition probability matrix which provides the probability of a tag given a previous tag in the sequence.
    emission_proba_matrix: numpy array
        The emission probability matrix which provides the probability of a word given a tag.
    pos_states: list
        The total list of tags from which to assign to words in a given sequence.
    vocab: list of words
        The vocabulary being used.

    Returns:
    ----------
    pos_tag: list
        The best sequence of assigned POS Tags for the given input word sequence.
    best_path_proba: float
        The highest probability of the pos tag sequence assigned to the given word sequence from possible combinations.
    best_path_pointer: int
        The point at which the best POS sequence path ends. This is used as a starting point
        to trace back to previous best POS states using back_pointer matrix.
    viterbi: numpy array
        The path probability matrix.
    back_pointer: numpy array
        The back pointer matrix used for backtrace.
    """

    # Ignore POS tagging for all non-word characters.
    # word_tokens = process(sent)

    # Get the word index.
    word_index = build_word_index(vocab)

    # Build the POS tag index.
    pos_tag_index = build_pos_tag_index(pos_states)

    word_tokens = []
    for word in word_list:
        # Append the word with a dummy tag '#' to use the get_word_tag module
        # and assign 'unk' tokens for words not in vocab.
        word, _ = get_word_tag(word+'\t'+'#', vocab)    
        word_tokens.append(word)

    N = len(pos_states) # Total pos states
    T = len(word_tokens)  # Length of the observations(words)


    # Create and initialize the path probability matrix.
    viterbi = np.zeros((N, T))

    # Create and initialize the back pointer matrix used for backtrace.
    back_pointer = np.zeros((N, T), dtype=int)

    # INITIALIZATION STEP:  
    for s in range(N):
        # Get the index of the start tag '--s--'.
        start_tag_index = pos_tag_index['--s--']

        # Initialize first column of the path probability matrix with the initial transition 
        # probability value * the emission probability value for each state(pos tags).
        viterbi[s,0] = np.log(transition_proba_matrix[start_tag_index, s]) + np.log(emission_proba_matrix[s, word_index[word_tokens[0]]])

        # Initialize first column of the back pointer matrix used for backtrace with the zero values. 
        # This is not required as the matrix was created with zero values.

    # FORWARD PASS:
    for t in range(1, T):
        # Loop over all POS states.
        for s in range(N):
            viterbi[s, t] = max(
                viterbi[s_prev, t-1] + 
                np.log(transition_proba_matrix[s_prev, s]) + 
                np.log(emission_proba_matrix[s, word_index[word_tokens[t]]]) for s_prev in range(N))  

            back_pointer[s, t] = np.argmax([
                viterbi[s_prev, t-1] + 
                np.log(transition_proba_matrix[s_prev, s]) + 
                np.log(emission_proba_matrix[s, word_index[word_tokens[t]]]) for s_prev in range(N)]) 

    # BACKWARD PASS:
    best_path_proba = max(viterbi[s, T-1] for s in range(N))
    best_path_pointer = np.argmax([viterbi[s, T-1] for s in range(N)])

    # Backtrace to get the POS for the word sequence.
    ptr = best_path_pointer
    pos_tag = []
    for t in range(T-1, -1, -1):
        pos_tag.append(pos_states[ptr])
        ptr = back_pointer[ptr, t]
    
    # Reverse the sequence to get the correct order of POS tags.
    pos_tag = pos_tag[::-1]

    return pos_tag, best_path_proba, best_path_pointer, viterbi, back_pointer


In [160]:
sent = ['Janet', 'will', 'back', 'the', 'bill', '.']
print('The sentence is: ',sent)
pos_tag, *_ = pos_tagger(sent, A, B, pos_states, vocab)
print('The tagged POS sequence is: ', pos_tag)

The sentence is:  ['Janet', 'will', 'back', 'the', 'bill', '.']
The tagged POS sequence is:  ['NNP', 'MD', 'VB', 'DT', 'NN', '.']


### Calculate **Accuracy** of POS Tagger using HMM Model.


In [167]:
def accuracy(test_corpus, transition_proba_matrix, emission_proba_matrix, pos_states, vocab):
    """
    Computes the accuracy of the POS tagger which uses HMM model.

    Params:
    ----------
    test_corpus: list
        The list of lines containing words and their true POS label delimited by tabs or whitespace.
    transition_proba_matrix: numpy array
        The transition probability matrix which provides the probability of a tag given a previous tag in the sequence.
    emission_proba_matrix: numpy array
        The emission probability matrix which provides the probability of a word given a tag.
    pos_states: list
        The total list of tags from which to assign to words in a given sequence.
    vocab: list of words
        The vocabulary being used.

    Returns:
    ----------
    accuracy: float
        The accuracy of the model.
    """


    word_list, true_tag_list = [], []
    for line in test_corpus:
        word, tag = get_word_tag(line, vocab)
        word_list.append(word)
        true_tag_list.append(tag)

    # Predict the POS sequence for the test word sequence.
    pred_tag_list, *_ = pos_tagger(word_list, transition_proba_matrix, emission_proba_matrix, pos_states, vocab)

    assert(len(true_tag_list) == len(pred_tag_list))

    accuracy = 0
    for true_tag, pred_tag in zip(true_tag_list, pred_tag_list):
        if true_tag == pred_tag: accuracy += 1

    accuracy = accuracy * 100 / len(true_tag_list)
    return f'{accuracy:.4f}'   



In [166]:
print('The accuracy of the POS Tagger using HMM Model is:', 
accuracy(test_corpus, A, B, pos_states, vocab))

Words processed:  500
Words processed:  1000
Words processed:  1500
Words processed:  2000
Words processed:  2500
Words processed:  3000
Words processed:  3500
Words processed:  4000
Words processed:  4500
Words processed:  5000
Words processed:  5500
Words processed:  6000
Words processed:  6500
Words processed:  7000
Words processed:  7500
Words processed:  8000
Words processed:  8500
Words processed:  9000
Words processed:  9500
Words processed:  10000
Words processed:  10500
Words processed:  11000
Words processed:  11500
Words processed:  12000
Words processed:  12500
Words processed:  13000
Words processed:  13500
Words processed:  14000
Words processed:  14500
Words processed:  15000
Words processed:  15500
Words processed:  16000
Words processed:  16500
Words processed:  17000
Words processed:  17500
Words processed:  18000
Words processed:  18500
Words processed:  19000
Words processed:  19500
Words processed:  20000
Words processed:  20500
Words processed:  21000
Words proces

### References

- ["Speech and Language Processing", Dan Jurafsky and James H. Martin](https://web.stanford.edu/~jurafsky/slp3/)