In [35]:
import nltk

import numpy as np
import pandas as pd
import random
from sklearn.model_selection import train_test_split
import pprint, time

#Il dataset che si prende in esame è "Treebank Corpus" dalla libreria nltk.
nltk.download('treebank')
nltk.download('universal_tagset')
nltk_data = list(nltk.corpus.treebank.tagged_sents(tagset='universal'))

[nltk_data] Downloading package treebank to
[nltk_data]     /Users/gaetanopalmieri/nltk_data...
[nltk_data]   Package treebank is already up-to-date!
[nltk_data] Downloading package universal_tagset to
[nltk_data]     /Users/gaetanopalmieri/nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!


In [36]:
# Stampo per esempio le prime 2 frasi con i tags
print(nltk_data[:2])

[[('Pierre', 'NOUN'), ('Vinken', 'NOUN'), (',', '.'), ('61', 'NUM'), ('years', 'NOUN'), ('old', 'ADJ'), (',', '.'), ('will', 'VERB'), ('join', 'VERB'), ('the', 'DET'), ('board', 'NOUN'), ('as', 'ADP'), ('a', 'DET'), ('nonexecutive', 'ADJ'), ('director', 'NOUN'), ('Nov.', 'NOUN'), ('29', 'NUM'), ('.', '.')], [('Mr.', 'NOUN'), ('Vinken', 'NOUN'), ('is', 'VERB'), ('chairman', 'NOUN'), ('of', 'ADP'), ('Elsevier', 'NOUN'), ('N.V.', 'NOUN'), (',', '.'), ('the', 'DET'), ('Dutch', 'NOUN'), ('publishing', 'VERB'), ('group', 'NOUN'), ('.', '.')]]


In [37]:
# Stampo ogni paraole con i suoi rispettivi tag per le prime 2 frasi
for sent in nltk_data[:2]:
  for tuple in sent:
    print(tuple)

('Pierre', 'NOUN')
('Vinken', 'NOUN')
(',', '.')
('61', 'NUM')
('years', 'NOUN')
('old', 'ADJ')
(',', '.')
('will', 'VERB')
('join', 'VERB')
('the', 'DET')
('board', 'NOUN')
('as', 'ADP')
('a', 'DET')
('nonexecutive', 'ADJ')
('director', 'NOUN')
('Nov.', 'NOUN')
('29', 'NUM')
('.', '.')
('Mr.', 'NOUN')
('Vinken', 'NOUN')
('is', 'VERB')
('chairman', 'NOUN')
('of', 'ADP')
('Elsevier', 'NOUN')
('N.V.', 'NOUN')
(',', '.')
('the', 'DET')
('Dutch', 'NOUN')
('publishing', 'VERB')
('group', 'NOUN')
('.', '.')


In [38]:
# Split in train e test set con in rapporto 80:20
train_set,test_set =train_test_split(nltk_data,train_size=0.80,test_size=0.20)

In [39]:
# Creo la lista di train e test delle tagged words
train_tagged_words = [ tup for sent in train_set for tup in sent ]
test_tagged_words = [ tup for sent in test_set for tup in sent ]
print(len(train_tagged_words))
print(len(test_tagged_words))

80396
20280


In [40]:
# check del risultato per le prime 5
train_tagged_words[:5]

[('Tokyu', 'NOUN'),
 ('Group', 'NOUN'),
 (',', '.'),
 ('Mitsubishi', 'NOUN'),
 ('Estate', 'NOUN')]

In [41]:
#Controllo quanti tag univoci ci sono nei dati di train
tags = {tag for word,tag in train_tagged_words}
print(len(tags))
print(tags)
# Tutte le parole del vocabolario
vocab = {word for word,tag in train_tagged_words}

12
{'CONJ', 'VERB', 'PRT', 'ADV', 'NUM', 'ADJ', 'ADP', '.', 'X', 'PRON', 'DET', 'NOUN'}


In [42]:
# Calcolo le probabilità di emissione
def word_given_tag(word, tag, train_bag = train_tagged_words):
    tag_list = [pair for pair in train_bag if pair[1]==tag]
    count_tag = len(tag_list) # numero totale di volte che il tag i-esimo occorre
    w_given_tag_list = [pair[0] for pair in tag_list if pair[0]==word]
    #Calcolo il numero totale di volte che la parola i-esima occorre come tag corrente.
    count_w_given_tag = len(w_given_tag_list)
    return (count_w_given_tag, count_tag)

In [43]:
# Calcolo le probabilità di transizione
def t2_given_t1(t2, t1, train_bag = train_tagged_words):
    tags = [pair[1] for pair in train_bag]
    count_t1 = len([t for t in tags if t==t1])
    count_t2_t1 = 0
    for index in range(len(tags)-1):
        if tags[index]==t1 and tags[index+1] == t2:
            count_t2_t1 += 1
    return (count_t2_t1, count_t1)

In [44]:
# Creo la matrice di trasizione dei tags t x t, dove t è il numero di tags
# L'elemento (i,j) della matrice rappresenta P(j-esimo tag dopo l'i-esimo tag)

tags_matrix = np.zeros((len(tags), len(tags)), dtype='float32')
for i, t1 in enumerate(list(tags)):
    for j, t2 in enumerate(list(tags)):
        tags_matrix[i, j] = t2_given_t1(t2, t1)[0]/t2_given_t1(t2, t1)[1]
print(tags_matrix)

[[5.61167253e-04 1.58249155e-01 3.92817054e-03 5.55555560e-02
  3.92817073e-02 1.20089784e-01 5.10662161e-02 3.59147042e-02
  9.53984261e-03 6.17283955e-02 1.17845118e-01 3.46240193e-01]
 [4.97008720e-03 1.69903353e-01 3.11090667e-02 8.10860544e-02
  2.30096634e-02 6.35987148e-02 9.13943872e-02 3.54348831e-02
  2.21629083e-01 3.51587683e-02 1.34284392e-01 1.08421534e-01]
 [2.73224036e-03 4.02419984e-01 7.80640112e-04 8.97736102e-03
  5.23028895e-02 8.23575333e-02 2.10772827e-02 4.56674471e-02
  1.32708820e-02 1.79547220e-02 1.02654174e-01 2.49804839e-01]
 [6.73534069e-03 3.47068131e-01 1.62440576e-02 7.64659271e-02
  3.16957198e-02 1.24405704e-01 1.17670365e-01 1.40253559e-01
  2.21870039e-02 1.50554674e-02 7.29001611e-02 2.93185413e-02]
 [1.36249550e-02 1.93617791e-02 2.58157048e-02 3.58551461e-03
  1.85371101e-01 3.33452858e-02 3.47794890e-02 1.16170667e-01
  2.14413762e-01 1.43420580e-03 2.86841160e-03 3.49229127e-01]
 [1.54028432e-02 1.26382308e-02 1.14533966e-02 4.93680872e-03
  2

In [45]:
# Converto la matrice in df per miglior leggibilità
# La tabella è la stessa di quella mostrata nel pdf riferito al POS TAG
tags_df = pd.DataFrame(tags_matrix, columns = list(tags), index=list(tags))
print(tags_df)

          CONJ      VERB       PRT       ADV       NUM       ADJ       ADP  \
CONJ  0.000561  0.158249  0.003928  0.055556  0.039282  0.120090  0.051066   
VERB  0.004970  0.169903  0.031109  0.081086  0.023010  0.063599  0.091394   
PRT   0.002732  0.402420  0.000781  0.008977  0.052303  0.082358  0.021077   
ADV   0.006735  0.347068  0.016244  0.076466  0.031696  0.124406  0.117670   
NUM   0.013625  0.019362  0.025816  0.003586  0.185371  0.033345  0.034779   
ADJ   0.015403  0.012638  0.011453  0.004937  0.021722  0.068720  0.078199   
ADP   0.000883  0.007824  0.001388  0.012115  0.062090  0.106386  0.016532   
.     0.057372  0.088248  0.002350  0.055556  0.079594  0.043803  0.090491   
X     0.010368  0.202074  0.185297  0.025071  0.001697  0.015834  0.147031   
PRON  0.005898  0.488657  0.011797  0.034483  0.008167  0.075318  0.021325   
DET   0.000431  0.040115  0.000288  0.012365  0.022574  0.203019  0.008483   
NOUN  0.042147  0.147255  0.043361  0.016564  0.008976  0.012054

In [24]:
def Viterbi(words, train_bag = train_tagged_words):
    state = []
    T = list(set([pair[1] for pair in train_bag]))

    for key, word in enumerate(words):
        # Inizializzo la lista delle colonne di probabilità per una data osservazione
        p = []
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
            # Calcolo probabilità di emissione e di stato
            emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p
            p.append(state_probability)

        pmax = max(p)
        # ritorno lo stato per il quale ogni probabilità è la massima in quello stato
        state_max = T[p.index(pmax)]
        state.append(state_max)
    return list(zip(words, state))

In [25]:
# Testo l'algoritmo di Viterbo su un sott'insieme di frasi del dataset di test
random.seed(1234) # Definisco un random seed per avere qualche frase diversi su run multipli
# Scelgo in modo random 10 numeri
rndom = [random.randint(1,len(test_set)) for x in range(10)]
# Lista delle 10 frasi random sulle quali testare
test_run = [test_set[i] for i in rndom]
# Lista delle tagged words
test_run_base = [tup for sent in test_run for tup in sent]
# Lista delle untagged words
test_tagged_words = [tup[0] for sent in test_run for tup in sent]

In [26]:
# Vado a testare sole 10 frasi per osservarne l'accuratezza in quanto tutto il test ci impiega molto tempo(dopo comunque lo si farà)
start = time.time()
tagged_seq = Viterbi(test_tagged_words)
end = time.time()
difference = end-start

print("Tempo impiegato: ", difference)
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j]
accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy: ',accuracy*100)

Tempo impiegato:  34.76898694038391
Viterbi Algorithm Accuracy:  91.01123595505618


### Codice per testare tutte le frasi

In [27]:
test_tagged_words = [tup for sent in test_set for tup in sent]
test_untagged_words = [tup[0] for sent in test_set for tup in sent]
start = time.time()
tagged_seq = Viterbi(test_untagged_words)
end = time.time()
difference = end-start
print("Tempo impiegato: ", difference)
check = [i for i, j in zip(test_tagged_words, test_untagged_words) if i == j]
accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy: ',accuracy*100)

Tempo impiegato:  2587.7308809757233
Viterbi Algorithm Accuracy:  0.0


In [46]:
# Per migliorare le prestazioni viene specificato un Tagger rule-based per parole non conosciute specificando i patterns per il tagging
patterns = [
    (r'.*ing$', 'VERB'),              # Gerundio
    (r'.*ed$', 'VERB'),               # Past tense in inglese
    (r'.*es$', 'VERB'),               # Verbo
    (r'.*\'s$', 'NOUN'),              # Nome possessivo
    (r'.*s$', 'NOUN'),                # Nome plurale
    (r'\*T?\*?-[0-9]+$', 'X'),        # X
    (r'^-?[0-9]+(.[0-9]+)?$', 'NUM'), # Numeri cardinali
    (r'.*', 'NOUN')                   # Nome
]

# rule-based Tagger
rule_based_tagger = nltk.RegexpTagger(patterns)

In [47]:
#Modifica di Viterbi per includere il tagger rule-based
def Viterbi_rule_based(words, train_bag = train_tagged_words):
    state = []
    T = list(set([pair[1] for pair in train_bag]))

    for key, word in enumerate(words):
        #Inizializzo la lista delle probabilità per una data osservazione
        p = []
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]

            # Computo proba di emissione e transizione
            emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p
            p.append(state_probability)
        pmax = max(p)
        state_max = rule_based_tagger.tag([word])[0][1]
        if(pmax==0):
            state_max = rule_based_tagger.tag([word])[0][1]
        else:
            if state_max != 'X':
                # Ritorno lo stato per il quale la probabilità è MAX
                state_max = T[p.index(pmax)]

        state.append(state_max)
    return list(zip(words, state))

In [None]:
#Testo l'accuratezza su un subset del test set di partenza
start = time.time()
tagged_seq = Viterbi_rule_based(test_tagged_words)
end = time.time()
difference = end-start

print("Tempo impiegato in secondi: ", difference)

# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j]

accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy: ',accuracy*100)

In [None]:
#Check di valutazione sulla stessa frase per Viterbi e Viterbi rule-based
test_sent="Will can see Marry"
pred_tags_rule=Viterbi_rule_based(test_sent.split())
pred_tags_withoutRules= Viterbi(test_sent.split())
print(pred_tags_rule)
print(pred_tags_withoutRules)
# Will e Mary vengono etichettati come NOME e risultati non conosciute dall'algoritmo di Viterbi.