In [1]:
# Importing libraries
import nltk
import numpy as np
import pandas as pd
import random
from sklearn.model_selection import train_test_split
import pprint, time
import conllu
from nltk.tag import UnigramTagger

# Загрузка данных из CoNLL-U файла
with open('ru_syntagrus-ud-dev.conllu', 'r', encoding='utf-8') as file:
    conllu_data = file.read()

filename = 'ru_syntagrus-ud-dev1.conllu'

def read_data(filename):
    sentences = []
    with open(filename, 'r', encoding='utf-8') as f:
        current_sentence = []
        for line in f:
            parts = line.strip().split('\t')
            if len(parts) == 10:
                word = parts[1]
                tag = parts[3]
                current_sentence.append((word, tag))
            elif not line.strip():  # Пустая строка разделяет предложения
                if current_sentence:
                    sentences.append(current_sentence)
                    current_sentence = []
    return sentences
    
# Пример использования
data = read_data(filename)
print(data[:1])  # Вывести первые два предложения для проверки

[[('Моя', 'DET'), ('мать', 'NOUN'), (',', 'PUNCT'), ('Анна', 'PROPN'), ('Всеволодовна', 'PROPN'), ('Мохова', 'PROPN'), ('(', 'PUNCT'), ('Дмитриева', 'PROPN'), (')', 'PUNCT'), (',', 'PUNCT'), ('родилась', 'VERB'), ('27', 'ADJ'), ('марта', 'NOUN'), ('1913', 'ADJ'), ('года', 'NOUN'), ('.', 'PUNCT')]]


In [10]:
for sent in data[:1]:
    for tuple in sent:
        print(tuple)

('Моя', 'DET')
('мать', 'NOUN')
(',', 'PUNCT')
('Анна', 'PROPN')
('Всеволодовна', 'PROPN')
('Мохова', 'PROPN')
('(', 'PUNCT')
('Дмитриева', 'PROPN')
(')', 'PUNCT')
(',', 'PUNCT')
('родилась', 'VERB')
('27', 'ADJ')
('марта', 'NOUN')
('1913', 'ADJ')
('года', 'NOUN')
('.', 'PUNCT')


In [3]:
train_set,test_set =train_test_split(data,train_size=0.80,test_size=0.20,random_state = 101)

In [12]:
train_tagged_words = [ tup for sent in train_set for tup in sent ]
test_tagged_words = [ tup for sent in test_set for tup in sent ]
print(len(train_tagged_words))
print(len(test_tagged_words))

unique_train_words = set(train_tagged_words)
unique_test_words = set(test_tagged_words)

print(len(unique_train_words))
print(len(unique_test_words))


1086267
273625
130239
56598


In [13]:
train_tagged_words[:5]

[('Такие', 'DET'),
 ('распродажи', 'NOUN'),
 ('долго', 'ADV'),
 ('не', 'PART'),
 ('идут', 'VERB')]

In [14]:
#use set datatype to check how many unique tags are present in training data
tags = {tag for word,tag in train_tagged_words}
print(len(tags))
print(tags)
 
# check total words in vocabulary
vocab = {word for word,tag in train_tagged_words}

18
{'INTJ', 'AUX', 'PROPN', '_', 'NUM', 'NOUN', 'SCONJ', 'ADJ', 'SYM', 'PRON', 'ADP', 'PART', 'X', 'VERB', 'PUNCT', 'DET', 'ADV', 'CCONJ'}


In [15]:
# compute Emission Probability
def word_given_tag(word, tag, train_bag = train_tagged_words):
    tag_list = [pair for pair in train_bag if pair[1]==tag]
    count_tag = len(tag_list)#total number of times the passed tag occurred in train_bag
    w_given_tag_list = [pair[0] for pair in tag_list if pair[0]==word]
#now calculate the total number of times the passed word occurred as the passed tag.
    count_w_given_tag = len(w_given_tag_list)
 
     
    return (count_w_given_tag, count_tag)

In [16]:
# compute  Transition Probability
def t2_given_t1(t2, t1, train_bag = train_tagged_words):
    tags = [pair[1] for pair in train_bag]
    count_t1 = len([t for t in tags if t==t1])
    count_t2_t1 = 0
    for index in range(len(tags)-1):
        if tags[index]==t1 and tags[index+1] == t2:
            count_t2_t1 += 1
    return (count_t2_t1, count_t1)

In [17]:
# creating t x t transition matrix of tags, t= no of tags
# Matrix(i, j) represents P(jth tag after the ith tag)
 
tags_matrix = np.zeros((len(tags), len(tags)), dtype='float32')
for i, t1 in enumerate(list(tags)):
    for j, t2 in enumerate(list(tags)): 
        tags_matrix[i, j] = t2_given_t1(t2, t1)[0]/t2_given_t1(t2, t1)[1]
 
print(tags_matrix)

[[0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
  0.00000000e+00 4.04624268e-02 0.00000000e+00 5.78034669e-03
  0.00000000e+00 1.73410401e-02 5.78034669e-03 5.78034669e-02
  0.00000000e+00 0.00000000e+00 8.55491340e-01 0.00000000e+00
  1.15606934e-02 5.78034669e-03]
 [0.00000000e+00 2.67462190e-02 8.33247602e-03 7.20090524e-04
  1.09042283e-02 9.30974185e-02 1.64592115e-03 1.63460553e-01
  1.02870072e-04 3.04495431e-02 5.65785430e-02 5.54469712e-02
  1.02870072e-04 3.78459007e-01 4.83489335e-02 2.49974281e-02
  9.73150879e-02 3.29184229e-03]
 [0.00000000e+00 1.10809533e-02 1.86707839e-01 5.22921386e-04
  5.22921374e-03 4.19831164e-02 1.51896209e-03 2.54488401e-02
  2.98812229e-04 6.92248298e-03 6.94240406e-02 2.05184389e-02
  2.56480486e-03 1.26422465e-01 4.13282216e-01 4.38257912e-03
  3.00306287e-02 5.36616966e-02]
 [0.00000000e+00 6.52741524e-04 3.78590077e-02 7.18015656e-02
  5.15665784e-02 1.37075722e-01 1.10966060e-02 7.50652775e-02
  6.52741524e-04 1.76240206e-02 2

In [18]:
# convert the matrix to a df for better readability
#the table is same as the transition table shown in section 3 of article
tags_df = pd.DataFrame(tags_matrix, columns = list(tags), index=list(tags))
display(tags_df)

Unnamed: 0,INTJ,AUX,PROPN,_,NUM,NOUN,SCONJ,ADJ,SYM,PRON,ADP,PART,X,VERB,PUNCT,DET,ADV,CCONJ
INTJ,0.0,0.0,0.0,0.0,0.0,0.040462,0.0,0.00578,0.0,0.017341,0.00578,0.057803,0.0,0.0,0.855491,0.0,0.011561,0.00578
AUX,0.0,0.026746,0.008332,0.00072,0.010904,0.093097,0.001646,0.163461,0.000103,0.03045,0.056579,0.055447,0.000103,0.378459,0.048349,0.024997,0.097315,0.003292
PROPN,0.0,0.011081,0.186708,0.000523,0.005229,0.041983,0.001519,0.025449,0.000299,0.006922,0.069424,0.020518,0.002565,0.126422,0.413282,0.004383,0.030031,0.053662
_,0.0,0.000653,0.037859,0.071802,0.051567,0.137076,0.011097,0.075065,0.000653,0.017624,0.240209,0.075718,0.000653,0.01893,0.120757,0.023499,0.087467,0.029373
NUM,0.0,0.001343,0.004253,0.000746,0.00664,0.568753,0.000149,0.070357,0.047154,0.006939,0.091845,0.00388,0.000373,0.021786,0.151832,0.007237,0.007312,0.009401
NOUN,8e-06,0.008817,0.043902,0.0006,0.006212,0.143862,0.001804,0.077051,0.000221,0.013798,0.115326,0.018311,0.000519,0.096028,0.383101,0.016685,0.026833,0.046924
SCONJ,0.0,0.014338,0.034029,0.001277,0.008986,0.209281,0.0055,0.098748,0.0,0.139504,0.128357,0.066585,0.000786,0.151043,0.030935,0.045519,0.063639,0.001473
ADJ,0.0,0.007205,0.015842,0.000544,0.002787,0.6723,0.000573,0.07269,6.7e-05,0.00522,0.023142,0.006279,0.000239,0.027455,0.131465,0.003445,0.00543,0.025318
SYM,0.0,0.00113,0.035028,0.00226,0.257627,0.152542,0.0,0.058757,0.00339,0.00113,0.084746,0.00565,0.012429,0.045198,0.301695,0.013559,0.015819,0.00904
PRON,0.0,0.036094,0.005418,0.002102,0.004627,0.065439,0.004589,0.072361,5.8e-05,0.042842,0.068293,0.100511,3.9e-05,0.325364,0.145204,0.019744,0.100106,0.007211


In [13]:
def Viterbi(words, train_bag = train_tagged_words):
    state = []
    T = list(set([pair[1] for pair in train_bag]))
     
    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = [] 
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['PUNCT', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
                 
            # compute emission and state probabilities
            emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p    
            p.append(state_probability)
             
        pmax = max(p)
        # getting state for which probability is maximum
        state_max = T[p.index(pmax)] 
        state.append(state_max)
    return list(zip(words, state))

In [19]:
# Let's test our Viterbi algorithm on a few sample sentences of test dataset
random.seed(1234)      #define a random seed to get same sentences when run multiple times
 
# choose random 10 numbers
rndom = [random.randint(1,len(test_set)) for x in range(10)]
 
# list of 10 sents on which we test the model
test_run = [test_set[i] for i in rndom]
 
# list of tagged words
test_run_base = [tup for sent in test_run for tup in sent]
 
# list of untagged words
test_tagged_words = [tup[0] for sent in test_run for tup in sent]

In [15]:
#Here We will only test 10 sentences to check the accuracy
#as testing the whole training set takes huge amount of time
start = time.time()
tagged_seq = Viterbi(test_tagged_words)
end = time.time()
difference = end-start
 
print("Time taken in seconds: ", difference)
 
# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
 
accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy: ',accuracy*100)

Time taken in seconds:  155.59310126304626
Viterbi Algorithm Accuracy:  84.31372549019608


In [16]:
test_sent="Вышел зайчик погулять"
pred_tags_withoutRules= Viterbi(test_sent.split())
print(pred_tags_withoutRules)

[('Вышел', 'VERB'), ('зайчик', 'VERB'), ('погулять', 'VERB')]


In [20]:
#To improve the performance,we specify a rule base tagger for unknown words 
# specify patterns for tagging
patterns = [
    (r'\*T?\*?-[0-9]+$', 'X'),        # X
    (r'^-?[0-9]+(.[0-9]+)?$', 'NUM'), # cardinal numbers
    (r'.*', 'NOUN'),                  # nouns
    (r'.*чик$', 'NOUN'),              # verb 
    (r'.*ченок$', 'NOUN'),
    (r'.*чёнок$', 'NOUN'),
    (r'.*(ый|ий|ого|его|ому|ему|ым|им|ом|ем)$', 'ADJ'),
    (r'.*(ешь|ишь|ет|ит|ем|им|ете|ите|ут|ют|ат|ят)$', 'VERB'),    
]
 
# rule based tagger
rule_based_tagger = nltk.RegexpTagger(patterns)

In [21]:
#modified Viterbi to include rule based tagger in it
def Viterbi_rule_based(words, train_bag = train_tagged_words):
    state = []
    T = list(set([pair[1] for pair in train_bag]))
     
    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = [] 
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['PUNCT', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
                 
            # compute emission and state probabilities
            emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p    
            p.append(state_probability)
             
        pmax = max(p)
        state_max = rule_based_tagger.tag([word])[0][1]       
        
         
        if(pmax==0):
            state_max = rule_based_tagger.tag([word])[0][1] # assign based on rule based tagger
        else:
            if state_max != 'X':
                # getting state for which probability is maximum
                state_max = T[p.index(pmax)]                
             
         
        state.append(state_max)
    return list(zip(words, state))

In [22]:
#test accuracy on subset of test data 
start = time.time()
tagged_seq = Viterbi_rule_based(test_tagged_words)
end = time.time()
difference = end-start
 
print("Time taken in seconds: ", difference)
 
# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
 
accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy: ',accuracy*100)

Time taken in seconds:  1242.6543123722076
Viterbi Algorithm Accuracy:  97.34513274336283


In [25]:
test_sent="Старичок, используя кастыль, спускается по лестнице"
pred_tags_rule=Viterbi_rule_based(test_sent.split())
print(pred_tags_rule)

[('Старичок,', 'NOUN'), ('используя', 'VERB'), ('кастыль,', 'NOUN'), ('спускается', 'VERB'), ('по', 'ADP'), ('лестнице', 'NOUN')]
