In [1]:
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.sequence import pad_sequences
from keras import backend as K

# Function to add sentence beginner
def add_sent_beginner(tagged_sents):
	ts = []
	for s in tagged_sents:
		s.insert(0, ("^", "^"))
		ts.append(s)
	return ts

# Function to separate words and tags from tagged data
def separate_words_tags(tagged_sents):
	sent_words, sent_tags = [], []
	for ts in tagged_sents:
		words, tags = zip(*ts)
		sent_words.append(np.array(words))
		sent_tags.append(np.array(tags))
	return sent_words, sent_tags

# Function to split data
def split_data(sent_words, sent_tags, test_size=0.2):
	train_sents, test_sents, train_tags, test_tags = train_test_split(sent_words, sent_tags, test_size=test_size)
	return train_sents, test_sents, train_tags, test_tags

# Function to split data kfolds
def kfold_data(sent_words, sent_tags, k=5):
	sent_words_folds = []
	sent_tags_folds = []
	split = int(len(sent_words)/k)
	for i in range(k-1):
		sent_words_folds.append(sent_words[split*i:split*(i+1)])
		sent_tags_folds.append(sent_tags[split*i:split*(i+1)])
	sent_words_folds.append(sent_words[split*(k-1):])
	sent_tags_folds.append(sent_tags[split*(k-1):])
	return sent_words_folds, sent_tags_folds

# Function to generate word labels and tag labels
def label_data(sent_words, sent_tags):
	words, tags = set([]), set([])
	for s in sent_words:
		for w in s:
			# Note that we are taking lower case while labelling
			words.add(w.lower())
	for s in sent_tags:
		for t in s:
			tags.add(t)
	word2label = {w: i+2 for i,w in enumerate(list(words))}
	word2label['PAD'] = 0
	word2label['OOV'] = 1
	tag2label = {t: i+1 for i,t in enumerate(list(tags))}
	tag2label['PAD'] = 0
	return word2label, tag2label

# Convert dataset in terms of labels
# Not specific to test or train data
def apply_labels(sent_words, sent_tags, word2label, tag2label):
	sent_words_labelled, sent_tags_labelled = [], []
	for s in sent_words:
		labelled_s = []
		for w in s:
			try:
				labelled_s.append(word2label[w.lower()])
			except KeyError:
				labelled_s.append(word2label['OOV'])
		sent_words_labelled.append(labelled_s)
	for s in sent_tags:
		labelled_s = []
		for t in s:
			try:
				labelled_s.append(tag2label[t])
			except KeyError:
				print("Tag is missing, can't be the case")
		sent_tags_labelled.append(labelled_s)
	return sent_words_labelled, sent_tags_labelled
# Pad labelled sentences
# Not specific to test or train
def pad_sentences(sent_words_labelled, sent_tags_labelled, MAX_LEN):
	sent_words_labelled = pad_sequences(sent_words_labelled, maxlen = MAX_LEN, padding='post')
	sent_tags_labelled = pad_sequences(sent_tags_labelled, maxlen = MAX_LEN, padding='post')
	return sent_words_labelled, sent_tags_labelled

# Tags to one hot encodings
def one_hot(sequences, num_categories):
	one_hot_sequences = []
	for s in sequences:
		one_hot_seq = []
		for i in s:
			one_hot_seq.append(np.zeros(num_categories))
			one_hot_seq[-1][i] = 1.0
		one_hot_sequences.append(one_hot_seq)
	return np.array(one_hot_sequences)

# Convert one hot encodings to tags
def one_hot_to_tags(sequences, label2tag):
	sent_tags = []
	for s in sequences:
		sent_tag = []
		for c in s:
			label = np.argmax(c) # softmax classifier
			sent_tag.append(label2tag[label])
		sent_tags.append(sent_tag)
	return sent_tags
 
def ignore_class_accuracy(to_ignore=0):
    def ignore_accuracy(y_true, y_pred):
        y_true_class = K.argmax(y_true, axis=-1)
        y_pred_class = K.argmax(y_pred, axis=-1)
 
        ignore_mask = K.cast(K.not_equal(y_pred_class, to_ignore), 'int32')
        matches = K.cast(K.equal(y_true_class, y_pred_class), 'int32') * ignore_mask
        accuracy = K.sum(matches) / K.maximum(K.sum(ignore_mask), 1)
        return accuracy
    return ignore_accuracy
def label_to_tag(sequences, label2tag):
  s = []
  for seq in sequences:
    for se in seq:
      s.append(label2tag[se])
  return s

In [2]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, InputLayer, Bidirectional, TimeDistributed, Embedding, Activation
from tensorflow.keras.optimizers import Adam
import nltk
from sklearn.metrics import confusion_matrix

# nltk.download('brown')
# nltk.download('universal_tagset')
# Get the train and test data
tagged_sentences = nltk.corpus.brown.tagged_sents(tagset='universal')
tagged_sentences = add_sent_beginner(tagged_sentences)
sent_words, sent_tags = separate_words_tags(tagged_sentences)
MAX_LEN = len(max(sent_words, key=len))
sent_words_folds, sent_tags_folds = kfold_data(sent_words, sent_tags)
tagset = ['.', 'ADJ', 'ADP', 'ADV', 'CONJ', 'DET', 'NOUN', 'NUM', 'PRON', 'PRT', 'VERB', 'X', '^']

[nltk_data] Downloading package brown to /root/nltk_data...
[nltk_data]   Unzipping corpora/brown.zip.
[nltk_data] Downloading package universal_tagset to /root/nltk_data...
[nltk_data]   Unzipping taggers/universal_tagset.zip.


In [14]:
k = 5
i = 1
print("Fold number {}".format(i+1))
test_sent_words = sent_words_folds[i]
test_sent_tags = sent_tags_folds[i]
train_sent_words = [] 
train_sent_tags = []
for j in range(k):
  if j!=i:
    train_sent_words += sent_words_folds[j] 
    train_sent_tags += 	sent_tags_folds[j]


# Get labels, apply them
word2label, tag2label = label_data(train_sent_words, train_sent_tags)
train_sent_words_labelled, train_sent_tags_labelled = apply_labels(train_sent_words, train_sent_tags, word2label, tag2label)
test_sent_words_labelled, test_sent_tags_labelled = apply_labels(test_sent_words, test_sent_tags, word2label, tag2label)

# Pad sentences
train_sent_words_labelled, train_sent_tags_labelled = pad_sentences(train_sent_words_labelled, train_sent_tags_labelled, MAX_LEN)
test_sent_words_labelled, test_sent_tags_labelled = pad_sentences(test_sent_words_labelled, test_sent_tags_labelled, MAX_LEN)

# Initialize neural net architecture 
model = Sequential()
model.add(InputLayer(input_shape=(MAX_LEN, )))
model.add(Embedding(len(word2label), 128))
model.add(Bidirectional(LSTM(256, return_sequences=True)))
model.add(TimeDistributed(Dense(len(tag2label))))
model.add(Activation('softmax'))
  
model.compile(loss='categorical_crossentropy',optimizer=Adam(0.001),metrics=['accuracy', ignore_class_accuracy(0)])

# Train the model
model.fit(train_sent_words_labelled, one_hot(train_sent_tags_labelled, len(tag2label)), batch_size=128, epochs=2)

Fold number 2
Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x7fccf47eb9e8>

In [15]:
from collections import defaultdict
# get score
acc_per_fold = []
loss_per_fold = []
scores = model.evaluate(test_sent_words_labelled, one_hot(test_sent_tags_labelled, len(tag2label)), verbose=0)
acc_per_fold.append(scores[1] * 100)
loss_per_fold.append(scores[0])

# Print confusion matrix
predictions = model.predict(test_sent_words_labelled)
predicted_tags = one_hot_to_tags(predictions, {i: t for t, i in tag2label.items()})
concatenated_predictions = []
concatenated_tests = label_to_tag(test_sent_tags_labelled, {i: t for t, i in tag2label.items()})
for pt in predicted_tags:
  for t in pt:
    concatenated_predictions.append(t)
conf_mat = confusion_matrix(concatenated_tests, concatenated_predictions, labels=tagset)

In [16]:
# per POS accuracy
per_pos_acc = {}
for l in range(len(tagset)):
  per_pos_acc[tagset[l]] = conf_mat[l,l]/sum(conf_mat[l, :])

In [18]:
# accuracy
acc = sum(np.diag(conf_mat))/sum(sum(conf_mat[:,:]))
print(acc)

0.958340210736426


In [20]:
print(per_pos_acc)

{'.': 1.0, 'ADJ': 0.9247104247104247, 'ADP': 0.9718357862122385, 'ADV': 0.9072164948453608, 'CONJ': 0.9931720865640551, 'DET': 0.9932201158504476, 'NOUN': 0.9444041416606213, 'NUM': 0.847315975570556, 'PRON': 0.9771721406941533, 'PRT': 0.92207563764292, 'VERB': 0.9386578947368421, 'X': 0.0, '^': 1.0}


In [19]:
print(acc_per_fold)

[99.47463870048523]


In [21]:
print(conf_mat)

[[29737     0     0     0     0     0     0     0     0     0     0     0
      0]
 [    0 18202    10   578     0     0   774     0     0    10   110     0
      0]
 [    0    24 31366   255    29   144    10     1    25   411    10     0
      0]
 [    0   376   448 10912    19    43   143     0     0    56    31     0
      0]
 [    0     0     0    46  8582    13     0     0     0     0     0     0
      0]
 [    0     2   103     8     8 30178     1     0    84     0     0     0
      0]
 [    0  2107    24    93     0    21 57280    92     3     5  1027     0
      0]
 [    0   102     3    13     0     0   325  2636     4     1    10     0
     17]
 [    0     0    59     4     0   123     8     0  8390     0     2     0
      0]
 [    0    14   389    30     0     0     8     0     0  5242     2     0
      0]
 [    0   392    36    39     0     0  1861     0     0     3 35669     0
      0]
 [    0    63     0    12     0     1   169    14     0     1     4     0
      0]
 [  

In [8]:
# word tag analysis
concatenated_words = label_to_tag(test_sent_words_labelled, {i: w for w, i in word2label.items()})
# for s in test_sent_words:
#   for w in s:
#     concatenated_words.append(w)
word_original_tag = defaultdict(int)
word_predicted_tag = defaultdict(int)
word_otag_ptag = defaultdict(int)
for iter in range(len(concatenated_predictions)):
  if concatenated_predictions[iter] != concatenated_tests[iter]:
    word_original_tag[(concatenated_words[iter], concatenated_tests[iter])] += 1
    word_predicted_tag[(concatenated_words[iter], concatenated_predictions[iter])] += 1
    word_otag_ptag[(concatenated_words[iter], concatenated_tests[iter], concatenated_predictions[iter])] += 1

In [9]:
sort_word_otag = sorted(word_original_tag.items(), key=lambda x: x[1], reverse=True)
sort_word_ptag = sorted(word_predicted_tag.items(), key=lambda x: x[1], reverse=True)
sort_word_otag_ptag = sorted(word_otag_ptag.items(), key=lambda x: x[1], reverse=True)

In [10]:
print(sort_word_otag)



In [12]:
print(sort_word_ptag)



In [13]:
print(sort_word_otag_ptag)

