#### PoS tagging per le lingue latino e greco: confronto tra baseline e HMM (Viterbi)
Francesco Sannicola

- csv per la lettura dei corpus
- math

In [1]:
from csv import reader as csv_reader
from numpy import array as np_array
from numpy import delete as np_delete
from numpy import empty as np_empty
from math import log

In [2]:
from collections import Counter

In [3]:
from nltk import word_tokenize
from nltk.tokenize import MWETokenizer

In [4]:
def trainParsing(file):
    
    #w_e tutte le parole finali
    w_e = []
    #w_t tutte le parole con proprio tag (+1 per INIT)
    w_t = []
    # w_s tutte le parole iniziali
    w_s= []
    
    w_t.append(('INIT', 'INIT'))
    with open(file) as fd:
        rd = csv_reader(fd, delimiter="\t", quotechar='"')
        i = -1
        for row in rd:
            if len(row) > 3:
                if i == 0:
                    w_s.append((row[1].lower(), 'INIT'))
                    i = 1
                w_t.append((row[1].lower(), row[3].lower()))
                last_str = row[1].lower()
            if len(row) == 0:
                w_e.append((last_str, 'END'))
                w_t.append(('END', 'END'))
                w_t.append(('INIT', 'INIT'))
                i = 0
        w_t.pop()
    return w_t, w_e, w_s 

In [5]:
def devParsing(file):
    w_t_dev = []
    with open(file) as fd:
        rd = csv_reader(fd, delimiter="\t", quotechar='"')
        for row in rd:
            if len(row) > 3:
                w_t_dev.append((row[1].lower(), row[3].lower()))
    return w_t_dev

In [6]:
def testParsing(file):
    query_list = []
    w_t_test = []
    
    first_char_init = 9
 
    with open(file) as fd:
        rd = csv_reader(fd, delimiter="\t", quotechar='"')
        i = -1
        for row in rd:
            if len(row) == 1:
                if row[0].startswith('# text'):
                    if '...' not in row[0]:
                        query_list.append(row[0][first_char_init:(len(row[0]))].lower().replace('.', ' .').replace('·', ' ·'))
                    else:
                        query_list.append(row[0][first_char_init:(len(row[0]))].lower().replace('·', ' ·'))
    
            elif len(row) > 3:
                w_t_test.append((row[1].lower(), row[3].lower()))
    return w_t_test, query_list


In [7]:
def singleWordDistribution(w_t_dev):
    #obtain tuples word:tag appearing one time
    single_occ_word_tag = dict()
    single_occ_words = list({key:val for key, val in Counter(i[0] for i in w_t_dev).items() if val == 1})
    
    u=0
    for word in single_occ_words:
        for tup in w_t_dev:
            if tup[0] == word:
                single_occ_word_tag[word] = tup[1]
                u+=1
                break
    
    tag_occ_singleoccWord=Counter(single_occ_word_tag.values())
    return tag_occ_singleoccWord, single_occ_words

In [8]:
greek_train_tree_bank = "./Bank/Greek/grc_perseus-ud-train.conllu"
latin_train_tree_bank = "./Bank/Latin/la_llct-ud-train.conllu"

latin_dev_tree_bank = "./Bank/Latin/la_llct-ud-dev.conllu"
greek_dev_tree_bank = "./Bank/Greek/grc_perseus-ud-dev.conllu"

greek_test_tree_bank = "./Bank/Greek/grc_perseus-ud-test.conllu"
latin_test_tree_bank = "./Bank/Latin/la_llct-ud-test.conllu"

In [9]:
w_t, w_e, w_s = trainParsing(latin_train_tree_bank)
w_t_dev = devParsing(latin_dev_tree_bank)
w_t_test, query_list = testParsing(latin_test_tree_bank)
tag_occ_singleoccWord, single_occ_words = singleWordDistribution(w_t_dev)

In [10]:
w_t_occ = Counter(w_t)
w_s_occ = Counter(w_s)
w_e_occ = Counter(w_e)

t_occ = Counter([i[1] for i in w_t])

In [11]:
p_emission = dict()
# compute emission probability
# prob w given t
for key, value in w_t_occ.items():
    prob = value / t_occ.get(key[1])
    '''if key[1] in p_emission:
        p_emission[key[1]].append([key[0], prob])
    else:
        p_emission[key[1]] = [[key[0], prob]]'''
    if prob == 0:
        prob = 0.00001
    if key[1] in p_emission:
        p_emission[key[1]].update({key[0]: abs(log(prob))})
    else:
        p_emission[key[1]] = {key[0]: abs(log(prob))}

In [12]:
p_emission_init = dict()
#compute emission probability for initial state
for key, value in w_s_occ.items():
    prob = value / t_occ.get(key[1])
    if prob == 0:
        prob = 0.00001
    if key[1] in p_emission_init:
        p_emission_init[key[1]].update({key[0]: abs(log(prob))})
    else:
        p_emission_init[key[1]] = {key[0]: abs(log(prob))}

In [13]:
p_emission_end = dict()
#compute emission probability for end state
for key, value in w_e_occ.items():
    prob = value / t_occ.get(key[1])
    if prob == 0:
        prob = 0.00001
    if key[1] in p_emission_end: 
        p_emission_end[key[1]].update({key[0]: abs(log(prob))})
    else:
        p_emission_end[key[1]] = {key[0]: abs(log(prob))}


In [14]:
p_transition_dict = dict()
# compute transition probability
# prob t1 given t
for t1 in t_occ.keys():
    #if t1 != 'INIT':
    for t in t_occ.keys():
        count = 0
        for i in range(1, len(w_t)):
            if w_t[i][1] == t1:
                if w_t[i - 1][1] == t:
                    count += 1
        prob = count / t_occ.get(t)
        if prob == 0:
            prob = 0.00001
        if t in p_transition_dict:
            p_transition_dict[t].update({t1: abs(log(prob))})
        else:
            p_transition_dict[t] = {t1: abs(log(prob))}

In [15]:
'''states = np_array(list(p_transition_dict.keys()))
states = np_delete(states, 0)
states = np_delete(states, 10)'''

'states = np_array(list(p_transition_dict.keys()))\nstates = np_delete(states, 0)\nstates = np_delete(states, 10)'

In [16]:
i = 0
states = np_empty(len(p_transition_dict) - 2 , dtype=object)
for key in p_transition_dict.keys():
    if str(key) != 'INIT' and str(key) != 'END':
        states[i] = str(key)
        i += 1

In [17]:
def dptable(V):
    yield " ".join(("%10d" % i) for i in range(len(V)))
    for y in V[0]:
        yield "%.7s: " % y+" ".join("%.7s" % ("%f" % v[y]) for v in V)

In [18]:
token_to_merge = [
                 ('[', 'adj', ']'), ('[', 'Adj', ']'),
                 ('[', 'adv', ']'), ('[', 'Adv', ']'),
                 ('[', 'aux', ']'), ('[', 'Aux', ']'),
                 ('[', 'cconj', ']'), ('[', 'Cconj', ']'), 
                 ('[', 'det', ']'), ('[', 'Det', ']'),
                 ('[', 'init', ']'), ('[', 'Init', ']'),
                 ('[', 'noun', ']'), ('[', 'Noun', ']'),
                 ('[', 'num', ']'), ('[', 'Num', ']'),
                 ('[', 'part', ']'), ('[', 'Part', ']'),
                 ('[', 'pron', ']'),('[', 'Pron', ']'),
                 ('[', 'propn', ']'),('[', 'Propn', ']'),
                 ('[', 'punct', ']'),('[', 'Punct', ']'),
                 ('[', 'sconj', ']'),('[', 'Sconj', ']'),
                 ('[', 'verb', ']'),('[', 'Verb', ']'),
                 ('[', 'x', ']'), ('[', 'X', ']'),
                 ('[', '--', ']'),
                 ('[', 'participle', ']')
                 ]

In [19]:
all_words = []
for value in p_emission.values():
    all_words.extend(list(value.keys()))

In [20]:
all_pos = []

In [21]:
tokenizer = MWETokenizer(token_to_merge)

In [22]:
#BASELINE
for query in query_list:
    input_splitted = tokenizer.tokenize(word_tokenize(query))
    T = len(input_splitted)
    tag_target = 'noun'
    for t in range(0, T):
        max = 0
        for key, value in w_t_occ.items():
            if key[0]==input_splitted[t].replace('_', ''):
                if max < value:
                    max = value
                    tag_target = key[1]
        all_pos.append([input_splitted[t].replace('_', ''), tag_target])

"for query in query_list:\n    input_splitted = tokenizer.tokenize(word_tokenize(query))\n    T = len(input_splitted)\n    tag_target = 'noun'\n    for t in range(0, T):\n        max = 0\n        for key, value in w_t_occ.items():\n            if key[0]==input_splitted[t].replace('_', ''):\n                if max < value:\n                    max = value\n                    tag_target = key[1]\n        all_pos.append([input_splitted[t].replace('_', ''), tag_target])"

In [39]:
def ViterbiHMM(smoothing_mode):
    all_pos = []
    for query in query_list:
            
        input_splitted = tokenizer.tokenize(word_tokenize(query))
        T = len(input_splitted)
        
        # Tracking tables from first observation
        backtrace=[{}]
        for i in states:
            try:
                backtrace[0][i]=p_transition_dict['INIT'][i]*p_emission_init['INIT'][input_splitted[0]]
            except KeyError:
                backtrace[0][i]=p_transition_dict['INIT'][i] * 20
        
        
        for t in range(1, T):
            input_splitted[t] = input_splitted[t].replace('_', '')
            backtrace.append({})
            for y in states:
                if t == T - 1:
                    try:
                        (prob, state) = min((backtrace[t-1][y0] * p_transition_dict[y0]['END'] * p_emission[y][input_splitted[t]], y0) for y0 in states)
                    except KeyError:
                        (prob, state) = min((backtrace[t-1][y0] * p_transition_dict[y0]['END'] * 50, y0) for y0 in states)
                else: 
                    if input_splitted[t] not in all_words:
                        
                        if smoothing_mode == 1: 
                            # P(unk|NOUN) =1
                            if y == 'noun':
                                (prob, state) = min((backtrace[t-1][y0] * p_transition_dict[y0][y] * 0.0001 , y0) for y0 in states)
                            else:
                                (prob, state) = min((backtrace[t-1][y0] * p_transition_dict[y0][y] * 50, y0) for y0 in states)
                                
                        elif smoothing_mode == 2:
                            # P(unk|NOUN) =0.5 and P(unk|VERB) = 0.5
                            if y == 'noun':
                                 (prob, state) = min((backtrace[t-1][y0] * p_transition_dict[y0][y] * abs(log(0.5)), y0) for y0 in states)
                            elif y == 'verb':
                                 (prob, state) = min((backtrace[t-1][y0] * p_transition_dict[y0][y] * abs(log(0.5)), y0) for y0 in states)
                            else:
                                (prob, state) = min((backtrace[t-1][y0] * p_transition_dict[y0][y] * 50, y0) for y0 in states)
                                
                        elif smoothing_mode == 3:
                            #P(unk|ti) = 1/#(PoS_TAGs)
                            (prob, state) = min((backtrace[t-1][y0] * p_transition_dict[y0][y] * abs(log(1 / len(states))), y0) for y0 in states)
                            
                        elif smoothing_mode == 4:
                            #Another smoothing technique based on the dev file and words which appear one time
                            for tag in states:
                                if y == tag:
                                    try:
                                        p_emission_new_word = abs(log(tag_occ_singleoccWord[tag] / len(single_occ_words)))
                                    except ValueError:
                                        p_emission_new_word =  50
                                    (prob, state) = min((backtrace[t-1][y0] * p_transition_dict[y0][y] * p_emission_new_word, y0) for y0 in states)
                                    break
                    else: 
                        try:
                            (prob, state) = min((backtrace[t-1][y0] * p_transition_dict[y0][y] * p_emission[y][input_splitted[t]], y0) for y0 in states)
                        except KeyError:
                            (prob, state) = min((backtrace[t-1][y0] * p_transition_dict[y0][y] * 50, y0) for y0 in states)
                backtrace[t][y] = prob
            #for i in dptable(viterbi):
            #   print (i)
            opt=[]
            for j in backtrace:
                for x,y in j.items():
                    if j[x]==min(j.values()):
                        opt.append(x)
        # print ('The PoS are\n'+''
        #       .join(map(''.join, zip([x + '/' for x in input_splitted], [x + '\n' for x in opt])))
        #       +'\nWith probability of %s'%p)
        for l in range(0,T):
            all_pos.append([input_splitted[l].replace('_', ''), opt[l]])
    return all_pos

In [40]:
all_pos = ViterbiHMM(4)

AttributeError: module 'numpy' has no attribute 'sortstates'

In [37]:
same = 0

for i in range(0, len(all_pos)):
    if all_pos[i][0] == w_t_test[i][0]:
        same+=1
    else:
        print(i)

In [38]:
right_pos = 0
wrong_pos = 0
n = 0

for word in all_pos:
    if (word[1] == w_t_test[n][1]):
        right_pos +=1
    else :
        wrong_pos +=1
    n += 1

accuracy = right_pos/(right_pos+wrong_pos)

print(accuracy)
print(right_pos)
print(wrong_pos)

0.9417334606918892
22676
1403


In [33]:
states

array(['punct', 'adp', 'propn', 'noun', 'verb', 'det', 'cconj', 'pron',
       'adj', 'num', 'aux', 'sconj', 'adv', 'part', 'x'], dtype=object)

In [34]:
np.sort(states)

array(['adj', 'adp', 'adv', 'aux', 'cconj', 'det', 'noun', 'num', 'part',
       'pron', 'propn', 'punct', 'sconj', 'verb', 'x'], dtype=object)