In [None]:
!pip install seqeval



1. Load the data in google Drive

In [1]:
import os
from google.colab import drive
drive.mount('/content/drive')
path ="/content/drive/My Drive/"
os.chdir(path)
# os.listdir(path)

Mounted at /content/drive


2. Process data

In [None]:
import tensorflow as tf
import numpy as np
import os
conll2003_path = "/content/drive/MyDrive/nlp/A2"
datasetpath= "/content/drive/MyDrive/nlp/A2"
def load_file(path = "/train.txt"):
    # Load the dataset
    train_sentences = []
    train_labels = []
    with open(conll2003_path + path) as f:
        sentence = []
        labels = []
        for line in f:
            line = line.strip()
            if line: # Split each line into four parts: word, part of speech, block, and label
                word, pos, chunk, label = line.split()
                sentence.append(word)
                labels.append(label)
            else:
                train_sentences.append(sentence)
                train_labels.append(labels)
                sentence = []
                labels = []
    return train_sentences, train_labels

In [None]:
max_len=64
def preproces(word2idx, tag2idx, num_tags, train_sentences,  train_labels):
    # Convert sentences and labels to numerical sequences
    x = [[word2idx[word.lower()] for word in sentence] for sentence in train_sentences]
    x = tf.keras.preprocessing.sequence.pad_sequences(maxlen=max_len, sequences=x, padding="post", value=0)
    y = [[tag2idx[tag] for tag in labels] for labels in train_labels]
    y = tf.keras.preprocessing.sequence.pad_sequences(maxlen=max_len, sequences=y, padding="post", value=tag2idx["O"])
    y = tf.keras.utils.to_categorical(y, num_tags)
    return x, y

In [None]:
def get_dataset():
    # Load the dataset
    train_sentences, train_labels = load_file("/train.txt")
    valid_sentences, valid_labels = load_file("/valid.txt")
    test_sentences, test_labels = load_file("/test.txt")
    # Create vocabulary and tag dictionaries
    all_sentencses = np.concatenate([train_sentences, valid_sentences,test_sentences])
    all_labels = np.concatenate([train_labels, valid_labels, test_labels])
    vocab = set()
    tags = set()
    for sentence in all_sentencses:
        for word in sentence:
            vocab.add(word.lower())
    word2idx = {}
    if len(word2idx) == 0:
        word2idx["PADDING_TOKEN"] = len(word2idx)
        word2idx["UNKNOWN_TOKEN"] = len(word2idx)
    for word in vocab:
        word2idx[word] = len(word2idx)

    for labels in all_labels:
        for label in labels:
            tags.add(label)
    tag2idx = {t: i for i, t in enumerate(tags)}
    save_dict(word2idx, os.path.join(conll2003_path, 'word2idx.json'),)
    save_dict(tag2idx, os.path.join(conll2003_path, 'idx2Label.json'),)
    num_words = len(word2idx) + 1
    num_tags = len(tag2idx)
    train_X, train_y = preproces(word2idx, tag2idx, num_tags, train_sentences, train_labels);
    valid_X, valid_y = preproces(word2idx, tag2idx, num_tags, valid_sentences, valid_labels);
    test_X, test_y = preproces(word2idx, tag2idx, num_tags, test_sentences, test_labels);
    np.savez( os.path.join(conll2003_path ,'dataset.npz'), train_X = train_X, train_y = train_y, valid_X = valid_X, valid_y =valid_y , test_X =test_X, test_y= test_y)
    return train_X, train_y, valid_X, valid_y , test_X, test_y

In [None]:
def save_dict(dict, file_path):
    import json
    # Saving the dictionary to a file
    with open(file_path, 'w') as f:
        json.dump(dict, f)
def load_dict(path_file):
    import json
    # Loading the dictionary from the file
    with open(path_file, 'r') as f:
        loaded_dict = json.load(f)
        return loaded_dict;
    print(loaded_dict)

In [None]:
train_X, train_y, valid_X, valid_y , test_X, test_y=get_dataset()

In [None]:
import tensorflow as tf
from tensorflow import keras
from keras.models import Model
from keras.layers import Input, Embedding, LSTM, Dense, TimeDistributed
import keras as keras
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.layers import Embedding, Conv1D, GlobalMaxPooling1D, Dense
import numpy as np

3. Use GPU to accelerate

In [None]:
if tf.test.gpu_device_name():
    print('Default GPU Device: {}'.format(tf.test.gpu_device_name()))
else:
    print("Please install GPU version of TensorFlow")

config = tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth = True
session = tf.compat.v1.Session(config=config)
tf.config.experimental_run_functions_eagerly(True)
# physical_devices = tf.config.list_physical_devices('GPU')
# tf.config.experimental.set_memory_growth(physical_devices[0], True)

Default GPU Device: /device:GPU:0


4. Define Transformer Model

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, LayerNormalization, MultiHeadAttention, Dropout, Input
from tensorflow.keras.models import Model

def Transformer(vocab_size, num_layers, d_model, num_heads, d_ff, input_length, dropout_rate):

    word2idx = load_dict('/content/drive/MyDrive/nlp/A2/word2idx.json')
    tag2idx = load_dict('/content/drive/MyDrive/nlp/A2/idx2Label.json')
    max_len=64
    num_words = len(word2idx) + 1
    num_tags = len(tag2idx)
    input_layer = Input(shape=(None,))
    vocab_size=num_words
    # Input Layer
    inputs = input_layer
    # Position Encoding
    position_encoding = positional_encoding(input_length, d_model)

    # Input Embedding Layer
    embedding = Embedding(vocab_size, d_model)(inputs)
    embedding *= tf.math.sqrt(tf.cast(d_model, tf.float32))
    embedding += position_encoding

    # Dropout Layer
    x = Dropout(dropout_rate)(embedding)
    # Transformer Layer
    for _ in range(num_layers):
        x = encoder_layer(d_model, num_heads, d_ff, dropout_rate)(x)
    for _ in range(num_layers):
        x = decoder_layer(d_model, num_heads, d_ff, dropout_rate)(x)
    # Output Layer
    outputs = TimeDistributed(Dense(num_tags, activation="softmax"))(x)
    # Build the Model
    model = Model(inputs=inputs, outputs=outputs)

    return model

def encoder_layer(d_model, num_heads, d_ff, dropout_rate):
    inputs = tf.keras.Input(shape=(None, d_model))

    # MultiHeadAttention
    attention = MultiHeadAttention(num_heads=num_heads, key_dim=d_model)(inputs, inputs)
    #attention = MultiHeadAttention(num_heads=num_heads)(inputs, inputs)
    attention = Dropout(dropout_rate)(attention)
    attention = LayerNormalization(epsilon=1e-6)(inputs + attention)

    # FFN
    outputs = Dense(d_ff, activation='relu')(attention)
    outputs = Dense(d_model)(outputs)
    outputs = Dropout(dropout_rate)(outputs)
    outputs = LayerNormalization(epsilon=1e-6)(attention + outputs)

    return tf.keras.Model(inputs=inputs, outputs=outputs)
def decoder_layer(d_model, num_heads, d_ff, dropout_rate):
    inputs = tf.keras.Input(shape=(None, d_model))

    # MultiHeadAttention
    attention1 = MultiHeadAttention(num_heads=num_heads, key_dim=d_model)(inputs, inputs)
    attention1 = Dropout(dropout_rate)(attention1)
    attention1 = LayerNormalization(epsilon=1e-6)(inputs + attention1)

    # ultiHeadAttention（Encoder-Decoder Attention）
    attention2 = MultiHeadAttention(num_heads=num_heads, key_dim=d_model)(attention1, attention1)
    attention2 = Dropout(dropout_rate)(attention2)
    attention2 = LayerNormalization(epsilon=1e-6)(attention1 + attention2)

    # FFN
    outputs = Dense(d_ff, activation='relu')(attention2)
    outputs = Dense(d_model)(outputs)
    outputs = Dropout(dropout_rate)(outputs)
    outputs = LayerNormalization(epsilon=1e-6)(attention2 + outputs)

    return tf.keras.Model(inputs=inputs, outputs=outputs)

def positional_encoding(max_length, d_model):
    pos = tf.expand_dims(tf.range(max_length, dtype=tf.float32), axis=1)
    div_term = tf.pow(10000, 2 * tf.range(d_model // 2, dtype=tf.float32) / d_model)
    encodings = tf.concat([tf.sin(pos / div_term), tf.cos(pos / div_term)], axis=1)
    return tf.expand_dims(encodings, axis=0)

In [None]:
def train( model,  train_X, train_y, valid_X, valid_y):
    # Define the path and file name for saving the model
    model_path = '/content/drive/MyDrive/nlp/A2/transformer.h5'
    # Define early stop callback function
    early_stop = EarlyStopping(monitor='val_accuracy', patience=3, mode='max', verbose=1)
    # Define the ModelCheckpoint callback function
    checkpoint = ModelCheckpoint(model_path, monitor='val_accuracy', save_best_only=True, mode='max', verbose=1)
    # Compile and train the model
    model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])

    model.fit(train_X, train_y, batch_size=32, epochs=3, validation_data=(valid_X, valid_y), callbacks=[early_stop, checkpoint])

In [None]:
def test(test_X, test_y):
    model = keras.models.load_model('/content/drive/MyDrive/nlp/A2/transformer.h5')
    # Evaluation Model
    scores = model.evaluate(test_X, test_y, verbose=0)
    print("Test Accuracy:", scores[1])
    #print(scores)

5. Train & Eval the Model

In [None]:
word2idx = load_dict('/content/drive/MyDrive/nlp/A2/word2idx.json')
tag2idx = load_dict('/content/drive/MyDrive/nlp/A2/idx2Label.json')

num_words = len(word2idx) + 1
num_tags = len(tag2idx)
vocab_size=num_tags
num_layers=2
d_model=64 # how long a vector to express a word
num_heads=8
d_ff=64
input_length=64
dropout_rate=0.2
model=Transformer(vocab_size, num_layers, d_model, num_heads, d_ff, input_length, dropout_rate)

In [None]:
train(model, np.concatenate([train_X, valid_X]), np.concatenate([train_y, valid_y]),test_X, test_y)
test(test_X, test_y)

Epoch 1/3
Epoch 1: val_accuracy improved from -inf to 0.98315, saving model to /content/drive/MyDrive/nlp/A2/transformer.h5
Epoch 2/3
Epoch 2: val_accuracy improved from 0.98315 to 0.98370, saving model to /content/drive/MyDrive/nlp/A2/transformer.h5
Epoch 3/3
Epoch 3: val_accuracy did not improve from 0.98370
Test Accuracy: 0.9837048649787903


6. Predict the Labels

In [None]:
predictions = model.predict(test_X)
# Convert predicted tags back to labels
predicted_labels = []
predicted_tags = tf.argmax(predictions, axis=-1)
word2idx = load_dict('/content/drive/MyDrive/nlp/A2/word2idx.json')
tag2idx = load_dict('/content/drive/MyDrive/nlp/A2/idx2Label.json')
for tags in predicted_tags:
    labels = [list(tag2idx.keys())[tag] for tag in tags if tag != 0]
    predicted_labels.append(labels)



In [None]:
test_sentences, test_labels = load_file("/test.txt")
predicted_labels2=predicted_labels
# Unify the dimension
for i in range(len(test_labels)):
  test_labels[i]=test_labels[i][:64]
  predicted_labels2[i]=predicted_labels2[i]+['O']*(64-len(predicted_labels2[i]))
  predicted_labels2[i]=predicted_labels2[i][:len(test_labels[i])]

7. Use Segeval to Evaluate the Tagger on the Test Set

In [None]:
from seqeval.metrics import accuracy_score
from seqeval.metrics import classification_report
from seqeval.metrics import f1_score
from seqeval.metrics import accuracy_score,precision_score,recall_score
print("Accuracy Score : ",accuracy_score(test_labels, predicted_labels2))
print("Precision Score : ",precision_score(test_labels, predicted_labels2))
print("Recall Score : ",recall_score(test_labels, predicted_labels2))
print("F1 Score : ",f1_score(test_labels, predicted_labels2))
print("-"*30)
print("Classification_Report")
print(classification_report(test_labels, predicted_labels2))

Accuracy Score :  0.8736977768231124
Precision Score :  0.43062286842577635
Recall Score :  0.42641308211873447
F1 Score :  0.42850763597392155
------------------------------
Classification_Report
              precision    recall  f1-score   support

         LOC       0.52      0.54      0.53      1661
        MISC       0.54      0.60      0.57       702
         ORG       0.52      0.53      0.53      1661
         PER       0.14      0.12      0.13      1602

   micro avg       0.43      0.43      0.43      5626
   macro avg       0.43      0.45      0.44      5626
weighted avg       0.42      0.43      0.42      5626



8. Save the Prediction

In [None]:
ts, test_labels = load_file("/test.txt")
text=''
for i in range(len(ts)):
  for j in range(len(ts[i])):
    a=''
    if(j>=len(predicted_labels[i])):
      a=ts[i][j]+' O\n'
    else:
      a=ts[i][j]+' '+predicted_labels[i][j]+'\n'
    text=text+a
  text=text+'\n'
#print(text)

In [None]:
with open("/content/drive/MyDrive/nlp/A2/transformer.test.txt", "w") as file:
  file.write(text)