# Part of Speech Tagging

Part of Speech Tagging (POS) is the process of assigning a part of speech to a word. i.e.) verb, noun, preposition, et cetra.

## Markov Property (Markov Assumption)

> The conditional probability distribution of future states of the process depends **only upon the present state**; that is, given the present, the future does not depend on the past. 

| | | | | | |
| -- | -- | -- | -- | -- | -- |
| I | killed | them | to | get | kills |
| noun | verb | noun | preposition | verb | ? |

You may find the tag of 'kill', based upon the present tag of 'get', _verb_.

## Building Hidden Markov Model

### Getting Transition Counts & Emission Counts

In [26]:
import enum
from collections import defaultdict

import numpy
import pandas


@enum.unique
class Tag(str, enum.Enum):
    STRT = "(start)"
    NOUN = "noun"
    VERB = "verb"
    PREP = "preposition"
    OTHR = "others"


training_corpus = [
    ("i", Tag.NOUN),
    ("kill", Tag.VERB),
    ("them", Tag.NOUN),
    ("to", Tag.PREP),
    ("get", Tag.VERB),
    ("kill", Tag.NOUN),
]


def get_transition_counts(corpus):
    transition_counts = defaultdict(lambda: 0)

    previous_tag = Tag.STRT
    for _, tag in corpus:
        transition_counts[(previous_tag, tag)] += 1
        previous_tag = tag

    return transition_counts


def get_emission_counts(corpus):
    emission_counts = defaultdict(lambda: 0)

    for word, tag in corpus:
        emission_counts[(tag, word)] += 1

    return emission_counts

In [28]:
import os

transition_counts = get_transition_counts(training_corpus)
emission_counts = get_emission_counts(training_corpus)

print("\n".join([f"{k[0]} -> {k[1]}\t{v}" for k, v in transition_counts.items()]))
print()
print("\n".join([f"{k[0]} -> {k[1]}\t{v}" for k, v in emission_counts.items()]))

(start) -> noun	1
noun -> verb	1
verb -> noun	2
noun -> preposition	1
preposition -> verb	1

noun -> i	1
verb -> kill	1
noun -> them	1
preposition -> to	1
verb -> get	1
noun -> kill	1


In [60]:
import numpy
import pandas


def create_transition_matrix(transition_counts, tags, alpha=0.001):
    transition_matrix = pandas.DataFrame(index=tags, columns=tags)

    previous_tag_count = defaultdict(lambda: 0)
    for (previous_tag, _), count in transition_counts.items():
        previous_tag_count[previous_tag] += count

    for previous_tag in tags:
        for next_tag in tags:
            count = transition_counts[(previous_tag, next_tag)]
            # smoothing
            probability = (count + alpha) / (previous_tag_count[previous_tag] + alpha * len(tags))
            transition_matrix.at[previous_tag, next_tag] = probability

    return transition_matrix


def create_emission_matrix(emission_counts, words, tags, alpha=0.001):
    emission_matrix = pandas.DataFrame(index=tags, columns=words)

    tag_counts = defaultdict(lambda: 0)
    for (tag, _), count in emission_counts.items():
        tag_counts[tag] += count

    for tag in tags:
        for word in words:
            count = emission_counts[(tag, word)]
            # smoothing
            probability = (count + alpha) / (tag_counts[tag] + alpha * len(tags))
            emission_matrix.at[tag, word] = probability

    return emission_matrix

In [75]:
tag_ids = {tag: i for i, tag in enumerate(Tag)}
training_words = list({word for word, _ in training_corpus})

transition_matrix = create_transition_matrix(transition_counts, Tag)
emission_matrix = create_emission_matrix(emission_counts, training_words, Tag)
print(f"# transition matrix\n{transition_matrix}\n\n# emission matrix\n{emission_matrix}")

# transition matrix
          Tag.STRT  Tag.NOUN  Tag.VERB  Tag.PREP  Tag.OTHR
Tag.STRT  0.000995   0.99602  0.000995  0.000995  0.000995
Tag.NOUN  0.000499  0.000499  0.499252  0.499252  0.000499
Tag.VERB  0.000499  0.998005  0.000499  0.000499  0.000499
Tag.PREP  0.000995  0.000995   0.99602  0.000995  0.000995
Tag.OTHR       0.2       0.2       0.2       0.2       0.2

# emission matrix
                to         i      kill      them       get
Tag.STRT       0.2       0.2       0.2       0.2       0.2
Tag.NOUN  0.000333  0.333111  0.333111  0.333111  0.000333
Tag.VERB  0.000499  0.000499  0.499252  0.000499  0.499252
Tag.PREP   0.99602  0.000995  0.000995  0.000995  0.000995
Tag.OTHR       0.2       0.2       0.2       0.2       0.2


## Viterbi Forward

In [172]:
def initialize_best_probability_matrix(transition_counts, emission_counts, transition_matrix, emission_matrix, words):
    best_probability_matrix = pandas.DataFrame(data=0.0, index=transition_matrix.index, columns=words)

    if len(words) < 1:
        return best_probability_matrix

    starting_word = words[0]
    best_probability_matrix.iloc[:, 0] = [
        float("-inf")
        if transition_counts[Tag.STRT, tag] == 0
        else numpy.log(transition_matrix.at[Tag.STRT, tag]) + numpy.log(emission_matrix.at[tag, starting_word])
        for tag in transition_matrix.index
    ]
    return best_probability_matrix


def initialize_best_path_matrix(transition_matrix, words):
    return pandas.DataFrame(index=transition_matrix.index, columns=words, dtype=str)

In [174]:
initialize_best_probability_matrix(
    transition_counts, emission_counts, transition_matrix, emission_matrix, words=["i", "kill"]
)

Unnamed: 0,i,kill
Tag.STRT,-inf,0.0
Tag.NOUN,-1.103266,0.0
Tag.VERB,-inf,0.0
Tag.PREP,-inf,0.0
Tag.OTHR,-inf,0.0


In [175]:
def viterbi_forward(transition_counts, emission_counts, transition_matrix, emission_matrix, words):
    best_probability_matrix = initialize_best_probability_matrix(
        transition_counts, emission_counts, transition_matrix, emission_matrix, words
    )
    best_path_matrix = initialize_best_path_matrix(transition_matrix, words)

    for word_index, word in enumerate(best_probability_matrix.columns[1:], start=1):
        for tag in best_probability_matrix.index:
            best_probability, best_path = float("-inf"), None
            tag_to_word_log = numpy.log(emission_matrix.at[tag, word])

            for previous_tag in best_probability_matrix.index:
                if (
                    probability := best_probability_matrix.loc[previous_tag].iloc[word_index - 1]
                    + numpy.log(transition_matrix.at[previous_tag, tag])
                    + tag_to_word_log
                ) > best_probability:
                    best_probability, best_path = probability, previous_tag

            best_probability_matrix.at[tag, word] = best_probability
            best_path_matrix.at[tag, word] = best_path

    return best_probability_matrix, best_path_matrix

In [195]:
best_probability_matrix, best_path_matrix = viterbi_forward(
    transition_counts, emission_counts, transition_matrix, emission_matrix, words=["i", "get", "to", "kill", "them"]
)
best_probability_matrix, best_path_matrix

(                 i        get         to       kill       them
 Tag.STRT      -inf -10.316103 -11.705392 -14.924268 -18.143144
 Tag.NOUN -1.103266 -16.714698 -10.502585 -12.808659 -11.899850
 Tag.VERB      -inf  -2.492555 -16.318041 -10.798575 -21.106702
 Tag.PREP      -inf  -8.710653 -10.099943 -18.109972 -20.416046
 Tag.OTHR      -inf -10.316103 -11.705392 -14.924268 -18.143144,
             i       get        to      kill      them
 Tag.STRT  NaN  Tag.NOUN  Tag.VERB  Tag.OTHR  Tag.OTHR
 Tag.NOUN  NaN  Tag.NOUN  Tag.VERB  Tag.STRT  Tag.VERB
 Tag.VERB  NaN  Tag.NOUN  Tag.PREP  Tag.PREP  Tag.NOUN
 Tag.PREP  NaN  Tag.NOUN  Tag.VERB  Tag.NOUN  Tag.NOUN
 Tag.OTHR  NaN  Tag.NOUN  Tag.VERB  Tag.OTHR  Tag.OTHR)

## Viterbi Backward

In [218]:
def viterbi_backward(best_probability_matrix, best_path_matrix):
    tag_results = []

    last_word = best_probability_matrix.columns[-1]
    best_last_tag, best_last_prob = None, float("-inf")
    for tag in best_probability_matrix.index:
        if (last_prob := best_probability_matrix.at[tag, last_word]) > best_last_prob:
            best_last_tag, best_last_prob = tag, last_prob
    tag_results.append(best_last_tag)
    tag_results.append(best_path_matrix.at[best_last_tag, last_word])

    for i in range(1, len(best_path_matrix.columns) - 1):
        tag_results.append(best_path_matrix.loc[tag_results[i]].iloc[len(best_path_matrix.columns) - 1 - i])

    return tag_results[::-1]

In [223]:
test_corpus = ["i", "get", "to", "kill", "them"]

best_probability_matrix, best_path_matrix = viterbi_forward(
    transition_counts, emission_counts, transition_matrix, emission_matrix, words=test_corpus
)

list(
    zip(
        test_corpus,
        [item.name for item in viterbi_backward(best_probability_matrix, best_path_matrix)],
    )
)

[('i', 'NOUN'),
 ('get', 'VERB'),
 ('to', 'PREP'),
 ('kill', 'VERB'),
 ('them', 'NOUN')]