## POS tagging using modified Viterbi

### Data Preparation

In [1]:
#Importing libraries
import nltk

In [2]:
# nltk.download('treebank')
# nltk.download('universal_tagset')

In [3]:
# reading the Treebank tagged sentences
nltk_data = list(nltk.corpus.treebank.tagged_sents(tagset='universal'))

In [4]:
from sklearn.model_selection import train_test_split
train_set, test_set = train_test_split(nltk_data, test_size=0.05, random_state=1)

In [5]:
train_set[0]

[('He', 'PRON'),
 ('has', 'VERB'),
 ('promised', 'VERB'),
 ('stiffer', 'ADJ'),
 ('fines', 'NOUN'),
 (',', '.'),
 ('though', 'ADP'),
 ('the', 'DET'),
 ('size', 'NOUN'),
 ('of', 'ADP'),
 ('penalties', 'NOUN'),
 ('sought', 'VERB'),
 ('*', 'X'),
 ('by', 'ADP'),
 ('OSHA', 'NOUN'),
 ('have', 'VERB'),
 ('been', 'VERB'),
 ('rising', 'VERB'),
 ('in', 'ADP'),
 ('recent', 'ADJ'),
 ('years', 'NOUN'),
 ('even', 'ADV'),
 ('before', 'ADP'),
 ('he', 'PRON'),
 ('took', 'VERB'),
 ('office', 'NOUN'),
 ('this', 'DET'),
 ('year', 'NOUN'),
 ('.', '.')]

In [6]:
# list of tagged words
train_tagged_words = [tup for sent in train_set for tup in sent]
len(train_tagged_words)

95668

In [7]:
# list of tags
len(set([tag for word, tag in train_tagged_words]))

12

In [8]:
sorted(list(set([tag for word, tag in train_tagged_words])))

['.',
 'ADJ',
 'ADP',
 'ADV',
 'CONJ',
 'DET',
 'NOUN',
 'NUM',
 'PRON',
 'PRT',
 'VERB',
 'X']

### Build the vanilla Viterbi based POS tagger

In [9]:
import numpy as np
import pandas as pd

In [10]:
# compute word given tag: Emission Probability
def word_given_tag(iword, itag, train_bag = train_tagged_words):
    tag_list = [(word, tag) for word, tag in train_bag if tag == itag]
    count_tag = len(tag_list)
    w_given_tag_list = [word for word, tag in tag_list if word == iword]
    count_w_given_tag = len(w_given_tag_list)
    
    return (count_w_given_tag, count_tag)

In [11]:
word_given_tag('Carolina', 'NOUN')

(12, 27474)

In [12]:
# compute tag given tag: tag2(t2) given tag1 (t1), i.e. Transition Probability
def t2_given_t1(t2, t1, train_bag = train_tagged_words):
    tags = [tag for word, tag in train_bag]
    count_t1 = len([t for t in tags if t==t1])
    count_t2_t1 = 0
    for index in range(len(tags)-1):
        if tags[index]==t1 and tags[index+1] == t2:
            count_t2_t1 += 1
    return (count_t2_t1, count_t1)

In [13]:
t2_given_t1('NOUN', 'ADP')

(3032, 9339)

In [14]:
# Viterbi Heuristic
class Viterbi:
    def __init__(self, train_tagged_words):
        self.__train_words = [word for word, tags in train_tagged_words]
        self.__train_tags = sorted(list(set([tag for word, tag in train_tagged_words])))
    
        v = len(self.__train_words)
        t = len(self.__train_tags)
    
        transition_p = np.zeros((t, t), dtype='float32')
        for i, t1 in enumerate(self.__train_tags):
            for j, t2 in enumerate(self.__train_tags):
                counts = t2_given_t1(t2, t1)
                transition_p[i, j] = counts[0] / counts[1]
        self.__transition_p_df = pd.DataFrame(transition_p, columns=self.__train_tags, index=self.__train_tags)
            
        self.__emission_p_df = pd.DataFrame(index=self.__train_tags)
        
    def tag(self, words, rule_based_tagger=None, laplace_smoothing=False):
        state = []
        
        # update cache for emission probabilities
        for word in words:
            # emission probabilities for the word is already cached
            if word in self.__emission_p_df.columns:
                continue
                
            for tag in self.__train_tags:
                counts = word_given_tag(word, tag)
                self.__emission_p_df.loc[tag, word] = counts[0] / counts[1]

        for key, word in enumerate(words):
            #initialise list of probability column for a given observation
            p = [] 

            for tag in self.__train_tags:                
                if key == 0:
                    transition_p = self.__transition_p_df.loc['.', tag]
                else:
                    transition_p = self.__transition_p_df.loc[state[-1], tag]

                emission_p = 0
                # cached emission probailities for the word
                emission_p = self.__emission_p_df.loc[tag, word]

                state_probability = emission_p * transition_p    
                
                # unknown word handling - Approach 2 - laplace smoothing
                if laplace_smoothing and word not in self.__train_words:
                    state_probability = transition_p

                p.append(state_probability)

            pmax = max(p)
            
            # getting state for which probability is maximum
            state_max = self.__train_tags[p.index(pmax)]

            # unknown word handling - Approach 1 - rule based tagger
            if word not in self.__train_words:
                if rule_based_tagger:
                    state_max = rule_based_tagger.tag([word])[0][1]
             
            state.append(state_max)

        return list(zip(words, state))

In [15]:
viterbi = Viterbi(train_tagged_words)

In [16]:
viterbi.tag(['Carolina'])

[('Carolina', 'NOUN')]

In [17]:
test_tagged_words = [tup for sent in test_set for tup in sent]
len(test_tagged_words)

5008

In [18]:
test_tagged_words[0:2]

[('While', 'ADP'), ('the', 'DET')]

In [19]:
test_words = [word for word, tag in test_tagged_words]
test_words[0:2]

['While', 'the']

In [None]:
import time
# tagging the test sentences
start = time.time()
tagged_seq = viterbi.tag(test_words)
end = time.time()
difference = end-start

In [None]:
msg = f'Tagging Complete. Time taken to tag test seq: {difference}secs.'
print(msg)

In [None]:
tagged_seq[0:10]

In [None]:
def accuracy(tagged_seq, test_tagged_words):
    predicted_tags = [tag for word, tag in tagged_seq]
    actual_tags = [tag for word, tag in test_tagged_words]
    # accuracy
    correctly_tagged = [predicted_tag for predicted_tag, actual_tag in zip(predicted_tags, actual_tags) 
                                    if predicted_tag == actual_tag] 
    accuracy = len(correctly_tagged) / len(actual_tags)
    return round(100 * accuracy, 2)

In [None]:
print(f'Accuracy of vanilla viterbi POS tagger: {accuracy(tagged_seq, test_tagged_words)}')

### Solve the problem of unknown words

In [None]:
def get_unknown_tagged_incorrectly(tagged_seq, test_tagged_words, train_tagged_words):
    train_words = [word for (word, _) in train_tagged_words]
    print('Tags not in train set')
    return pd.DataFrame([(word, tag, actual_tag) 
            for (word, tag), (_, actual_tag) in zip(tagged_seq, test_tagged_words) 
            if word not in train_words and tag != actual_tag],
            columns=['word', 'tag', 'actual_tag'])

In [None]:
# lets figure out the unknown words
unknown_tagged_words = get_unknown_tagged_incorrectly(tagged_seq, test_tagged_words, train_tagged_words)
unknown_tagged_words.head(10)

In [None]:
len(unknown_tagged_words)

In [None]:
# see if the unknown words have same tags
unknown_tagged_words['tag'].unique()

#### Extend Viterbi with Rule-based tagger
In here, the vanilla viterbi is extended by rule based taggers, where if a particular word in not found in train words, we will use the tag given by the rule based tagger.

In [None]:
# lets identify the mismatches
unknown_tagged_words = get_unknown_tagged_incorrectly(tagged_seq, test_tagged_words, train_tagged_words)
unknown_tagged_words.head(10)

In [None]:
# build a rule based tagger
patterns = [
    (r'\*+', 'X'),                   # if there are * in the words, mark it X
    (r'.*', 'NOUN')                  # nouns
]
rule_based_tagger = nltk.RegexpTagger(patterns)

In [None]:
rule_based_tagged_seq = viterbi.tag(test_words, rule_based_tagger)
print(f'Accuracy of rule based viterbi POS tagger: {accuracy(rule_based_tagged_seq, test_tagged_words)}')

In [None]:
# build a rule based tagger - add NUM for number matches
patterns = [
    (r'^-?[0-9]+(.[0-9]+)?$', 'NUM'), # cardinal numbers
    (r'\*+', 'X'),                   # if there are * in the words, mark it X
    (r'.*', 'NOUN')                  # nouns
]
rule_based_tagged_seq = viterbi.tag(test_words, rule_based_tagger)
print(f'Accuracy of rule based viterbi POS tagger: {accuracy(rule_based_tagged_seq, test_tagged_words)}')

In [None]:
# lets identify any other mismatches
unknown_tagged_words = get_unknown_tagged_incorrectly(rule_based_tagged_seq, test_tagged_words, train_tagged_words)
unknown_tagged_words.head(10)

In [None]:
# build a rule based tagger - ADJ for gerund
patterns = [
    (r'.*ing$', 'ADJ'),              # gerund
    (r'^-?[0-9]+(.[0-9]+)?$', 'NUM'), # cardinal numbers
    (r'\*+', 'X'),                   # if there are * in the words, mark it X
    (r'.*', 'NOUN')                  # nouns
]
rule_based_tagged_seq = viterbi.tag(test_words, rule_based_tagger)
print(f'Accuracy of rule based viterbi POS tagger: {accuracy(rule_based_tagged_seq, test_tagged_words)}')

In [None]:
# lets identify any other mismatches
unknown_tagged_words = get_unknown_tagged_incorrectly(rule_based_tagged_seq, test_tagged_words, train_tagged_words)
unknown_tagged_words.head(10)

In [None]:
# build a rule based tagger - ADJ for 'ous'
patterns = [
    (r'(.*ing|.*ous)$', 'ADJ'),
    (r'.*ed$', 'VERB'),               # past tense
    (r'^-?[0-9]+(.[0-9]+)?$', 'NUM'), # cardinal numbers
    (r'\*+', 'X'),                   # if there are * in the words, mark it X
    (r'.*', 'NOUN')                  # nouns
]
rule_based_tagged_seq = viterbi.tag(test_words, rule_based_tagger)
print(f'Accuracy of rule based viterbi POS tagger: {accuracy(rule_based_tagged_seq, test_tagged_words)}')

In [None]:
# lets identify any other mismatches
unknown_tagged_words = get_unknown_tagged_incorrectly(rule_based_tagged_seq, test_tagged_words, train_tagged_words)
unknown_tagged_words.head(10)

In [None]:
# now apply unigram tagger
# lexicon backed up by the rule-based tagger
lexicon_tagger = nltk.BigramTagger(train_set, backoff=rule_based_tagger)
rule_based_tagged_seq = viterbi.tag(test_words, lexicon_tagger)
print(f'Accuracy of rule based viterbi POS tagger: {accuracy(rule_based_tagged_seq, test_tagged_words)}')

#### Extend Viterbi with Probabilistic method - Laplace smooting
In laplace smooting, for the unknown words, we will consider only the transition probabilities to determine the tag in viterbi algorithm.

In [None]:
prob_tagged_seq = viterbi.tag(test_words, laplace_smoothing=True)
print(f'Accuracy of probalistic based viterbi POS tagger: {accuracy(prob_tagged_seq, test_tagged_words)}')

#### Evaluation

In [None]:
results = pd.DataFrame(columns=['Accuracy'])
results.loc['Vanilla Viterbi'] = accuracy(tagged_seq, test_tagged_words)
results.loc['Viterbi with rule based tagger'] = accuracy(rule_based_tagged_seq, test_tagged_words)
results.loc['Vanilla with laplace smooting'] = accuracy(prob_tagged_seq, test_tagged_words)
results

The cases which were incorrectly tagged by original POS tagger and got corrected by the extension.

In [None]:
# incorrectly tagged words from tagged_seq which were corrected in merged_tag_seq
corrected_tags = \
[(word, viterbi_tag, viterbi_extend_tag, actual_tag)
 for (word, actual_tag), (_, viterbi_tag), (_, viterbi_extend_tag) in 
 zip(test_tagged_words, tagged_seq, prob_tagged_seq)
 if (viterbi_tag != actual_tag) and (viterbi_extend_tag == actual_tag)]

In [None]:
pd.DataFrame(corrected_tags,
             columns=["word", "vanilla viterbi tag", "viterbi-lapace tag", "acutal tag"]
            ).head(10)

### Generating tags on test sentences

In [None]:
file_path = 'Test_sentences.txt'

In [None]:
test_sentences = []
with open(file_path) as f:
    test_sentences = f.read().split('\n')
# test_sentences

In [None]:
tagged_test_seq_vanilla = []
for sent in test_sentences:
    tagged_test_seq_vanilla.append(viterbi.tag(sent.split()))

In [None]:
tagged_test_seq = []
for sent in test_sentences:
    tagged_test_seq.append(viterbi.tag(sent.split(), laplace_smoothing=True))
#     tagged_test_seq.append(viterbi.tag(sent.split(), lexicon_tagger))

In [None]:
t1_word_tags = [(word, tag) for tups in tagged_test_seq_vanilla for word, tag in tups]
t2_word_tags = [(word, tag) for tups in tagged_test_seq for word, tag in tups]

In [None]:
diff_tags = [(word, tag1, tag2) for (word, tag1), (_, tag2) in zip(t1_word_tags, t2_word_tags) if tag1 != tag2]
pd.DataFrame(diff_tags,
             columns=['word', 'vanilla viterbi tag', 'extended viterbi tag']).head(10)

In [None]:
def print_tag_sentence(sent_tup):
    for tup in sent_tup:
        print(f'{tup[0]}/{tup[1]}', end=' ')
    print()

In [None]:
print_tag_sentence(tagged_test_seq_vanilla[0])
print_tag_sentence(tagged_test_seq_vanilla[1])

In [None]:
print_tag_sentence(tagged_test_seq[0])
print_tag_sentence(tagged_test_seq[1])

As seen in the result, following thress results got corrected.

- Case 1:  
Vanilla Viterbi &nbsp;&nbsp;&nbsp; : <kbd>**Android/.** is/VERB a/DET mobile/ADJ operating/NOUN system/NOUN developed/VERB by/ADP Google./.</kbd>   
Extended Viterbi: <kbd>**Android/NOUN** is/VERB a/DET mobile/ADJ operating/NOUN system/NOUN developed/VERB by/ADP Google./NOUN</kbd>

- Case 2:  
Vanilla Viterbi &nbsp;&nbsp;&nbsp; : <kbd>Android/. has/VERB been/VERB the/DET best-selling/ADJ **OS/.** worldwide/. on/ADP smartphones/. since/ADP 2011/. and/CONJ on/ADP tablets/NOUN since/ADP 2013./.</kbd>   
Extended Viterbi: <kbd>Android/NOUN has/VERB been/VERB the/DET best-selling/ADJ **OS/NOUN** worldwide/NOUN on/ADP smartphones/NOUN since/ADP 2011/NOUN and/CONJ on/ADP tablets/NOUN since/ADP 2013./NOUN</kbd>  

- Case 3:  
Vanilla Viterbi &nbsp;&nbsp;&nbsp; : <kbd>Android/. has/VERB been/VERB the/DET best-selling/ADJ OS/. worldwide/. on/ADP **smartphones/.** since/ADP 2011/. and/CONJ on/ADP tablets/NOUN since/ADP 2013./.</kbd>   
Extended Viterbi: <kbd>Android/NOUN has/VERB been/VERB the/DET best-selling/ADJ OS/NOUN worldwide/NOUN on/ADP **smartphones/NOUN** since/ADP 2011/NOUN and/CONJ on/ADP tablets/NOUN since/ADP 2013./NOUN</kbd>  