In [None]:
# Import the required libraries.
import re
import math
import random
import collections
import operator
import numpy as np
import pandas as pd
from sklearn.model_selection import KFold
from sklearn.metrics import precision_recall_fscore_support, f1_score
from sklearn.metrics import confusion_matrix, classification_report
from collections import defaultdict

random.seed(11)
np.random.seed(11)

In [None]:
from keras.models import Sequential
from keras.layers import Dense, LSTM, Conv1D, InputLayer, Bidirectional, TimeDistributed, Embedding, Activation
from keras.optimizers import Adam, SGD
from keras.preprocessing.sequence import pad_sequences

In [None]:
def parse_sentence(sentence):
    '''
    Function for parsing the words and tags from the
    sentences of the input corpus.
    '''
    word_tag_pairs = sentence.split(" ")
    words = []
    tags = []

    for i, word_tag in enumerate(word_tag_pairs):
        word, tag = word_tag.strip().rsplit('/', 1)
        words.append(word)
        tags.append(tag)
        
    return words, tags

In [None]:
# Parse the sentences into a list.
parsed_sentences = []

with open('./Brown_train.txt', 'r') as file:
    sentences = file.readlines()

    for sentence in sentences:
        sentence = sentence.strip()
        parsed_sentences.append(parse_sentence(sentence))

In [None]:
def get_vocab(X_train, Y_train):
    '''
    Function for building the vocabulary from the training set of
    words and tags.
    '''
    vocabulary2id = dict()    
    tag2id = dict()
    vocabulary2id['UNK'] = 0
    vocabulary2id['PAD'] = 1

    for sent in X_train:
        for word in sent:
            if word not in vocabulary2id.keys():
                vocabulary2id[word] = len(vocabulary2id)
    
    tag2id['PAD'] = 0
    for sent in Y_train:
        for tag in sent:
            if tag not in tag2id.keys():
                tag2id[tag] = len(tag2id)
    
    return vocabulary2id, tag2id

def get_word_tag_counts(X_train, Y_train, vocabulary2id, tag2id):
    '''
    Function for calculating the counts pertaining to the
    individual word tags.
    '''
    wordcount = defaultdict(int)
    tagcount = defaultdict(int)
    tagpaircount = defaultdict(int)
    tagtriplecount = defaultdict(int)
    
    for sent in X_train:
        for word in sent:
            wordcount[word] += 1
    
    for sent in Y_train:
        for tag in sent:
            tagcount[tag] += 1
    
    for sent in Y_train:
        for i in range(len(sent) - 1):
            tagpaircount[sent[i], sent[i + 1]] += 1

    for sent in Y_train:
        for i in range(len(sent) - 2):
            tagtriplecount[sent[i], sent[i + 1], sent[i + 2]] += 1
    
    return wordcount, tagcount, tagpaircount, tagtriplecount

In [None]:
parsed_sentences[:5]

In [None]:
# Build the test and training sets of sentences.
kf = KFold(n_splits = 3, shuffle = False)
parsed_sentences = np.asarray(parsed_sentences)
scores = []
scores1 = []
y_pred_idx = []
y_pred_idx1 = []
y_test_idx = []
y_test_idx1 = []

preds = []

for train_index, test_index in kf.split(parsed_sentences):
    train_data = parsed_sentences[train_index]
    test_data = parsed_sentences[test_index]
    X_train = [a[0] for a in train_data]
    Y_train = [a[1] for a in train_data]
    X_test = [a[0] for a in test_data]
    Y_test = [a[1] for a in test_data]
    
    # Build the vocabulary and word counts.
    vocabulary2id, tag2id = get_vocab(X_train, Y_train)
    
    break


In [None]:
padlen = max(len(i) for i in X_train)
def pad(sentence, padid=vocabulary2id['PAD']):
    out = sentence[:padlen]
    padding = [padid for _ in range(padlen - len(out))]
    return out + padding

In [None]:
X_train_ids = np.asarray([pad([vocabulary2id[word] if word in vocabulary2id.keys() else vocabulary2id['UNK'] for word in sent]) for sent in X_train])
X_test_ids = np.array([pad([vocabulary2id[word] if word in vocabulary2id.keys() else vocabulary2id['UNK'] for word in sent]) for sent in X_test])

In [None]:
Y_train_ids = np.asarray([pad([tag2id[word] if word in tag2id.keys() else tag2id['UNK'] for word in sent], tag2id['PAD']) for sent in Y_train])
Y_test_ids = np.asarray([pad([tag2id[word] if word in tag2id.keys() else tag2id['UNK'] for word in sent], tag2id['PAD']) for sent in Y_test])

In [None]:
def id2onehot(Y, numtags):
    out = []
    for s in Y:
        categories = []
        for item in s:
            categories.append(np.zeros(numtags))
            categories[-1][item] = 1.0
        out.append(categories)
    return np.array(out)
 

In [None]:
Y_train_onehot = id2onehot(Y_train_ids, len(tag2id))
Y_test_onehot = id2onehot(Y_test_ids, len(tag2id))

In [None]:
model = Sequential()
model.add(InputLayer(input_shape=(padlen, )))
model.add(Embedding(len(vocabulary2id), 100))
model.add(Bidirectional(LSTM(int((128+256)/2), return_sequences=True)))
model.add(TimeDistributed(Dense(len(tag2id))))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy',
              optimizer=Adam(0.001),
              metrics=['accuracy'])
model.summary()

In [None]:
model.fit(X_train_ids, Y_train_onehot, batch_size=128, epochs=5, validation_split=0.2)

In [None]:
predictions = model.predict(X_test_ids)

In [None]:
test_accuracy = np.sum((Y_test_ids == np.argmax(predictions, axis=-1)) * (Y_test_ids != 0)) / np.sum((Y_test_ids != 0))
print(test_accuracy)

In [None]:
predictions_argmax = np.argmax(predictions, axis=-1)

In [None]:
y_pred_nopad = []
y_true_nopad = []

for i in range(len(Y_test_ids)):
    for j in range(len(Y_test_ids[i])):
        if Y_test_ids[i][j] != 0 and predictions_argmax[i][j] != 0:
            y_true_nopad.append(Y_test_ids[i][j])
            if predictions_argmax[i][j] == 0:
                y_pred_nopad.append(1)
            else:
                y_pred_nopad.append(predictions_argmax[i][j])


In [None]:
y_pred_nopad = np.asarray(y_pred_nopad)
y_true_nopad = np.asarray(y_true_nopad)

In [None]:
(y_pred_nopad == y_true_nopad).mean()

In [None]:
prec, rec, fscore, _ = precision_recall_fscore_support(y_true_nopad, y_pred_nopad, average = 'weighted')

In [None]:
prec, rec, fscore

In [None]:
model = Sequential()
model.add(InputLayer(input_shape=(padlen, )))
model.add(Embedding(len(vocabulary2id), 100))
model.add(Bidirectional(SimpleRNN(int((128+256)/2), return_sequences=True)))
model.add(TimeDistributed(Dense(len(tag2id))))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy',
              optimizer=Adam(0.003),
              metrics=['accuracy'])
model.summary()

In [None]:
model.fit(X_train_ids, Y_train_onehot, batch_size=128, epochs=4, validation_split=0.2)

In [None]:
predictions = model.predict(X_test_ids)

In [None]:
test_accuracy = np.sum((Y_test_ids == np.argmax(predictions, axis=-1)) * (Y_test_ids != 0)) / np.sum((Y_test_ids != 0))
print(test_accuracy)

In [None]:
predictions_argmax = np.argmax(predictions, axis=-1)

In [None]:
y_pred_nopad = []
y_true_nopad = []

for i in range(len(Y_test_ids)):
    for j in range(len(Y_test_ids[i])):
        if Y_test_ids[i][j] != 0 and predictions_argmax[i][j] != 0:
            y_true_nopad.append(Y_test_ids[i][j])
            if predictions_argmax[i][j] == 0:
                y_pred_nopad.append(1)
            else:
                y_pred_nopad.append(predictions_argmax[i][j])


In [None]:
y_pred_nopad = np.asarray(y_pred_nopad)
y_true_nopad = np.asarray(y_true_nopad)

In [None]:
(y_pred_nopad == y_true_nopad).mean()

In [None]:
prec, rec, fscore, _ = precision_recall_fscore_support(y_true_nopad, y_pred_nopad, average = 'weighted')

In [None]:
prec, rec, fscore

In [None]:
print(classification_report(y_true_nopad, y_pred_nopad))

In [None]:
tag2id