In [1]:
from collections import defaultdict

In [2]:
fn = r"../train.tsv"
handle  = open(fn, "r")
tokens = list()

for line in handle:
  entry = line.strip().split("\t")
  tokens.append( ( entry[0], entry[1]) )

In [3]:
def flat2sent(sents, end = "<S>"):
  """This function turns flat array (1d) of sentences into list of sentences (2d).

    End indicates the last token of the sentence.
  Args:
      sents (): 1d array of sentences
      end (str, optional): start of sentence symbol. Defaults to "<S>".

  Returns:
      _type_: 2d array (list of sentences)
  """
  list_of_sent = list()
  sent = []
  for char in sents:
    sent.append(char)
    if char[0] == end:
      list_of_sent.append(sent)
      sent = []
  return list_of_sent

In [4]:
#train-split for sentences
train_index = int([ind for (ind, token) in enumerate(tokens) if token[0] == "<S>"][-1]*0.8)
sentTrain = flat2sent ( tokens[:train_index] )
sentTest = flat2sent( tokens[train_index:] )

In [5]:
# map of n preceding words to mutations
LAGGED_PRECEDE_MUTATE = defaultdict(lambda: defaultdict(int))
def lag_to_create_n_grams(sent: list, ngram=1):
    sent = [("<IGNORE>", "N") for _ in range(ngram)] + sent #padding in front
    for ind, piece in enumerate(sent):
        if ind < ngram:
            continue
        for n in range(1,ngram+1):

            prev_phrase = ' '.join( [w[0] for w in sent[ind-n: ind]] )
            if "<IGNORE>" in prev_phrase:
                continue
            tag = piece[1]
            LAGGED_PRECEDE_MUTATE[prev_phrase][tag] += 1
            LAGGED_PRECEDE_MUTATE[prev_phrase]["occurence"] += 1
            #if (prev_phrase == ""):
            #    print(ind, n, prev_phrase, tag, piece)


In [6]:
# example: The weather is nice with tag D N V A with ngram = 2
# {The: {N: 1}, weather: {V: 1}, The weather: {V : 1}, is: {A: 1}, weather is: {A: 1}
#lag_to_create_n_grams([("The", "D"), ("weather", "N"), ("is", "V"), ("nice", "A")], ngram=2)
#creaeting bigram
for sent in sentTrain:
    lag_to_create_n_grams(sent,2)
#LAGGED_PRECEDE_MUTATE


In [7]:
# building the transition matrix
def prob_from_count_dict(count_dict,k1,k2):
    try:
        return count_dict[k1][k2]/count_dict[k1]["occurence"]
    except ZeroDivisionError:
        return 0

In [8]:
TAG_TO_WORD_COUNT =defaultdict(lambda: defaultdict(int))
INITIAL_DISTRIBUTION_COUNT = defaultdict(int)
#if we were to generate a mutation, how many of it is word w_i
def build_emission(sent: list):
    for ind, piece in enumerate(sent):

        word = piece[0]
        tag = piece[1]

        TAG_TO_WORD_COUNT[tag][word] += 1
        TAG_TO_WORD_COUNT[tag]["occurence"] += 1
        if ind == 0:
            INITIAL_DISTRIBUTION_COUNT[tag] +=1
            INITIAL_DISTRIBUTION_COUNT["all"] += 1


        #if (prev_phrase == ""):
        #    print(ind, n, prev_phrase, tag, piece)
for sent in sentTrain:
    build_emission(sent)


In [116]:
def argmax(Vit_matrix, ind, phrase):
    ans = -1
    bestTag = None
    for t in ["N", "S", "U", "T", "H"]:
        #prob_from_count_dict(count_dict,k1,k2)
        temp = Vit_matrix[t][ind-1]*prob_from_count_dict(LAGGED_PRECEDE_MUTATE,phrase,t)
        #print(temp)
        #print(prob_from_count_dict(LAGGED_PRECEDE_MUTATE, phrase, t))
        #print(Vit_matrix[t][ind-1])
        if temp > ans:
            ans = temp
            bestTag = t
        #print(t, end=' ')
    return ans, bestTag

def viterbi(sent):
    best_tags = []
    Vit_matrix = defaultdict(lambda: defaultdict(float)) #V[state][word]
    for t in ["N","S","U","T","H"]:
        # initial probability distribution * emission

        start_state_prob = INITIAL_DISTRIBUTION_COUNT[t]/INITIAL_DISTRIBUTION_COUNT["all"]

        #prob_from_count_dict(count_dict,k1,k2)
        emission = prob_from_count_dict(TAG_TO_WORD_COUNT, t, sent[0][0])

        Vit_matrix[t][0] = start_state_prob * emission
    for i in range(1, len(sent)):
        for t in ["N", "S", "U", "T", "H"]:
            phrase = sent[i-1][0] #TODO: dynamically chose the prhase could be 2 word precede, 1 word precede
            val,tag = argmax(Vit_matrix, i, phrase)
            emission = prob_from_count_dict(TAG_TO_WORD_COUNT, t,sent[i][0])
            Vit_matrix[t][i] = val*emission # transimition_matrix *emission probability
            best_tags.append((i, tag))
    return Vit_matrix, best_tags


In [117]:
v, bt = viterbi(sentTrain[241])
sentTrain[241]
#TAG_TO_WORD_COUNT["N"]["cosaint"]/TAG_TO_WORD_COUNT["N"]["occurence"]
#print( prob_from_count_dict(TAG_TO_WORD_COUNT, "T", "cosaint") )
#LAGGED_PRECEDE_MUTATE

print(bt)

[(1, 'N'), (1, 'N'), (1, 'N'), (1, 'N'), (1, 'N'), (2, 'N'), (2, 'N'), (2, 'N'), (2, 'N'), (2, 'N'), (3, 'N'), (3, 'N'), (3, 'N'), (3, 'N'), (3, 'N'), (4, 'N'), (4, 'N'), (4, 'N'), (4, 'N'), (4, 'N')]


In [98]:
bp

defaultdict(<function __main__.viterbi.<locals>.<lambda>()>,
            {'N': defaultdict(str, {1: 'N', 2: 'N', 3: 'N', 4: 'N'}),
             'S': defaultdict(str, {1: 'N', 2: 'N', 3: 'N', 4: 'N'}),
             'U': defaultdict(str, {1: 'N', 2: 'N', 3: 'N', 4: 'N'}),
             'T': defaultdict(str, {1: 'N', 2: 'N', 3: 'N', 4: 'N'}),
             'H': defaultdict(str, {1: 'N', 2: 'N', 3: 'N', 4: 'N'})})

In [None]:
# argument sent is a list of [token,label] pairs; return number of correctly predicted labels
def predict_from_scratch(sent, model=None):
  correct = 0
  for token in sent:
    guess = random.choice(['S','U','T','H','N'])
    if guess == token[1]:
      correct += 1
  return correct

In [None]:
# argument sent is a list of [token,label] pairs; return number of correctly predicted labels
def predict_anything_goes(sent):
  correct = 0
  for token in sent:
    guess = 'N'
    if guess == token[1]:
      correct += 1
  return correct

In [None]:
#sentence is a list of tuples(x,y)
def evaluate():
  total = 0
  correct_from_scratch = 0
  correct_anything_goes = 0
  testfile = open('test.tsv', 'r')
  sentence = []
  for line in testfile:
    total += 1
    pieces = line.rstrip("\n").split("\t")
    if pieces[0]=='<S>':
      correct_from_scratch += predict_from_scratch(sentence)
      correct_anything_goes += predict_anything_goes(sentence)
      sentence = []
    else:
      sentence.append(pieces)
  correct_from_scratch += predict_from_scratch(sentence)
  correct_anything_goes += predict_anything_goes(sentence)
  return (correct_from_scratch/total, correct_anything_goes/total)

In [None]:
evaluate()