# Part-of-Speech Tagging using Recurrent Neural Networks (RNN)
Author: Pierre Nugues

## Python Headers

### The Modules

In [None]:
import sys
import os
from sklearn.feature_extraction import DictVectorizer
import time
from keras import models, layers
import sys
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from keras.models import load_model
import math
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical

### Some Parameters

In [None]:
OPTIMIZER = 'rmsprop'
SCALER = True
SIMPLE_MODEL = True
BATCH_SIZE = 128
EPOCHS = 4
MINI_CORPUS = True
EMBEDDING_DIM = 100
UNKNOWN_TOKEN = '__UNK__'
MAX_SEQUENCE_LENGTH = 150  # mask_zero = True (conseillé par Marcus)
LSTM_UNITS = 512

## Preprocessing

### Loading the Embeddings
We will use GloVe embeddings and load them

In [None]:
def load(file):
    """
    Return the embeddings in the from of a dictionary
    :param file:
    :return:
    """
    file = file
    embeddings = {}
    glove = open(file)
    for line in glove:
        values = line.strip().split()
        word = values[0]
        vector = np.array(values[1:], dtype='float32')
        embeddings[word] = vector
    glove.close()
    embeddings_dict = embeddings
    embedded_words = sorted(list(embeddings_dict.keys()))
    return embeddings_dict

In [None]:
embedding_file = '/Users/pierre/Documents/Cours/EDAN20/corpus/glove.6B.100d.txt'
embeddings_dict = load(embedding_file)

In [None]:
embeddings_dict['table']

### Loading the Corpus

In [None]:
def load_conll2009_pos():
    train_file = '/Users/pierre/Documents/Cours/EDAN20/corpus/conll2009/en/CoNLL2009-ST-English-train-pos.txt'
    dev_file = '/Users/pierre/Documents/Cours/EDAN20/corpus/conll2009/en/CoNLL2009-ST-English-development-pos.txt'
    test_file = '/Users/pierre/Documents/Cours/EDAN20/corpus/conll2009/en/CoNLL2009-ST-test-words-pos.txt'
    # test2_file = 'simple_pos_test.txt'

    column_names = ['id', 'form', 'lemma', 'plemma', 'pos', 'ppos']

    train_sentences = open(train_file).read().strip()
    dev_sentences = open(dev_file).read().strip()
    test_sentences = open(test_file).read().strip()
    # test2_sentences = open(test2_file).read().strip()
    return train_sentences, dev_sentences, test_sentences, column_names

train_sentences, dev_sentences, test_sentences, column_names = load_conll2009_pos()

### Converting the Corpus in a Dictionary
We follow the fit-transform pattern of sklearn

In [None]:
import regex as re

class Token(dict):
    pass

class CoNLLDictorizer:

    def __init__(self, column_names, sent_sep='\n\n', col_sep=' +'):
        self.column_names = column_names
        self.sent_sep = sent_sep
        self.col_sep = col_sep

    def fit(self):
        pass

    def transform(self, corpus):
        corpus = corpus.strip()
        sentences = re.split(self.sent_sep, corpus)
        return list(map(self._split_in_words, sentences))

    def fit_transform(self, corpus):
        return self.transform(corpus)

    def _split_in_words(self, sentence):
        rows = re.split('\n', sentence)
        return [Token(dict(zip(self.column_names,
                               re.split(self.col_sep, row))))
                for row in rows]

In [None]:
conll_dict = CoNLLDictorizer(column_names, col_sep='\t')
train_dict = conll_dict.transform(train_sentences)
dev_dict = conll_dict.transform(dev_sentences)
test_dict = conll_dict.transform(test_sentences)
print('First sentence, train:', train_dict[0])

### Function to build the two-way sequences
Two vectors: $\mathbf{x}$ and $\mathbf{y}$

In [None]:
def build_sequences(corpus_dict, key_x='form', key_y='pos', tolower=True):
    """
    Creates sequences from a list of dictionaries
    :param corpus_dict:
    :param key_x:
    :param key_y:
    :return:
    """
    X = []
    Y = []
    for sentence in corpus_dict:
        x = []
        y = []
        for word in sentence:
            x += [word[key_x]]
            y += [word[key_y]]
        if tolower:
            x = list(map(str.lower, x))
        X += [x]
        Y += [y]
    return X, Y

In [None]:
X_train_cat, Y_train_cat = build_sequences(train_dict)
print('First sentence, words', X_train_cat[0])
print('First sentence, POS', Y_train_cat[0])

### Extracting the Unique Words

In [None]:
vocabulary_words = sorted(list(set([word for sentence in X_train_cat for word in sentence])))
pos = sorted(list(set([pos for sentence in Y_train_cat for pos in sentence])))
print(pos)
NB_CLASSES = len(pos)

### Function to convert the words or parts of speech to indices

In [None]:
def to_index(X, idx, num_words=None):
    """
    Convert the word lists (or POS lists) to indexes
    :param X: List of word (or POS) lists
    :param idx: word to number dictionary
    :param num_words: total number of words. Used for the unknown word
    :return:
    """
    X_idx = []
    for x in X:
        if num_words:
            # We map the unknown words to the last index of the matrix
            x_idx = list(map(lambda x: idx.get(x, num_words + 1), x))
        else:
            x_idx = list(map(idx.get, x))
        X_idx += [x_idx]
    return X_idx

### We create the indexes

In [None]:
# We start at one to make provision for the padding symbol 0 in RNN and LSTMs
rev_idx_words = dict(enumerate(vocabulary_words, start=1))
rev_idx_pos = dict(enumerate(pos, start=1))
idx_words = {v: k for k, v in rev_idx_words.items()}
idx_pos = {v: k for k, v in rev_idx_pos.items()}
print('word index:', list(idx_words.items())[:10])
print('POS index:', list(idx_pos.items())[:10])

# We create the parallel sequences of indexes
X_idx = to_index(X_train_cat, idx_words)
Y_idx = to_index(Y_train_cat, idx_pos)
print('First sentences, word indices', X_idx[:3])
print('First sentences, POS indices', Y_idx[:3])

### We Create and Embedding Matrix
0 is the padding symbol and the last one is a unknown word

In [None]:
rdstate = np.random.RandomState(1234567)
embedding_matrix = rdstate.uniform(-0.05, 0.05, (len(vocabulary_words) + 2, EMBEDDING_DIM))

In [None]:
print('Shape of embedding matrix:', embedding_matrix.shape)
print('Embedding of table', embedding_matrix[idx_words['table']])
print('Embedding of the padding symbol, idx 0, random numbers', embedding_matrix[0])

### We pad the sentences

In [None]:
X = pad_sequences(X_idx, maxlen=MAX_SEQUENCE_LENGTH)
Y = pad_sequences(Y_idx, maxlen=MAX_SEQUENCE_LENGTH)

print(X[0])
print(Y[0])

# The number of POS classes and 0 (padding symbol)
Y_train = to_categorical(Y, num_classes=len(pos) + 1)
print(Y_train[0])

## The Network

In [None]:
model = models.Sequential()
model.add(layers.Embedding(len(vocabulary_words) + 2,
                           EMBEDDING_DIM,
                           mask_zero=True,
                           input_length=MAX_SEQUENCE_LENGTH))
model.layers[0].set_weights([embedding_matrix])
# The default is True
model.layers[0].trainable = True
#model.add(layers.Bidirectional(layers.SimpleRNN(NB_CLASSES + 1, return_sequences=True)))
#model.add(layers.Bidirectional(layers.LSTM(NB_CLASSES + 1, return_sequences=True)))
model.add(layers.Bidirectional(layers.LSTM(LSTM_UNITS, return_sequences=True)))
model.add(layers.Dense(NB_CLASSES + 1, activation='softmax'))

### We fit it

In [None]:
model.compile(loss='categorical_crossentropy',
              optimizer='rmsprop',
              metrics=['acc'])
model.summary()
model.fit(X, Y_train, epochs=EPOCHS, batch_size=BATCH_SIZE)

## Evaluation

### Formatting the Test Set

In [None]:
# In X_dict, we replace the words with their index
X_test_cat, Y_test_cat = build_sequences(test_dict)
# We create the parallel sequences of indexes
X_test_idx = to_index(X_test_cat, idx_words,
                      num_words=len(vocabulary_words))
Y_test_idx = to_index(Y_test_cat, idx_pos)
print('X[0] test idx', X_test_idx[0])
print('Y[0] test idx', Y_test_idx[0])

X_test_padded = pad_sequences(X_test_idx, maxlen=MAX_SEQUENCE_LENGTH)
Y_test_padded = pad_sequences(Y_test_idx, maxlen=MAX_SEQUENCE_LENGTH)
print('X[0] test idx passed', X_test_padded[0])
print('Y[0] test idx padded', Y_test_padded[0])
# One extra symbol for 0 (padding)
Y_test_padded_vectorized = to_categorical(Y_test_padded, num_classes=len(pos) + 1)
print('Y[0] test idx padded vectorized', Y_test_padded_vectorized[0])
print(X_test_padded.shape)
print(Y_test_padded_vectorized.shape)

### Evaluation

In [None]:
# Evaluates with the padding symbol
test_loss, test_acc = model.evaluate(X_test_padded, Y_test_padded_vectorized)
print('Loss:', test_loss)
print('Accuracy:', test_acc)

### We evaluate on all the test corpus

In [None]:
print('X_test', X_test_cat[0])
print('X_test_padded', X_test_padded[0])
corpus_pos_predictions = model.predict(X_test_padded)
print('Y_test', Y_test_cat[0])
print('Y_test_padded', Y_test_padded[0])
print('predictions', corpus_pos_predictions[0])

### Remove padding

In [None]:
pos_pred_num = []
for sent_nbr, sent_pos_predictions in enumerate(corpus_pos_predictions):
    pos_pred_num += [sent_pos_predictions[-len(X_test_cat[sent_nbr]):]]
print(pos_pred_num[:2])

### Convert to POS idx to symbols

In [None]:
pos_pred = []
for sentence in pos_pred_num:
    pos_idx = list(map(np.argmax, sentence))
    pos_cat = list(map(rev_idx_pos.get, pos_idx))
    pos_pred += [pos_cat]

print(pos_pred[:2])
print(Y_test_cat[:2])

### Evaluate

In [None]:
total, correct, total_ukn, correct_ukn = 0, 0, 0, 0
for id_s, sentence in enumerate(X_test_cat):
    for id_w, word in enumerate(sentence):
        total += 1
        if pos_pred[id_s][id_w] == Y_test_cat[id_s][id_w]:
            correct += 1
        # The word is not in the dictionary
        if word not in idx_words:
            total_ukn += 1
            if pos_pred[id_s][id_w] == Y_test_cat[id_s][id_w]:
                correct_ukn += 1

print('total %d, correct %d, accuracy %f' % (total, correct, correct / total))
if total_ukn != 0:
    print('total unknown %d, correct %d, accuracy %f' % (total_ukn, correct_ukn, correct_ukn / total_ukn))
