In [None]:
import math
import pandas as pd
import random
import nltk
import numpy as np
import csv
import os

In [None]:
####### (2) Mount the google drive so that we can access the files from google drive
from google.colab import drive 
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
work_dir = '/content/drive/My Drive/Colab Notebooks/NLP/Celtic Mutation'
os.chdir(work_dir)


In [None]:
#Read the train file
#tsv_file = open("./train.tsv",'r')
df_data = pd.read_csv('train.tsv',sep='\t', header=None,quoting = csv.QUOTE_NONE)
df_data.columns =['text','label']
#df_data = df_data[pd.notnull(df_data['text'])]
df_data.shape

(10000000, 2)

In [None]:
#Given dataset is in the form of tokens. 
#This function will create a new dataset of (word,tag) tuples for each sentence 
def create_sentence_set(data):
    #extracting sentence start indices
    sent_start = list(np.where(df_data['text'] == '<S>')[0])
    sent_start.insert(0,0)  #First sentence doesnt have a start tag so add it manually
    sent_start.append(len(df_data))
    dataset_sent = []
    tag_set = []
    word_set = []
    for i in range(len(sent_start)-1): 
        sos =  sent_start[i]    #start of sentence index
        eos =  sent_start[i+1]  #end of sentence index
        #create a sentence dataset with word,tag tuples
        dataset_sent.append([(data.iloc[i,0],data.iloc[i,1]) for i in range(sos,eos)] ) 
        tag_set.append([data.iloc[i,1] for i in range(sos,eos)] ) 
    return dataset_sent , tag_set 
        
dataset,tag_set = create_sentence_set(df_data)
len(dataset)

395923

In [None]:
#splitting sentence dataset into training and validation sets in 80:20 ratio
#def train_test_split(split_index):  
split_index = int(len(dataset) * 0.8)
X_train = dataset[:split_index]
X_val = dataset[split_index:]
print(len(X_train), len(X_val))
train_tags = tag_set[:split_index]

316738 79185


In [None]:
#Compucing frequencies for unigram, bigram and trigram
uni_freq = {}
bi_freq= {}
tri_freq = {}
def create_tagger(tagset):
    for sent in tagset:
        for t in range(len(sent)):
            if (t-2 >= 0):    
                if (sent[t-2],sent[t-1], sent[t]) not in tri_freq:
                    tri_freq[(sent[t-2],sent[t-1], sent[t])]=1
                else:
                    tri_freq[(sent[t-2],sent[t-1], sent[t])]+=1
            if (t-1 >= 0):
                if (sent[t-1],sent[t]) not in bi_freq:
                    bi_freq[(sent[t-1],sent[t])]=1
                else:
                    bi_freq[(sent[t-1],sent[t])]+=1
            
            if (sent[t]) not in uni_freq:
                    uni_freq[sent[t]]=1
            else:
                uni_freq[sent[t]]+=1
                
#Adding a small value to unseen trigram sets in training              
def trigram_smoothing():
  tag_set = uni_freq.keys()
  for t2 in tag_set:
    for t1 in tag_set:
      for t in tag_set:
        if (t2,t1,t) not in tri_freq:
          tri_freq[(t2,t1,t)] =0.1


create_tagger(train_tags)
trigram_smoothing()

In [None]:
#calculate probablities for unigram , bigram and trigram sequences.
trigram_prob = {}
bigram_prob={}
unigram_prob={}

trigram_prob = {(t2 , t1, t): tri_freq[(t2 , t1, t)]/bi_freq[t2,t1] for (t2,t1,t) in tri_freq}

bigram_prob = {(t1, t): bi_freq[(t1, t)]/uni_freq[t1]  for (t1,t) in bi_freq}

unigram_prob = {t: uni_freq[t]/sum(uni_freq.values()) for t in uni_freq}
  

In [None]:
#Inverse of Naive Bayes
#Calculating probability of word given the tag
flipped = [(t,w) for sent in X_train for (w,t) in sent]
tag_word_count= {}
tag_c = {'N':0,'H':0,'S':0,'T':0,'U':0}
for (t,w) in flipped:
  if (t,w) not in tag_word_count:
    tag_word_count[(t,w)] = 1
  else:
    tag_word_count[(t,w)] += 1
    
  tag_c[t] += 1

wordgiventag = {}

wordgiventag = {(w,t) : tag_word_count[(t,w)]/uni_freq[t] for (t,w) in tag_word_count}

In [None]:
wordgiventag[('a','N')]

0.046853667066805396

In [None]:
#calculating the pi values 
#the initial tag probablities for each tag
init_prob = {}
init_freq = {}

for sent_tag in tag_set:
  if sent_tag[0] == '<S>':
    start_tag = sent_tag[1]
  else:
    start_tag = sent_tag[0]
  if start_tag not in init_freq:
      init_freq[start_tag]=1
  else:
      init_freq[start_tag]+=1
        
init_prob = {t: count/sum(init_freq.values())  for tag,count in init_freq.items()}
init_prob #in our dataset the initial tag is always N

{'N': 1.0}

In [None]:
#Taken from Dr.Scannell's code 
#Viterbi algorithm tweaked to fit to trigram HMM 
def argmax(V,tag_list,t,t1,i,emission_prob):
    ans=-1
    best=None
    for t2 in tag_list:
      #calculations for second word where t-2 doesnt exist
      if i ==1:
        temp=V.get(('',t1,i-1),0) * bigram_prob.get((t1,t)) * emission_prob
        if temp > ans:
            ans = temp
            best = t2
      else:   #calculating for words index > 2
        temp=V.get((t2,t1,i-1)) * trigram_prob.get((t2,t1,t),0) * emission_prob
        if temp > ans:
            ans = temp
            best = t2
    return (best,ans)

def printV(sentence,tag_list,V,B):
    #print('i='+str(0)+' ['+sentence[0]+']')
    #print('  N'+'='+str(V[('','N',0)]))
    for i in range(1,len(sentence)):
        #print('i='+str(i)+' ['+sentence[i]+']')
        for t1 in tag_list:
          for t in tag_list:
              if V[(t1,t,i)] != 0:
                  toprint='  '+t+'='+str(V[(t1,t,i)])
                  if i>0:
                      toprint += ' (from '+B[(t,i)]+')'
                  return toprint
                
def viterbi(sentence):
    cur_vit = []
    prev_viterbi= []
    V = dict()    # keys are (t,i) where t is a tag (row label) and i is position in sentence (column label)
    Back_ptr = dict()    # same keys as V; this stores the "backpointers" to remember best tag sequence
    tag_list = uni_freq.keys()
    backpointer = []
    
    for t in 'N': #first tag is always N
      V[('',t,0)] = 1.0 * wordgiventag.get((sentence[0],t),-1)
      cur_vit.append(V)
      backpointer.append('N')
      #print(V)
    for i in range(1,len(sentence)):
     this_viterbi = { }
     prev_viterbi = cur_vit[-1]
     for t1 in tag_list:
        for t in tag_list:
         emission_prob = wordgiventag.get((sentence[i],t),-1)
         pair = argmax(V,tag_list,t,t1,i,emission_prob)
         Back_ptr[(t,i)] = pair[0]
         V[(t1,t,i)] = pair[1] 
         this_viterbi[t] = pair[1]

     currbest = max(this_viterbi.keys(), key = lambda tag: this_viterbi[ tag ])
     cur_vit.append(this_viterbi)

    backpointer.append(Back_ptr.values())
    printV(sentence,tag_list,V,Back_ptr)
    print(backpointer)
    return backpointer.reverse()



In [None]:
tags = viterbi(X_train[0])


['N', dict_values(['N', 'N', 'N', 'N', 'N', 'S', 'S', 'S', 'S', 'S', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'N'])]


In [None]:
tags

In [None]:
#beginning of anything goes implementation
#The model implemented below is taken from the example found online - https://www.depends-on-the-definition.com/guide-sequence-tagging-neural-networks-python/ 

#calculate unique list of words
word_set = list(set([w for sent in dataset for (w,t) in sent]))
#word_set.append('ENDPAD')

#calculate unique list of tags
tag_set = list(set([t for sent in dataset for (w,t) in sent]))
#tag_set.append('ENDPAD')
#use indices from the word & tag list to create an index dictionary
word_idx = {w: i for i, w in enumerate(word_set)} 
tag_idx = {t: i for i, t in enumerate(tag_set)}

100


In [None]:
tag_idx

{'H': 4, 'N': 2, 'S': 1, 'T': 3, 'U': 0}

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from keras.preprocessing.sequence import pad_sequences
maxLength =50
#keeping the max length of the sentence to 50 
#the following lines will create an index vector for text and labels 
#and add padding to the sentences
#padded sentences are assigned 'N' tag
X = [[word_idx[w[0]] for w in s] for s in dataset]
X = pad_sequences(maxlen=maxLength, sequences=X, padding="post", value=len(word_set) - 1)
y = [[tag_idx[w[1]] for w in s] for s in dataset]
y = pad_sequences(maxlen=maxLength, sequences=y, padding="post", value=tag_idx["N"])
#create one hot vector of labels
y = [to_categorical(i, num_classes=len(tag_set)) for i in y]

#split data set into train and validation
X_train,X_val, y_train, y_val = train_test_split(X,y,test_size=0.2,random_state=42)

In [None]:
#using Keras to build an RNN
from keras.models import Model, Input
from keras.layers import LSTM, Embedding, Dense, TimeDistributed, Dropout, Bidirectional
max_len =50
input = Input(shape=(max_len,))
model = Embedding(input_dim=len(word_set), output_dim=50, input_length=max_len)(input)  # 50-dim embedding
model = Dropout(0.1)(model)
model = Bidirectional(LSTM(units=100, return_sequences=True, recurrent_dropout=0.2))(model)  # variational biLSTM
out = TimeDistributed(Dense(len(tag_set), activation="softmax"))(model)  # softmax output layer

In [None]:
#training the model
#doesnt seem to complete on colab even with smaller batch size. 
#was showing promising results on an earlier run but stopped working
model = Model(input, out)
model.compile(optimizer="rmsprop", loss="categorical_crossentropy", metrics=["accuracy"])
history = model.fit(X_train, np.array(y_train), batch_size=5, epochs=5, validation_split=0.1, verbose=1)


Epoch 1/5

In [None]:
# argument sent is a list of [token,label] pairs; return number of correctly predicted labels
def predict_from_scratch(sent):
  correct = 0
  test_sent = ''
  for i in range(len(sent)):
    
    if guess == token[1]:
      correct += 1
  return correct

In [None]:
# argument sent is a list of [token,label] pairs; return number of correctly predicted labels
def predict_anything_goes(sent):
  correct = 0
  for token in sent:
    guess = 'N'
    if guess == token[1]:
      correct += 1
  return correct

In [None]:
def evaluate():
  total = 0
  correct_from_scratch = 0
  correct_anything_goes = 0
  testfile = open('test.tsv', 'r')
  sentence = []
  for line in testfile:
    total += 1
    pieces = line.rstrip("\n").split("\t")
    if pieces[0]=='<S>':
      correct_from_scratch += predict_from_scratch(sentence)
      correct_anything_goes += predict_anything_goes(sentence)
      sentence = []
    else:
      sentence.append(pieces)
  correct_from_scratch += predict_from_scratch(sentence)
  correct_anything_goes += predict_anything_goes(sentence)
  return (correct_from_scratch/total, correct_anything_goes/total)

In [None]:
evaluate()

(0.191809, 0.819613)