# Load Target Dataset - Penn Treebank PoS Tagging

The Hidden Markov Model (HMM) operates on some observed data ($\{o_t\}_{t=0..T-1}$) with corresponding hidden states ($\{ s_t \}_{t=0..T-1}$), where each state comes from a set $S$.

The Penn Treebank Corpus has labelled sentence words with part-of-speech tags. Consider words as observations and tags as hidden states.

In [3]:
import nltk

# treebank :=  Penn Treebank dataset of tagged corpus
nltk.download('treebank')

# Be careful with parameter 'tagset' because depend on it you get different tag names
treebank_corpus = nltk.corpus.treebank.tagged_sents()

[nltk_data] Downloading package treebank to
[nltk_data]     C:\Users\ilya-\AppData\Roaming\nltk_data...
[nltk_data]   Package treebank is already up-to-date!


In [4]:
sentence = treebank_corpus[0]

for t, (word, tag) in enumerate(sentence):
    print(f"Observation {t}: {word} \t Hidden State {t}: {tag}")

Observation 0: Pierre 	 Hidden State 0: NNP
Observation 1: Vinken 	 Hidden State 1: NNP
Observation 2: , 	 Hidden State 2: ,
Observation 3: 61 	 Hidden State 3: CD
Observation 4: years 	 Hidden State 4: NNS
Observation 5: old 	 Hidden State 5: JJ
Observation 6: , 	 Hidden State 6: ,
Observation 7: will 	 Hidden State 7: MD
Observation 8: join 	 Hidden State 8: VB
Observation 9: the 	 Hidden State 9: DT
Observation 10: board 	 Hidden State 10: NN
Observation 11: as 	 Hidden State 11: IN
Observation 12: a 	 Hidden State 12: DT
Observation 13: nonexecutive 	 Hidden State 13: JJ
Observation 14: director 	 Hidden State 14: NN
Observation 15: Nov. 	 Hidden State 15: NNP
Observation 16: 29 	 Hidden State 16: CD
Observation 17: . 	 Hidden State 17: .


# Hidden Markov Model

In [169]:
import numpy as np
from typing import List, Tuple


class HMM:

    def __init__(self, n_state_types: int, n_observation_types: int):
        self.n_state_types = n_state_types
        self.n_observation_types = n_observation_types
        self.initial_probabilities = np.zeros((n_state_types, ), dtype=np.float64)
        self.transition_probabilities = np.zeros((n_state_types, n_state_types), dtype=np.float64)
        self.emission_probabilities = np.zeros((n_observation_types, n_state_types), dtype=np.float64)

    def train(self, labelled_observations: List[List[Tuple[str, str]]]):

        # Create mapping from states to state id and observations to observation id
        self.st2idx = {}
        self.obs2idx = {}

        for observed_sequence in labelled_observations:
            for observation, state in observed_sequence:
                if observation not in self.obs2idx.keys():
                    self.obs2idx[observation] = len(self.obs2idx)
                if state not in self.st2idx.keys():
                    self.st2idx[state] = len(self.st2idx)
        
        # Inverse mappings
        self.idx2st = {id: state for state, id in self.st2idx.items()}
        self.idx2obs = {id: observation for observation, id in self.obs2idx.items()}

        # Fit initial probabilities
        for observed_sequence in labelled_observations:
            _, initial_state = observed_sequence[0]
            self.initial_probabilities[self.st2idx[initial_state]] += 1
        
        self.initial_probabilities /= self.initial_probabilities.sum()

        # Fit transition probabilities
        for observed_sequence in labelled_observations:
            # Take current state and next state, ignore the last state
            for t in range(len(observed_sequence)-1):
                
                _, current_state = observed_sequence[t] 
                _, next_state = observed_sequence[t+1]

                self.transition_probabilities[ self.st2idx[next_state], self.st2idx[current_state] ] += 1
        
        total_frequencies = self.transition_probabilities.sum(axis=0)
        self.transition_probabilities /= total_frequencies

        # Fit emission probabilities
        for observed_sequence in labelled_observations:
            for observation, state in observed_sequence:
                self.emission_probabilities[ self.obs2idx[observation], self.st2idx[state] ] += 1
        
        total_frequencies = self.emission_probabilities.sum(axis=0)
        self.emission_probabilities /= total_frequencies
    
    def predict(self, observations: List[str]) -> List[str]:
        T = len(observations)
        V = np.zeros((T, self.n_state_types), dtype=np.float64)
        backpointers = np.zeros((T, self.n_state_types), dtype=np.int32)
        
        # Convert observations to indices
        obs_indices = [self.obs2idx[obs] for obs in observations]
        
        # Initialization step (t=0)
        V[0, :] = self.initial_probabilities * self.emission_probabilities[obs_indices[0], :]
        backpointers[0, :] = 0  # No previous state
        
        # Recursion step (t=1 to T-1)
        for t in range(1, T):
            for s in range(self.n_state_types):  # current state
                # Calculate probability for each possible previous state
                # V[t-1, prev] * transition_prob[prev -> s] * emission[obs[t] | s]
                probabilities = V[t-1, :] * self.transition_probabilities[s, :] * self.emission_probabilities[obs_indices[t], s]
                
                # Find the best previous state
                best_prev = np.argmax(probabilities)
                best_prob = probabilities[best_prev]
                
                V[t, s] = best_prob
                backpointers[t, s] = best_prev
        
        # Backtrace
        # Find the best final state
        best_last_state = np.argmax(V[-1, :])
        
        # Reconstruct the path
        path = [best_last_state]
        for t in range(T-1, 0, -1):
            best_last_state = backpointers[t, best_last_state]
            path.append(best_last_state)
        
        # Reverse to get chronological order and convert to state names
        path.reverse()
        return [self.idx2st[state] for state in path]

## Illustrative Example

Let's consider a finite state machine with two states $\{a, b\}$ and two observation types $\{1, 2\}$, such that 

- initial state can be either of the states with probabilities $\frac{7}{8}, \frac{1}{8}$ respectively;
- state $a$ emits $1$ and $2$ with probabilities equal probabilities;
- state $b$ emits $1$ and $2$ with probabilities $\frac{3}{4}, \frac{1}{4}$ respectively;
- after state $a$ follows either of the states with probabilities $\frac{9}{10}, \frac{1}{10}$;
- after state $b$ follows only $b$.

In [170]:
hmm = HMM(2, 2)

In [171]:
np.random.seed(1)

def generation_function(length: int):
    init_state = str(np.random.choice(['a', 'b'], p=[7/8, 1/8]))

    if init_state == 'a':
        init_obs = int(np.random.choice([1, 2], p=[1/2, 1/2]))
    else:
        init_obs = int(np.random.choice([1, 2], p=[3/4, 1/4]))
    
    output = [(init_obs, init_state)]
    for _ in range(length - 1):
        _, prev_state = output[-1]
        
        if prev_state == 'a':
            curr_state = str(np.random.choice(['a', 'b'], p=[9/10, 1/10]))
        else:
            curr_state = 'b'
        
        if curr_state == 'a':
            curr_obs = int(np.random.choice([1, 2], p=[1/2, 1/2]))
        else:
            curr_obs = int(np.random.choice([1, 2], p=[3/4, 1/4]))
        
        output.append((curr_obs, curr_state))
    
    return output

In [172]:
data = [generation_function(10) for _ in range(1000)]

In [173]:
hmm.train(data)

In [174]:
import pandas as pd

# Initial probabilities

initial_probabilities = pd.DataFrame(hmm.initial_probabilities, [hmm.idx2st[i] for i in range(2)])
initial_probabilities

Unnamed: 0,0
a,0.862
b,0.138


In [175]:
transition_probabilities = pd.DataFrame(hmm.transition_probabilities, columns=[hmm.idx2st[i] for i in range(2)], index=[hmm.idx2st[i] for i in range(2)])
transition_probabilities

Unnamed: 0,a,b
a,0.899736,0.0
b,0.100264,1.0


In [176]:
emission_probabilities = pd.DataFrame(hmm.emission_probabilities, columns=[hmm.idx2st[i] for i in range(2)], index=[hmm.idx2obs[i] for i in range(2)])
emission_probabilities

Unnamed: 0,a,b
2,0.508086,0.249257
1,0.491914,0.750743


## Training on Penn Treebank Corpus

In [185]:
tags_set = set()
words_set = set()

for sentence in treebank_corpus:
    for word, tag in sentence:
        tags_set.add(tag)
        words_set.add(word)

In [186]:
hmm_pos = HMM(len(tags_set), len(words_set))

In [187]:
hmm_pos.train(treebank_corpus)

### Test the performance

In [191]:
sentence = treebank_corpus[0]

predicted_tags = hmm_pos.predict([word for word, _ in sentence])

for t, (word, tag) in enumerate(sentence):
    print(f"Observation {t}: {word} \t Hidden State {t}: {tag} \t Predicted {t}: {predicted_tags[t]}")

Observation 0: Pierre 	 Hidden State 0: NNP 	 Predicted 0: NNP
Observation 1: Vinken 	 Hidden State 1: NNP 	 Predicted 1: NNP
Observation 2: , 	 Hidden State 2: , 	 Predicted 2: ,
Observation 3: 61 	 Hidden State 3: CD 	 Predicted 3: CD
Observation 4: years 	 Hidden State 4: NNS 	 Predicted 4: NNS
Observation 5: old 	 Hidden State 5: JJ 	 Predicted 5: JJ
Observation 6: , 	 Hidden State 6: , 	 Predicted 6: ,
Observation 7: will 	 Hidden State 7: MD 	 Predicted 7: MD
Observation 8: join 	 Hidden State 8: VB 	 Predicted 8: VB
Observation 9: the 	 Hidden State 9: DT 	 Predicted 9: DT
Observation 10: board 	 Hidden State 10: NN 	 Predicted 10: NN
Observation 11: as 	 Hidden State 11: IN 	 Predicted 11: IN
Observation 12: a 	 Hidden State 12: DT 	 Predicted 12: DT
Observation 13: nonexecutive 	 Hidden State 13: JJ 	 Predicted 13: JJ
Observation 14: director 	 Hidden State 14: NN 	 Predicted 14: NN
Observation 15: Nov. 	 Hidden State 15: NNP 	 Predicted 15: NNP
Observation 16: 29 	 Hidden Sta

In [195]:
from collections import defaultdict

incorrect = 0
total = 0

errors = defaultdict(int)

for sentence in treebank_corpus:
    predicted_tags = hmm_pos.predict([word for word, _ in sentence])
    for t, (word, tag) in enumerate(sentence):
        if tag != predicted_tags[t]:
            incorrect += 1
            errors[word] += 1
        total += 1

In [196]:
f"Accuracy of the HMM model is {round((total - incorrect) / total, 2)}"

'Accuracy of the HMM model is 0.98'

In [203]:
print("Top 10 confusing words:")
print(sorted(errors.items(), key=lambda x: -x[1])[:10])

Top 10 confusing words:
[('that', 65), ('as', 47), ('up', 44), ('more', 38), (',', 33), ('chief', 32), ('about', 32), ('out', 27), ('down', 27), ('much', 20)]
