In [358]:
import nltk
from nltk.corpus import treebank

from itertools import chain
from collections import defaultdict
import numpy as np
import pandas as pd
import math
import re

# DATA

The Penn Treebank Corpus is a widely used annotated dataset in natural language processing research. It consists of text samples from various sources, including newspaper articles, fiction, non-fiction, and transcribed speech. Each word in the corpus is manually annotated with its corresponding part-of-speech tag, providing valuable linguistic information for tasks such as syntactic analysis, parsing, and information extraction

In [359]:
#Download the treebank corpus
nltk.download('treebank')

[nltk_data] Downloading package treebank to
[nltk_data]     C:\Users\pc\AppData\Roaming\nltk_data...
[nltk_data]   Package treebank is already up-to-date!


True

In [360]:
#Get the tagged sentences
tagged_sentences = list(treebank.tagged_sents())

#We need to mark the beginning of a sentence, so we will use the token <s> tagged as <s>
for sentence in tagged_sentences:
    sentence.insert(0, ('--s--', '--s--'))

In [361]:
#Split the data into training and test data
DATA_size = len(tagged_sentences)

train_data = tagged_sentences[:int(DATA_size * 0.8)]
test_data = tagged_sentences[int(DATA_size * 0.8):]

print("Train data size: ", len(train_data))
print("Test data size: ", len(test_data))

Train data size:  3131
Test data size:  783


In [362]:
#Create the vocabulary and tags set
vocab = set()
tags = set()

#Get the vocabulary
for sentence in train_data:
    for word, tag in sentence:
        vocab.add(word)
        tags.add(tag)

#sort tags
tags = sorted(tags)

print("Vocabulary size: ", len(vocab))
print("Tags size: ", len(tags))

Vocabulary size:  11045
Tags size:  47


In [363]:
#Create a dictionary to map words and tags to unique indices
vocab_dict = {word: i for i, word in enumerate(vocab)}
tags_dict = {tag: i for i, tag in enumerate(tags)}

In [364]:
# Create a training corpus
training_corpus = list(chain(*train_data))
print("Training corpus size: ", len(training_corpus))

Training corpus size:  83768


---
# Generating Transition and Emission Matrices

First of all we need to compute :

* Transition counts : $C(t_i/t_{i-1})$
* Emission counts : $C(w_i/t_i)$

And we will also need a tag counts for smoothing :
* Tag counts : $C(t_i)$

In [365]:
def create_counts(training_corpus):
    """
    input: 
        - training_corpus: a corpus where each word is tagged. Each word is a tuple of (word, tag).
    output:
        - transition_counts: a dictionary where the keys are tuples that represent a part of speech, and the values are the counts of each part of speech.
        - emission_counts: a dictionary where the keys are tuples that represent a (tag, word) pair, and the values are the counts of each pair.
        - tag_counts: a dictionary where the keys are the part of speech tags, and the values are the counts of each tag.
    """
    #Transition_counts dictionary
    transition_counts = defaultdict(int)
    emission_counts = defaultdict(int)
    tag_counts = defaultdict(int)

    prev_tag = '--s--' #start symbol

    for w,tag in training_corpus:
        transition_counts[(prev_tag,tag)] += 1
        emission_counts[(w, tag)] += 1
        tag_counts[tag] += 1
        prev_tag = tag

    return transition_counts, emission_counts, tag_counts

transition_counts, emission_counts, tag_counts = create_counts(training_corpus)

We will generate the transitions matrix A and the emission matrix. Note that we will need to apply smoothing to these.

* Transition Matrix A : $$P(t_i/t_{i-1})=\frac{C(t_{i-1},t_i)+\alpha}{C(t_{i-1})+\alpha*N}$$
* Emission Matrix B : $$P(w_i/t_i)=\frac{C(t_i,w_i)+\alpha}{C(t_{i})+\alpha*N}$$

In [366]:
#create a function that computes the transition matrix A
def transition_matrix(tag_counts, transition_counts,alpha=1e-2):
    """
    input: 
        - tag_counts: a dictionary where the keys are the part of speech tags, and the values are the counts of each tag.
        - transition_counts: a dictionary where the keys are tuples that represent a part of speech, and the values are the counts of each part of speech.
        - alpha: a smoothing parameter.
    output:
        - A: a matrix of dimension (num_tags, num_tags) representing the transition matrix of part of speech tags.
    """
    num_tags = len(tag_counts)
    A = np.zeros((num_tags, num_tags))

    for tag,i in tags_dict.items():
        for prev_tag,j in tags_dict.items():
            A[i, j] = (transition_counts[(tag, prev_tag)]+alpha) / (tag_counts[prev_tag]+alpha*num_tags)

    return A

#Create the transition matrix
A = transition_matrix(tag_counts, transition_counts)

In [367]:
pd.DataFrame(A[0:5,0:5], columns = list(tags_dict.keys())[0:5], index = list(tags_dict.keys())[0:5])

Unnamed: 0,#,$,'',",",--s--
#,0.000691,2.1e-05,1.6e-05,3e-06,3e-06
$,0.000691,2.1e-05,1.6e-05,3e-06,3e-06
'',0.000691,2.1e-05,0.006505,3e-06,0.065148
",",0.000691,0.099509,0.392574,3e-06,0.000323
--s--,0.000691,0.010384,0.001638,3e-06,0.000323


In [368]:
#Create a function that computes the emission matrix B
def emission_matrix(tag_counts, emission_counts, vocab,alpha=1e-2):
    """
    input: 
        - tag_counts: a dictionary where the keys are the part of speech tags, and the values are the counts of each tag.
        - emission_counts: a dictionary where the keys are tuples that represent a (tag, word) pair, and the values are the counts of each pair.
        - vocab: a dictionary where keys are words in the vocabulary and the value is the unique integer that represents the word.
        - alpha: a smoothing parameter.
    output:
        - B: a matrix of dimension (num_tags, len(vocab)) representing the emission matrix of part of speech tags.
    """
    num_tags = len(tag_counts)
    num_words = len(vocab)
    B = np.zeros((num_tags, num_words))

    for tag,i in tags_dict.items():
        for j, word in enumerate(vocab):
            B[i, j] = (emission_counts[(word, tag)] +alpha)/ (tag_counts[tag]+alpha*num_words)

    return B

#Create the emission matrix
B = emission_matrix(tag_counts, emission_counts, vocab)

pd.DataFrame(B[0:5, 0:5], columns = list(vocab)[0:5], index = list(tags_dict.keys())[0:5])

Unnamed: 0,133.8,represented,Winiarski,overnight,2.25
#,8e-05,8e-05,8e-05,8e-05,8e-05
$,1.7e-05,1.7e-05,1.7e-05,1.7e-05,1.7e-05
'',1.4e-05,1.4e-05,1.4e-05,1.4e-05,1.4e-05
",",2e-06,2e-06,2e-06,2e-06,2e-06
--s--,3e-06,3e-06,3e-06,3e-06,3e-06


---
# Viterbi algorithm

We will create a matrix **best_probs**, where each cell $v_t(j)$ represent the probability that the HMM (with the parameters A and B represented by $\lambda$) is in state j (unique ID of a tag) after seeing the first t observations $o_1,o_2,...,o_{t}$ and passing throught the most probable state sequence $q_1,q_2,...,q_{t-1}$, formally :

$$v_t(j)=max_{q_1,q_2,...,q_{t-1}}P(q_1,q_2,...,q_{t-1},o_1,o_2,...,o_{t},q_t=j/\lambda)$$


The value of each cell $v_t(j)$ is computed by recursively taking the most porbable path that could lead us to this cell.

Let's start by the first column $v_1(j)$ :

\begin{align*}
v_1(j) &= P(o_1, q_1=j|\lambda) \\
&=P(o_1|\lambda).P(q_1=j|\lambda) \\
(*) &= P(o_1/\text{<s>})P(\text{tags\_dict[j]/<s>}) \\
&= B[j,\text{vocab\_dict}[o_1]]A[j,0]
\end{align*}

> (*) *Since $o_1$ is the $1^{st}$ observation then it will be preceded necessary by the tag \<s>, which let us say that the tag of index j is also preceded by the tag \<s>*


We already computed A and B, so we can compute the first column of **best_path** matrix.

Now recursively :

\begin{align*}
v_t(j) &= max_{q_1,q_2,...,q_{t-1}}P(q_1,q_2,...,q_{t-1},o_1,o_2,...,o_{t},q_t=j/\lambda) \\
&= max_{q_{t-1}}max_{q_1,q_2,...,q_{t-2}}P(q_1,q_2,...,q_{t-2},o_1,o_2,...,o_{t-1},q_{t-1}/\lambda).P(q_{t-1},o_t,q_t=j) \\
&= max_{i=1}^Nmax_{q_1,q_2,...,q_{t-2}}P(q_1,q_2,...,q_{t-2},o_1,o_2,...,o_{t-1},q_{t-1}=i/\lambda).P(q_{t-1}=i,q_t=j).P(o_t/q_t=j) \\
&= max_{i=1}^Nv_{t-1}(i)A[j,i]B[j,\text{vocab\_dict}[o_t]]
\end{align*}

To sum up :
\begin{align*}
\forall j\in \text{Tags\_index    } v_t(j) &= max_{i=1}^Nv_{t-1}(i)A[j,i]B[j,\text{vocab\_dict}[o_t]] \\
v_1(j) &= B[j,\text{vocab\_dict}[o_1]]A[j,0]
\end{align*}



Its composed of three main stages :
* Initialisation
* Feed forward
* Feed backward

## 1-Initialisation

$$v_1(j) = B[j,\text{vocab\_dict}[o_1]]A[j,0]$$

In [369]:
text = ['I','feel','happy','today']

In [370]:
def initialize(tags_dict,A,B,vocab_dict,text):
    num_tags = len(tags_dict)

    #initialize best_probs
    best_probs = np.zeros((num_tags,len(text)))

    for j in range(num_tags):
        best_probs[j,0] = math.log(B[j,vocab_dict[text[0]]])+math.log(A[j,0])

    return best_probs

best_probs = initialize(tags_dict,A,B,vocab_dict,text)
best_probs[:,0]

array([-16.70632194, -18.26668439, -18.47058757, -20.19294357,
       -19.96619395, -17.20274136, -20.46959415, -17.23166601,
       -19.96031513, -18.27341333, -19.45618039, -19.75504454,
       -20.68696062, -17.13704459, -16.62255596, -14.31903623,
       -20.3706868 , -17.91178018, -17.42152227, -16.69825413,
       -18.60799163, -21.12961563, -15.53074094, -17.55384936,
       -20.40568263, -16.76104446, -18.48697131, -10.00372844,
       -18.50840692, -19.6880759 , -17.32245183, -16.82726947,
       -17.56071162, -16.59599397, -12.99340479, -16.61378012,
       -19.56326307, -15.05486613, -19.05978039, -19.37375659,
       -19.0263909 , -19.43803531, -17.99488254, -17.66762675,
       -16.67365264, -17.44866097, -18.49102564])

## 2- Viterbi forward

$$v_t(j) = max_{i=1}^Nv_{t-1}(i)A[j,i]B[j,\text{vocab\_dict}[o_t]]$$

In [376]:
#Function to handle unknown words in the vocabulary set
def handle_unk_words(word):
    if re.match(r'^[0-9\W]+$', word):
        return "CD"
    if word[-2:]=="er":
        return "JJR"
    if word[-3:]=="est":
        return "JJS"
    if word[-2:]=="ly":
        return "RB"
    if word[-3:]=="ing":
        return "VBG"
    if word[-2:]=="ed":
        return "VBD"
    if word[-1:]=="s":
        return "NNS"
    if word[-2:] in ["'s","es"]:
        return "VBZ"
    else :
        return "NN"

#Feed forward
def feed_forward(tags_dict,A,B,vocab_dict,text,best_probs,h_unk=False):

    best_paths = np.zeros((len(tags_dict),len(text)),dtype=int)

    num_tags = len(tags_dict)
    #iterate over the words in the text
    for t in range(1,len(text)):

        #iterate over the tags for the word
        for j in range(num_tags):
            best_prob = float('-inf')
            best_path = None

            #iterate over the tags for the previous word
            for i in range(num_tags):
                #handle unknown words
                try :
                    prob = best_probs[i,t-1]+math.log(A[j,i])+math.log(B[j,vocab_dict[text[t]]])
                except KeyError:
                    if h_unk:
                        if tags_dict[handle_unk_words(text[t])]==j:
                            bj = 1
                        else:
                            bj = 1e-8
                        prob = best_probs[i,t-1]+math.log(A[j,i])+math.log(bj)
                    else:
                        prob = best_probs[i,t-1]+math.log(A[j,i])+math.log(1e-10)
                if prob>best_prob:
                    best_prob = prob
                    best_path = i
            
            best_probs[j,t] = best_prob
            best_paths[j,t] = best_path

    return best_probs, best_paths

best_probs, best_paths = feed_forward(tags_dict,A,B,vocab_dict,text,best_probs,h_unk=True)
pd.DataFrame(best_probs[:5], columns = text[:5], index = list(tags_dict.keys())[:5])

Unnamed: 0,I,feel,happy,today
#,-16.706322,-31.015501,-40.167702,-50.831571
$,-18.266684,-32.400492,-41.46958,-48.857898
'',-18.470588,-24.699711,-36.224858,-47.187489
",",-20.192944,-25.147485,-35.522499,-47.635264
--s--,-19.966194,-24.617661,-37.282667,-47.10544


## 3- Viterbi backward

In [378]:
def feed_backward(best_probs,best_paths,tags_dict):
    m = best_probs.shape[1]
    z = [0]*m
    tags = list(tags_dict.keys())

    z[-1]=np.argmax(best_probs[:,-1])
    for t in range(m-1,0,-1):
        z[t-1] = best_paths[z[t],t]

    T = [tags[i] for i in z]

    return T

T = feed_backward(best_probs,best_paths,tags_dict)
T
        

['PRP', 'VB', 'MD', 'NN']

## All together - **Viterbi algo**

In [379]:
def viterbi_pred(tags_dict,A,B,vocab_dict,text,h_unk=False):
    best_probs = initialize(tags_dict,A,B,vocab_dict,text)
    best_probs, best_paths = feed_forward(tags_dict,A,B,vocab_dict,text,best_probs,h_unk)

    T = feed_backward(best_probs,best_paths,tags_dict)

    return T

In [380]:
#Exemple 
sentence = "we are going to watch a football match , do you want to join us ?"
print("Input sentence: ", sentence,'\nTags: ')
vit_pred = viterbi_pred(tags_dict,A,B,vocab_dict,sentence.split())
for word, tag in zip(sentence.split(), vit_pred):
    print(f"{word.ljust(10)} : {tag}")


Input sentence:  we are going to watch a football match , do you want to join us ? 
Tags: 
we         : PRP
are        : VBP
going      : PRP
to         : TO
watch      : NN
a          : DT
football   : NN
match      : NN
,          : ,
do         : VBP
you        : PRP
want       : VB
to         : TO
join       : VB
us         : PRP
?          : ``


---
# Testing

In [381]:
#Split the test data to feeature and target sets
X_test = [[x[0] for x in sentence] for sentence in test_data]
y_test = [[x[1] for x in sentence] for sentence in test_data]

In [382]:
#Computing the accuracy
def accuracy(X_test,y_test,tags_dict,A,B,vocab_dict,h_unk=False):
    accuracy = 0
    S = 0
    for i in range(len(X_test)):
        pred = viterbi_pred(tags_dict,A,B,vocab_dict,X_test[i],h_unk)

        for j in range(len(pred)):
            if pred[j] == y_test[i][j]:
                accuracy+=1
            S+=1

    return accuracy/S

print("The accuracy without handling unkown words :",accuracy(X_test,y_test,tags_dict,A,B,vocab_dict))
print("The accuracy with handling unkown words    :",accuracy(X_test,y_test,tags_dict,A,B,vocab_dict,True))

The accuracy without handling unkown words : 0.7540101815387571
The accuracy with handling unkown words    : 0.7751416770723274
