In [3900]:
'''
    data pre-processing.
        Define the basic data structures to hold training and test corpora.
        Read in training corpus and build word-index and pos_tag-index mappings
'''

class Corpora():
    """
    The class holding training and test corpora.
    """

    def __init__(self):
        """
        Constructor
        """
        # word to index (0-based integers) mapping
        self.word_index = {}
        # POS-tag to index (0-based integers) mapping
        self.tag_index = {}
        # index to POS-tag mapping: the reverse mapping of the above
        self.index_tag = {}
        # list of sentences, each of which is a list of pairs of integer indices (word_index[w_t], tag_index[tag_t]),
        # where w_t and tag_t are the word and POS tag at the location t of a sentence, respectively.
        self.training_sentences = []
        # list of sentences, each of which is a list of integer indices (word_index[w_t])
        self.test_sentences = []

        self.max_len = 0

    # -------------------------------------------------------------------------
    def read_corpus(self, corpus_path, is_training):
        """
        Read a corpus
        :param corpus_path: path to a file with POS-tagged sentences.
        :param is_training: if true, the file is for the training corpus, otherwise the test corpus
        :return:
        """
        with open(corpus_path, 'r') as f:
            # holding the current sentence
            cur_sentence = []
            self.max_len = 0
            while True:
                # each line is a (word, POS_tag, other_label) tuple.
                line = f.readline()
                if not line:
                    break
                # sentences are delimited by an empty line.
                if len(line) == 1:
                    if is_training:
                        self.training_sentences.append(cur_sentence)
                    else:
                        self.test_sentences.append(cur_sentence)
                    if len(cur_sentence) > self.max_len:
                        self.max_len = len(cur_sentence)
                    cur_sentence = []
                    continue
                w, t, _ = line.strip().split()
                if w not in self.word_index:
                    self.word_index[w] = len(self.word_index)
                if t not in self.tag_index:
                    self.tag_index[t] = len(self.tag_index)
                    self.index_tag[len(self.tag_index) - 1] = t
                cur_sentence.append((self.word_index[w], self.tag_index[t]))

In [3901]:
corpora = Corpora()
corpora.read_corpus('./data/train.txt', is_training=True)
corpora.read_corpus('./data/test.txt', is_training=False)

In [3902]:
corpora.test_sentences[0]

[(19122, 10),
 (1112, 10),
 (938, 10),
 (30, 13),
 (14446, 10),
 (129, 0),
 (105, 21),
 (302, 29),
 (2511, 21),
 (23, 2),
 (3562, 8),
 (459, 0),
 (19123, 16),
 (389, 17),
 (2512, 0),
 (206, 1),
 (3758, 10),
 (163, 10),
 (7, 6),
 (1723, 7),
 (9528, 8),
 (2248, 9),
 (15, 1),
 (3758, 10),
 (30, 13),
 (17909, 18),
 (19124, 9),
 (33, 14)]

In [3903]:
'''
    implement the forward, backward, and Viterbi algorithms.
    Implementation based on: http://www.cs.sjsu.edu/faculty/stamp/RUA/HMM.pdf
'''
import numpy as np
import math

class HMM():
    """
    The HMM class that holds data structures and implementations for the forward, backward, and Viterbi algorithms.
    """

    def __init__(self, corpora):
        # self.corpora = corpora
        self.num_words = len(corpora.word_index)
        self.num_tags = len(corpora.tag_index)
        self.max_sentence_len = corpora.max_len

        # a very small positive number for Laplacian smoothing
        self.eps = 1e-8

        # for HMM parameters obtained from MLE on the POS-tagged corpus
        self.A0 = np.zeros((self.num_tags, self.num_tags)) + self.eps
        self.B0 = np.zeros((self.num_tags, self.num_words)) + self.eps
        self.pi0 = np.zeros((self.num_tags, 1)) + self.eps

        # for HMM parameters estimated iteratively during EM.
        self.A = np.zeros((self.num_tags, self.num_tags)) + self.eps
        self.B = np.zeros((self.num_tags, self.num_words)) + self.eps
        self.pi = np.zeros((self.num_tags, 1)) + self.eps

        # for forward algorithm
        self.alpha = np.zeros((self.num_tags, self.max_sentence_len))
        self.scales = np.zeros((1, self.max_sentence_len))

        # for backward algorithm
        self.beta = np.zeros((self.num_tags, self.max_sentence_len))

        # for Viterbi algorithm
        self.v = np.zeros((self.num_tags, self.max_sentence_len)) #viterbi
        self.back_pointer = np.zeros((self.num_tags, self.max_sentence_len))
        self.pred_seq = np.zeros((1, self.max_sentence_len))

    # -------------------------------------------------------------------------


    def mle(self, training_sentences):
        """
        Use MLE to initialize the HMM parameters A, B, and pi
        :return:
        """
        #A0, B0, and PI0 are intialized the same way as A, B, and PI. (just set them equal to eachother)
        #For A0, B0, and PI0, we can find these using the relative frequency to estimate all 3. 
        #Go through all sentences and see how often a tag happen in the first position, then divide that by the number of sentence
        
        #pi /
        #A /
        #B /
        
        #create list of just tags: /
        training_pos_tags = []
        for sentence in training_sentences:
            new_sentence = []
            for tpl in sentence:
                _ , tag_index = tpl
                new_sentence.append(tag_index)
            training_pos_tags.append(new_sentence)
        #------------------------------------------------------
        
        #initialize B0: /
        for sentence in training_sentences:
            for tpl in sentence:
                word_index, tag_index = tpl
                self.B0[tag_index][word_index] += 1

        for i in range(self.num_tags):
            self.B0[i] /= np.sum(self.B0[i])
            
        #-------------------------------------------------------
        
        #initialize pi0 /
        for sentence in training_sentences:
            _ , tag_index0 = sentence[0]
            self.pi0[tag_index0] += 1
        
        for i in range(len(self.pi0)):
            self.pi0[i] /= len(training_sentences)
        #-------------------------------------------------------
        
        #Initialize A0: /
        for sentence in training_pos_tags:
            bi_gram = list(zip(sentence, sentence[1:]))
            for tpl in bi_gram:
                i1, i2 = tpl
                self.A0[i1][i2] += 1        
        
        for i in range(len(self.A0)):
            self.A0[i] /= np.sum(self.A0[i])
        
        #------------------------------------------------------
        
        self.A = self.A0
        self.B = self.B0
        self.pi = self.pi0
        
        # -------------------------------------------------------------------------
    def forward(self, sentence):
        """
        Run the forward algorithm on a sentence and populate the alpha array.
        :param sentence: a sentence on which the forward algorithm runs. The sentence is typically from the unlabeled set.
        :return: log-likelihood, computed according to the project description using the local normalization factors.
        """
        #the probability of a state at a certain time, given the history of evidence, previous alpha
        #alpha array = 44 x 70, 70 being the max sentence length (shouldn't it be 70 x 44 to make the calculations easier)?
        #state: POS_tags
        #observations: words
        
        #Base Case: t == 0
        for i in range(self.num_tags):
            self.alpha[i][0] = self.pi[i] * self.B[i][sentence[0][0]]
            self.scales[0][0] += self.alpha[i][0]
            
        #Scale Base Case:
        self.scales[0][0] = 1 / self.scales[0][0]
        for i in range(self.num_tags):
            self.alpha[i][0] = self.scales[0][0] * self.alpha[i][0]
            
        #Case: t > 0
        for t in range(1, len(sentence)): 
            for i in range(self.num_tags): 
                for j in range(self.num_tags):
                    self.alpha[i][t] = self.alpha[i][t] + (self.alpha[j][t-1] * self.A[j][i])
                self.alpha[i][t] = self.alpha[i][t] * self.B[i][sentence[t][0]]
                self.scales[0][t] += self.alpha[i][t]
            self.scales[0][t] = 1 / self.scales[0][t]
            for i in range(self.num_tags):
                self.alpha[i][t] = self.scales[0][t] * self.alpha[i][t]
                
        log_p = 0.0
        for t in range(len(sentence)):
            log_p += np.log(self.scales[0][t])
        log_p = log_p * -1
        return log_p
                    
    # -------------------------------------------------------------------------
    def backward(self, sentence):
        """
        Run the backward algorithm on the d-th training sentence and populate the beta array
        :param sentence: a sentence on which the forward algorithm runs. The sentence is typically from the unlabeled set.
        :return:
        """                                      
        for i in range(self.num_tags):
            self.beta[i][len(sentence)-1] = self.scales[0][len(sentence)-1]
        
        for t in range(len(sentence)-2, -1, -1):
            for i in range(self.num_tags):
                for j in range(self.num_tags):
                    self.beta[i][t] = self.beta[i][t] + ((self.A[i][j] * self.B[j][sentence[t+1][0]]) *
                                                        self.beta[j][t+1])
                self.beta[i][t] = self.scales[0][t] * self.beta[i][t]
                                                                  
    # -------------------------------------------------------------------------
    def Viterbi(self, sentence):
        """
        Run the Viterbi algorithm on the d-th training sentence.
        Populate the v, back_point, and pred_seq arrays.
        Note that the v array stores the log of the Viterbi values to avoid underflow.

        :param sentence: a sentence on which the forward algorithm runs. The sentence is typically from the unlabeled set.
        :return: log Pr(best_Q | O)
        """
        for i in range(self.num_tags):
            self.v[i][0] = self.pi[i] * self.B[i][sentence[0][0]]
            self.back_pointer[i][0] = 0
        
        for t in range(1, len(sentence)):
            for i in range(self.num_tags):
                prob = self.v[:, t-1] * self.A[:, i] * self.B[:, sentence[t][0]]
                self.v[i, t] = np.max(prob)
                self.back_pointer[i, t] = np.argmax(prob)
        bestpathprob = np.max(self.v[:, len(sentence)-1])
        bestpathpointer = np.argmax(self.v[:, 0])
        
        for t in range(0, len(sentence)):
            np.log(self.v[:, t])
        
        t = len(sentence)-1
        arr = np.zeros((1, len(sentence)))
        self.pred_seq[0][0] = bestpathpointer
        while t>0:
            self.pred_seq[0][t] = self.back_pointer[bestpathpointer,t]
            j = i
            t -= 1
        np.flip(self.pred_seq)
        
        return np.log(bestpathprob)

In [3904]:
'''
    EM for HMM training
        Use functions in problem1.py and problem2.py to run EM algorithm and train/evaluate HMM.
'''

# -------------------------------------------------------------------------
def em(model, corpora, mu, num_iters = 30):
    """
    The EM algorithm for training HMM.
    :param model: the HMM model to be trained
    :param corpora: training and test corpora
    :param mu: relative importance of MLE
    :param num_iters: number of total EM iterations
    :return:
    """
    
    # initialize model parameters using the POS-tagged sentences
    model.mle(corpora.training_sentences)

    for it in range(num_iters):
        # reset the ksi and gamma matrices and get ready for accumulation of the soft frequencies

        # new_= np.zeros((model.num_tags, 1))
        new_A = np.zeros((model.num_tags, model.num_tags))
        new_B = np.zeros((model.num_tags, model.num_words))
        new_pi = np.zeros((model.num_tags, 1))

        # the E-step: go through the unlabeled corpus and update the ksi & gamma matrices
        log_likelihood = 0
        for i, sentence in enumerate(corpora.test_sentences):
            log_likelihood += em_one_sentence(model, sentence, new_A, new_B, new_pi)
        print(log_likelihood)
        # normalize new_A, new_B, and new_pi
        # update HMM parameters A, B, and pi using the new_A, new_B, and new_pi
        maximization(model, new_A, new_B, new_pi, mu)

        accuracy, log_p = evaluate(model, corpora.test_sentences)
        print(accuracy, log_p)

# -------------------------------------------------------------------------
def em_one_sentence(model, sentence, new_A, new_B, new_pi):
    """
    Run the expectation step using the fixed HMM model on a single sentence.
    The soft fraquencies are accumulated into the last four matrices.
    :param model: the current fixed HMM
    :param sentence: a sentence from test set, presumably unlabeled.
    :param new_A: sum of transition soft frequencies (tag_i -> tag_j)
    :param new_B: sum of emission soft frequencies (tag_i -> word_o)
    :param new_pi: sum of starting soft frequencies (tag_i)
    :return: log likelihood obtained from the forward algorithm
    """
#     log_p = 0.0
#     for t in range(len(sentence)):
#         log_p += np.log(model.scales[0][t])
#     log_p = log_p * -1
    
#     new_A = np.zeros((model.num_tags, model.num_tags))
#     new_B = np.zeros((model.num_tags, model.num_words))
#     new_pi = np.zeros((model.num_tags, 1))
        
    #compute gamma and digamma
    gamma = np.zeros((model.num_tags, len(sentence)))
    digamma = np.zeros((model.num_tags, model.num_tags, len(sentence)))
    
    for t in range(len(sentence)-1):
        denom = 0
        for i in range(model.num_tags):
            for j in range(model.num_tags):
                denom += model.alpha[i][t] * model.A[i][j] * model.B[j][sentence[t+1][0]] * model.beta[j][t+1]
        for i in range(model.num_tags):
            for j in range(model.num_tags):
                digamma[i][j][t] = (model.alpha[i][t] * model.A[i][j] * model.B[j][sentence[t+1][0]] * model.beta[j][t+1]) / denom
                gamma[i][t] += digamma[i][j][t]
        
        #special case for gammaT-1(i)
        denom = 0
        for i in range(model.num_tags):
            denom += model.alpha[i][len(sentence)-1]
        for i in range(model.num_tags):
            gamma[i][len(sentence)-1] = model.alpha[i][len(sentence)-1] / denom
    
    #new_pi
    for i in range(model.num_tags):
        new_pi[i] = gamma[i][0]
    
    #seem to not get new_A (gamma and digamma seem to be right compared to yours)
    #new_A
    for i in range(model.num_tags):
        for j in range(model.num_tags):
            numer = 0
            denom = 0
            for t in range(len(sentence)-1):
                numer += digamma[i][j][t]
                denom += gamma[i][t]
            new_A[i][j] = numer/denom
    
    #new_B
    for i in range(model.num_tags):
        for j in range(model.num_words):
            numer = 0
            denom = 0
            for t in range(len(sentence)):
                if(sentence[t][0] == j):
                    numer += gamma[i][t]
                denom += gamma[i][t]
            new_B[i][j] = numer/denom
    return model.forward(sentence)
            
    
# -------------------------------------------------------------------------
def maximization(model, new_A, new_B, new_pi, mu):
    """
    Update HMM parameters A, B, and pi
    :param model:
    :param new_A:
    :param new_B:
    :param new_pi:
    :param mu: a real number between [0, 1]. Relative importance of the MLE
    :return:
    """
    sum_B = np.sum(new_B, axis=1)
    for i in range(model.num_tags):
        new_B[i]=new_B[i]/sum_B[i]
        
    model.A = mu * model.A + (1 - mu) * new_A
    model.B = mu * model.B + (1-mu) * new_B
    model.pi = mu * model.pi + (1-mu) * new_pi

# -------------------------------------------------------------------------
def evaluate(model, test_corpus):
    """
    Use Viterbi algorithm to predict POS tags for the sentences in the test_corpus.
    :param model: an HMM
    :param test_corpus: list of POS-tagged sentences
    :return: accuracy of the prediction, defined as the percentage of tags that are predicted correctly.
    """
    correct = 0.0
    total = 0.0
    total_log_p = 0
    for sentence in test_corpus:
        max_log_p = model.Viterbi(sentence)
        total_log_p += max_log_p
        for t, (o, q) in enumerate(sentence):
            if model.pred_seq[0, t] == q:
                correct += 1.0
        total += len(sentence)
    return correct / total, total_log_p

In [3905]:
model = HMM(corpora)

In [3906]:
model.mle(corpora.training_sentences)

In [3907]:
log_p = model.forward(corpora.test_sentences[0]) #different if you run this cell again, so don't

In [3908]:
model.backward(corpora.test_sentences[0])

In [3909]:
max_log_p = model.Viterbi(corpora.test_sentences[0])
print(max_log_p)

-252.44455993227464


In [3910]:
model.pred_seq

array([[42., 10., 10., 13.,  0.,  0., 21., 29., 21.,  2.,  8.,  0.,  0.,
        17.,  0.,  1., 10., 10.,  6.,  7.,  8.,  9.,  1., 10., 13.,  0.,
         0., 14.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
         0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
         0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
         0.,  0.,  0.,  0.,  0.]])

In [3911]:
new_A = np.zeros((model.num_tags, model.num_tags))
new_B = np.zeros((model.num_tags, model.num_words))
new_pi = np.zeros((model.num_tags, 1))
em_one_sentence(model, corpora.test_sentences[0], new_A, new_B, new_pi)

248.86971046348873

In [3912]:
maximization(model, new_A, new_B, new_pi, 0.1)

In [3913]:
model.A[:4,0]

array([0.0119059 , 0.01115603, 0.04845907, 0.00395955])

In [3914]:
model.B[:4, 7]

array([3.82313408e-14, 4.49358146e-14, 5.48944554e-14, 6.71832682e-13])

In [3915]:
model.pi[:4]

array([[0.00483173],
       [0.02939046],
       [0.04863724],
       [0.00072883]])