In [1]:
import numpy as np
import pandas as pd
from reformat import *
from architecture.WordSegPreProcessing import *

fn = "train.tsv"
x, y = file_to_table(read_file(fn))


## Prepping Data

In [2]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.1)
preprocessor = WordSegPreProcessing(X_train, y_train, segment_to_tag)

In [3]:
X_train, y_train = preprocessor.x, preprocessor.y
X_test, y_test = preprocessor.extract_pairs(X_test, y_test)


## Supervised NGramTagger with Backoff

In [4]:
from architecture.NgramSupervisedTagger import NGramSupervisedTagger
ng = NGramSupervisedTagger(X_train, y_train, ngram_choice=2)
ng.create_n_gram_tagger()
ng.f1_by_tags(X_test, y_test)["I"] #F1 score for just I tags

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  dtype=np.int):
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  method='lar', copy_X=True, eps=np.finfo(np.float).eps,
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  method='lar', copy_X=True, eps=np.finfo(np.float).eps,
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  eps=np.finfo(np.float).eps, copy_Gram=True, verbose=0,
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  eps=np.finfo(np.float).eps, copy_X=True, fit_path=True,
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  eps=np.finfo(np.floa

0.8905882352941177

## Supervised HMM

In [5]:
from architecture.HMMSupervisedTagger import HMMSupervisedTagger
hmm = HMMSupervisedTagger(X_train, y_train)
hmm.train()
hmm.f1_by_tags(X_test, y_test)["I"]


0.8894117647058822

The same thing as HMM Tagger but instead of just using the provided characters, I manually engineered some features (probably not the best bleh)... You can check it out in `architecture.WordSegPreProcessing.apply_features`

In [6]:
from architecture.HMMSupervisedTagger import HMMSupervisedTagger
hmm2 = HMMSupervisedTagger(preprocessor.generate_features(X_train), y_train)
hmm2.train()
hmm2.f1_by_tags(preprocessor.generate_features(X_test), y_test)["I"]


0.9448621553884713

In [7]:
hmm2.tagger.best_path_simple(preprocessor.apply_features(X_test[3]))

['B', 'B', 'B', 'I', 'B', 'B', 'B', 'B', 'I', 'B']

## Unsupervised

In [8]:
from architecture.utils import *
x, y = file_to_table(read_file(fn))
feedX, feedY = preprocessor.extract_pairs(x[:650], y[:650])


In [9]:
testX2, testY2 = preprocessor.extract_pairs(x[650:], y[650:])


In [10]:
feedX = list(map(lambda x: preprocessor.let2index(x), feedX ))
feedY = list(map(lambda y: preprocessor.tag2index(y), feedY))


In [11]:
#initialize with some probs

import nltk
import numpy as np
init_tmat = np.zeros((len(preprocessor.index_tag),
                     len(preprocessor.index_tag)))

init_emission = np.zeros(
    (len(preprocessor.index_tag), len(preprocessor.index_vocab)))
trainer = nltk.HiddenMarkovModelTrainer()
tagger = trainer.train_supervised(
           tuple_xy4nltk(feedX, feedY))
for k in tagger._transitions.keys():
    for v in tagger._transitions[k].samples():
        init_tmat[k][v] = tagger._transitions[k].prob(v)
    for let in tagger._outputs[k].samples():
        init_emission[k][let] = tagger._outputs[k].prob(let)
init_state_distrib = np.array([0, 1.0])  # because always start with B




In [12]:
init_emission

array([[0.04002929, 0.        , 0.02294362, 0.01342446, 0.03075421,
        0.00781059, 0.01269221, 0.03270686, 0.08445204, 0.01635343,
        0.01586527, 0.12570173, 0.04027337, 0.04491091, 0.03783256,
        0.05296558, 0.02172321, 0.05735904, 0.        , 0.03587991,
        0.03685624, 0.07883817, 0.09421528, 0.01440078, 0.        ,
        0.01830608, 0.06370515],
       [0.        , 0.        , 0.        , 0.00265252, 0.        ,
        0.        , 0.        , 0.75596817, 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.08488064, 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.15649867,
        0.        , 0.        ]])

In [43]:
from collections import defaultdict

class UnSupervised:
    def __init__(self, tmat, emission, initial_distrib):

        self.tmat = tmat
        self.emission = emission
        self.initial_prob = initial_distrib

        self.num_states = len(self.tmat)
        self.num_letters = len(self.emission[0])

    def calculate_beta_backward(self, o_seq, tmat, emission, end_prob=[0.5, 0.5]):
        backward_table = np.zeros((self.num_states, len(o_seq)))  # vit table

        backward_table[:, -1] = end_prob
        #print(backward_table)

        for i in range(len(o_seq)-2, -1, -1):  # start filling in the table
            beta_s_t = 0
            for tag_cell in range(self.num_states):
                for tag_prev in range(self.num_states):
                    beta_s_t += backward_table[tag_prev][i+1] * \
                        tmat[tag_cell][tag_prev] * \
                        emission[tag_prev][o_seq[i+1]]
                backward_table[tag_cell][i] = beta_s_t
                beta_s_t = 0
        return backward_table

    def calculate_alpha_forward(self, o_seq, tmat, emission):

        forward_table = np.zeros((self.num_states, len(o_seq)))  # vit table
        emission_t0 = [self.emission[i][o_seq[0]]
                       for i in range(self.num_states)]
        forward_table[:, 0] = np.multiply(emission_t0, self.initial_prob)

        for i in range(1, len(o_seq)):  # start filling in the table
            alpha_s_t = 0
            for tag_cell in range(self.num_states):
                for tag_prev in range(self.num_states):
                    alpha_s_t += forward_table[tag_prev][i-1] * \
                        tmat[tag_prev][tag_cell] * \
                        emission[tag_cell][o_seq[i]]
                forward_table[tag_cell][i] = alpha_s_t
                alpha_s_t = 0
        return forward_table

    def baum_welch(self, o_seq, n_iter):
        tmat = self.tmat
        emission = self.emission
        M = len(tmat[0])
        T = len(o_seq)

        for _ in range(n_iter):
            alpha = self.calculate_alpha_forward(
                o_seq, tmat, emission)  # matrix
            beta = self.calculate_beta_backward(o_seq, tmat, emission)
            prod_alpha_beta = alpha*beta
            prod_alpha_beta_normed = prod_alpha_beta/sum(prod_alpha_beta) #normalized alpha dot beta

            #emission aux is well emission auxillary Sum(P(state_i |observed_t) of all time t ) 
            #-----seq-------
            #|
            #state
            #|
            emission_aux = defaultdict(lambda : defaultdict(float))
            #transition aux. 
            # LESSON LEARNED!! KNOW MATRIX ALGEBRA LIKE A PRO to avoid calculating entry by entry like this!!
            #P(state_i|state_j)
            transition_aux = defaultdict(lambda: defaultdict(float))

            for i,obs in enumerate(o_seq):
                for state in range(self.num_states):
                    emission_aux[state][obs]+=prod_alpha_beta_normed[state][i]
                    if i==0:
                        continue
                    else:
                        for state2 in range(self.num_states):
                            prev_step = alpha[state2][i-1]*tmat[state2][state]
                            beta_now = beta[state][i]*emission[state][obs]
                            #print(obs, state, state2,
                            #      alpha[state][i-1], tmat[state][state2],
                            #      beta[state][i],emission[state][obs])
                            #print(obs, state, state2, prev_step*beta_now/(sum(prod_alpha_beta)[i]))
                            transition_aux[state2][state] += prev_step * \
                                beta_now/(sum(prod_alpha_beta)[i])
            new_tmat = np.zeros((self.num_states,self.num_states))
            for si in range(self.num_states):
                norm_factor = sum(prod_alpha_beta_normed[si])
                for sj in range(self.num_states):
                    new_tmat[sj][si] = transition_aux[si][sj]/norm_factor
            print(transition_aux)
            print(new_tmat)

        return {"a": tmat, "b": emission}


In [44]:
#ice cream debug
tmat_weather = [[0.8, 0.2],
[0.2,0.8]]
emission = [[0.7,0.2,0.1],
[0.1,0.2,0.7]]
unsupervised = UnSupervised(tmat_weather,emission, [0.5,0.5])

In [45]:
seq = [1,2,2,1,2,1,2,1,1,2,0,2,2,0,0,0,1,0,0,0,2,0,1,0,0,0,1,2,2,1,2,1,1]
unsupervised.calculate_alpha_forward(seq, tmat_weather, emission)
unsupervised.calculate_beta_backward(seq, tmat_weather, emission)
unsupervised.baum_welch(seq,1)

defaultdict(<function UnSupervised.baum_welch.<locals>.<lambda> at 0x7f7ed4c74c20>, {0: defaultdict(<class 'float'>, {0: 12.132497937565816, 1: 2.676301437433337}), 1: defaultdict(<class 'float'>, {0: 2.795184580166156, 1: 14.396016044834688})})
[[0.80061975 0.15662704]
 [0.1766083  0.80667497]]


{'a': [[0.8, 0.2], [0.2, 0.8]], 'b': [[0.7, 0.2, 0.1], [0.1, 0.2, 0.7]]}