### Data Preparation

Load and explore data

In [45]:
from collections import Counter
import nltk
from nltk.corpus import brown

nltk.download('brown')
nltk.download('universal_tagset')

[nltk_data] Downloading package brown to C:\Users\MSI
[nltk_data]     GF66\AppData\Roaming\nltk_data...
[nltk_data]   Package brown is already up-to-date!
[nltk_data] Downloading package universal_tagset to C:\Users\MSI
[nltk_data]     GF66\AppData\Roaming\nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!


True

In [24]:
tagged_sentences = brown.tagged_sents()

print(f"Total sentences: {len(tagged_sentences)}")

Total sentences: 57340


In [25]:
words = [word.lower() for sent in tagged_sentences for word, tag in sent]
vocab = set(words)
print(f"Vocab size: {len(vocab)}")

Vocab size: 49815


In [28]:
tags = [tag for sent in tagged_sentences for word, tag in sent]
tag_counts = Counter(tags)

print("Tag distribution")
for tag, count in tag_counts.most_common(15):
    print(f"{tag}: {count}")

Tag distribution
NN: 152470
IN: 120557
AT: 97959
JJ: 64028
.: 60638
,: 58156
NNS: 55110
CC: 37718
RB: 36464
NP: 34476
VB: 33693
VBN: 29186
VBD: 26167
CS: 22143
PPS: 18253


In [29]:
for sent in tagged_sentences[:5]:
    print(sent)

[('The', 'AT'), ('Fulton', 'NP-TL'), ('County', 'NN-TL'), ('Grand', 'JJ-TL'), ('Jury', 'NN-TL'), ('said', 'VBD'), ('Friday', 'NR'), ('an', 'AT'), ('investigation', 'NN'), ('of', 'IN'), ("Atlanta's", 'NP$'), ('recent', 'JJ'), ('primary', 'NN'), ('election', 'NN'), ('produced', 'VBD'), ('``', '``'), ('no', 'AT'), ('evidence', 'NN'), ("''", "''"), ('that', 'CS'), ('any', 'DTI'), ('irregularities', 'NNS'), ('took', 'VBD'), ('place', 'NN'), ('.', '.')]
[('The', 'AT'), ('jury', 'NN'), ('further', 'RBR'), ('said', 'VBD'), ('in', 'IN'), ('term-end', 'NN'), ('presentments', 'NNS'), ('that', 'CS'), ('the', 'AT'), ('City', 'NN-TL'), ('Executive', 'JJ-TL'), ('Committee', 'NN-TL'), (',', ','), ('which', 'WDT'), ('had', 'HVD'), ('over-all', 'JJ'), ('charge', 'NN'), ('of', 'IN'), ('the', 'AT'), ('election', 'NN'), (',', ','), ('``', '``'), ('deserves', 'VBZ'), ('the', 'AT'), ('praise', 'NN'), ('and', 'CC'), ('thanks', 'NNS'), ('of', 'IN'), ('the', 'AT'), ('City', 'NN-TL'), ('of', 'IN-TL'), ('Atlanta'

Data splitting

In [30]:
from sklearn.model_selection import train_test_split

train_data, test_dev_data = train_test_split(tagged_sentences, test_size=0.2, random_state=42)

test_data, dev_data = train_test_split(test_dev_data, test_size=0.5, random_state=42)

print(f"Training data size: {len(train_data)}")
print(f"Test data size: {len(test_data)}")
print(f"Dev data size: {len(dev_data)}")

Training data size: 45872
Test data size: 5734
Dev data size: 5734


Data preprocessing

In [31]:
def preprocess_data(data):
    processed_data = [[(word.lower(), tag) for (word, tag) in sent] for sent in data]

    vocab = sorted({word for sent in processed_data for (word, tag) in sent})
    tags = sorted({tag for sent in processed_data for (word, tag) in sent})

    word_idx = {word: i for i, word in enumerate(vocab)}
    tag_idx = {tag: i for i, tag in enumerate(tags)}
    idx_word = {i: word for word, i in word_idx.items()}
    idx_tag = {i: tag for tag, i in tag_idx.items()}

    return processed_data, vocab, tags, word_idx, tag_idx, idx_word, idx_tag

In [37]:
train_data, vocab, tags, word_idx, tag_idx, idx_word, idx_tag = preprocess_data(train_data)

print("Number of sentences:", len(train_data))
print("Vocabulary size:", len(vocab))
print("Number of tags:", len(tags))
print("Sample vocab:", vocab[10])
print("Sample tags:", tags[250:260])

Number of sentences: 45872
Vocabulary size: 45157
Number of tags: 456
Sample vocab: ['!', '$.027', '$.03', '$.054/mbf', '$.07', '$.07/cwt', '$.076', '$.09', '$.10-a-minute', '$.105']
Sample tags: ['NN$-HL', 'NN$-TL', 'NN+BEZ', 'NN+HVD-TL', 'NN+HVZ', 'NN+IN', 'NN+MD', 'NN+NN-NC', 'NN-HL', 'NN-NC']


### HMM Implementation

Initial probabilities

In [76]:
def initial_probabailities(training_data, all_tags):
    counts = {tag: 1 for tag in all_tags}
    total = len(training_data) + len(all_tags)

    for sentence in training_data:
        first_tag = sentence[0][1]
        counts[first_tag] += 1

    initial_prob = {tag: counts[tag]/total for tag in all_tags}

    return initial_prob

In [77]:
initial_probs = initial_probabailities(train_data, tags)

print("Sample initial probabilities:")
for tag in ["NN", "VB", "DT"]:
    print(tag, initial_probs.get(tag, None))

Sample initial probabilities:
NN 0.024218615092384734
VB 0.01489380072526334
DT 0.02767225004317044


Transition probabilities

In [83]:
def transition_probabilities(training_data, all_tags):
    start_tag, end_tag = "<START>", "<END>"
    tags_with_start_end = all_tags + [start_tag, end_tag]

    tag_counts = {tag: 0 for tag in tags_with_start_end}
    transition_counts = {t1: {t2: 1 for t2 in tags_with_start_end} for t1 in tags_with_start_end}

    for sentence in training_data:
        tags = [start_tag] + [tag for _, tag in sentence] + [end_tag]
        for i in range(1, len(tags)):
            prev_tag = tags[i-1]
            curr_tag = tags[i]
            transition_counts[prev_tag][curr_tag] += 1
            tag_counts[prev_tag] += 1

    transition_probs = {}
    for prev_tag, curr_tag in transition_counts.items():
        transition_probs[prev_tag] = {}
        for next_tag, count in curr_tag.items():
            transition_probs[prev_tag][next_tag] = count / (tag_counts[prev_tag] + len(tags_with_start_end))

    return transition_probs

In [84]:
all_tags = list({tag for sent in train_data for _, tag in sent})

transition_probs = transition_probabilities(train_data, all_tags)

print("Number of tags:", len(all_tags))
print("Sample transition probabilities:")
print("P(VB|NN):", transition_probs["VB"]["NN"])
print("P(END|NN):", transition_probs["<END>"]["NN"])

Number of tags: 456
Sample transition probabilities:
P(VB|NN): 0.0428011326508386
P(END|NN): 0.002183406113537118


Emission probabilities

In [57]:
def emission_probabilities(training_data, all_tags):
    vocab = list({word.lower() for sent in training_data for word, _ in sent})

    emission_counts = {tag: {word: 1 for word in vocab} for tag in all_tags}
    tag_counts = {tag: 0 for tag in all_tags}

    for sentence in training_data:
        for word, tag in sentence:
            word = word.lower()
            emission_counts[tag][word] += 1
            tag_counts[tag] += 1

    emission_probs = {}
    for tag in all_tags:
        total = tag_counts[tag] + len(vocab)
        emission_probs[tag] = {}
        for word, count in emission_counts[tag].items():
            emission_probs[tag][word] = count / total

    emission_probs["<OOV>"] = {tag: 1e-6 for tag in all_tags}

    return emission_probs

In [74]:
if "cases" in vocab and "problem" in vocab:
    print(True)

if "outofvocabword" not in vocab:
    print(True)


True
True


In [67]:
emission_probs = emission_probabilities(train_data, all_tags)
print("Sample emission probabilities:")
print("P(cases|NNS):", emission_probs["NNS"]["cases"])
print("P(problem|NN):", emission_probs["NN"]["problem"])

print("P(outofvocabword|NN):", emission_probs["NN"].get("outofvocabword", emission_probs["<OOV>"]["NN"]))

Sample emission probabilities:
P(cases|NNS): 0.0013303075358009234
P(problem|NN): 0.0015022743595882212
P(outofvocabword|NN): 1e-06


HMM POS Tagger

In [86]:
import math

class HMMPOSTagger:
    def __init__(self, all_tags, initial_probs, transition_probs, emission_probs):
        self.POS = all_tags
        self.initial_prob = initial_probs
        self.transition_prob = transition_probs
        self.emission_prob = emission_probs

    def viterbi_decode(self, sentence):
        V = []
        backpointer = []

        first_probs = {}
        first_back = {}
        for tag in self.POS:
            emission = self.emission_prob[tag].get(sentence[0], self.emission_prob["<OOV>"][tag])
            first_probs[tag] = math.log(self.initial_prob[tag]) + math.log(emission)
            first_back[tag] = None

        V.append(first_probs)
        backpointer.append(first_back)

        for i in range(1, len(sentence)):
            current_probs = {}
            current_back = {}
            word = sentence[i]
            for tag in self.POS:
                max_prob = float("-inf")
                best_prev_tag = None
                for prev_tag in self.POS:
                    transition = self.transition_prob[prev_tag].get(tag, 1e-6)
                    emission = self.emission_prob[tag].get(word, self.emission_prob["<OOV>"][tag])
                    prob = V[i-1][prev_tag] + math.log(transition) + math.log(emission)
                    if prob > max_prob:
                        max_prob = prob
                        best_prev_tag = prev_tag
                current_probs[tag] = max_prob
                current_back[tag] = best_prev_tag
            V.append(current_probs)
            backpointer.append(current_back)

        best_last_tag = max(V[-1], key=V[-1].get)
        best_tags = [best_last_tag]
        for i in range(len(sentence)-1, 0, -1):
            best_tags.append(backpointer[i][best_tags[-1]])
        best_tags.reverse()

        return best_tags

In [88]:
hmm_tagger = HMMPOSTagger(all_tags, initial_probs, transition_probs, emission_probs)

test_sentences = [
    ["The", "tiny", "sparrow", "quickly", "flew", "over", "the", "garden", "fence"],
    ["After", "a", "long", "day", "of", "work", "she", "decided", "to", "read", "a", "book"],
    ["Scientists", "have", "recently", "discovered", "a", "new", "species", "of", "frog", "in", "the", "Amazon"],
    ["The", "team", "celebrated", "their", "hard-earned", "victory", "with", "a", "grand", "dinner"],
    ["In", "spite", "of", "the", "heavy", "rain", "the", "festival", "continued", "until", "late", "at", "night"]
]

for sentence in test_sentences:
    predicted_tags = hmm_tagger.viterbi_decode([w.lower() for w in sentence])
    print("Sentence:", sentence)
    print("Predicted Tags:", predicted_tags)

Sentence: ['The', 'tiny', 'sparrow', 'quickly', 'flew', 'over', 'the', 'garden', 'fence']
Predicted Tags: ['AT', 'JJ', 'NN', 'RB', 'VBD', 'IN', 'AT', 'NN', 'NN']
Sentence: ['After', 'a', 'long', 'day', 'of', 'work', 'she', 'decided', 'to', 'read', 'a', 'book']
Predicted Tags: ['IN', 'AT', 'JJ', 'NN', 'IN', 'NN', 'PPS', 'VBD', 'TO', 'VB', 'AT', 'NN']
Sentence: ['Scientists', 'have', 'recently', 'discovered', 'a', 'new', 'species', 'of', 'frog', 'in', 'the', 'Amazon']
Predicted Tags: ['PPSS', 'HV', 'RB', 'VBD', 'AT', 'JJ', 'NN', 'IN', 'NN', 'IN', 'AT', 'NN']
Sentence: ['The', 'team', 'celebrated', 'their', 'hard-earned', 'victory', 'with', 'a', 'grand', 'dinner']
Predicted Tags: ['AT', 'NN', 'IN', 'PP$', 'JJ', 'NN', 'IN', 'AT', 'JJ', 'NN']
Sentence: ['In', 'spite', 'of', 'the', 'heavy', 'rain', 'the', 'festival', 'continued', 'until', 'late', 'at', 'night']
Predicted Tags: ['IN', 'NN', 'IN', 'AT', 'JJ', 'NN', 'AT', 'NN', 'VBD', 'IN', 'JJ', 'IN', 'NN']


Evaluation

In [93]:
def evaluate_tagger(tagger, data):
    total_words = 0
    correct_tags = 0

    for sentence in data:
        words = [w.lower() for w, t in sentence]
        original_tags = [t for w, t in sentence]

        predicted_tags = tagger.viterbi_decode(words)

        total_words += len(words)
        correct_tags += sum([p == g for p, g in zip(predicted_tags, original_tags)])

    accuracy = correct_tags / total_words
    return accuracy

dev_accuracy = evaluate_tagger(hmm_tagger, dev_data[:1000])
print(f"Dev set accuracy: {dev_accuracy:.4f}")

Dev set accuracy: 0.8900
