<a href="https://colab.research.google.com/github/canhtc/POS/blob/master/Notebooks/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
%tensorflow_version 2.x
import tensorflow as tf
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))
from tensorflow import keras
from tensorflow.keras import backend as K
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Dense, LSTM, InputLayer, Bidirectional, TimeDistributed, Embedding, Activation
from tensorflow.keras.models import Sequential
from sklearn.model_selection import train_test_split
import numpy as np

In [0]:
from google.colab import drive
drive.mount('/content/drive')

In [0]:
import os
os.path.isfile("/content/drive/My Drive/Attachs/pos-train2")
train_data = open("/content/drive/My Drive/Attachs/pos-train2").readlines()
print(len(train_data))

In [0]:
train_data = [t.split() for t in train_data]
print(train_data[0])

In [0]:
tagged_sentences = []
count_errors = 0
for data in train_data:
    childs = []
    for t in data:
        child = t.strip().split("/")
        if len(child) == 2 and child[1].isalpha():
            childs.append(tuple([child[0],child[1].upper()]))
        else:
            count_errors+=1
    if(len(childs) > 0):
        tagged_sentences.append(childs)

print(tagged_sentences[10])
print("Tagged sentences: ", len(tagged_sentences))

In [0]:
sentences, sentence_tags =[], [] 
for tagged_sentence in tagged_sentences:
    sentence, tags = zip(*tagged_sentence)
    sentences.append(np.array(sentence))
    sentence_tags.append(np.array(tags))

print(sentences[0])
print(sentence_tags[0])

In [0]:
(train_sentences, 
 test_sentences, 
 train_tags, 
 test_tags) = train_test_split(sentences, sentence_tags, test_size=0.2)

**Keras also needs to work with numbers**

(OOV – Out Of Vocabulary)

In [0]:
words, tags = set([]), set([])
for s in train_sentences:
    for w in s:
        words.add(w.lower())
 
for ts in train_tags:
    for t in ts:
        tags.add(t)
 
word2index = {w: i + 2 for i, w in enumerate(list(words))}
word2index['-PAD-'] = 0  # The special value used for padding
word2index['-OOV-'] = 1  # The special value used for OOVs
 
tag2index = {t: i + 1 for i, t in enumerate(list(tags))}
tag2index['-PAD-'] = 0  # The special value used to padding

Convert the word dataset to integer dataset, both the words and the tags

In [0]:
train_sentences_X, test_sentences_X, train_tags_y, test_tags_y = [], [], [], []
 
for s in train_sentences:
    s_int = []
    for w in s:
        try:
            s_int.append(word2index[w.lower()])
        except KeyError:
            s_int.append(word2index['-OOV-'])
 
    train_sentences_X.append(s_int)
 
for s in test_sentences:
    s_int = []
    for w in s:
        try:
            s_int.append(word2index[w.lower()])
        except KeyError:
            s_int.append(word2index['-OOV-'])
 
    test_sentences_X.append(s_int)
 
for s in train_tags:
    train_tags_y.append([tag2index[t] for t in s])
 
for s in test_tags:
      test_tags_y.append([tag2index[t] for t in s])

print(train_sentences_X[0])
print(test_sentences_X[0])
print(train_tags_y[0])
print(test_tags_y[0])

Maximum length of all the sequences

In [0]:
MAX_LENGTH = len(max(train_sentences_X, key=len))
print(MAX_LENGTH)  # 105

Keras provides an API to easily truncate and pad sequences to a common length:
[tf.keras.preprocessing.sequence.pad_sequences](https://www.tensorflow.org/guide/keras/masking_and_padding)


In [0]:

train_sentences_X = pad_sequences(train_sentences_X, maxlen=MAX_LENGTH, padding='post')
test_sentences_X = pad_sequences(test_sentences_X, maxlen=MAX_LENGTH, padding='post')
train_tags_y = pad_sequences(train_tags_y, maxlen=MAX_LENGTH, padding='post')
test_tags_y = pad_sequences(test_tags_y, maxlen=MAX_LENGTH, padding='post')

print(train_sentences_X[0])
print(test_sentences_X[0])
print(train_tags_y[0])
print(test_tags_y[0])

**VERSION 1:**

In [0]:
# model = Sequential()
# model.add(InputLayer(input_shape=(MAX_LENGTH, )))
# model.add(Embedding(len(word2index), 128))
# model.add(Bidirectional(LSTM(256, return_sequences=True)))
# model.add(TimeDistributed(Dense(len(tag2index))))
# model.add(Activation('softmax'))
 
# model.compile(loss='categorical_crossentropy',
#               optimizer=Adam(0.001),
#               metrics=['accuracy'])
 
# model.summary()
 

Transform the sequences of tags to sequences of **One-Hot Encoded tags**. This is what the Dense Layer outputs:

In [0]:
def to_categorical(sequences, categories):
    cat_sequences = []
    for s in sequences:
        cats = []
        for item in s:
            cats.append(np.zeros(categories))
            cats[-1][item] = 1.0
        cat_sequences.append(cats)
    return np.array(cat_sequences)

How the **one hot encoded** tags look like:

In [0]:
cat_train_tags_y = to_categorical(train_tags_y, len(tag2index))
print(cat_train_tags_y[0]) 

***Train***

In [0]:
# model.fit(train_sentences_X, to_categorical(train_tags_y, len(tag2index)), batch_size=128, epochs=40, validation_split=0.2)

**Evaluate**

In [0]:
# scores = model.evaluate(test_sentences_X, to_categorical(test_tags_y, len(tag2index)))
# print(f"{model.metrics_names[1]}: {scores[1] * 100}")   # acc:

**Test sentences**

In [0]:
test_samples = [
    "HN đẹp nhất về đêm".split(),
    "Người đẹp nhất là người hay cười .".split()
]
print(test_samples)

In [0]:
test_samples_X = []
for s in test_samples:
    s_int = []
    for w in s:
        try:
            s_int.append(word2index[w.lower()])
        except KeyError:
            s_int.append(word2index['-OOV-'])
    test_samples_X.append(s_int)
 
test_samples_X = pad_sequences(test_samples_X, maxlen=MAX_LENGTH, padding='post')
print(test_samples_X)

**First predictions**

In [0]:
# predictions = model.predict(test_samples_X)
# print(predictions, predictions.shape)

**Reverse** operation for **to_categorical**:

In [0]:
def logits_to_tokens(sequences, index):
    token_sequences = []
    for categorical_sequence in sequences:
        token_sequence = []
        for categorical in categorical_sequence:
            token_sequence.append(index[np.argmax(categorical)])
 
        token_sequences.append(token_sequence)
 
    return token_sequences

In [0]:
# print(logits_to_tokens(predictions, {i: t for t, i in tag2index.items()}))

**VERSION 2:**


**Ignores the paddings**

In [0]:
def ignore_class_accuracy(to_ignore=0):
    def ignore_accuracy(y_true, y_pred):
        y_true_class = K.argmax(y_true, axis=-1)
        y_pred_class = K.argmax(y_pred, axis=-1)
 
        ignore_mask = K.cast(K.not_equal(y_pred_class, to_ignore), 'int32')
        matches = K.cast(K.equal(y_true_class, y_pred_class), 'int32') * ignore_mask
        accuracy = K.sum(matches) / K.maximum(K.sum(ignore_mask), 1)
        return accuracy
    return ignore_accuracy

Retrain, adding the ***ignore_class_accuracy*** metric at the compile stage:

In [0]:
model = Sequential()
model.add(InputLayer(input_shape=(MAX_LENGTH, )))
model.add(Embedding(len(word2index), 128))
model.add(Bidirectional(LSTM(256, return_sequences=True)))
model.add(TimeDistributed(Dense(len(tag2index))))
model.add(Activation('softmax'))
 
model.compile(loss='categorical_crossentropy',
              optimizer=Adam(0.001),
              metrics=['accuracy', ignore_class_accuracy(0)])
 
model.summary()
 

**Retrain:**

In [0]:
model.fit(train_sentences_X, to_categorical(train_tags_y, len(tag2index)), batch_size=128, epochs=40, validation_split=0.2)

In [0]:
scores = model.evaluate(test_sentences_X, to_categorical(test_tags_y, len(tag2index)))
print(f"{model.metrics_names[1]}: {scores[1] * 100}")   #accuracy: 97.76236414909363

**Predict again:**

In [0]:
predictions = model.predict(test_samples_X)
print(logits_to_tokens(predictions, {i: t for t, i in tag2index.items()}))