## IMPORT

In [1]:
import os
import sys
from functools import reduce
from typing import Dict, List, Tuple
from keras.models import load_model
from tensorflow.python.keras import utils
from keras.utils import to_categorical
from sklearn.metrics import confusion_matrix
import numpy as np

In [2]:
import glob
from builtins import Exception
from typing import List, Dict, Tuple

import numpy as np
from keras.preprocessing.sequence import pad_sequences

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
import sys
sys.path.append('/content/drive/MyDrive/Mestrado/Dissertação/NER_DD/lstm_ner')

In [5]:
from utils import data_utils

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [6]:
from keras import Sequential, Model
from keras.layers import Embedding, LSTM, Dropout, Dense, Reshape, Conv1D, MaxPooling1D, TimeDistributed, \
    concatenate
import tensorflow as tf

In [7]:
pip install Unidecode

Collecting Unidecode
  Downloading Unidecode-1.3.8-py3-none-any.whl.metadata (13 kB)
Downloading Unidecode-1.3.8-py3-none-any.whl (235 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/235.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.5/235.5 kB[0m [31m7.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: Unidecode
Successfully installed Unidecode-1.3.8


In [8]:
import string
from functools import reduce

import nltk
import re
from unidecode import unidecode
from typing import List, Tuple, Dict

nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

#Text Utils

In [9]:
def remove_punctuations(text: str):
    translate_table = dict((ord(char), None) for char in string.punctuation)
    return text.translate(translate_table)

In [10]:
def generate_ngrams_freqdist(text, n, tokens=None):
    if tokens is None:
        tokens = nltk.word_tokenize(text)
    ngrams = nltk.ngrams(tokens, n)
    return nltk.FreqDist(ngrams), tokens

In [11]:
def score_ngrams(word_list: List[str], ngrams: nltk.FreqDist, unigrams: nltk.FreqDist, delta: float):
    """
    Scores a list of words according to frequency distribution of each word and their ngram.
    This approach is based on Mikolov, Tomas, et al. "Distributed representations of words and phrases and their
    compositionality." Advances in neural information processing systems. 2013.
    :param word_list: the word list to be scored
    :param ngrams: ngrams freqdist in which n must be the length of word_list
    :param unigrams: unigrams freqdist
    :param delta: delta is used as a discounting coefficient and prevents too many phrases consisting of very infrequent
     words to be formed
    :return: score of the word_list according to freqdist of ngrams
    """
    assert len(word_list) == len(list(ngrams.keys())[0])
    # filter full of words unigrams so that it has only words that contain in word_list
    word_unigram_freqs = map(lambda t: t[1], filter(lambda kv: kv[0][0] in word_list, unigrams.items()))
    return (ngrams[tuple(word_list)] - delta) / reduce(lambda a, b: a*b, word_unigram_freqs, 1)

In [12]:
def multiple_replace(_string, replace_dict):
    pattern = re.compile("|".join([re.escape(k) for k, v in replace_dict.items()]), re.M)
    return pattern.sub(lambda match: replace_dict[match.group(0)], _string)

In [13]:
def normalize_word(line):
    """
    Transforms line to ASCII string making character translations, except some unicode characters are left because
    they are used in portuguese (such as ß, ä, ü, ö).
    """
    line = line.replace(u"„", u"\"")
    line = line.lower()

    replacements = dict(((u"ß", "SZ"), (u"ä", "AE"), (u"ü", "UE"), (u"ö", "OE")))
    replacements_inv = dict(zip(replacements.values(), replacements.keys()))
    line = multiple_replace(line, replacements)
    line = unidecode(line)
    line = multiple_replace(line, replacements_inv)

    line = line.lower()  # unidecode might have replaced some characters, like € to upper case EUR

    line = re.sub("([0-9][0-9.,]*)", '0', line)

    return line.strip()

In [14]:
def tokenize_sentences(sentences: List[List[Tuple[str, str]]], word_indices: Dict[str, int],
                       label_indices: Dict[str, int], char_level=False):
    unknown_idx = word_indices['UNKNOWN']

    def tokenize(_string):
        if _string in word_indices:
            return word_indices[_string]
        lower = _string.lower()
        if lower in word_indices:
            return word_indices[lower]
        normalized = normalize_word(_string)
        if normalized in word_indices:
            return word_indices[normalized]
        return unknown_idx

    def create_element(_string, label):
        if char_level:
            return [tokenize(c) for c in _string]
        return tokenize(_string), label_indices[label]

    return [[create_element(word, label) for word, label in sentence] for sentence in sentences]

# Data Utils

In [15]:
def read_input_file(filename: str):
    """
        Reads the input file and creates a list of sentences in which each sentence is a list of its word where the word
        is a 2-dim tuple, whose elements are the word itself and its label (named entity), respectively. Also creates
        a map of label to index.

        Expected files have a sequence of sentences. It has one word by line in first column (in a tab-separated file)
        followed in second column by its label, i.e., the named entity. The sentences are separated by an empty line.

        :param filename: Name of the file
        :return: List of sentences, map of label to index
    """
    sentences = []
    sentence = []
    label2idx = {'O': 0}
    label_idx = 1
    count = 0
    with open(filename, 'r', encoding='utf-8') as file:
        for line in file:
            line = line.strip()
            if line == "":
                if len(sentence) > 0:
                    sentences.append(sentence)
                    sentence = []
                continue
            splits = line.split('\t')
            word = splits[0]
            label = splits[1]
            sentence.append((word, label))
            if label not in label2idx.keys():
                label2idx[label] = label_idx
                label_idx += 1
    return sentences, label2idx

In [16]:
def create_context_windows(sentences: List[List[Tuple[int, int]]], window_size: int, padding_idx: int):
    """
    Generates X and Y matrices. X is an array of context window (indexed according to word2Idx). Each element of the
    array is the context window of the word in the middle and its index in the array is the index of its label in Y
    matrix.

    :param sentences: Sentences whose words and labels are already tokenized.
    :param window_size: How much words to the left and to the right.
    :param padding_idx: Index (token) for padding windows in which the main word has no enough surrounding words.
    :return: X and Y matrices as numpy array.
    """
    x_matrix = []
    y_vector = []
    for sentence in sentences:
        for target_word_idx in range(len(sentence)):
            word_indices = []
            for wordPosition in range(target_word_idx - window_size, target_word_idx + window_size + 1):
                if wordPosition < 0 or wordPosition >= len(sentence):
                    word_indices.append(padding_idx)
                    continue
                word_idx = sentence[wordPosition][0]
                word_indices.append(word_idx)
            label_idx = sentence[target_word_idx][1]
            x_matrix.append(word_indices)
            y_vector.append(label_idx)


    return np.array(x_matrix), np.array(y_vector)

In [17]:
def read_embeddings_file(filename: str):
    """
    Reads the embeddings file and maps its words to the index in the embeddings matrix

    :param filename: Name of the embeddings file
    :return: Embeddings matrix, map of word to index
    """
    word2idx = {}
    word_idx = 0
    char2idx = {'UNKNOWN': 0, 'PADDING': 1, 'LEFT_WORDS_PADDING': 2, 'RIGHT_WORDS_PADDING': 3}
    char_idx = 4
    embeddings = []
    embeddings_dim = None
    with open(filename, 'r', encoding='utf-8') as file:
        for line in file:
            splits = line.strip().split(' ')
            if embeddings_dim is None:
                embeddings_dim = len(splits)
            elif embeddings_dim != len(splits):
                continue
            word = splits[0]
            for c in word:
                if c not in char2idx:
                    char2idx[c] = char_idx
                    char_idx += 1
            word2idx[word] = word_idx
            word_idx += 1
            embeddings.append(splits[1:])
    embeddings = np.array(embeddings, dtype=np.float32)
    return embeddings, word2idx, char2idx


In [18]:
def create_char_context_windows(sentences: List[List[List[int]]], char2idx: Dict[str, int], word_win_size: int,
                                max_word_len: int):
    left_pad = char2idx['LEFT_WORDS_PADDING']
    right_pad = char2idx['RIGHT_WORDS_PADDING']
    inner_word_pad = char2idx['PADDING']
    sentences = [pad_sequences(sentences[i], maxlen=max_word_len, dtype=np.int_, value=inner_word_pad, padding='post')
                 for i, _ in enumerate(sentences)]
    sentences = [pad_sequences(sentences[i], maxlen=max_word_len + word_win_size, dtype=np.int_, value=left_pad,
                               padding='pre') for i, _ in enumerate(sentences)]
    sentences = [pad_sequences(sentences[i], maxlen=max_word_len + word_win_size * 2, dtype=np.int_, value=right_pad,
                               padding='post') for i, _ in enumerate(sentences)]
    padding_word = word_win_size * [left_pad] + max_word_len * [inner_word_pad] + word_win_size * [right_pad]

    padded_words = []
    for sentence in sentences:
        for word_idx, word in enumerate(sentence):
            padded_word_window = np.array([], dtype=np.int_)
            for window_idx in range(word_idx - word_win_size, word_idx + word_win_size + 1):
                if window_idx < 0 or word_idx > len(sentence):
                    padded_word_window = np.append(padded_word_window, padding_word)
                else:
                    padded_word_window = np.append(padded_word_window, sentence[word_idx])
            padded_words.append(padded_word_window)
    return np.array(padded_words)


In [19]:
def transform_to_xy(sentences: List[List[Tuple[str, str]]], word2idx: Dict[str, int],
                    label2idx: Dict[str, int], word_window_size: int,
                    char2idx: Dict[str, int], max_word_len: int):
    word_indexed_sentences = tokenize_sentences(sentences, word2idx, label2idx)
    char_indexed_sentences = tokenize_sentences(sentences, char2idx, label2idx, char_level=True)
    x_word, y = create_context_windows(word_indexed_sentences, word_window_size, word2idx['PADDING'])
    x_char = create_char_context_windows(char_indexed_sentences, char2idx, word_window_size, max_word_len)
    x = [x_word, x_char]
    return x, y

In [20]:
def load_dataset(input_data_folder: str, test_percent: float):
    assert 0 <= test_percent <= 1
    train_data, test_data, label2idx = [], [], {}
    for filename in glob.glob(f'{input_data_folder}/*.tsv'):
        print(filename)
        sentences, cur_lbl2idx = read_input_file(filename)
        if len(sentences) == 0:
            continue
        label2idx = {**label2idx, **cur_lbl2idx}
        test_amount = int(len(sentences) * test_percent)
        thresh_idx = len(sentences) - test_amount
        train_data += sentences[:thresh_idx]
        test_data += sentences[thresh_idx:]

    return train_data, test_data, label2idx

In [21]:
def save_embeddings(filename, weights, char2idx):
    with open(filename, 'w', encoding='utf-8') as f:
        for char, index in char2idx.items():
            line = f'{char} {" ".join(str(item) for item in weights[index, :])}\n'
            f.write(line)

# Variaves

In [22]:
# Caminho para a pasta de dados
train_path = '/content/drive/MyDrive/Mestrado/Dissertação/NER_DD/lstm_ner'

In [23]:
# defining constants
word_embeddings_file = train_path + '/data/cbow_s50.txt'
input_data_folder = train_path + '/dataset'
model_file = train_path + '/output/model.h5'
char_embeddings_file = train_path + '/output/char_embeddings.txt'

In [24]:
word_window_size = 2
char_window_size = 2
char_embeddings_dim = 20
dropout_rate = 0.5
lstm_units = 420
conv_num = 10
epochs = 1
test_percent = 0.2

# Modelo

In [25]:
def generate_model(word_embedding_model: Sequential, char_embedding_model: Sequential, lstm_units: int, num_labels: int,
                   dropout_rate=.5, word_embedding_only=False, cpu_only=False):
    if word_embedding_only:
        input_layer_output = [word_embedding_model.output]
        hidden_layer_input_units = word_embedding_model.output_shape[2]
        input_layer_model = [word_embedding_model.input]
    else:
        input_layer_output = concatenate([word_embedding_model.output, char_embedding_model.output])
        hidden_layer_input_units = word_embedding_model.output_shape[2] + char_embedding_model.output_shape[2]
        input_layer_model = [word_embedding_model.input, char_embedding_model.input]

    if cpu_only:
        first_lstm_net = LSTM(lstm_units, input_shape=(None, hidden_layer_input_units), return_sequences=True)
    else:
        first_lstm_net = LSTM(lstm_units, input_shape=(None, hidden_layer_input_units), return_sequences=True)

    hidden_layer_model = Sequential()
    hidden_layer_model.add(first_lstm_net)
    hidden_layer_model.add(Dropout(dropout_rate))
    hidden_layer_model.add(LSTM(lstm_units) if cpu_only else LSTM(lstm_units))
    hidden_layer_model.add(Dense(num_labels, activation='softmax'))
    hidden_layer_model_output = hidden_layer_model(input_layer_output)
    model = Model(input_layer_model, hidden_layer_model_output)
    model.compile(loss='categorical_crossentropy', optimizer='adagrad', metrics=['accuracy',tf.keras.metrics.Precision(),tf.keras.metrics.Recall()])

    return model

In [26]:
def generate_embedding(input_length: int, weights=None, vocab_size=0, embedding_dim=0):
    if weights is not None:
        vocab_size = weights.shape[0]
        embedding_dim = weights.shape[1]
        return Embedding(vocab_size, embedding_dim, input_length=input_length, weights=[weights], trainable=False)
    return Embedding(vocab_size, embedding_dim, input_length=input_length)

In [27]:
def generate_word_embedding_model(input_length: int, weights=None, vocab_size=0, embedding_dim=0):
    model = Sequential()
    model.add(generate_embedding(input_length, weights=weights, vocab_size=vocab_size, embedding_dim=embedding_dim))
    return model

In [28]:
def generate_char_embedding_model(max_word_len: int, max_word_len_padded: int, word_input_len: int,
                                  char_embedding_dim: int, conv_num: int, char_window_size,
                                  vocab_size: int):
    char_input_len = word_input_len * max_word_len_padded
    model = Sequential()
    model.add(generate_embedding(char_input_len, vocab_size=vocab_size, embedding_dim=char_embedding_dim))
    model.add(Reshape((word_input_len, max_word_len_padded, char_embedding_dim)))
    model.add(TimeDistributed(Conv1D(conv_num, char_window_size)))
    model.add(TimeDistributed(MaxPooling1D(max_word_len)))
    model.add(Reshape((word_input_len, conv_num)))
    return model

# Main

In [29]:
def main():
    # loading data from files
    word_embeddings, word2idx, char2idx = read_embeddings_file(word_embeddings_file)
    max_word_len = max(map(lambda word: len(word), word2idx.keys()))
    train_data, test_data, label2idx = load_dataset(input_data_folder, test_percent)
    print('train sentences:', len(train_data))
    print('test sentences:', len(test_data))
    print("epochs: ", epochs)
    x_train, y_train = transform_to_xy(train_data, word2idx, label2idx, word_window_size,
                                                  char2idx, max_word_len)
    x_test, y_test = transform_to_xy(test_data, word2idx, label2idx, word_window_size,
                                                char2idx, max_word_len)
    num_labels = len(label2idx)
    # "binarize" labels
    y_train = to_categorical(y_train, num_labels)
    y_test = to_categorical(y_test, num_labels)
    # load model whether it is saved
    if os.path.exists(model_file):
        model = load_model(model_file)
        print(f'Model loaded from {model_file}')
        print(model.summary())
    else:
        # defining model
        word_input_length = 2 * word_window_size + 1
        max_word_len_padded = max_word_len + word_window_size * 2
        word_embedding_model = generate_word_embedding_model(word_input_length, weights=word_embeddings)
        char_embedding_model = generate_char_embedding_model(max_word_len, max_word_len_padded, word_input_length,
                                                                 char_embeddings_dim, conv_num, char_window_size,
                                                                 vocab_size=len(char2idx))
        model = generate_model(word_embedding_model, char_embedding_model, lstm_units, num_labels, dropout_rate)

        # summarize the model
        print(model.summary())

        # training model
        model.fit(x_train, y_train, epochs=epochs)

        # saving embeddings
        embedding_layer = char_embedding_model.layers[0]
        weights = embedding_layer.get_weights()[0]
        data_utils.save_embeddings(char_embeddings_file, weights, char2idx)

        # saving whole model
        # model.save(model_file)

    # evaluating model
    print('x_test:')
    print(x_test)
    print('@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@')
    print('y_test:')
    print(y_test)
    print('@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@')
    _, accuracy, precision, recall = model.evaluate(x_test, y_test)
    print('Accuracy: %f' % (accuracy * 100))
    print('Precision: %f' % (precision * 100))
    print('Recall: %f' % (recall * 100))
    output = model.predict(x_test)
    testPredict = model.predict(x_test)
    testPredict = np.argmax(output, axis=1)
    y_test=np.argmax(y_test, axis=1)
    train_data_flat = reduce(lambda acc, cur: acc + cur, train_data, [])
    label_dist = {label: 0 for label in label2idx.keys()}
    for _, label in train_data_flat:
        label_dist[label] += 1
    print()
    print('####### train label distribution')
    print('total: %d\n' % len(train_data_flat))
    for label, count in label_dist.items():
        print(label, count)
    print()
    cm = confusion_matrix(y_test, testPredict)
    print(cm)
    test_data_flat = reduce(lambda acc, cur: acc + cur, test_data, [])
    print(test_data_flat)
    labeled_output = label_output(output, label2idx, test_data_flat)
    #evaluate_model(labeled_output, test_data_flat, label2idx)

In [30]:
def evaluate_model(predicted: List[Tuple[str, str]], actual: List[Tuple[str, str]], label2idx: Dict[str, int]):
    true_pos, true_neg, false_pos, false_neg = [0] * 4
    labeled_metrics: Dict[str, Metrics] = {label: Metrics() for label in label2idx.keys()}
    confusion_matrix: Dict[str, Dict[str, int]] = {actual_label: {pred_label: 0 for pred_label in label2idx.keys()} for
                                                   actual_label in label2idx.keys()}
    not_entity_label = 'O'
    #------Leonidia-----
    #global planilha
    #------------------
    for i, pred in enumerate(predicted):
        pred_label, actual_label = pred[1], actual[i][1]
        confusion_matrix[actual_label][pred_label] += 1
        labeled_metrics[actual_label].actual_total += 1
        #-----------------------------------Leonidia--------------------------------------
        #get_output(pred_label, actual_label)
        #_________________________________________________________________________________
        if pred_label == actual_label == not_entity_label:
            true_neg += 1
            labeled_metrics[actual_label].true_neg += 1
        elif pred_label == not_entity_label:
            false_neg += 1
            labeled_metrics[actual_label].false_neg += 1
        elif pred_label == actual_label:
            true_pos += 1
            labeled_metrics[actual_label].true_pos += 1
        else:
            false_pos += 1
            labeled_metrics[pred_label].false_pos += 1
    #----------------Leonidia---------------------------
    '''dados = OrderedDict()
                dados.update({"Sheet1": planilha})
                save_data("Saida-do-modelo.xls", dados)'''
    #--------------------------------------------

    print('TP: %d\nTN: %d\nFP: %d\nFN: %d' % (true_pos, true_neg, false_pos, false_neg))
    accuracy = (true_pos + true_neg) / len(predicted)
    print('Accuracy: %f' % accuracy)
    print("tru_pos value: ", true_pos)
    print("false_pos value: ", false_pos)
    precision = true_pos / (true_pos + false_pos)
    recall = true_pos / (true_pos + false_neg)
    f_measure = 2 * precision * recall / (precision + recall)

    print('Precision: %f\nRecall: %f\nF1 score: %f' % (precision, recall, f_measure))

    for label, metrics in labeled_metrics.items():
        print()
        print('==========>', label)
        print('TP: %d\nTN: %d\nFP: %d\nFN: %d' % (
            metrics.true_pos, metrics.true_neg, metrics.false_pos, metrics.false_neg))
        print('Total predicted: %f' % metrics.total_predicted())
        print('Total actual: %f' % metrics.actual_total)
        print('Accuracy: %f' % metrics.accuracy())
        print('Precision: %f' % metrics.precision())
        print('Recall: %f' % metrics.recall())
        print('F-measure: %f' % metrics.f_measure())
        print()

    # print_matrix
    print('Matriz de confusão (quantidades)')
    max_label = max(confusion_matrix.keys())
    print(f'{"".ljust(len(max_label)+3)}\t', end='')
    for label in confusion_matrix.keys():
        print(label, end='\t')
    print()
    for label in confusion_matrix.keys():
        print(label.ljust(3 + len(max_label)), end='\t')
        for amount in confusion_matrix[label].values():
            print(str(amount).ljust(5), end='\t')
        print()

    print()
    # print_matrix
    print('Matriz de confusão (percentual)')
    max_label = max(confusion_matrix.keys())
    print(f'{"".ljust(len(max_label)+3)}\t', end='')
    for label in confusion_matrix.keys():
        print(label, end='\t')
    print()
    for label in confusion_matrix.keys():
        print(label.ljust(3 + len(max_label)), end='\t')
        for amount in confusion_matrix[label].values():
            total = reduce(lambda a, b: a + b, confusion_matrix[label].values(), 0)
            if total == 0:
                amount, total = 0, 1
            print('%.2f%%' % (100 * amount / total), end='\t')
        print()

    return precision, recall, f_measure

In [31]:
def label_output(output: List[float], label2idx: Dict[str, int], test_data_flat: List[Tuple[str, str]]):
    classed_output = []
    for i in range(len(output)):
        not_entity_idx = label2idx['O']
        ent_prob_max = 0
        ent_idx = not_entity_idx
        for j, ent in enumerate(output[i]):
            if ent > ent_prob_max:
                ent_prob_max = ent
                ent_idx = j
        entity = [label for label, idx in label2idx.items() if idx == ent_idx][0]
        classed_output.append((test_data_flat[i][0], entity))
    return classed_output

In [32]:
if __name__ == '__main__':
    main()

/content/drive/MyDrive/Mestrado/Dissertação/NER_DD/lstm_ner/dataset/Tráfico de drogas.tsv
/content/drive/MyDrive/Mestrado/Dissertação/NER_DD/lstm_ner/dataset/Homicídios.tsv
/content/drive/MyDrive/Mestrado/Dissertação/NER_DD/lstm_ner/dataset/Armas.tsv
/content/drive/MyDrive/Mestrado/Dissertação/NER_DD/lstm_ner/dataset/Roubo carga- veículo.tsv
/content/drive/MyDrive/Mestrado/Dissertação/NER_DD/lstm_ner/dataset/Tráfico de Drogas - Armas.tsv
/content/drive/MyDrive/Mestrado/Dissertação/NER_DD/lstm_ner/dataset/Roubos em Geral.tsv
train sentences: 4685
test sentences: 1168
epochs:  1




ValueError: The layer sequential has never been called and thus has no defined output.