Reference: https://gist.github.com/dirko/1d596ca757a541da96ac3caa6f291229

In [2]:
import pickle 
import numpy as np 

from sklearn.metrics import confusion_matrix, accuracy_score

from keras.models import Sequential
from keras.preprocessing.sequence import pad_sequences
from keras.models import load_model

from keras.layers import Input, Dense, TimeDistributed
from keras.layers import Embedding, Activation
from keras.layers import GRU, LSTM, Bidirectional
from keras.callbacks import ModelCheckpoint, EarlyStopping


from keras.backend import tf

Using TensorFlow backend.


## Load the data

In [3]:
with open('data/conll.pkl', 'rb') as f:
    data = pickle.load(f)

In [4]:
X = data['train']['X']
y = data['train']['y']
X_test = data['test']['X']
y_test = data['test']['y']
maxlen = data['stats']['maxlen']
word2ind = data['stats']['word2ind']
ind2word = data['stats']['ind2word']
label2ind = data['stats']['label2ind']
ind2label = data['stats']['ind2label']

In [5]:
print(ind2label)

{1: 'I-MISC', 2: 'I-PER', 3: 'I-ORG', 4: 'O', 5: 'I-LOC'}


In [6]:
def encode_one_hot(idx, dim):
    temp = [0]*dim
    temp[idx] = 1
    return temp

def encode_corpus(X, maxlen):
    X_enc = [[word2ind[word] for word in x] for x in X]
    return pad_sequences(X_enc, maxlen=maxlen, value=0)

def encode_labels(Y, maxlen, dim):
    Y_enc = [[label2ind[tag] for tag in y] for y in Y]
    Y_enc = pad_sequences(Y_enc, maxlen=maxlen, value=0)
    Y_enc = [[encode_one_hot(idx, dim) for idx in y] for y in Y_enc]
    return np.array(Y_enc)

In [8]:
dim = len(ind2label) + 1
print(dim)

X_enc = encode_corpus(X, maxlen)
y_enc = encode_labels(y, maxlen, dim)

X_test_enc = encode_corpus(X_test, maxlen)
y_test_enc = encode_labels(y_test, maxlen, dim)

6


In [10]:
validation_split = 0.1

indices = np.arange(X_enc.shape[0])
np.random.shuffle(indices)
X_enc = X_enc[indices]
y_enc = y_enc[indices]
num_validation_samples = int(validation_split * X_enc.shape[0])

X_train_enc = X_enc[:-num_validation_samples]
y_train_enc = y_enc[:-num_validation_samples]
X_val_enc = X_enc[-num_validation_samples:]
y_val_enc = y_enc[-num_validation_samples:]

In [11]:
print('Training and testing tensor shapes:')
print(X_train_enc.shape, X_val_enc.shape, X_test_enc.shape, y_train_enc.shape, y_val_enc.shape, y_test_enc.shape)

Training and testing tensor shapes:
(1475, 63) (163, 63) (182, 63) (1475, 63, 6) (163, 63, 6) (182, 63, 6)


## Build the model 

In [12]:
max_features = len(word2ind)+1
embedding_size = 128
hidden_size = 32
out_size = len(label2ind) + 1
batch_size = 32
epochs = 10

In [64]:
model = Sequential()
model.add(Embedding(input_dim=max_features, output_dim=embedding_size,
                    input_length=maxlen, mask_zero=True))
model.add(Bidirectional(LSTM(hidden_size, return_sequences=True)))
model.add(TimeDistributed(Dense(out_size)))
model.add(Activation('softmax'))

model.summary()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding_3 (Embedding)      (None, 63, 128)           1060608   
_________________________________________________________________
bidirectional_3 (Bidirection (None, 63, 64)            41216     
_________________________________________________________________
time_distributed_3 (TimeDist (None, 63, 6)             390       
_________________________________________________________________
activation_3 (Activation)    (None, 63, 6)             0         
Total params: 1,102,214.0
Trainable params: 1,102,214.0
Non-trainable params: 0.0
_________________________________________________________________


## Train the model 

In [65]:
model.compile(loss='categorical_crossentropy', optimizer='adam')

In [66]:
filepath = "models/NER-Wikigold-{epoch:02d}-{val_loss:.2f}.hdf5"
checkpoint = ModelCheckpoint(filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='auto')
earlystopping = EarlyStopping(monitor='val_loss', min_delta=0, patience=3, verbose=1, mode='auto')
callbacks_list = [checkpoint, earlystopping]

In [67]:
model.fit(X_train_enc, y_train_enc, batch_size=batch_size, epochs=epochs,
          validation_data=(X_val_enc, y_val_enc), callbacks=callbacks_list)

Train on 1475 samples, validate on 163 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x1373c1550>

In [68]:
model.save('models/bidir_lstm.h5')

## Evaluate the model (deprecated)

In [13]:
model = load_model('models/bidir_lstm.h5')

In [14]:
score = model.evaluate(X_test_enc, y_test_enc, batch_size=batch_size, verbose=1)
print('Raw test score:', score)

Raw test score: 0.515410898478


In [34]:
def unpad_sequences(yh, pr):
    coords = [np.where(yhh > 0)[0][0] for yhh in yh]
    yh = [yhh[co:] for yhh, co in zip(yh, coords)]
    ypr = [prr[co:] for prr, co in zip(pr, coords)]
    return yh, ypr

def score(yh, pr):
    yh, ypr = unpad_sequences(yh, pr)
    fyh = [c for row in yh for c in row]
    fpr = [c for row in ypr for c in row]
    return fyh, fpr

def compare_prediction_groumdtruth(model, X, y, verbose=True, indices=None):
    pr = model.predict(X)
    pr = pr.argmax(2)
    yh = y.argmax(2)
    fyh, fpr = score(yh, pr)
    print('Accuracy:', accuracy_score(fyh, fpr), end='\n\n')
    print('Confusion matrix:')
    print(confusion_matrix(fyh, fpr), end='\n\n')
    
    if verbose and indices != None:
        yh, ypr = unpad_sequences(yh, pr)
        for idx in indices:
            print('test sample', idx)
            print(yh[idx])
            print(ypr[idx], end='\n\n')

In [35]:
compare_prediction_groumdtruth(model, X_test_enc, y_test_enc, True, indices=[1,2,3,4,5,6])

Accuracy: 0.851641414141

Confusion matrix:
[[  47    5    2  141    3]
 [   2   25    1   99    1]
 [   8    3   79   69   20]
 [  26    3   14 2502   10]
 [   5    3    7   48   45]]

test sample 1
[4 4 4 4 4 4 4 4 5 4]
[4 4 4 4 4 4 4 4 5 4]

test sample 2
[4]
[4]

test sample 3
[4 3 3 3 3 3 4 4 4 4 4 4 4 5 5 4]
[4 5 5 4 4 4 4 4 4 4 4 4 4 5 5 4]

test sample 4
[4]
[4]

test sample 5
[3 3 3 4 4 4 4 4 5 4 4 5 4 5 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4 4 5 4 5 4 4 4 4 5 4]

test sample 6
[4 4 4 4 4 4 4 3 4 4 4 4 4 4 4 4]
[4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4]



## Evaluation (updated)

In [3]:
def unpad_sequences(yh, pr):
    """ remove the padding 0s for the ground truth tags and predicted tags 
    """
    coords = [np.where(yhh > 0)[0][0] for yhh in yh]
    yh = [yhh[co:] for yhh, co in zip(yh, coords)]
    ypr = [prr[co:] for prr, co in zip(pr, coords)]
    return yh, ypr

def score(yh, pr):
    """ flatten tags in lsit of samples into a list of tags 
    """
    yh, ypr = unpad_sequences(yh, pr)
    fyh = [c for row in yh for c in row]
    fpr = [c for row in ypr for c in row]
    return fyh, fpr

def compare_prediction_groundtruth(model, X, y, verbose=True, indices=None):
    """ show evaluation results, including prediction accuracy (word-wise) and the confusion matrix, 
        optionally showing the predicted tags and groundtruth tags for a chosen set of samples (a list of indices as an argument)
    """
    pr = model.predict(X) 
    pr = pr.argmax(2)
    yh = y.argmax(2)
    fyh, fpr = score(yh, pr)
    # get accuracy score 
    acc = accuracy_score(fyh, fpr)
    # get confusion matrix
    cm = confusion_matrix(fyh, fpr)
    print('Accuracy:', acc, end='\n\n')
    print('Confusion matrix:')
    print(cm, end='\n\n')
    
    if verbose and indices != None:
        yh, ypr = unpad_sequences(yh, pr)
        for idx in indices:
            print('test sample', idx)
            print([ind2label[index] for index in yh[idx]])
            print([ind2label[index] for index in ypr[idx]], end='\n\n')
    return acc, cm

def get_TP_FP_FN(cm, label):
    """ get numbers of True positives, False positives and False negitives,
        cm is the confusion matrix for multiple labels, label is a tag index
    """
    dim = min(cm.shape[0], cm.shape[1])
    if label >= 0 and label < dim:
        # TP, True positive, diagonal position
        tp = cm[label, label]
        # FP, False positive: sum of column label (without main diagonal)
        fp = sum(cm[:, label]) - cm[label][label]
        # FN, False negative: sum of row label (without main diagonal)
        fn = sum(cm[label, :]) - cm[label][label]
        return tp, fp, fn
    else:
        print("label out of bound")

def get_precision(cm, label):
    # precision = TP / (TP + FP)
    tp, fp, fn = get_TP_FP_FN(cm, label)
    return tp / (tp + fp)

def get_recall(cm, label):
    # recall = TP / (TP + FN)
    tp, fp, fn = get_TP_FP_FN(cm, label)
    return tp / (tp + fn)

def get_F1_score(cm, label):
    # F1 = 2TP / (2TP + FP + FN)
    tp, fp, fn = get_TP_FP_FN(cm, label)
    return 2*tp / (2*tp + fp + fn)
        
def get_evaluation_statistics(ind2label, label=0):
    """ show True positives, False positives, False negatives, precision, recall and F1 score for a particular label
    """
    print("evaluation statistics for label", label, ind2label[label])
    tp, fp, fn = get_TP_FP_FN(cm, label)
    print("True positives", tp, " , False positives", fp, " , False negatives", fn)
    precison = get_precision(cm, label)
    print("Precision", precision)
    recall = get_recall(cm, label)
    print("Recall", recall)
    f1 = get_F1_score(cm, label)
    print("F1 score", f1, end='\n\n')

In [None]:
#  Evaluate the model
model = load_model('models/NER_Wikigold.h5')

# get the test set score (categorical crossentropy / loss), not USEFUL 
score = model.evaluate(X_test, y_test, batch_size=batch_size, verbose=1)
print('Raw test score:', score)

# shows accuracy of the model on a word level and the confusion matrix of the test set 
acc, cm = compare_prediction_groundtruth(model, X_test, y_test, True, indices=[1,2,3])

# show evaluation statistics of chosen labels
labels = [1, 2, 3]
for label in labels:
    get_evaluation_statistics(ind2label, label)