In [1]:
import os
import json
import pickle
import numpy as np
import pandas as pd
import tensorflow as tf
from transformers import TFBertModel, BertTokenizer
from tensorflow.keras.metrics import Metric, Precision, Recall
from sklearn.metrics import accuracy_score, confusion_matrix

In [2]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
     tf.config.experimental.set_memory_growth(gpu, True)

In [3]:
testing_data_path = ["dataset/dev1.jsonl", "dataset/dev2.jsonl"]
output_dir = "output/"

In [4]:
def load_data(file_path):
    data = []
    for file in file_path:
        with open(file, 'r', encoding='utf-8') as f:
            data.extend([json.loads(line) for line in f])
    return data

In [5]:
MAX_LENGTH = 60

In [6]:
def clean_data(data):
    sentences = []
    valid_sentences = []

    for line in data:
        split_list = [part.strip() for part in line['stripped_sentence'].split('. ') if part.strip()]
        for i in range(len(split_list)-1):
            split_list[i] = split_list[i]+'.'  
        sentences.extend(split_list)

    for sentence in sentences:
        if 4 < len(sentence.split()) <= 50:
            if any(token in sentence for token in ['~<X>', '(<X>)', '<X>']):
                if "  \\\\" not in sentence: # remove sentences with "   \\\\"
                    valid_sentences.append(sentence)

    return valid_sentences

In [7]:
def preprocess_data(data):
    tokenizer  = BertTokenizer.from_pretrained("bert-base-uncased")
    
    input_ids, attention_masks, target_tags = [], [], []
    
    for sentence in data:

        masked_sentence = sentence.lower().replace('~<x>', '[MASK]').replace('(<x>)', '[MASK]').replace('<x>', '[MASK]')
        
        encoded_dict = tokenizer(masked_sentence, max_length=MAX_LENGTH, padding='max_length', truncation=True, return_tensors="tf")
        
        mask_indexes = [i for i, token in enumerate(encoded_dict["input_ids"][0]) if token == tokenizer.convert_tokens_to_ids('[MASK]')]
   
        # Preprocessing input_ids
        ids_without_mask = np.delete(encoded_dict["input_ids"], mask_indexes)
        if len(ids_without_mask) < MAX_LENGTH:
            ids_without_mask = np.pad(ids_without_mask, (0, MAX_LENGTH-len(ids_without_mask)), mode='constant')
        input_ids.append([ids_without_mask])

        # Preprocessing attention_mask
        attention_without_mask = np.delete(encoded_dict["attention_mask"], mask_indexes)
        if len(attention_without_mask) < MAX_LENGTH:
            attention_without_mask = np.pad(attention_without_mask, (0, MAX_LENGTH-len(attention_without_mask)), mode='constant')
        attention_masks.append([attention_without_mask])
        
        # Preprocessing target_tags
        labels = np.zeros(MAX_LENGTH, dtype=int)
        for i in mask_indexes:
            labels[i - 1] = 1
        labels_without_mask = np.delete(labels, mask_indexes)  
        if len(labels_without_mask) < MAX_LENGTH:
            labels_without_mask = np.pad(labels_without_mask, (0, MAX_LENGTH-len(labels_without_mask)), mode='constant')
        target_tags.append([labels_without_mask])

    return tf.concat(input_ids, axis=0), tf.concat(attention_masks, axis=0), tf.concat(target_tags, axis=0)

In [8]:
class F1Score(tf.keras.metrics.Metric):
    def __init__(self, name='f1_score', **kwargs):
        super(F1Score, self).__init__(name=name, **kwargs)
        self.precision = Precision()
        self.recall = Recall()

    def update_state(self, y_true, y_pred, sample_weight=None):
        self.precision.update_state(y_true, y_pred)
        self.recall.update_state(y_true, y_pred)

    def result(self):
        precision_result = self.precision.result()
        recall_result = self.recall.result()
        return 2 * ((precision_result * recall_result) / (precision_result + recall_result + tf.keras.backend.epsilon()))

    def reset_state(self):
        self.precision.reset_state()
        self.recall.reset_state()

In [9]:
def evaluate_model(model, test_preprocessed_data):
  eval_inputs, eval_attentions, eval_labels = test_preprocessed_data
  num_sentences = len(eval_inputs)

  total_precision = 0
  total_recall = 0
  total_f1_score = 0
  total_confusion_matrix = np.zeros((2, 2))  # Initialize empty confusion matrix

  # Loop through each sentence in the batch
  for i in range(num_sentences):
    batch_inputs = [tf.expand_dims(eval_inputs[i], axis=0), tf.expand_dims(eval_attentions[i], axis=0)]  # Extract data for each sentence
    batch_labels = eval_labels[i]

    predictions = model.predict(batch_inputs)
    predictions = np.round(predictions.squeeze()).astype(int)
      
    precision_object = tf.keras.metrics.Precision()
    precision_object.update_state(batch_labels, predictions)
    precision = precision_object.result()
    
    recall_object = tf.keras.metrics.Recall()
    recall_object.update_state(batch_labels, predictions)
    recall = recall_object.result()
      
    f1_score = 2 * ((precision * recall) / (precision + recall + tf.keras.backend.epsilon()))  # Manual F1 calculation

    total_precision += precision
    total_recall += recall
    total_f1_score += f1_score

    total_confusion_matrix += confusion_matrix(batch_labels, predictions)

  # Calculate average metrics for multiple sentences
  average_precision = total_precision / num_sentences
  average_recall = total_recall / num_sentences
  average_f1_score = total_f1_score / num_sentences

  return {
      "average_precision": average_precision,
      "average_recall": average_recall,
      "average_f1_score": average_f1_score,
      "confusion_matrix": total_confusion_matrix
  }

In [10]:
data = load_data(testing_data_path)
cleaned_test_data = clean_data(data)

In [11]:
if os.path.isfile(output_dir + 'test_tokenized_data.pkl'):
    with open(output_dir + 'test_tokenized_data.pkl', 'rb') as f:
        test_preprocessed_data = pickle.load(f)
else:
    test_preprocessed_data = preprocess_data(cleaned_test_data)
    with open(output_dir + 'test_tokenized_data.pkl', 'wb') as f:
        pickle.dump(test_preprocessed_data, f)

In [12]:
# Load the model with custom_objects
model = tf.keras.models.load_model(output_dir + 'model/token_insertion_model.h5',
                                   custom_objects={'TFBertModel': TFBertModel,
                                                   'F1Score': F1Score,
                                                   'Precision': tf.keras.metrics.Precision,
                                                   'Recall': tf.keras.metrics.Recall})



In [13]:
evaluation_result = evaluate_model(model, test_preprocessed_data)



































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































In [15]:
with open(output_dir + 'evaluation_result.pkl', 'wb') as f:
        pickle.dump(evaluation_result, f)

In [16]:
acuracy_metrics = pd.DataFrame.from_dict({'average_precision' : evaluation_result['average_precision'].numpy(),
                                         'average_recall' : evaluation_result['average_recall'].numpy(),
                                         'average_f1_score' : evaluation_result['average_f1_score'].numpy()}, orient='index', columns=['Values'])
acuracy_metrics

Unnamed: 0,Values
average_precision,0.915271
average_recall,0.911828
average_f1_score,0.909105
