# Ehsan Shaghaei
# B19-AAI01

# 0 Imports, Defines and Preprocessing 

In [1]:

# Importing the neccessary libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import nltk
import re
from functools import reduce
import gc
from IPython.display import display
from tqdm import tqdm

# Data path
data_paths = {
    # 'distro':'./tag_logit_per_word.tsv',
    'train': './train_pos.txt',
    'test': './test_pos.txt',
}
random_data = pd.read_csv('./tag_logit_per_word.tsv', sep='\t')
random_data.set_index(random_data.columns[0],inplace=True)
# display(random_data)
random_data= random_data.to_dict()
# Funtion to load the raw data


def load_data(data_paths: dict):
    # Allowing Blank Lines since we can find the end of sentences
    return {key: pd.read_csv(path, sep=' ', skip_blank_lines=False, header=None) for key, path in data_paths.items()}


# Loading the data
data = load_data(data_paths)

# Finding the indices of the end of the sentences locating the "Blank Lines"
endOfSentenceIndexes = dict(map(
    lambda x:
        (x[0], x[1][x[1].isna().any(axis=1)].index),
    data.items()))


# Extracting sentences for train and test data
for key, df in data.items():
    df.rename(columns={0: 'token', 1: 'tag'}, inplace=True)

prev_ind = 0
train_sentences = []
for current_ind in endOfSentenceIndexes['train']:
    sentence = data['train'][prev_ind:current_ind:]
    sentence.reset_index(drop=True, inplace=True)
    train_sentences.append(sentence)
    prev_ind = current_ind+1

prev_ind = 0
test_sentences = []
for current_ind in endOfSentenceIndexes['test']:
    sentence = data['test'][prev_ind:current_ind:]
    sentence.reset_index(drop=True, inplace=True)
    test_sentences.append(sentence)
    prev_ind = current_ind+1

# Creating the Dictionary for a faster access
train_list, test_list = [], []
# print('Loading Train data ... ')
for df in (train_sentences):
    train_list.append({row[1].token: row[1].tag for row in df.iterrows()})
# print('Loading Test data ... ')
for df in (test_sentences):
    test_list.append({row[1].token: row[1].tag for row in df.iterrows()})

# Assigning the data
data['train'] = train_list
data['test'] = test_list

# Garbage collection and preliminary evaluation
gc.collect()
# display('First Train Sentence',data['train'][0])
# display('First Test Sentence',data['test'][0])


0

# 1 Calculate the transition probability and emission matrices (First step towards viterbi) - 10 points

Finding all unique tags

In [2]:
unique_tags = set()
# print('Finding unique tags')
for sentence in (data['train']):
    unique_tags = unique_tags.union(set(sentence.values()))

# pd.DataFrame(unique_tags,columns=['tag'])

Finding all unique tokens\[words\]

In [3]:
unique_tokens = set()
# print('Finding uique tokens')
for sentence in (data['train']):
    unique_tokens = unique_tokens.union(set(sentence.keys()))

# pd.DataFrame(unique_tokens,columns=['token'])

## Transition Probability Matrix

Constructing Transition Count Matrix 

In [4]:
# smoothing factor for TPM
alpha = 0.001
n_tags = len(unique_tags)

# Start tag
startPOS = 'START'
transitionProbabilityMatrixDict = {start_tag:{end_tag:0.0 for end_tag in unique_tags} for start_tag in [startPOS]+list(unique_tags) }

# Counting the transitions
# print('Counting Transitions')
for dict in (data['train']):
    current_tag=startPOS
    for next_tag in dict.values():
        transitionProbabilityMatrixDict[current_tag][next_tag]+=1
        current_tag = next_tag

transitionProbabilityMatrix = pd.DataFrame.from_dict(transitionProbabilityMatrixDict)+alpha
# display(transitionProbabilityMatrix)

Preliminary test of the count results

In [5]:
def transitionsFromSTART2x(x, data):
    return len(re.findall(f'\n \n\w+ {x}', data))


with open(data_paths['train'], 'r') as f:
    t = f.read()
    assert((transitionsFromSTART2x('VBG', t)) ==
           transitionProbabilityMatrixDict[startPOS]['VBG']), "in Transition Probability Matrix, Transition count 'START->VBG' is INVALID"
    assert((transitionsFromSTART2x('VBN', t)) ==
           transitionProbabilityMatrixDict[startPOS]['VBN']), "in Transition Probability Matrix, Transition count 'START->VBN' s INVALID"


Normalizing and smoothing the transition vectors to obtain probability matrix

In [6]:
# print('Normalizing and smoothing TPM')
for start_tag in (transitionProbabilityMatrix.columns):
    transitionProbabilityMatrix[start_tag] /= (
        np.sum(transitionProbabilityMatrix[start_tag]) + n_tags*alpha)
transitionProbabilityMatrixDict = transitionProbabilityMatrix.to_dict()
# transitionProbabilityMatrix


Testing the Transition Probability Matrix

In [7]:
assert all(transitionProbabilityMatrix < 1) and all(transitionProbabilityMatrix.all() >=0) , ' The Transition Probability Matrix has invalid probability value'

Smoothening Transition Probability Matrix

## Emission Matrix

Constructing Emiss
ion Count Matrix 

In [8]:
# smoothing factor for TPM
beta = 0.01
n_tags = len(unique_tags)

emissionMatrixDict = {token:{tag:0.0 for tag in unique_tags} for token in unique_tokens}
# print('Counting Emissions')
# Counting the transitions
for dict in (data['train']):
    for token,tag in dict.items():
        emissionMatrixDict[token][tag]+=1

emissionMatrix = pd.DataFrame.from_dict(emissionMatrixDict)+beta
# display(emissionMatrix)

Preliminary test of the count results

In [9]:
def emissionCNTbyRegex(token,tag,data):
    return len(re.findall(f'\n{token} {tag}\n',data))

with open(data_paths['train'],'r') as f:
    t = f.read()
    assert(emissionCNTbyRegex('want','VB',t)==emissionMatrixDict['want']['VB']),"in Emition Matrix, Emition count 'want -> VB' is INVALID"


Normalizing and smoothing the emission vectors to obtain probability matrix

In [10]:
# print('Normalizing and smoothening the EPM')
for token in (emissionMatrix.columns):
    emissionMatrix[token]/=(np.sum(emissionMatrix[token])+(beta*n_tags))
emissionMatrixDict = emissionMatrix.to_dict()
# emissionMatrix


Testing the Emission Matrix

In [11]:
assert all(emissionMatrix < 1) and all(emissionMatrix.all() >=0) , ' The Emittion Matrix has invalid probability value'
gc.collect()

0

# 2 Implement Viterbi algorithm for POS tagging task. - 30 points

Initialization of so called best_probabilities and best_paths matrices

In [12]:


tpm_T = transitionProbabilityMatrix.to_dict()





def getVector(d):
    return np.array(list(d.values()))

# feed-forward of best_probability matrix C_ij
def feedforward(i, j,bpr,token_vector,tags_vector):
    # C[k][j-1] X A[K][i] X B[i][j]
    dest_vector = getVector(bpr[token_vector[j-1]]) +\
        np.log(getVector(tpm_T[tags_vector[i]])) +  \
        np.log(random_data[tags_vector[i]][token_vector[j]]) if not token in unique_tokens else getVector(bpr[token_vector[j-1]]) +\
        np.log(getVector(tpm_T[tags_vector[i]])) +  \
        np.log(emissionMatrixDict[token_vector[j]][tags_vector[i]])
    # return best_probability and best_path
    return np.max(dest_vector), np.argmax(dest_vector)



def generate_best_probability_path_matrix(tokens):
    tokens_vector = list(tokens)
    tags_vector = list(unique_tags)
    bpr, bpa = {token: {tag: 0.0 for tag in tags_vector}
                for token in tokens_vector}, {token: {tag: 0 for tag in tags_vector}
                                       for token in tokens_vector}

    # print('Initialization of the first column of best_probability')
    for token in bpr.keys():
        for tag in bpr[token].keys():
            if token in unique_tokens:
                bpr[token][tag] = np.log(transitionProbabilityMatrixDict[startPOS][tag])+np.log(emissionMatrixDict[token][tag])
            else:
                bpr[token][tag] = np.log(transitionProbabilityMatrixDict[startPOS][tag])+np.log(random_data[tag][token])
        # display(pd.DataFrame.from_dict(bpr))
        break  # --! First column only for now

    # print('Initialization of the first column of best_path')
    for token in (bpa.keys()):
        for tag in bpa[token].keys():
            bpa[token][tag] = 0
        # display(pd.DataFrame.from_dict(bpa))
        break  # --! First column only for now

    # print('feeding-forward')
    # skipping the initialized column
    for j, token in (enumerate(tokens_vector[1:], 1)):
        for i, tag in enumerate(tags_vector):
            bpr[token][tag], bpa[token][tag] = feedforward(
                i, j,bpr,tokens_vector,tags_vector)
    pred_tags=[]
    # Feed-Backward
    for j, token in reversed(list(enumerate(tokens_vector))):
        next_tag_id=0
        if j == len(tokens_vector)-1:
            next_tag_id = np.argmax(
                getVector(bpr[token])
            )
        else:
            next_tag_id = bpa[token][tags_vector[next_tag_id]]
        pred_tags.append(tags_vector[next_tag_id])
    pred_tags = list(reversed(pred_tags))
        
    

    return pred_tags,bpr,bpa     

# 3 Test your viterbi algorithm on the test set and record the accuracy. The accuray referes to the number of correcly predicted tags in the whole test samples. - 10 points

Obtaingig test data

In [13]:
test = data['test']
y = []
y_pred = []
for sentence in test:
    # print(sentence)
    tokens = list(sentence.keys())
    truth = list(sentence.values())
    # print(all_tokens)
    pred,_,_ = generate_best_probability_path_matrix(tokens)
    y+=truth
    y_pred+=pred



def calc_accuracy(y,y_pred):
    return np.sum(np.array(y)==np.array(y_pred))/len(y)
print(calc_accuracy(y,y_pred))
# # print()
# p,q,r= generate_best_probability_path_matrix(test[1].keys())
# np.sum(np.array(p)==np.array(test[1].values()))
# np.sum(np.array([1,1,1])==np.array([1,2,1]))


  bpr[token][tag] = np.log(transitionProbabilityMatrixDict[startPOS][tag])+np.log(random_data[tag][token])


KeyError: 'extending'