In [1]:
import matplotlib.pyplot as plt
import pandas as pd
import pickle
import numpy as np
import json
import regex as re
import warnings
warnings.filterwarnings("ignore", category=np.VisibleDeprecationWarning) 
np.random.seed(0)
plt.style.use("ggplot")

import tensorflow as tf
print('Tensorflow version:', tf.__version__)
print('GPU detected:', tf.config.list_physical_devices('GPU'))

Tensorflow version: 2.11.0
GPU detected: []


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
NUM_FEATURES = 8
def word_shape_features(word):
    return np.array([word.istitle(), word.islower(), word.isupper(), len(word),
                     word.isdigit(),  word.isalpha(),word.isalnum(), word.isnumeric()])

def get_word_features(word):
    return word_shape_features(word)

def get_sent_features(sent):
    ret = []
    for word in sent:
        ret.append(get_word_features(word))
    return ret

In [4]:
TAGS =  pickle.load(open( "tags.pickle", "rb" ))
TAGS.remove("O") 
NUM_TAGS = len(TAGS)
# print(NUM_TAGS)

tag2id = {}
for id,label in enumerate(TAGS):
    tag2id[label] = id 

def label2id(labels):
    ret = []
    prev_label = ""
    for label in labels:
        if label == "O":
            ret.append([2*NUM_TAGS])
        elif label == prev_label:
            l =[tag2id[t]+ NUM_TAGS for t in label]
            ret.append(l)
        else:
            l =[tag2id[t] for t in label]
            ret.append(l)
        prev_label = label
    return ret 

In [5]:
def get_label(label_id):
    if label_id == (2*NUM_TAGS):
        return "O"
    elif label_id >= NUM_TAGS:
        return [TAGS[label_id-NUM_TAGS]]
    else:
        return TAGS[label_id]

def id2label(labels):
    ret = []
    for label in labels:
        l = [get_label(x) for x in label]
        if len(l) == 1 and l[0] == "O":
            l = "O"
        ret.append(l)
    return ret 

In [6]:
def clean_text(sent):
    '''
    This is text cleaning function
    '''
    ret_sent= []
    for txt in sent:
      fil_txt = re.sub('[^A-Za-z0-9]+', '', str(txt))
      if len(fil_txt) == 0:
        fil_txt  = txt [0]
      ret_sent.append(fil_txt)
    assert(len(ret_sent) == len(sent))
    return ret_sent

In [7]:
f = open('drive/MyDrive/train.json')
data = json.load(f)
f.close()

In [8]:
a = [d["sent"] for d in data]
set_ = set()
for idx,s in enumerate(a):
    for t in s:
        if len(t)<1:
            set_.add(idx)
data = [data[i] for i in range(len(data)) if i not in set_]

In [9]:
# Reduce training size to fit in RAM
print('Total Entries:', len(data))
data = data[:20000]
print('Reduced Entries:', len(data))

Total Entries: 1247626
Reduced Entries: 20000


In [10]:
df = pd.DataFrame(data)
df["sent"] = df["sent"].map( lambda x: clean_text(x))
df["features"] = df["sent"].map(lambda x: get_sent_features(x))
df["labels"] = df["tags"].map(lambda x: label2id(x))


In [11]:
sentences = list(df["sent"])
labels = list(df["labels"])
unique_word_set = set()
for x in sentences:
  for w in x:
    unique_word_set.add(w)
words_to_id = {}
for idx, w in enumerate(unique_word_set):
  words_to_id[w] = idx

num_words = len(unique_word_set)
print("Number of training sentences: {:,}".format(len(sentences)))

Number of training sentences: 20,000


In [12]:
def to_bool_vec(y_id):
    y_bool = np.zeros(2*NUM_TAGS+1, np.int32)
    num_labels = len(y_id)
    for id in y_id:
        # for l in label:
          y_bool[id] = 1
    return y_bool

In [13]:
from tensorflow.keras.utils import pad_sequences, to_categorical

max_len = 105

X = np.array([np.array([words_to_id[w] for w in s], dtype=np.float32) for s in sentences])
X = pad_sequences(maxlen=max_len, dtype='float32', sequences=X, padding="post", value=(num_words-1))

print(X.shape)
print(X.dtype)

y_padding = np.zeros(2*NUM_TAGS+1, np.float32)
y_padding[2*NUM_TAGS] = 1.0

y = np.array([np.array([to_bool_vec(lbl) for lbl in l], dtype=np.float32) for l in labels])
y = pad_sequences(maxlen=max_len, dtype='float32', sequences=y, padding="post", value=y_padding)

print(y.shape)
print(y.dtype)

(20000, 105)
float32
(20000, 105, 227)
float32


In [14]:
from tensorflow.keras import Model, Input
from tensorflow.keras.layers import LSTM, Embedding, Dense
from tensorflow.keras.layers import TimeDistributed, SpatialDropout1D, Bidirectional
from keras import backend as K

In [15]:
input_word = Input(shape=(max_len,))
model = Embedding(input_dim=num_words , output_dim=max_len, input_length=max_len)(input_word)
model = SpatialDropout1D(0.1)(model)
model = Bidirectional(LSTM(units=NUM_TAGS, return_sequences=True, recurrent_dropout=0.1))(model)
out = TimeDistributed(Dense(2*NUM_TAGS+1, activation="softmax"))(model)
model = Model(input_word, out)
model.summary()

[print(i.shape, i.dtype) for i in model.inputs]
print("---------------")
[print(o.shape, o.dtype) for o in model.outputs]
print("---------------")
[print(l.name, l.input_shape, l.dtype) for l in model.layers]

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 105)]             0         
                                                                 
 embedding (Embedding)       (None, 105, 105)          6989115   
                                                                 
 spatial_dropout1d (SpatialD  (None, 105, 105)         0         
 ropout1D)                                                       
                                                                 
 bidirectional (Bidirectiona  (None, 105, 226)         197976    
 l)                                                              
                                                                 
 time_distributed (TimeDistr  (None, 105, 227)         51529     
 ibuted)                                                         
                                                             

[None, None, None, None, None]

In [16]:
def multi_class_cross_entropy(y_true, y_pred):
    y_true = K.cast(y_true, 'float32')
    y_pred = K.cast(y_pred, 'float32')
    y_pred = K.clip(y_pred, K.epsilon(), 1-K.epsilon())
    cross_entropy = -(y_true * K.log(y_pred) + (1 - y_true) * K.log(1 - y_pred))
    loss = K.sum(cross_entropy, axis=0)
    return loss

model.compile(optimizer="adam",
              loss=multi_class_cross_entropy,
              metrics=["accuracy"])

In [17]:
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
chkpt = ModelCheckpoint("model_weights.h5", monitor='val_loss',verbose=1, save_best_only=True, save_weights_only=True, mode='min')
early_stopping = EarlyStopping(monitor='val_accuracy', min_delta=0, patience=1, verbose=0, mode='max', baseline=None, restore_best_weights=False)

history = model.fit(
    x=X,
    y=y,
    batch_size=32, 
    epochs=3,
    verbose=1
)

Epoch 1/3
Epoch 2/3
Epoch 3/3


In [18]:
model.save('lstm_model.h5')

In [20]:
f_test = open('./drive/MyDrive/test.json')
data_test = json.load(f_test)
f_test.close()

a = [d["sent"] for d in data_test]
set_ = set()
for idx,s in enumerate(a):
    for t in s:
        if len(t)<1:
            set_.add(idx)

data_test = [data_test[i] for i in range(len(data_test)) if i not in set_]
df = pd.DataFrame(data_test)
df["sent"] = df["sent"].map( lambda x: clean_text(x))
df["features"] = df["sent"].map(lambda x: get_sent_features(x))
df["labels"] = df["tags"].map(lambda x: label2id(x))

sentences = list(df["sent"])
labels = list(df["labels"])
unique_word_set = set()
for x in sentences:
  for w in x:
    unique_word_set.add(w)
words_to_id = {}
for idx, w in enumerate(unique_word_set):
  words_to_id[w] = idx

num_words = len(unique_word_set)
print("Number of testing sentences: {:,}".format(len(sentences)))

X_test = np.array([np.array([words_to_id[w] for w in s], dtype=np.float32) for s in list(df["sent"])])
X_test = pad_sequences(maxlen=max_len, dtype='float32', sequences=X_test, padding="post", value=(num_words-1))

y_padding = np.zeros(2*NUM_TAGS+1, np.float32)
y_padding[2*NUM_TAGS] = 1.0

y_test = np.array([np.array([to_bool_vec(lbl) for lbl in l], dtype=np.float32) for l in list(df["labels"])])
y_test = pad_sequences(maxlen=max_len, dtype='float32', sequences=y_test, padding="post", value=y_padding)

out = model.evaluate(X_test, y_test)

Number of testing sentences: 278


In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score
# predict on test data
y_pred = model.predict(X_test)

#TODO

print(X_test[0][5])
print(y_test[0][5])
print(y_pred[0][5])

# calculate precision, recall, and f1 score
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)

# print the results
print("Precision:", precision)
print("Recall:", recall)
print("F1 score:", f1)