 ***IMPORT LIBRARIES***

In [None]:
# ////GENERALS////
import os
import re
import gc
import time
import json
import requests
import numpy as np
import pandas as pd

# ////AUTOMATIC LEARNING////
import tensorflow as tf
from datasets import load_dataset,DatasetDict,Dataset
import keras
from keras.callbacks import EarlyStopping
from keras.callbacks import ModelCheckpoint
from typing_extensions import Type

# ////TRANSFORMERS BY Hugging Face////
import transformers
from transformers import AutoTokenizer
from transformers import TFAutoModelForSeq2SeqLM
from transformers import DataCollatorForSeq2Seq
from transformers import create_optimizer, AdamWeightDecay

# ////NLP MEASURES////
import nltk
from nltk.translate.bleu_score import sentence_bleu,corpus_bleu
from nltk.translate.bleu_score import corpus_bleu
from nltk.translate.meteor_score import meteor_score
from sklearn.model_selection import KFold
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
from sklearn.preprocessing import LabelBinarizer
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics import accuracy_score
import evaluate
meteor_measure = evaluate.load('meteor')

import warnings
warnings.filterwarnings('ignore')

***READ THE DATASET***

In [None]:
#Reading the dataset JSON file
with open('PATH TO DATASET') as f:
   dataset = json.load(f)

rows = dataset['pairs']

# Reading Natural language (nl) and Controlled Natural lagauge (cnl)
nl = [row['NL'] for row in rows]
cnl = [row['CNL'] for row in rows]

**DATASET STRUCTURE**

In [None]:
#To dataset structure
train_dict = Dataset.from_dict({"sentences": nl,"targets": cnl})
train_dict

***TRAINING TRANSFORMERS***

In [None]:
#Using AutoTokenizer from trasnformer 't5-small' or 'facebook/bart-base'
tokenizer = AutoTokenizer.from_pretrained("t5-small") # or update mode 'facebook/bart-base'

In [None]:
#Defining Prefix and model inputs
prefix = "translate English to CNL: "
def preprocess(data):
  inputs = [prefix + example for example in data["sentences"]]
  targets = data["targets"]
  model_inputs = tokenizer(inputs, text_target=targets, max_length=128, truncation=True)
  return model_inputs

In [None]:
#Importing models 't5-small' or 'facebook/bart-base'
model = TFAutoModelForSeq2SeqLM.from_pretrained("t5-small") # update model name 'facebook/bart-base' or give path of saved model

In [None]:
#Used DataCollator For Seq2Seq
data_collator = DataCollatorForSeq2Seq(tokenizer=tokenizer, model=model, return_tensors="tf")

In [None]:
#Defined Adam optimizer with learning and weight decay rates
optimizer = AdamWeightDecay(learning_rate=2e-5, weight_decay_rate=0.01)

In [None]:
#Dataset spliting, preprocessing, preparation and model creation
def split_dataset(dataset, train_idxs, test_idxs):
  training_dict = Dataset.from_dict({"sentences":dataset[train_idxs]["sentences"], "targets": dataset[train_idxs]["targets"]})
  test_dict =Dataset.from_dict({"sentences":dataset[test_idxs]["sentences"], "targets": dataset[test_idxs]["targets"]})
  return (training_dict, test_dict)

def preprocessing_sentences(dataset):
  data = dataset.map(preprocess, batched=True)
  return data

def prepare_tf_dataset(dataset):
  tf_dataset = model.prepare_tf_dataset(
      dataset,
      shuffle=True,
      batch_size=16,
      collate_fn=data_collator,
  )
  return tf_dataset

def create_and_compile_model(learning_rate=2e-5, weight_decay_rate=0.01):
  optimizer = AdamWeightDecay(learning_rate=2e-5, weight_decay_rate=0.01)
  model = TFAutoModelForSeq2SeqLM.from_pretrained("t5-small")
  model.compile(optimizer=optimizer,metrics=['accuracy'])
  return model

**BLUE SCORE FUNCTION**

In [None]:
%%time
# Function to compute the BLEU Score
def bleu_score(model, tokenizer, test_dict):
    sents = test_dict['sentences']
    predicted = predict_cnl(model, tokenizer, sents)
    targets = test_dict['targets']
    references, hypothesis = [], []
    for i, pred in enumerate(predicted):
        references.append([targets[i].split()])
        hypothesis.append(pred.split())

    bleu_dic = {}
    print(hypothesis)
    bleu_dic['1-grams'] = corpus_bleu(references, hypothesis, weights=(1.0, 0, 0, 0))
    bleu_dic['1-2-grams'] = corpus_bleu(references, hypothesis, weights=(0.5, 0.5, 0, 0))
    bleu_dic['1-3-grams'] = corpus_bleu(references, hypothesis, weights=(0.3, 0.3, 0.3, 0))
    bleu_dic['1-4-grams'] = corpus_bleu(references, hypothesis, weights=(0.25, 0.25, 0.25, 0.25))

    return bleu_dic

***METEOR SCORE FUNCTION***

In [None]:
# Function to compute the METEOR Score
%%time
def compute_meteor(model, test_dict, alpha=0.9, beta=3, gamma=0.5):
    sents = test_dict['sentences']

    predicted = predict_cnl(model, tokenizer, sents)
    targets = test_dict['targets']
    references, hypothesis = [], []
    for i, pred in enumerate(predicted):
        references.append([targets[i]])
        hypothesis.append(pred)

    meteor_dic = {}
    meteor_dic = meteor_measure.compute(predictions= hypothesis, references= references)

    return meteor_dic

***SYNTAX_CHECK FUNCTION***

In [None]:
# ////We used a self created REST API that interact with the syntax checker implemented in the CNL2ASP tool (https://github.com/dodaro/cnl2asp)
BASE_URL = "http://XYZ/api/check_syntax"  #Your check syntax API link replace'XYZ'

def call_check_syntax_api(sentence, verbose = 1):
  body = {'cnls': sentence}
  headers = {"X-API-KEY": "YOUR API KEY REPLACE HERE"}  #Update your API key
  try:
    resp = requests.post(BASE_URL, json = body, headers = headers)
    if(resp.status_code != 200):
      if(verbose == 1):
        print(sentence)
        print(resp.json())
        print('-----------------')
      return None
    return resp.json()
  except:
    return None

def evaluate_syntax(sentences, verbose = 1):
  if(sentences is None or len(sentences) == 0):
      raise Exception("Sentences list is empty")

  errorCount = 0
  problematic_sentences = []
  for sentence in sentences:
    resp = call_check_syntax_api(sentence)
    if(resp is None or "cli_message" not in resp or resp["cli_message"] != "Input file fits the grammar."):
      errorCount += 1
      problematic_sentences.append(sentence)
    time.sleep(0.1)
  summary = {
      'errorCount': errorCount,
      'total': len(sentences),
      'avg': errorCount/len(sentences),
      'problematics': problematic_sentences
  }
  return summary

##Evaluate the syntax Sample CNLs
evaluate_syntax(["A node goes from 1 to 50.", "A node goes om 1 to 50."])

**INFERENCE MULTI SENTENCES**

In [None]:
#Sentence Cleaning function
def clean_sentence(sentence):
  regExp = '(<\/s>|<s>|<pad>|<unk>)'
  return re.sub(regExp, "", sentence).strip()

##Evaluate Sentence Cleaning function
clean_sentence('</s><s>Waiter W is working when waiter W serves a drink.</s><pad><pad><pad><pad><pad><pad><pad><pad>')

In [None]:
#Predicting the CNL function
def predict_cnl(model, tokenizer, sentences):
  task_prefix = "translate English to CNL: "
  prefixeds = [task_prefix + sentence for sentence in sentences]
  predict_input = tokenizer(prefixeds, return_tensors="tf", padding=True)
  output_sequences = model.generate(
    input_ids=predict_input["input_ids"],
    attention_mask=predict_input["attention_mask"],
    do_sample=False,  # disable sampling to test if batching affects output
    max_length=300, min_length=5,num_beams=1
  )
  cnls = [tokenizer.decode(encoded) for encoded in output_sequences]
  cnls = [clean_sentence(sentence) for sentence in cnls]
  return cnls

##Evaluate predict_cnl function
toTranslate = ["The pub 1 is close to the pub number 2 and the pub number X, where X is equal to 3, 4."]
translateds = predict_cnl(model, tokenizer, toTranslate)
print(translateds)

***INFERENCE***

In [None]:
def predict_predicates(sentence):
  task_prefix = "translate English to CNL: "
  predict_input = tokenizer([task_prefix + sentence], return_tensors="tf", padding=True)

  output_sequences = model.generate(
    input_ids=predict_input["input_ids"],
    attention_mask=predict_input["attention_mask"],
    do_sample=False,  # disable sampling to test if batching affects output
    max_length=300, min_length=20,num_beams=1
  )

  return tokenizer.decode(output_sequences[0])

print(predict_predicates('Serving as many drinks as possible is preferred with low priority.'))

**TRAINING CROSS VALIDATION K=5**

In [None]:
%%time
## K-fold Cross-Validation##
def get_model_name(k):
    return 'model_'+str(k)+'.h5py'

LOSS = []
VALIDAITON_LOSS = []

# Define an empty list to store the test_idxs for each split
all_test_idxs = []

save_dir = 'content/Kfold5_t5small/' #Give your saving path here

# prepare cross validation
kfold = KFold(n_splits=5, shuffle=True, random_state=1)

# Create a DataFrame to store the scores
score_df = pd.DataFrame(columns=['split', 'bleu_1gram', 'bleu_2gram', 'bleu_3gram', 'bleu_4gram', 'meteor', 'train_time', 'score_time'])

start_time = time.time()
# enumerate splits
for index, (train_idxs, test_idxs) in enumerate(kfold.split(train_dict)):
  # Save the test_idxs in a file
  print("///Save the test_idxs in a file///")
  np.savetxt(save_dir + 'test_idxs_' + str(index) + '.txt', test_idxs, fmt='%d')
  # Append the test_idxs to the list
  all_test_idxs.append(test_idxs)
  # Append the test_idxs to the list
  all_test_idxs.append(test_idxs)
  # Save all_test_idxs as a numpy array
  np.save(save_dir + 'all_test_idxs.npy', all_test_idxs)

  # ///GETTING SPLITS///
  print("///GETTING SPLITS///")
  (training_dict, test_dict) = split_dataset(train_dict, train_idxs, test_idxs)

  # ///TOKENIZING///
  print("///TOKENIZING///")
  training_dict = preprocessing_sentences(training_dict)#training_dict.map(preprocess, batched=True)
  test_dict = preprocessing_sentences(test_dict)#test_dict.map(preprocess, batched=True)

  # ///COLLATOR///
  print("///COLLATOR///")
  tf_train_set = prepare_tf_dataset(training_dict)
  tf_test_set = prepare_tf_dataset(test_dict)

  # ///MODEL///
  print("///create_and_compile_model///")
  tf.keras.backend.clear_session()
  model = create_and_compile_model()

  # ///TRAIN///
  print('///TRAIN/// SPLIT: %s' % (index))
  es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=20)
  csv_logger = tf.keras.callbacks.CSVLogger('trainingt5small.log', separator=",", append=True)

	# CREATE CALLBACKS
  checkpoint = tf.keras.callbacks.ModelCheckpoint(save_dir+get_model_name(index), save_best_only=True, monitor='val_loss', verbose=1, mode='min', save_weights_only=False)
  split_start_time = time.time()
  history = model.fit(tf_train_set, validation_data=tf_test_set, epochs=1, callbacks=[es,checkpoint, csv_logger])
  split_train_time = time.time() - split_start_time

  best_model = create_and_compile_model()
  best_model.load_weights(save_dir+get_model_name(index))
  gc.collect()

  start_score_time = time.time()
  bleu = bleu_score(best_model, tokenizer, test_dict)
  meteor = compute_meteor(best_model, test_dict)
  score_time = time.time() - start_score_time
  score_df.loc[len(score_df)] = [index, bleu['1-grams'], bleu['1-2-grams'], bleu['1-3-grams'], bleu['1-4-grams'], meteor, split_train_time, score_time]

  print(history.history)

# Save the scores DataFrame to a CSV file
score_df.to_csv(save_dir + 'scores.csv', index=False)

print("k-fold Cross-Validation Training execution time: ", time.time() - start_time)

***PLOTTING THE LOG FILES***

In [None]:
# Load the log file using pandas
log_data = pd.read_csv('trainingT5Small.log')

# Plotting training loss vs validation loss
plt.figure(figsize=(10, 6))
plt.plot(log_data['loss'], label='Training Loss')
plt.plot(log_data['val_loss'], label='Validation Loss')
plt.title('Training Loss vs Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

***TESTING AFTER TRAINING***

In [None]:
%%time
def predict_cnl(model, tokenizer, sentences):
  task_prefix = "translate English to CNL: "
  prefixeds = [task_prefix + sentence for sentence in sentences]
  #predict_input = tokenizer([task_prefix + sentence], return_tensors="tf", padding=True)
  predict_input = tokenizer(prefixeds, return_tensors="tf", padding=True)
  # print(predict_input)
  output_sequences = model.generate(
    input_ids=predict_input["input_ids"],
    attention_mask=predict_input["attention_mask"],
    do_sample=False,  # disable sampling to test if batching affects output
    max_length=300, min_length=5,num_beams=1
  )
  # print(output_sequences)
  cnls = [tokenizer.decode(encoded) for encoded in output_sequences]
  cnls = [clean_sentence(sentence) for sentence in cnls]
  return cnls

# toTranslate = train_dict['sentences'][:1120]
toTranslate = ["If the value of C1 is lower than C2, C1 will take on the same color as C2, Likewise, whenever C2 is given a color, C1 will be assigned the same color."]
translateds = predict_cnl(model, tokenizer, toTranslate)
print(translateds)



***CALCULATING SYNTAX CHECK BY SPLIT***

In [None]:
#Calculating the syntax check of each split
save_dir = 'content/Kfold5_t5small/'
# Load the test indices from the text file
test_idxs = np.loadtxt(save_dir + 'test_idxs.txt', dtype=int) #Update text ids .txt files

# Get the corresponding test data using the test_idxs
test_dict = Dataset.from_dict({"sentences": [train_dict['sentences'][i] for i in test_idxs],
                               "targets": [train_dict['targets'][i] for i in test_idxs]})

# Predict CNLs using the loaded model
predicted_cnls = predict_cnl(model, tokenizer, test_dict['sentences'])

# Apply the syntax check function on the predicted CNLs
summary = evaluate_syntax(predicted_cnls)

# Convert the summary to a DataFrame
df = pd.DataFrame(summary)

# Save the DataFrame to a CSV file
df.to_csv('T5_syntax_check_results.csv', index=False)


***EVALUATING MODEL ON TEST DATASET***

In [None]:
# Read the dataset
test_data = pd.read_excel('test.xlsx')

In [None]:
# Get the sentences to predict
sentences_to_predict = test_data['Test Natural Language'].tolist()

# Use your model to predict the CNL
predicted_cnl = predict_cnl(model, tokenizer, sentences_to_predict)

# Now you have the predicted CNL, you need to compare with the actual CNL
actual_cnl = test_data['CNL'].tolist()

In [None]:
# Create a DataFrame to store the actual and predicted CNLs
results_df = pd.DataFrame({'Actual CNL': actual_cnl, 'Predicted CNL': predicted_cnl})

# Save the DataFrame to a CSV file
results_df.to_csv('cnl_predictions_t5Small5fold.csv', index=False)

In [None]:
def sentence_to_words(sentence):
    return sentence.split(' ')

def sentences_to_word_lists(sentences):
    return [sentence_to_words(sentence) for sentence in sentences]

    #Defined metrices to calaculate the accuracy, precision, recall, f1 scores
def calculate_metrics_word_level(actual, predicted):
    mlb = MultiLabelBinarizer()

    actual_words = sentences_to_word_lists(actual)
    predicted_words = sentences_to_word_lists(predicted)

    # Fit the MultiLabelBinarizer on the union of actual and predicted words
    all_words = list(set().union(*actual_words, *predicted_words))
    mlb.fit([all_words])

    actual_binary = mlb.transform(actual_words)
    predicted_binary = mlb.transform(predicted_words)

    accuracy = accuracy_score(actual_binary, predicted_binary)
    precision = precision_score(actual_binary, predicted_binary, average='micro', zero_division=0)
    recall = recall_score(actual_binary, predicted_binary, average='micro', zero_division=0)
    f1 = f1_score(actual_binary, predicted_binary, average='micro', zero_division=0)

    return accuracy, precision, recall, f1



In [None]:
print("Number of actual CNL sentences: ", len(actual_cnl))
print("Number of predicted CNL sentences: ", len(predicted_cnl))

In [None]:
accuracy, precision, recall, f1 = calculate_metrics_word_level(actual_cnl, predicted_cnl)
print('Accuracy word_level:', accuracy)
print('Precision word_level:', precision)
print('Recall word_level:', recall)
print('F1 score word_level:', f1)