### Import Functions

In [1]:
import json
import sklearn_crfsuite
from sklearn_crfsuite import metrics
import classla
import json
import re
from numpy import array, argmax
from sklearn.preprocessing import LabelEncoder
from keras.preprocessing.text import Tokenizer
from sklearn.metrics import classification_report
from keras.models import model_from_json
from tensorflow.keras.utils import to_categorical

### General Utilities

In [2]:
# input - name of a text file with one sentence per line
# output - list of sentences (strings)
def read_file_as_list_of_sentences(input_file_name):
    with open(input_file_name, "r") as input_file:
        return input_file.read().splitlines()

In [3]:
# inpit - list of strings
# output - list of strings
def add_special_symbol_at_start_of_each_sentence(data):
    return ['^{0}'.format(sentence) for sentence in data]

In [4]:
# input - list of sentences (strings) and a classla pipeline (POS tokenize or tokenize)
# output - list of dictionaries ((POS) tokenized sentences) 
def run_through_classla_pipeline(list_of_sentences, pipeline):
    return [pipeline(sentence).to_dict()[0][0] for sentence in list_of_sentences]

In [5]:
# input - list of dictionaries (a dictionary for each word - the tokenized version of the sentence)
# output - list of dictionaries (dictionaries that contain punctuation one after another are squashed into one)
def squash_punctuation(sentence):
    new_sentence = []

    for i in range(len(sentence)):
        if sentence[i]['text'] in ',()";.?!:-':
            if len(new_sentence) > 0 and all(character in ',()";.?!:-' for character in new_sentence[-1]['text']):
                new_sentence[-1]['text'] += sentence[i]['text']
            else:
                new_sentence.append(sentence[i])
        else:
            new_sentence.append(sentence[i])
            
    return new_sentence

In [6]:
# input - label (string)
# output - transformed label (string) - one that is part of the allowed labels
def transform_label(label):
    label_priorities = {1: ',', 2: '?', 3: '!', 4: '.', 5: ':',
                        6: '-', 7: ';', 8: '(', 9: ')', 10: '"',
                        11: '...'}
    
    allowed_labels = [',', '?', '!', '.', ':', '-', ';', '(', ')', '"',
                      '', '...', '",', '),', '".', ').', ':"']
    
    matched_label = next((allowed_label for allowed_label in allowed_labels[11:] if allowed_label in label),
                         False)

    
    if label in allowed_labels:
        return label
    elif matched_label:
        return matched_label
    else:
        for i in range(1, 12):
            if label_priorities[i] in label:
                return label_priorities[i]
    
    return ''

In [7]:
# input - sentence and an index of a word (dictionary) - assigns a label to each word based on the punctuation after
# output - a label - None for punctuation, punctuation for words followed by punctuation and empty for words if they are not
def word2label(sentence, i):
    if all(character in ',()";.?!:-' for character in sentence[i]['text']):
        return None

    if i < len(sentence) - 1:
        if all(character in ',()";.?!:-' for character in sentence[i+1]['text']):
            label = transform_label(sentence[i+1]['text'])
            return label
    
    return ''

In [8]:
# input - sentence (list of dictionaries)
# output - list of labels (strings) with None labels for punctuation being filtered out
def sent2labels(sentence):
    return [label for label in (word2label(sentence, i) for i in range(len(sentence))) if label != None]

In [9]:
# input - list of dictionaries (a dictionary for each word - the tokenized version of the sentence)
# output - new sentence (string) with the punctuation removed
def remove_punctuation(sentence):
    new_sentence = ''

    for i in range(len(sentence)):
        if all(character in ',()";.?!:-' for character in sentence[i]['text']):
            pass
        else:
            new_sentence = new_sentence + sentence[i]['text'] + ' '
    
    return new_sentence

In [10]:
# input - sentence (list of dictionaries)
# output - True/False depending on whether the sentence contains interrogative word or not
def contains_interrogative_word(sentence):
    for i in range(len(sentence)):
        if sentence[i]['xpos'].startswith('Pi'):
            return True
    
    return False

In [11]:
# input - sentence (list of dictionaries)
# output - True/False depending on whether the sentence contains interrogative particle or not
def contains_interrogative_particle(sentence):
    for i in range(len(sentence)):
        if sentence[i]['xpos'] == 'Ti':
            return True
    
    return False

In [12]:
# input - sentence (list of dictionaries)
# output - True/False depending on whether the sentence contains imperative verb or not
def contains_imperative_verb(sentence):
    for i in range(len(sentence)):
        if sentence[i]['xpos'][0] == 'V' and sentence[i]['xpos'][4] == 'z':
            return True
    
    return False

In [13]:
# input - sentence (list of dictionaries) and index of a word (dictionary)
# output - True/False depending on whether the sentence contains relative pronoun before and the tag
#          of the pronoun in the sentence (returns '' if False)
def contains_relative_pronoun_before(sentence, i):
    for word_i in reversed(range(len(sentence[:i]))):
        if sentence[word_i]['xpos'].startswith('Pr'):
            return True
    
    return False

In [14]:
# input - sentence (list of dictionaries) and index of a word (dictionary)
# output - True/False depending on whether the sentence contains repetitive conjunction before
def contains_rep_conj_before(sentence, i):
    if i < len(sentence)-1 and sentence[i+1]['xpos'] in ('Cr', 'Cp'):
        for word_i in reversed(range(len(sentence[:i]))):
            if sentence[word_i]['xpos'] == sentence[i+1]['xpos']:
                return True
    
    return False

In [15]:
# input - sentence (list of dictionaries) and index of a word (dictionary)
# output - True/False depending on whether the sentence contains repetitive word before and the tag
#          of the word in the sentence (returns '' if False)
def contains_repetitive_word_before(sentence, i):
    for word_i in reversed(range(len(sentence[:i]))):
        if i < len(sentence)-1 and sentence[word_i]['text'].lower() == sentence[i+1]['text'].lower():
            return True, sentence[word_i]['xpos']
    
    return False, ''

In [16]:
# input - sentence (list of dictionaries)
# output - True/False depending on whether the sentence contains publicistic word
def contains_publicistic_word(sentence):
    publicistic_words_list = ['казва', 'каза', 'заяви', 'заявява', 'отбеляза', 'добави', 'добавя',
                              'цитира', 'твърди', 'обясни', 'посочи', 'допълни', 'подчерта', 'писа',
                              'изтъкна', 'посочва', 'пише', 'заключи', 'сподели', 'обяснява', 'предупреди',
                              'отбелязва', 'предупреждава', 'призова', 'допълва', 'съобщи', 'заяви', 'обяви',
                              'коментира']

    for i in range(len(sentence)):
        if sentence[i]['text'] in publicistic_words_list:
            return True
    
    return False

In [17]:
# input - sentence (list of dictionaries) and index of a word (dictionary)
# output - the count of the verbs before the word
def count_of_verbs_before(sentence, i):
    verbs_count = 0

    for word_i in range(len(sentence[:i])):
        if sentence[word_i]['upos'] == 'VERB':
            verbs_count += 1
    
    return verbs_count

In [18]:
# input - sentence (list of dictionaries) and index of a word (dictionary)
# output - the count of the verbs before the word
def between_two_verbs(sentence, i):
    verb_before = False
    verb_after = False

    for word_i in reversed(range(len(sentence[:(i+1)]))):
        if sentence[word_i]['upos'] == 'VERB':
            verb_before = True
    
    for word in sentence[(i+1):]:
        if word['upos'] == 'VERB':
            verb_after = True
    
    return verb_before and verb_after

In [19]:
# input - a full xpos tag as defined in BulTreeBank tagset (string) and a prefix word for the output dictionary keys (string)
# output - a dictionary with subtags generated from the full xpos tag with prefix word and their corresponding values
def split_xpos(xpos, word='word'):
    def gender_number_article(number, gender, article):
        gender_number_article = ''
        
        for feature in (gender, number, article):
            if feature:
                gender_number_article += feature
            else:
                gender_number_article += '-'
        
        return gender_number_article

    pos2features = {'N': {'xpos_type': xpos[:2], 'xpos_gender_number_article': xpos[2:5]},
                    'A': {'xpos_type': xpos[:1], 'xpos_gender_number_article': xpos[1:4]},
                    'H': {'xpos_type': xpos[:1], 'xpos_gender_number_article': xpos[1:4]},
                    'M': {'xpos_type': xpos[:2], 'xpos_gender_number_article': xpos[2:5]},
                    'V': {'xpos_type': xpos[:2],
                          'xpos_gender_number_article': gender_number_article(xpos[8:9], xpos[9:10], xpos[10:11])},
                    'P': {'xpos_type': xpos[:2],
                          'xpos_gender_number_article': gender_number_article(xpos[5:6], xpos[7:8], xpos[8:9])},
                    'D': {'xpos_type': xpos},
                    'C': {'xpos_type': xpos},
                    'T': {'xpos_type': xpos},
                    'R': {'xpos_type': xpos},
                    'I': {'xpos_type': xpos}}
    
    result_pos2features = pos2features.get(xpos[0], {'xpos_type': xpos})
    
    if xpos[0] == 'V' and xpos[4:5] in ('z', 'c', 'g'):
        result_pos2features.update({'xpos_mood': xpos[4:5]})
    elif xpos[0] == 'P' and xpos[2:3] in ('e', 'a', 'l', 'm', 'q', 't'):
        result_pos2features.update({'xpos_ref_type': xpos[2:3]})
    elif xpos[0] == 'A' and xpos[4:5] == 'e':
        result_pos2features.update({'xpos_extended': xpos[4:5]})
    
    return {(word + '_' + key): value.rstrip('-') for key, value in result_pos2features.items()}

In [20]:
# input - sentence (list of dictionaries) and an index of a word (dictionary)
# output - list of dictionaries (features, for each word)
def word2features(sentence, i):
    sentence_contains_interrogative_word = contains_interrogative_word(sentence)
    sentence_contains_interrogative_particle = contains_interrogative_particle(sentence)
    sentence_contains_imperative_verb = contains_imperative_verb(sentence)

    features = {
        'word': sentence[i]['text'].lower(),
        'sent_len': len(sentence),
        'upos': sentence[i]['upos'],
        'first_word_in_sent': sentence[1]['text'].lower(),
        'contains_interrogative_word': sentence_contains_interrogative_word,
        'contains_interrogative_particle': sentence_contains_interrogative_particle,
        'contains_imperative_verb': sentence_contains_imperative_verb,
        'contains_repetitive_conj_before': contains_rep_conj_before(sentence, i),
        'between_two_verbs': between_two_verbs(sentence, i),
        'contains_publicistic_word': contains_publicistic_word(sentence)
    }

    features.update(split_xpos(sentence[i]['xpos']))

    if i > 0:
        features.update({
            'prev_word': sentence[i-1]['text'].lower(),
            'prev_word_upos': sentence[i-1]['upos']
        })

        features.update(split_xpos(sentence[i-1]['xpos'], 'prev_word'))
    else:
        features.update({
            'BOS': True
        })

    if i > 1:
        features.update({
            'word_before_prev_word': sentence[i-2]['text'].lower(),
            'word_before_prev_word_upos': sentence[i-2]['upos']
        })
        
        features.update(split_xpos(sentence[i-2]['xpos'], 'word_before_prev_word'))

    if i < len(sentence)-1:
        features.update({
            'next_word': sentence[i+1]['text'].lower(),
            'next_word_upos': sentence[i+1]['upos']
        })
        
        features.update(split_xpos(sentence[i+1]['xpos'], 'next_word'))
    else:
        features.update({
            'EOS': True
        })
        
    if i < len(sentence)-2:
        features.update({
            'word_after_next_word': sentence [i+2]['text'].lower(),
            'word_after_next_word_upos': sentence[i+2]['upos']
        })
        
        features.update(split_xpos(sentence[i+2]['xpos'], 'word_after_next_word'))

    return features

In [21]:
# input - sentence (list of dictionaries)
# output - list of features (dictionaries, for each word)
def sent2features(sentence):
    return [word2features(sentence, i) for i in range(len(sentence))]

In [22]:
# input - JSON-serializable data and name of the output JSON file
# output - None, saves the sentences to a JSON file
def save_as_json(data, output_file_name):
    with open(output_file_name, "w") as output_file:
        json.dump(data, output_file) 

In [23]:
# input - JSON file
# output - the contents of the JSON file as an object
def load_json(json_file_name):
    with open(json_file_name, "r") as json_file:
        return json.load(json_file)

In [24]:
# input - name of a text file with one sentence per line and a variable indicating whether or not to save X and y to JSON
# output - X and y - features and labels
def data_prep(input_file_name, json_serialize=False):
    data = add_special_symbol_at_start_of_each_sentence(read_file_as_list_of_sentences(input_file_name))
    nlp_tokenize = classla.Pipeline('bg', processors='tokenize')
    tokenized_data = run_through_classla_pipeline(data, nlp_tokenize)
    
    if len(data) != len(tokenized_data):
        print("Warning: Mismatch in the count of the data and tokenized data")

    squashed_tokenized_data = [squash_punctuation(sentence) for sentence in tokenized_data]

    if len(tokenized_data) != len(squashed_tokenized_data):
        print("Warning: Mismatch in the count of the tokenized and squashed tokenized data")
    
    y = [sent2labels(sentence) for sentence in squashed_tokenized_data]
    
    if len(squashed_tokenized_data) != len(y):
        print("Warning: Mismatch in the count of the squashed tokenized data and labeled data")
    
    data_without_punctuation = [remove_punctuation(sentence) for sentence in squashed_tokenized_data]
    
    if len(data_without_punctuation) != len(y):
        print("Warning: Mismatch in the count of the data without punctuation and labeled data")
    
    nlp_pos_tokenize = classla.Pipeline('bg', processors='tokenize,pos')   
    pos_tokenized_data = run_through_classla_pipeline(data_without_punctuation, nlp_pos_tokenize)
    
    if len(data_without_punctuation) != len(pos_tokenized_data):
        print("Warning: Mismatch in the count of the data without punctuation and POS tokenized data")
    
    X = [sent2features(sentence) for sentence in pos_tokenized_data]
    
    if len(X) != len(pos_tokenized_data):
        print("Warning: Mismatch in the count of the prepped data and POS tokenized data")
    
    if json_serialize:
        save_as_json(X, re.sub('\.txt', '_X.json', input_file_name))
        save_as_json(y, re.sub('\.txt', '_y.json', input_file_name))
        
    return X, y

In [25]:
# input - X and y - features and labels
# output - None, prints on the screen the features-labels pairs that have length mismatch
def verify_prepped_data(X, y):
    for feat, label in zip(X, y):
        if len(feat) != len(label):
            print(feat, label)

In [26]:
# input - X and y - features and labels
# output - list of punctuated sentences (strings; y labels applied to X)
def punctuate(X, y):
    punctuated_sentences = []

    for feat, label in zip(X, y):
        sentence = ''

        for i in range(len(feat)):
            sentence = sentence + feat[i]['word'] + label[i] + ' '
        
        punctuated_sentences.append(sentence.lstrip('^').strip())
    
    return punctuated_sentences

In [27]:
# input - X and y - features and labels
# output - list of punctuated sentences (strings; y labels applied to X)
def scikit_punctuate(X, y):
    punctuated_sentences = ''

    for feat, label in zip(X, y):
        if feat['word'] == '^':
            pass
        else:
            punctuated_sentences = punctuated_sentences + feat['word'] + label + ' '
    
    return punctuated_sentences.strip()

### NN utilities

In [28]:
# input - data - a list of lists
# output - a numpy array of numpy arrays
def transform_to_array(data):
    return array([array(sequence) for sequence in data])

In [29]:
# input - data - a list of lists of labels (strings)
# output - a list of labels (data flattened)
def extract_labels(data):
    return [label for sentence in data for label in sentence]

In [30]:
# input - labels (list of strings), label_encoder (optional) that transforms string labels to numeric ones
# output - the one hot encoded labels (binary matrix) and a label_encoder (if one was created)
def one_hot_encode_labels(labels, label_encoder=False):
    if label_encoder:
        new_labels = label_encoder.transform(labels)
    
        return to_categorical(new_labels)
    else:
        label_encoder = LabelEncoder()
        new_labels = label_encoder.fit_transform(labels)
        
        return to_categorical(new_labels), label_encoder

In [31]:
# input - labels (a matrix), label_encoder that is used to detransform numeric labels to string ones
# output - list of labels (strings)
def inverse_transform_one_hot_encoded_labels(labels, label_encoder):
    decoded_labels = [argmax(label, axis=None, out=None) for label in labels]
    return list(label_encoder.inverse_transform(decoded_labels))

In [32]:
# input - data (list of lists of strings), tokenizer (optional) that transforms strings to numbers
# output - list of lists of numbers and a tokenizer (if one was created)
def transform_text_to_numbers(data, tokenizer=False):
    if tokenizer:
        return tokenizer.texts_to_sequences(data)
    else:
        tokenizer = Tokenizer(oov_token='oov')
        tokenizer.fit_on_texts(data)

        return tokenizer.texts_to_sequences(data), tokenizer        

In [33]:
# input - y_actual (a list of strings - the actual class labels),
#       - y_pred (a list of strings - the predicted class labels),
#       - label_encoder - to get a list of the classes
# output - prints the classification report about F-measure, precision and recall
def nn_classification_report(y_actual, y_pred, label_encoder):
    labels=list(label_encoder.classes_)
    labels.remove('')
    print(classification_report(y_actual, list(y_pred), labels=labels, digits=3)) 

In [34]:
# input - model (a neural network model),
#       - output_file_name_model (string with the file name where to save the model in JSON format)
#       - output_file_name_weights (string with the file name where to save the model weights in H5 format)
# no output - writes to files
def save_nn_model(model, output_file_name_model, output_file_name_weights):
    model_json = model.to_json()

    with open(output_file_name_model, "w") as json_file:
        json_file.write(model_json)
        model.save_weights(output_file_name_weights)

In [35]:
# input - output_file_name_model (string with the file name where the model is saved in JSON format)
#       - output_file_name_weights (string with the file name where the model weights are saved in H5 format)
# output - a neural network model
def load_nn_model(output_file_name_model, output_file_name_weights):
    with open(output_file_name_model, 'r') as json_file:
        loaded_model_json = json_file.read()

    loaded_model = model_from_json(loaded_model_json)
    loaded_model.load_weights(output_file_name_weights)
    
    return loaded_model

In [36]:
# input - features (a dictionary)
# output - a list with the word feature values for 5 word window
def word_features(features):
    return [features.get('word_before_prev_word', ''),
            features.get('prev_word', ''),
            features.get('word', ''),
            features.get('next_word', ''),
            features.get('word_after_next_word', '')]

In [37]:
# input - xpos subtags values (strings)
# output - one xpos string with the concatenated subtags for the particular xpos type
def xpos_tags_to_string(xpos_type, xpos_gender_number_article, xpos_mood, xpos_ref_type, xpos_extended):
    if xpos_type[0] in ('D', 'C', 'T', 'R', 'I'):
        return xpos_type
    elif xpos_type[0] in ('N', 'H', 'M'):
        return xpos_type + xpos_gender_number_article
    elif xpos_type[0] == 'A':
        return xpos_type + xpos_gender_number_article + xpos_extended
    elif xpos_type[0] == 'P':
        return xpos_type + xpos_gender_number_article + xpos_ref_type
    elif xpos_type[0] == 'V':
        return xpos_type + xpos_gender_number_article + xpos_mood
    else:
        return xpos_type

In [38]:
# input - features (a dictionary), word (a string) - dictionary key prefix
# output - a list with xpos subtags for a word
def extract_xpos_tags(features, word):
    return [features.get(word + 'xpos_type', '-'),
            features.get(word + 'xpos_gender_number_article', '-'),
            features.get(word + 'xpos_mood', '-'),
            features.get(word + 'xpos_ref_type', '-'),
            features.get(word + 'xpos_extended', '-')]

In [39]:
# input - features (a dictionary)
# output - a list of xpos full tags (strings) for 5 word window
def xpos_features(features):
    return [xpos_tags_to_string(*extract_xpos_tags(features, 'word_before_prev_word_')),
            xpos_tags_to_string(*extract_xpos_tags(features, 'prev_word_')),
            xpos_tags_to_string(*extract_xpos_tags(features, 'word_')),
            xpos_tags_to_string(*extract_xpos_tags(features, 'next_word_')),
            xpos_tags_to_string(*extract_xpos_tags(features, 'word_after_next_word_'))]

In [40]:
# input - features (a dictionary)
# output - a list of upos tags (strings) for 5 word window
def upos_features(features):
    return [features.get('word_before_prev_word_upos', ''),
            features.get('prev_word_upos', ''),
            features.get('upos', ''),
            features.get('next_word_upos', ''),
            features.get('word_after_next_word_upos', '')]

In [41]:
# input - features (a dictionary)
# output - a list of bool and numeric feture values
def bool_numeric_features(features):
    return [features.get('sent_len'),
            int(features.get('contains_interrogative_word')),
            int(features.get('contains_interrogative_particle')),
            int(features.get('contains_imperative_verb')),
            int(features.get('contains_repetitive_conj_before')),
            int(features.get('between_two_verbs')),
            int(features.get('contains_publicistic_word')),
            int(features.get('BOS', False)),
            int(features.get('EOS', False))]

In [42]:
# input - data (a list of lists of dictionaries), function (a function specifying which features to extract)
# output - a list of lists
def extract_features(data, function):
    return [function(word) for sentence in data for word in sentence]