In [1]:
import nltk
import numpy as np
import pandas as pd
import random
from sklearn.model_selection import train_test_split
import pprint, time

# Preprocessing of data

Here I will aim to form an array of arrays of tuples with each word and the following punctuation mark.

Example of single sentence:
[('I', ' '),
('love', ' '),
('kittens', ','),
('and', ' ').
('you', '?')]

In [2]:
from os import listdir
from os.path import isfile, join

datapath = 'target-sentences-tokenized'
onlyfiles = [join(datapath, f) for f in listdir(datapath) if isfile(join(datapath, f))]

In [3]:
data = []
punctuations = {',', '.', '!', '?'}
ignore = {'"', "'", '-', '–', '—', ':', ';'}
for file in onlyfiles:
    with open(file, 'r') as f:
        lines = f.readlines()
    for l in lines:
        words = l.split()
        curr_sent = []
        i = 0
        while i < len(words) - 1:
            current_pair = [words[i].lower()]
            if words[i].isnumeric():
                current_pair = ['NUM']
            if words[i] in ignore:
                i += 1
                continue
            if words[i + 1] in punctuations:
                current_pair.append(words[i + 1])
                i += 1
            elif words[i + 1] in ignore:
                current_pair.append(' ')
                i += 1
            else:
                current_pair.append(' ')
            curr_sent.append(tuple(current_pair))
            i += 1
        data.append(curr_sent)

In [4]:
data[:10]

[[('вона', ' '),
  ('влаштувала', ' '),
  ('свою', ' '),
  ('дошку', ' '),
  ('і', ' '),
  ('почала', ' '),
  ('малювати', ' '),
  ('ручкою', ' '),
  ('та', ' '),
  ('чорнилом', ','),
  ('щоб', ' '),
  ('проілюструвати', ' '),
  ('журнальну', ' '),
  ('історію', '.')],
 [('молоді', ' '),
  ('художники', ' '),
  ('повинні', ' '),
  ('прокласти', ' '),
  ('собі', ' '),
  ('шлях', ' '),
  ('до', ' '),
  ('мистецтва', ','),
  ('малюючи', ' '),
  ('картинки', ' '),
  ('для', ' '),
  ('журнальних', ' '),
  ('історій', ','),
  ('які', ' '),
  ('молоді', ' '),
  ('автори', ' '),
  ('пишуть', ','),
  ('щоб', ' '),
  ('прокласти', ' '),
  ('собі', ' '),
  ('шлях', ' '),
  ('до', ' '),
  ('літератури', '.')],
 [('поки', ' '),
  ('сью', ' '),
  ('замальовувала', ' '),
  ('пару', ' '),
  ('елегантних', ' '),
  ('штанів', ' '),
  ('для', ' '),
  ('верхової', ' '),
  ('їзди', ' '),
  ('та', ' '),
  ('монокль', ' '),
  ('фігури', ' '),
  ('героя', ','),
  ('ковбоя', ' '),
  ('айдахо', ','),
  ('вона',

In [5]:
len(data)

29760

---------------

In [6]:
# split data into training and validation set in the ratio 90:10
train_set, test_set = train_test_split(data, train_size=0.9, test_size=0.009, random_state=101)

# create list of train and test tagged words
train_tagged_words = [tup for sent in train_set for tup in sent]
test_tagged_words = [tup for sent in test_set for tup in sent]
print(len(train_tagged_words))
print(len(test_tagged_words))

319448
3487


In [7]:
# check some tagged words.
train_tagged_words[:5]

[('пишаюся', ','), ('що', ' '), ('тобі', ' '), ('вдається', ' '), ('і', ' ')]

In [8]:
#use set datatype to check how many unique tags are present in training data
tags = {tag for word, tag in train_tagged_words}
print(len(tags))
print(tags)

# check total words in vocabulary
vocab = {word for word, tag in train_tagged_words}

5
{'?', '!', ',', ' ', '.'}


In [13]:
# compute Emission Probability
def word_given_tag(word, tag, train_bag = train_tagged_words):
    tag_list = [pair for pair in train_bag if pair[1]==tag]
    count_tag = len(tag_list)#total number of times the passed tag occurred in train_bag
    w_given_tag_list = [pair[0] for pair in tag_list if pair[0]==word]
#now calculate the total number of times the passed word occurred as the passed tag.
    count_w_given_tag = len(w_given_tag_list)


    return (count_w_given_tag, count_tag)


# compute  Transition Probability
def t2_given_t1(t2, t1, train_bag=train_tagged_words):
    tags = [pair[1] for pair in train_bag]
    count_t1 = len([t for t in tags if t==t1])
    count_t2_t1 = 0
    for index in range(len(tags)-1):
        if tags[index]==t1 and tags[index+1] == t2:
            count_t2_t1 += 1
    return (count_t2_t1, count_t1)

# creating t x t transition matrix of tags, t= no of tags
# Matrix(i, j) represents P(jth tag after the ith tag)

tags_matrix = np.zeros((len(tags), len(tags)), dtype='float32')
for i, t1 in enumerate(list(tags)):
    for j, t2 in enumerate(list(tags)):
        tags_matrix[i, j] = t2_given_t1(t2, t1)[0]/t2_given_t1(t2, t1)[1]

print(tags_matrix)

[[0.00756501 0.0108747  0.09598108 0.8600473  0.02553191]
 [0.00513611 0.01386749 0.08577298 0.86748844 0.02773498]
 [0.00314518 0.00539969 0.08219216 0.8954854  0.01377755]
 [0.00713678 0.00561741 0.11805072 0.7934702  0.07572489]
 [0.0062506  0.01202039 0.09948072 0.8529666  0.02923358]]


In [14]:
# convert the matrix to a df for better readability
# the table is same as the transition table shown in section 3 of article
tags_df = pd.DataFrame(tags_matrix, columns=list(tags), index=list(tags))
display(tags_df)

Unnamed: 0,?,!,",",Unnamed: 4,.
?,0.007565,0.010875,0.095981,0.860047,0.025532
!,0.005136,0.013867,0.085773,0.867488,0.027735
",",0.003145,0.0054,0.082192,0.895485,0.013778
,0.007137,0.005617,0.118051,0.79347,0.075725
.,0.006251,0.01202,0.099481,0.852967,0.029234


In [15]:
from tqdm import tqdm


def Viterbi(words, train_bag=train_tagged_words):
    state = []
    T = list(set([pair[1] for pair in train_bag]))
    print(T)
    for key, word in enumerate(tqdm(words)):
        # initialise list of probability column for a given observation
        p = []
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['.', tag] + tags_df.loc['?', tag] + tags_df.loc['!', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]

            # compute emission and state probabilities
            count_word_tag, count_tag = word_given_tag(words[key], tag)
            emission_p = count_word_tag / count_tag
#             emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p
            p.append(state_probability)

        pmax = max(p)
        # getting state for which probability is maximum
        state_max = T[p.index(pmax)]
        state.append(state_max)
    return list(zip(words, state))


In [16]:
# Let's test our Viterbi algorithm on a few sample sentences of test dataset
random.seed(1234)      #define a random seed to get same sentences when run multiple times

# choose random 10 numbers
rndom = [random.randint(1, len(test_set)) for x in range(10)]

# list of 10 sents on which we test the model
test_run = [test_set[i] for i in rndom]

# list of tagged words
test_run_base = [tup for sent in test_run for tup in sent]

# list of untagged words
test_tagged_words = [tup[0] for sent in test_run for tup in sent]

# Here We will only test 10 sentences to check the accuracy
# as testing the whole training set takes huge amount of time
start = time.time()
tagged_seq = Viterbi(test_tagged_words)
end = time.time()
difference = end-start

print("Time taken in seconds: ", difference)

# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j]

accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy: ', accuracy * 100)

['?', '!', ',', ' ', '.']


100%|██████████| 126/126 [00:18<00:00,  6.75it/s]

Time taken in seconds:  18.708982706069946
Viterbi Algorithm Accuracy:  79.36507936507937





In [17]:
tagged_seq

[('у', ' '),
 ('статті', ' '),
 ('NUM', ' '),
 ('закону', ' '),
 ('україни', ' '),
 ('про', ' '),
 ('електронний', ' '),
 ('цифровий', '?'),
 ('підпис', ' '),
 ('сказано', ' '),
 ('що', ' '),
 ('“', ' '),
 ('електронний', ' '),
 ('підпис', ' '),
 ('не', ' '),
 ('може', ' '),
 ('бути', ' '),
 ('визнаний', ' '),
 ('недійсним', '?'),
 ('лише', ' '),
 ('через', ' '),
 ('те', ','),
 ('що', ' '),
 ('він', ' '),
 ('має', ' '),
 ('електронну', ' '),
 ('форму', ' '),
 ('або', ' '),
 ('не', ' '),
 ('ґрунтується', ' '),
 ('на', ' '),
 ('посиленому', '?'),
 ('сертифікаті', '?'),
 ('ключа', ' '),
 ('”', ' '),
 ('а', ' '),
 ('у', ' '),
 ('статті', ' '),
 ('NUM', ' '),
 ('що', ' '),
 ('“', ' '),
 ('юридичні', '?'),
 ('та', ' '),
 ('фізичні', ' '),
 ('особи', ' '),
 ('можуть', ' '),
 ('…', ' '),
 ('використовувати', ' '),
 ('електронний', ' '),
 ('цифровий', '?'),
 ('підпис', ' '),
 ('без', ' '),
 ('сертифіката', '?'),
 ('ключа', ' '),
 ('”', ' '),
 ('розум', ' '),
 ('у', ' '),
 ('чому', ' '),
 ('проб

In [18]:
len(test_tagged_words)

126

In [68]:
def measure_sign(predicted_seq, test_seq, punctuation_sign):
    test_pos = set([i for i, (_, second) in enumerate(test_seq) if second == punctuation_sign])
    predicted_pos = set([i for i, (_, second) in enumerate(predicted_seq) if second == punctuation_sign])
    
    # how many times was a punctuation sign restored correctly
    true_positives = len(test_pos & predicted_pos)
    
    # how many times wasn't a punctuation sign restored when it should habe been
    false_negatives = len(test_pos - predicted_pos)
    
    # how many times was a punctuation sign restored when it shouldn't have been there
    false_positives = len(predicted_pos - test_pos)
    
    no_test_pos = set([i for i, (_, second) in enumerate(test_seq) if second != punctuation_sign])
    no_predicted_pos = set([i for i, (_, second) in enumerate(predicted_seq) if second != punctuation_sign])
    true_negatives = len(no_test_pos & no_predicted_pos)
    
    return true_positives, false_positives, false_negatives, true_negatives


def standard_tests(true_positives, false_positives, false_negatives, true_negatives):
    accuracy = (true_positives + true_negatives) / \
        (true_positives + true_negatives + false_positives + false_negatives) \
        if (true_positives + true_negatives + false_positives + false_negatives) > 0 else np.nan
    precision = true_positives / (true_positives + false_positives) \
        if (true_positives + false_positives) > 0 else np.nan
    recall = true_positives / (true_positives + false_negatives) \
        if (true_positives + false_negatives) > 0 else np.nan
    f_score = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else np.nan
    
    return f"{accuracy:.3f}", f"{precision:.3f}", f"{recall:.3f}", f"{f_score:.3f}"

In [70]:
def evaluate(predicted_seq, test_seq):
    tp, fp, fn, tn = 0,0,0,0
    print("{:<16} {:<9} {:<12} {:<9} {:<9}".format("Punctuation", "Accuracy", "Precision", "Recall", "F-score"))
    for tag in tags:
        true_positives, false_positives, false_negatives, true_negatives = \
            measure_sign(predicted_seq, test_seq, tag)
        tp += true_positives
        fp += false_positives
        fn += false_negatives
        tn += true_negatives
        accuracy, precision, recall, f_score = \
            standard_tests(true_positives, false_positives, false_negatives, true_negatives)
        print("{:<16} {:<9} {:<12} {:<9} {:<9}".format(tag, accuracy, precision, recall, f_score))

    accuracy, precision, recall, f_score = standard_tests(tp, fp, fn, tn)
    print("{:<16} {:<9} {:<12} {:<9} {:<9}".format("Overall", accuracy, precision, recall, f_score))

evaluate(tagged_seq, test_run_base)

Punctuation      Accuracy  Precision    Recall    F-score  
?                0.921     0.000        0.000     nan      
!                1.000     nan          nan       nan      
,                0.905     0.333        0.200     0.250    
                 0.817     0.883        0.907     0.895    
.                0.944     nan          0.000     nan      
Overall          0.917     0.794        0.794     0.794    


In [None]:
# Code to test all the test sentences
# (takes alot of time to run s0 we won't run it here)
# tagging the test sentences()
test_tagged_words = [tup for sent in test_set for tup in sent]
test_untagged_words = [tup[0] for sent in test_set for tup in sent]


print('Starting testing')
# start = time.time()
tagged_seq = Viterbi(test_untagged_words)
# end = time.time()
# difference = end-start

# print("Time taken in seconds: ", difference)


In [None]:
# accuracy
check = [i for i, j in zip(tagged_seq, test_untagged_words) if i == j]

accuracy = len(check) / len(tagged_seq)
print('Viterbi Algorithm Accuracy: ', accuracy * 100)