In [9]:
class Preprocessor:
    def setenceToTokens(self, sentence):
        return sentence.split()
    def tokenToWordAndTag(self, token):
    #     print(token)
        word, tag = token.split('/')
        return (word, tag)
    def tokensToPairs(self, tokens):
        return [tokenToWordAndTag(token) for token in tokens]
    def tokensToWordsAndTags(self, tokens):
        x, y = [], []
        for token in tokens:
            word, tag = tokenToWordAndTag(token)
            x.append(word)
            y.append(tag)
        return x, y
    def process(self, data):
        """
        data: list of senteces
        """
        x, y = [], []
        for sentence in data:
            words, tags = tokensToWordsAndTags(setenceToTokens(sentence))
            x.append(words)
            y.append(tags)
        return x, y
    def setUnknown(self, x, vocab):
        # Set words not in vocab to unknown
        for words in x:
            for i in range(len(words)):
                if words[i] not in vocab:
                    words[i] = '<UNK>'
        return x
    def setLowFreqToUnknown(self, x, occurences = 2):
        count = Counter()
        vocab = set()
        for words in x:
            for word in words:
                count[word] += 1
        for words in x:
            for i, word in enumerate(words):
                if count[word] < occurences:
                    words[i] = '<UNK>'
                    count['<UNK>'] += 1
#                 vocab.add(words[i])
        return x

## POS tagging
### (a) Preprocessing

In [10]:
# Load packages
from collections import Counter

# Load data
trn = open("data/trn.pos").read().strip().split("\n")
print("Training data ...")
print("{}".format(len(trn)))
processor = Preprocessor()
trainX, trainY = processor.process(trn)
trainX = processor.setLowFreqToUnknown(trainX)

Training data ...
54097


### (b)  Estimating transition probability

In [14]:
transitionCount = Counter()
for words, tags in zip(trainX, trainY):
    transitionCount[('<start>',tags[0])] += 1
    transitionCount[(tags[-1], '<end>')] += 1
    for i in range(1, len(tags)):
        transitionCount[(tags[i-1], tags[i])] += 1
tagCount = Counter()
stateSpace = set()
vocab = set()
for words, tags in zip(trainX, trainY):
    for word, tag in zip(words, tags):
        tagCount[tag] += 1
        stateSpace.add(tag)
        vocab.add(word)
stateSpace = list(stateSpace)
tagCount['<start>'] = len(trainY)
tagCount['<end>'] = len(trainY)
# print(transitionCount)

In [15]:
def calculateTransitionProb(transitionCount, stateSpace, alpha = 1):
    transitionProb = {}
    for tag1 in stateSpace+['<start>']:
        for tag2 in stateSpace+['<end>']:
            transitionProb[(tag1, tag2)] = (transitionCount[(tag1, tag2)] + alpha) / (tagCount[tag1] + alpha*len(stateSpace))
    return transitionProb

In [17]:
transitionProb = calculateTransitionProb(transitionCount, stateSpace, 1)
# print(transitionProb)

In [18]:
testSum = 0
for key, value in transitionProb.items():
    if key[0] == 'C':
        testSum += value
print(testSum)

1.0000076051410753


### (c) Estimating the emission probability

In [19]:
emissionCount = Counter()
beta = 1
for words, tags in zip(trainX, trainY):
    for word, tag in zip(words, tags):
        emissionCount[(tag, word)] += 1
def calculateEmissionProb(emissionCount, stateSpace, beta):    
    emissionProb = {}
    for tag in stateSpace:
#         print(tag)
        for word in vocab:
            emissionProb[(tag, word)] = (emissionCount[(tag, word)] + beta)/(tagCount[tag] + beta*len(vocab))
    return emissionProb
emissionProb = calculateEmissionProb(emissionCount, stateSpace, beta)

In [21]:
testSum = 0
for key, value in emissionProb.items():
    if key[0] == 'N':
        testSum += value
print(testSum)

0.999999999999954


### (d) The Viterbi Algorithm

In [22]:
# Convert to log probabilities
from math import log
def convertToLog(prob):
    for k in prob:
        prob[k] = log(prob[k])
    return prob
logTransitionProb = convertToLog(transitionProb)
logEmissionProb = convertToLog(emissionProb)

In [23]:
import numpy as np
def viterbi(x, logTransitionProb, logEmissionProb):
    """
    Return the MAP estimate of y.
    Parameters
    ----------
    x : array (T,)
        Observation state sequence. int dtype.
    logTransitionProb : dictionary
        State transition map.
        details.
    logEmissionProb : dictionary
        Emission map.

    Returns
    -------
    y : array (T,)
        Maximum a posteriori probability estimate of hidden state trajectory.
    """
    # Cardinality of the state space
    K = len(stateSpace)
    T = len(x)
    maxProb = np.empty((T, K), np.float64)
    bestPrev = np.empty((T, K), dtype=object)

    # Initilaize the tracking tables from first observation
    for i, tag in enumerate(stateSpace):
        maxProb[0, i] = logTransitionProb[('<start>', tag)] + logEmissionProb[(tag, x[0])]
    bestPrev[0, :] = '<start>'
    for i in range(1, len(x)):
        for j, tag in enumerate(stateSpace):
            bestPrevProb, bestPrevTag = float('-inf'), None
            for k, prevTag in enumerate(stateSpace):
                temp = maxProb[i-1, k] + logTransitionProb[(prevTag, tag)] + logEmissionProb[(tag, x[i])]
                if temp > bestPrevProb:
                    bestPrevProb = temp
                    bestPrevTag = prevTag
            maxProb[i, j] = bestPrevProb
            bestPrev[i, j] = bestPrevTag
    bestLastProb, betLastTag = float('-inf'), None
    for i, prevTag in enumerate(stateSpace):
        temp = maxProb[-1, i] + logTransitionProb[(prevTag, '<end>')]
        if temp > bestLastProb:
            bestLastProb = temp
            bestLastTag = prevTag
    y = [bestLastTag]
    for i in reversed(range(1, T)):
#         print(i)
        y.append(bestPrev[i, stateSpace.index(bestLastTag)])
        bestLastTag = y[-1]
    return y[::-1]

In [24]:
def calculateAccuracy(x, y, logTransitionProb, logEmissionProb):
    correct, total = 0, 0
    for words, tags in zip(x, y):
        predicted_tags = viterbi(words, logTransitionProb, logEmissionProb)
        for predicted_tag, tag in zip(predicted_tags, tags):
            total += 1
            if predicted_tag == tag:
                correct += 1
    return correct/total

In [25]:
## load dev
dev = open("data/dev.pos").read().strip().split("\n")
x, y = process(dev)
x = setUnknown(x, vocab)
# print(x[:3])
# print(y[:3])
print('Accuracy: {}'.format(calculateAccuracy(x, y, logTransitionProb, logEmissionProb)))

Accuracy: 0.9474509398528627


### (e) Tuning $\alpha$ and $\beta$

In [26]:
for alpha in [1,100, 200 , 500, 1000, 2000, 3000, 5000, 10000, 20000]:
    transitionProb = calculateTransitionProb(transitionCount, stateSpace, alpha)
    logTransitionProb = convertToLog(transitionProb)
    print('alpha={}, accuracy={}'.format(alpha, calculateAccuracy(x, y, logTransitionProb, logEmissionProb)))

alpha=1, accuracy=0.9474509398528627
alpha=100, accuracy=0.9478213676538998
alpha=200, accuracy=0.9480195034544546
alpha=500, accuracy=0.9482865560552024
alpha=1000, accuracy=0.9489671094571079
alpha=2000, accuracy=0.9497941110594235
alpha=3000, accuracy=0.9498199548594959
alpha=5000, accuracy=0.9502248410606295
alpha=10000, accuracy=0.9496735066590858
alpha=20000, accuracy=0.9472355748522596


In [27]:
transitionProb = calculateTransitionProb(transitionCount, stateSpace, 5000)
logTransitionProb = convertToLog(transitionProb)
for beta in [1, 2, 3, 4, 5]:
    emissionProb = calculateEmissionProb(emissionCount, stateSpace, beta)
    logEmissionProb = convertToLog(emissionProb)
    print('beta={}, accuracy={}'.format(beta, calculateAccuracy(x, y, logTransitionProb, logEmissionProb)))

beta=1, accuracy=0.9502248410606295
beta=2, accuracy=0.9462448958494857
beta=3, accuracy=0.9422563360383177
beta=4, accuracy=0.9388794128288623
beta=5, accuracy=0.9361485846212161


### (f) Testing

In [28]:
tst = open("data/tst.pos").read().strip().split("\n")
x, y = process(tst)
x = setUnknown(x, vocab)
# alpha = 5000
transitionProb = calculateTransitionProb(transitionCount, stateSpace, 5000)
logTransitionProb = convertToLog(transitionProb)
# beta = 1
emissionProb = calculateEmissionProb(emissionCount, stateSpace, 1)
logEmissionProb = convertToLog(emissionProb)
print('Accuracy on testing data: {}'.format(calculateAccuracy(x, y, logTransitionProb, logEmissionProb)))

Accuracy on testing data: 0.9515465680318943
