In [1]:
# modules
import pandas as pd
import numpy as np
from tqdm import tqdm
import ast

**Definition of HMM**

In [2]:
# Reference: D. Jurafsky, J. H. Martin, "Speech and Language Processing: 3rd Edition. Chapter A: Hidden Markov Models", 7th Jan 2023
# Hyperlink: https://web.stanford.edu/~jurafsky/slp3/

class HMM:
    
    # The arguments are the training-data and the no. of data to use for training
    def __init__(self, corpus, m):
        
        # Pre-process the training-corpus using Stemming & ReGex
        self.corpus = self.preprocess(corpus[:m])

        # Creates two sets, States & Vocabulary
        self.Q, self.V = self.analyze()
        
        self.N = len( self.Q )
        self.D = len( self.V )
        
        # Create hashtables, to retrieve the index of a token (or tag) in time O(1)
        self.create_hashtables()
        
        # Intializes the model with the training-corpus
        self.initialize_model()
  
    # Trains the model
    def fit(self):
        
        #
        self.log_start()
        # Stores the log-likehood per epoch; it is calculated during training
        self.costs = []
        # An Expectation-Maximization algorithm
        self.Baum_Welch_algorithm()
        #
        self.log_end()

    """
    1. Input: A dictionary of untagged-sentences
    2. Output: A list of tagged-sentences
    Here each sentence is a list of tokens
    """
    def predict(self, test_data):

        # Pre-process using Stemming & ReGex
        self.test_data = self.preprocess_2(test_data)
        
        # Generates predictions
        ans = []
        print("Generating predictions:")
        for key, sentence in tqdm(self.test_data):
            
            self.viterbi_algorithm(sentence)
            ans.append([key, list( zip(test_data[key], self.bestpath) )])

        return ans
    
    # Wrapper function to initialize the model
    def initialize_model(self):
        
        #
        self.A, self.B, self.Pi = self.initialize()
        # Normalizes the vectors A[i, :], B[:, j], Pi[:]
        self.rectify()
    
    #Reads the training-corpus to create two sets, one each for states & vocabulary
    def analyze(self):

        # Read the possible states and tokens
        vocabulary = set()
        states = set()

        print("Initializing Q, V:")
        for sentence in tqdm(self.corpus):
            for token, tag in sentence:
                states.add(tag)
                vocabulary.add(token)

        # Any Out-Of-Vocabulary token (in the testing data) shall be replaced by the token"UNK"
        vocabulary.add("UNK")

        v = list(vocabulary)
        q = list(states)
        v.sort()
        q.sort()

        return q, v
    
    
    # Creates hashtables to map token & tags to their indicies in the lists self.V & self.Q
    def create_hashtables(self):
        
        self.idx_to_token = dict(enumerate(self.V))
        
        self.token_to_idx = {}
        for idx, tok in self.idx_to_token.items():
            self.token_to_idx[tok] = idx
        
        self.idx_to_tag = dict(enumerate(self.Q))
        
        self.tag_to_idx = {}
        for idx, tag in self.idx_to_tag.items():
            self.tag_to_idx[tag] = idx

    """
    The functions 'log_start' & 'log_end shall' print the change in
    the model's parameters, i.e., A, B, Pi, before & after training
    """
    def log_start(self):
        
        self.AA  = np.copy(self.A)
        self.BB  = np.copy(self.B)
        self.Pii = np.copy(self.Pi)
        
    def log_end(self):
        
        delta_A  = np.linalg.norm( self.A - self.AA )   / np.linalg.norm(self.AA)
        delta_B  = np.linalg.norm( self.B - self.BB )   / np.linalg.norm(self.BB)
        delta_pi = np.linalg.norm( self.Pi - self.Pii ) / np.linalg.norm(self.Pii)
        
        print("\nAfter training,")
        print(f"Change in A: {round(100*delta_A)} %")
        print(f"Change in B: {round(100*delta_B)} %")
        print(f"Change in Pi: {round(100*delta_pi)} %")
        
    # Normalizes the vectors A[i, :], B[:, j], Pi[:]
    def rectify(self):

        den = np.sum(self.Pi)
        if den:
            self.Pi /= den

        for i in range(self.N):
            den = np.sum(self.A[i])
            if den:
                self.A[i] /= den

        for i in range(self.N):
            den = np.sum(self.B[:, i])
            if den:
                self.B[:, i] /= den
                
    """
    1. Input 'sent' must be a tagged-sentence, i.e., a list of tuples, of the form (token, tag)
    2. Output: A prediction
    """
    def predict_for_sentence(self, sentence):

        # Pre-process using Stemming & ReGex
        tmp = self.preprocess([sentence])[0]
        
        # Extracts the tokens from the tagged-sentence
        sent = []
        for tok, tag in tmp:
            sent.append(tok)

        # Predicts the tags
        self.viterbi_algorithm(sent)

        #
        print("Sentence:", end= " ")
        for tok, tag in sentence:
            print(tok, end= " ")

        print("\nTarget:", end= " ")
        for tok, tag in sentence:
            print(tag, end= "->")
        print("END")

        print("\nPrediction:", end= " ")
        for tag in self.bestpath:
            print(tag, end= "->")
        print("END")
        
        val = self.best_score
        if val > 0:
            val = round( np.log(val), 3)
        else:
            val = float('inf')
        
        print("\nLog-Likelihood", val)
        
    """
    Pre-process the training-Corpus (i.e., tagged sentences) using Stemming & ReGex
    The argument 'corpus' is a list of lists
    """
    def preprocess(self, corpus):

        # modules
        import re
        from nltk.stem.snowball import SnowballStemmer

        stmr = SnowballStemmer(language= "english")
        sp = self.regex_strings()

        print("Preprocessing with ReGex & Stemming:")
        data = []
        for sentence in tqdm(corpus):
            tmp = []
            for token, tag in sentence:
                a = re.sub(sp[0], "(", token)
                b = re.sub(sp[1], ")", a)
                c = re.sub(sp[2], "-", b)
                d = re.sub(sp[3], "*", c)
                e = re.sub(sp[4], "3", d)
                f = re.sub(sp[5], "3", e)
                g = re.sub("-3", "3", f)
                h = stmr.stem(g)
                tmp.append( (h, tag) )
            data.append(tmp)

        del re, SnowballStemmer, stmr
        return data

    """
    Pre-process the testing-Corpus (i.e., untagged sentences) using Stemming & ReGex
    The argument 'test_data' is a dictionary
    """
    def preprocess_2(self, test_data):
        
        # modules
        import re
        from nltk.stem.snowball import SnowballStemmer

        stmr = SnowballStemmer(language= "english")
        sp = self.regex_strings()
        
        print("Preprocessing with ReGex & Stemming:")
        data = []
        for key, sentence in tqdm(test_data.items()):
            tmp = []
            for token in sentence:
                a = re.sub(sp[0], "(", token)
                b = re.sub(sp[1], ")", a)
                c = re.sub(sp[2], "-", b)
                d = re.sub(sp[3], "*", c)
                e = re.sub(sp[4], "3", d)
                f = re.sub(sp[5], "3", e)
                g = re.sub("-3", "3", f)
                h = stmr.stem(g)
                tmp.append( h )
            data.append((key, tmp))

        del re, SnowballStemmer, stmr
        return data

    # This function stores the search-patterns, which shall be used to pre-process the corpora
    def regex_strings(self):

        # Search patterns
        s1 = r"[\[({]"
        s2 = r"[\])}]"
        s3 = r"[-]+"
        s4 = r"[*]+[-]?[a-z]+[-]?[a-z]*"
        s5 = r"[$]?[.]?[']?[0-9]+[*''/,-.%$:0-9]*"
        s6 = r"3[\[][a-z][\]][\[]?[3]?[\]]?"
        #s7 = r"[!?;]+"
        
        return [s1, s2, s3, s4, s5, s6] #, s7
 
    # Initializes the tensors A, B, pi
    def initialize(self):

        # 'ls' is used for (Laplace-)Smoothing
        ls = 0.1
        
        A  = np.full((self.N, self.N), ls)
        B  = np.full((self.D, self.N), ls)
        Pi = np.full((self.N), ls)

        #
        print("Initializing A, B, Pi")
        for sentence in tqdm(self.corpus):

            # Updates Pi
            tag = sentence[0][1]
            Pi[self.Q.index(tag)] += 1

            # Updates A, B
            T_1 = len(sentence) -1
            for t in range(T_1):

                token, tag = sentence[t]
                next_tag = sentence[t+1][1]

                i = self.tag_to_idx[tag]
                j = self.tag_to_idx[next_tag]
                k = self.token_to_idx[token]

                A[i, j] += 1
                B[k, i] += 1

            # Edge Case for matrix B: This is the last tuple in the sentence
            token, tag = sentence[-1]
            i = self.tag_to_idx[tag]
            k = self.token_to_idx[token]
            B[k, i] += 1

        # Converts the frequencies to probabilities
        x = self.N * ls
        den = np.sum(Pi) + x
        Pi /= den

        for i in range(self.N):
            den = np.sum(A[i]) + x
            A[i] = A[i] / den

        for i in range(self.N):
            den = np.sum(B[:, i]) + x
            B[:, i] /= den

        return A, B, Pi

    # Calculates the Forward probabilities
    def forward_algorithm(self, sentence):

        # Creates the alpha-matrix
        T = len(sentence)
        self.alpha = np.zeros((T, self.N))

        # Initialization
        v = self.token_to_idx[sentence[0][0]]
        np.multiply(self.B[v], self.Pi, out= self.alpha[0])

        # Recursion: To build the forward-trellis 'alpha'
        for t in range(1, T):
            v = self.token_to_idx[sentence[t][0]]
            tmp = np.matmul(self.alpha[t-1], self.A)
            np.multiply( self.B[v], tmp, out= self.alpha[t] )

        # Termination
        self.forward_probabilty = np.sum(self.alpha[T-1])


    # Calculates the Backward probabilities
    def backward_algorithm(self, sentence):

        # Creates the beta-matrix
        T = len(sentence)
        self.beta = np.zeros((T, self.N))

        # Initialization
        self.beta[T-1] = 1

        # Recursion: To build the backward-trellis 'beta'
        for t in range(T-2, -1, -1):
            t1 = t+1
            v = self.token_to_idx[sentence[t1][0]]
            tmp = np.multiply(self.B[v], self.beta[t1])
            np.matmul(tmp, self.A.T, out= self.beta[t])

        # Termination : The following is an unncessary computation
        """
        v = self.token_to_idx(sentence[0][0])
        self.backward_probability =  np.mul( self.pi, np.multiply(self.B[v, :], self.beta[0, :]))
        """

    # Calculates the log-likelhood of the input 'sentence' & predicts the state-sequence
    def viterbi_algorithm(self, sentence):

        # Creates the Viterbi & backtrace matrices
        T = len(sentence)
        self.viterbi   = np.zeros((T, self.N))
        self.backtrace = np.zeros((T, self.N))
        
        # 1. Initialization
        
        # If an OOV token occurs (in the testing-corpus), then re-assign it the token 'UNK'
        try:
            v = self.token_to_idx[sentence[0]]
        except KeyError:
            v = self.token_to_idx['UNK']
        
        np.multiply( self.B[v], self.Pi, out= self.viterbi[0] )
        self.backtrace[0, :] = np.argmax(self.viterbi[0])

        # 2. Recursion: To build the Viterbi-trellis 'viterbi'
        for t in range(1, T):
            try:
                v = self.token_to_idx[sentence[t]]
            except KeyError:
                v = self.token_to_idx['UNK']
                
            t_1 = t-1
            for j in range(self.N):
                tmp = np.multiply(self.viterbi[t_1, :], self.A[:,j])
                self.viterbi[t, j]   = np.max(tmp) * self.B[v, j]
                self.backtrace[t, j] = np.argmax(tmp)

        # 3. Termination
        self.best_score = np.max(self.viterbi[T-1])

        # Backtracing
        indices = [ np.argmax(self.viterbi[T-1]) ]
        for t in range(T-2, -1, -1):
            indices.append( np.argmax(self.viterbi[t]) )
        indices.reverse()
        
        path = []  
        for idx in indices:
            path.append(self.Q[idx])
        
        self.bestpath = path
    
    """
    Performs Expectation-Maximization to find [locally-] optimal values for
    the Transition & Emission matrices, namely A & B
    """
    def Baum_Welch_algorithm(self):

        # 1. Initialize A, B, pi : The following step has already been perform in "self.fit()"
        #self.A, self.B, self.Pi = self.initialize()

        # 2. Iterate until convergence
        epochs = 2
        itr = 0
        n = len(self.corpus)

        while itr < epochs:

            # Stores the forward-probabilities of a batch
            batch = []

            # These matrices shall accumulate the A, B, Pi matrices updated by each sentence
            a  = np.zeros((self.N, self.N))
            b  = np.zeros((self.D, self.N))
            pi = np.zeros((self.N))

            # Performs full-batch training
            for sentence in tqdm(self.corpus):

                T = len(sentence)
                
                # These matrices are temporarily required
                xi     = np.zeros((self.N, self.N))
                xi_num = np.zeros((T-1, self.N, self.N))
                gamma_num  = np.zeros((T, self.N))

                # E-step
                self.forward_algorithm(sentence)
                self.backward_algorithm(sentence)

                val = self.forward_probabilty
                if val > 0:
                    batch.append( np.log(val) )
                else:
                    # INT_MIN = 10**-323
                    batch.append( -323 )

                # M-step
                
                # Calculates Gamma's numerator
                np.multiply(self.alpha, self.beta, out= gamma_num)

                # Calculates 'b'
                for i, v in enumerate(self.V):
                    indices = []
                    for t, tup in enumerate(sentence):
                        if tup[0] == v:
                            indices.append(t)
                    
                    num = np.sum(gamma_num[indices, :], axis=0)
                    den = np.sum(gamma_num[:, :], axis=0)
                    
                    den[ den == 0 ] = 1
                    b[i, :] += num / den

                # Calculates Pi
                if val > 0:
                    pi += gamma_num[0, :] / val

                # Calculates Xi's numerator
                for t in range(T-1):
                    t1 = t + 1
                    v = self.V.index(sentence[t1][0])
                    tmp = np.multiply(self.A, self.B[v, :])
                    np.multiply(tmp, self.beta[t1, :], out= xi_num[t])
                
                np.multiply(xi_num, self.alpha[:-1, :, None], out= xi_num)
                
                # Calculates 'a'
                xi = np.sum(xi_num, axis= 0)           
                den = np.sum( xi, axis= 1)
                den[ den == 0 ] = 1
                a += xi/den[:, None]

            # Division by n gives equal weight to each sentence
            self.A = a/n
            self.B = b/n
            self.Pi = pi/n

            # Ensures that each row of A, columns of B, & Pi add upto 1
            self.rectify()

            # Store the collective log-likelihood of the batch
            self.costs.append( sum(batch)/n )

            #
            print(f"\nEpoch {itr+1}:", round(self.costs[itr], 3))
            itr += 1

        print("HMM: Training is Complete.")

**Main**

In [3]:
# Loads the training data
df = pd.read_csv('./data/train.csv')
data = []
print("Reading the training-Corpus:")
for index, row in tqdm(df.iterrows()):
    data.append(ast.literal_eval(row['tagged_sentence']))

Reading the training-Corpus:


47340it [00:14, 3198.18it/s]


In [4]:
#del obj
n = len(data) // 1000
obj = HMM(data, n)



Preprocessing with ReGex & Stemming:


100%|█████████████████████████████████████████████████████████████████████████████████| 47/47 [00:00<00:00, 986.46it/s]


Initializing Q, V:


100%|██████████████████████████████████████████████████████████████████████████████████████████| 47/47 [00:00<?, ?it/s]


Initializing A, B, Pi


100%|████████████████████████████████████████████████████████████████████████████████| 47/47 [00:00<00:00, 2989.89it/s]


**Before training**

In [5]:
obj.predict_for_sentence(data[6])

Preprocessing with ReGex & Stemming:


100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]

Sentence: Merger proposed 
Target: NN->VB->END

Prediction: NN->VB->END

Log-Likelihood -14.271





In [6]:
obj.fit()

100%|██████████████████████████████████████████████████████████████████████████████████| 47/47 [00:00<00:00, 47.37it/s]



Epoch 1: -114.868


100%|██████████████████████████████████████████████████████████████████████████████████| 47/47 [00:00<00:00, 57.31it/s]


Epoch 2: -110.103
HMM: Training is Complete.

After training,
Change in A: 35 %
Change in B: 45 %
Change in Pi: 40 %





**After training**

In [7]:
obj.predict_for_sentence(data[6])

Preprocessing with ReGex & Stemming:


100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]

Sentence: Merger proposed 
Target: NN->VB->END

Prediction: AT->JJ->END

Log-Likelihood -10.311





In [8]:
# Loads the testing-data
df2 = pd.read_csv('./data/test.csv')
test_data = {}
print("Reading the testing-Corpus:")
for index, row in tqdm(df2.iterrows()):
    test_data[row['id']] = ast.literal_eval(row['untagged_sentence'])

Reading the testing-Corpus:


4000it [00:00, 5081.07it/s]


**Generates Predictions**

In [9]:
ans = obj.predict(test_data)
pred = pd.DataFrame(data= ans, columns =['id', 'tagged_sentence'])
pred.to_csv('./data/pred.csv', index= False)

Preprocessing with ReGex & Stemming:


100%|████████████████████████████████████████████████████████████████████████████| 4000/4000 [00:02<00:00, 1514.13it/s]


Generating predictions:


100%|██████████████████████████████████████████████████████████████████████████████| 4000/4000 [00:43<00:00, 91.11it/s]
