# Part-of-Speech Tagging (POS)

## Using Hidden Markov Model (HMM) and Viterbi Algorithm

In [1]:
import numpy as np
import pandas as pd
import string

from collections import Counter
from math import log
from tqdm.auto import tqdm

In [2]:
def read_file(path, lower):
    data_x, data_y = [], []

    with open(path, 'r') as file:
        lines = file.read().splitlines()

    words, tags = ["--n--"], ["--s--"]
    for line in lines:
        if line.split():
            word, tag = line.split()
            word = word.strip()

            words.append(word.lower() if lower else word)
            tags.append(tag.strip())

        else:
            data_x.append(words)
            data_y.append(tags)
            words, tags = ["--n--"], ["--s--"]

    return data_x, data_y

In [3]:
punct = set(string.punctuation)

noun_suffix = [ "action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood", "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry", "scape", "ship", "ty" ]
verb_suffix = [ "ate", "ify", "ise", "ize" ]
adj_suffix  = [ "able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive", "less", "ly", "ous" ]
adv_suffix  = [ "ward", "wards", "wise" ]

def assign_unk(word):
    if any(char.isdigit() for char in word):
        return "--unk_digit--"
    elif any(char in punct for char in word):
        return "--unk_punct--"
    elif any(char.isupper() for char in word):
        return "--unk_upper--"
    elif any(word.endswith(suffix) for suffix in noun_suffix):
        return "--unk_noun--"
    elif any(word.endswith(suffix) for suffix in verb_suffix):
        return "--unk_verb--"
    elif any(word.endswith(suffix) for suffix in adj_suffix):
        return "--unk_adj--"
    elif any(word.endswith(suffix) for suffix in adv_suffix):
        return "--unk_adv--"
    return "--unk--"

def oov_to_unk(words, vocab):
    return [ word if word in vocab else assign_unk(word) for word in words ]

In [4]:
def read_corpus(train_path, test_path, vocab_size, lower):
    train_x, train_y = read_file(train_path, lower=lower)
    test_x, test_y = read_file(test_path, lower=lower)

    # get counts for all words in the train set
    vocab = Counter()
    for words in tqdm(train_x):
        for word in words:
            vocab[word] = vocab[word] + 1

    # grab `vocab_size` most common words
    vocab = { word for word, _ in vocab.most_common(vocab_size) }

    # replace words not in the vocab with `--unk--...`
    train_x = [ oov_to_unk(words, vocab) for words in train_x ]
    test_x = [ oov_to_unk(words, vocab) for words in test_x ]

    # re-create vocab based on the updated train set
    vocab = sorted([*{ word for words in train_x for word in words }])

    return train_x, train_y, test_x, test_y, vocab

In [5]:
def compute_matrices(data_x, data_y, vocab, alpha=1e-4):

    # compute counts
    transition_count, emission_count, tag_count = {}, {}, {}
    for words, tags in tqdm(zip(data_x, data_y), total=len(data_x)):

        prev = None
        for word, tag in zip(words, tags):

            if prev is not None:
                transition_count[(prev, tag)] = transition_count.get((prev, tag), 0) + 1
                emission_count[(tag, word)] = emission_count.get((tag, word), 0) + 1
            
            tag_count[tag] = tag_count.get(tag, 0) + 1
            prev = tag

    # compute transition matrix
    tags = [ *tag_count.keys() ]
    n_tags = len(tags)

    A = np.zeros((n_tags, n_tags))
    for i, tag_from in enumerate(tags):
        total_count = tag_count[tag_from] + alpha * n_tags

        for j, tag_to in enumerate(tags):
            tran = (tag_from, tag_to)
            count = transition_count.get(tran, 0)

            A[i, j] = (count + alpha) / total_count

    # compute emission matrix
    n_words = len(vocab)

    B = np.zeros((n_tags, n_words))
    for i, tag_from in enumerate(tags):
        total_count = tag_count[tag_from] + alpha * n_words

        for j, word_to in enumerate(vocab):
            emis = (tag_from, word_to)
            count = emission_count.get(emis, 0)

            B[i, j] = (count + alpha) / total_count

    return A, B, tags

In [6]:
def viterbi(words, A, B, tag_map, vocab_map):
    n_words = len(words)
    n_tags = len(tag_map)
    s_idx = tag_map["--s--"]

    C = np.zeros((n_tags, n_words))
    D = np.zeros((n_tags, n_words), dtype=np.int)

    for i, word in enumerate(words):
        # init step
        if i == 0:
            for tag, j in tag_map.items():
                if A[s_idx, j] == 0: C[j, i] = -np.inf
                else: C[j, i] = log(A[s_idx, j]) + log(B[j, vocab_map[word]])

        # forward steps
        else:
            for tag, j in tag_map.items():
                best_prob = -np.inf
                best_path = None

                for tag_prev, k in tag_map.items():
                    prob = C[k, i-1] + log(A[k, j]) + log(B[j, vocab_map[word]])

                    if prob > best_prob:
                        best_prob = prob
                        best_path = k

                C[j, i] = best_prob
                D[j, i] = best_path

    # backward steps
    preds = [ C[:, -1].argmax() ]
    for i in reversed(range(n_words)): preds.insert(0, D[preds[0], i])

    return preds

In [7]:
def predict(data_x, A, B, tags, vocab):
    tag_map = { tag: idx for idx, tag in enumerate(tags) }
    vocab_map = { word: idx for idx, word in enumerate(vocab) }

    pred_y = []
    for words in tqdm(data_x):
        preds = viterbi(words[1:], A, B, tag_map, vocab_map)
        pred_y.append([ tags[pred] for pred in preds ])

    return pred_y

def calc_accuracy(true_y, pred_y):
    correct, total = 0, 0

    for words, preds in zip(true_y, pred_y):
        total += len(words)
        correct += np.sum([ word == pred for word, pred in zip(words, preds) ])

    return correct / total

## Train parameters

In [8]:
train_path = "WSJ_02-21.pos"
test_path = "WSJ_24.pos"

vocab_size = 20000
to_lower=False

In [9]:
train_x, train_y, test_x, test_y, vocab = read_corpus(train_path, test_path, vocab_size, to_lower)

print("\ntrain set size:", len(train_x))
print("test set size:", len(test_x))

print("\nvocab size:", len(vocab))
print("vocab:", vocab[:20], "...", vocab[-20:])

100%|██████████| 39832/39832 [00:00<00:00, 92514.41it/s]

train set size: 39832
test set size: 1346

vocab size: 20008
vocab: ['!', '#', '$', '%', '&', "'", "''", "'60s", "'70s", "'80s", "'86", "'90s", "'S", "'d", "'em", "'ll", "'m", "'n'", "'re", "'s"] ... ['young', 'younger', 'youngest', 'youngsters', 'your', 'yourself', 'youth', 'youthful', 'yuppie', 'yuppies', 'zero', 'zero-coupon', 'zeros', 'zinc', 'zip', 'zone', 'zones', 'zoning', '{', '}']


In [10]:
# compute transmission (A) and emission (B) matrices for Viterbi algorithm
A, B, tags = compute_matrices(train_x, train_y, vocab)

print("\nA shape:", A.shape)
print("B shape:", B.shape)

print("\nnumber of tags:", len(tags))
print("tags:", tags)

100%|██████████| 39832/39832 [00:01<00:00, 32225.17it/s]

A shape: (46, 46)
B shape: (46, 20008)

number of tags: 46
tags: ['--s--', 'IN', 'DT', 'NNP', 'CD', 'NN', '``', "''", 'POS', '(', 'VBN', 'NNS', 'VBP', ',', 'CC', ')', 'VBD', 'RB', 'TO', '.', 'VBZ', 'NNPS', 'PRP', 'PRP$', 'VB', 'JJ', 'MD', 'VBG', 'RBR', ':', 'WP', 'WDT', 'JJR', 'PDT', 'RBS', 'WRB', 'JJS', '$', 'RP', 'FW', 'EX', 'SYM', '#', 'LS', 'UH', 'WP$']


In [11]:
pred_y = predict(test_x, A, B, tags, vocab)
acc = calc_accuracy(test_y, pred_y)

print(f"\naccuracy: {acc:.4f}")

100%|██████████| 1346/1346 [02:03<00:00, 10.91it/s]
accuracy: 0.9550

