In [None]:
batch_size = 32 
num_workers = 0 
device = 'cuda:0' 
max_input_length = 64 
bert_max_input_length = 512 
bert_model = 'distilbert-base-uncased' 
model_name = "./_baseline_with_BERT_t5-small/" 

In [None]:
from typing import List, Tuple 
import json 

from tqdm import tqdm 

import torch 
from torch.utils.data import DataLoader 

from transformers  import AutoTokenizer, AutoModelForSeq2SeqLM 

In [None]:
from sklearn.metrics import f1_score 
from sklearn.metrics import multilabel_confusion_matrix 
from sklearn.metrics import classification_report 
from sklearn.preprocessing import MultiLabelBinarizer 
import evaluate 
from bert_score import score as b_score 

In [None]:
class FaithDial_Dataset(torch.utils.data.Dataset):
    def __init__(self, questions, contexts, answers, tokenizer):
        self.tokenizer = tokenizer 
        self.questions = questions 
        self.contexts = contexts 
        self.answers = answers 

        # https://github.com/nunziati/bert-vs-t5-for-question-answering/blob/main/MyDataset.py 
        if len(self.questions) != len(self.contexts): 
            raise Exception(
                "something wrong while building the dataset: questions and contexts in different dimensions") 
        if len(self.questions) != len(self.answers):
            raise Exception(
                "something wrong while building the dataset: questions and answers result in different dimensions")

        self.item_count: int = len(self.questions)

    def __getitem__(self, index: int):
        return self.questions[index], self.contexts[index], self.answers[index]

    def __len__(self):
        return self.item_count 

    def pack_minibatch(self, data: List[Tuple[str, str, str]]):
        """Pack mini-batch function
        """
        return zip(*data) 

In [None]:
def build_data(data_path, hal_data_path): 
    question_list = ['Is the response hallucinated?', 'What are the response attribution classes?', 'What are the speech acts?', 'What is the faithful response to this?'] 

    questions = [] 
    questions_index = [] 
    contexts = [] 
    answers = [] 

    with open(data_path, 'r') as f1, open(hal_data_path, 'r') as f2: 
        data = json.load(f1) 
        hal_data = json.load(f2) 

    for conversation, hal_conversation in zip(data, hal_data): 
        for turn, hal_turn in zip(conversation['utterances'], hal_conversation['utterances']): 

            # Task1 
            # question, history, knowledge, response -> "Yes" or "No" 
            # knowledge and response are usually short 
            questions.append(f"question: {question_list[0]} response: {turn['response']}")
            contexts.append(f"{turn['knowledge']} {', '.join(turn['history'])}")
            answers.append("No") 
            questions_index.append(0) 

            hal_turn['history'] = ['null' if h is None else h for h in hal_turn['history']]
            hal_turn['response'] = 'null' if hal_turn['response'] is None else hal_turn['response'] 
            # knowledge and response are usually short 
            questions.append(f"question: {question_list[0]} response: {hal_turn['response']}") 
            contexts.append(f"{hal_turn['knowledge']} {', '.join(hal_turn['history'])}") 
            answers.append("Yes")
            questions_index.append(0) 

            # Task2-1 
            # question, history, knowledge, response -> BEGIN tag(s) 
            # knowledge and response are usually short 
            questions.append(f"question: {question_list[1]} response: {hal_turn['response']}") 
            contexts.append(f"{hal_turn['knowledge']} {', '.join(hal_turn['history'])}") 
            answers.append(f"{', '.join(hal_turn['BEGIN'])}") 
            questions_index.append(1) 

            # Task2-2 
            # question, history, knowledge -> VRM tag(s) 
            if hal_turn['VRM'][0] != '': 
                # knowledge is usually short 
                questions.append(f"question: {question_list[2]}") 
                contexts.append(f"{hal_turn['knowledge']} {', '.join(hal_turn['history'])}") 
                answers.append(f"{', '.join(['Acknowledgment' if v=='Ack.' else v for v in hal_turn['VRM']])}") 

                questions_index.append(2) 

            # Task3 
            # question, history, knowledge -> response 
            # knowledge is usually short 
            questions.append(f"question: {question_list[3]}") 
            contexts.append(f"{turn['knowledge']} {', '.join(turn['history'])}") 
            answers.append(f"{turn['response']}") 
            questions_index.append(3) 

    return questions, contexts, answers, questions_index 

In [None]:
test_questions, test_contexts, test_answers, test_questions_index = build_data('data/test.json', 'data/hal_test.json') 

In [None]:
# https://huggingface.co/distilbert-base-uncased 
from transformers import DistilBertModel 
# https://stackoverflow.com/questions/64156202/add-dense-layer-on-top-of-huggingface-bert-model 
class CustomDistilBertModel(torch.nn.Module): 
    def __init__(self, bert_model_name, t5_seq_len, t5_emb_dim): 
        super(CustomDistilBertModel, self).__init__() 
        self.t5_seq_len = t5_seq_len 
        self.t5_emb_dim = t5_emb_dim 
        self.db_model = DistilBertModel.from_pretrained(bert_model_name) 
        # (B, bert_seq_len, 768) -> (B, t5_seq_len, t5_emb_dim) 
        self.linear1 = torch.nn.Linear(768, t5_emb_dim) 

    def forward(self, input_ids, attention_mask): 
        # (B, bert_seq_len, 768) 
        db_outputs = self.db_model(input_ids, attention_mask=attention_mask).last_hidden_state 
        # (B, t5_seq_len, t5_emb_dim) 
        if db_outputs.size(1) < self.t5_seq_len: 
            diff = self.t5_seq_len - db_outputs.size(1) 
            db_outputs = torch.cat((db_outputs, db_outputs[:, -1:, :].repeat(1, diff, 1)), 1) 
        linear1_output = self.linear1(db_outputs[:, :self.t5_seq_len, :]) 

        return linear1_output 

In [None]:
t5_tokenizer = AutoTokenizer.from_pretrained(model_name + 't5-small/tokenizer/best-f1')
t5_model = AutoModelForSeq2SeqLM.from_pretrained(model_name + 't5-small/model/best-f1')  

db_tokenizer = AutoTokenizer.from_pretrained(model_name + bert_model + '/tokenizer/best-f1') 

db_model = torch.load(f'{model_name}{bert_model}/model/best-f1/pytorch_model.pt') 

test_set = FaithDial_Dataset(test_questions, test_contexts, test_answers, t5_tokenizer) 

my_test_dataloader = DataLoader(test_set, batch_size=batch_size,
                                num_workers=num_workers, collate_fn=lambda data_: test_set.pack_minibatch(data_))

t5_model.to(device)
db_model.to(device) 

In [None]:
t5_model.eval() 
db_model.eval() 
with torch.no_grad():
    model_predictions_encoded = [] 
    for questions, contexts, _ in tqdm(my_test_dataloader): 
        db_inputs = contexts 
        db_encoded_inputs = db_tokenizer(db_inputs, 
                                        padding="longest", 
                                        max_length=bert_max_input_length, 
                                        truncation=True, 
                                        return_tensors="pt").to(device)
        db_outputs = db_model(**db_encoded_inputs) 

        inputs = questions 
        encoded_inputs = t5_tokenizer(inputs,
                                      # padding="longest", 
                                      padding='max_length', 
                                      max_length=max_input_length, 
                                      truncation=True,
                                      return_tensors="pt",
                                      )
        
        encoded_inputs, attention_mask = encoded_inputs.input_ids, encoded_inputs.attention_mask 

        encoded_inputs = encoded_inputs.to(device) 
        attention_mask = attention_mask.to(device)

        t5_encoder = t5_model.get_encoder() 

        t5_encoder_outputs = t5_encoder(input_ids=encoded_inputs,
                                        attention_mask=attention_mask,
                                        ) 
        t5_encoder_outputs['last_hidden_state'] = (t5_encoder_outputs['last_hidden_state'] + db_outputs) / 2 

        model_predictions = t5_model.generate(input_ids=encoded_inputs,
                                              attention_mask=attention_mask,
                                              encoder_outputs=t5_encoder_outputs, 
                                              )
        model_predictions_encoded += model_predictions.tolist() 

In [None]:
model_predictions = t5_tokenizer.batch_decode(model_predictions_encoded, skip_special_tokens=True) 

In [None]:
BEGIN_LIST = ['Uncooperative', 'Hallucination', 'Entailment', 'Generic'] 
VRM_LIST = ['Disclosure', 'Acknowledgment', 'Edification', 'Advisement', 'Question']  

In [None]:
BEGIN_dict = dict(zip(BEGIN_LIST, range(len(BEGIN_LIST)))) 
VRM_dict = dict(zip(VRM_LIST, range(len(VRM_LIST)))) 

In [None]:
# https://stackoverflow.com/questions/10018679/python-find-closest-string-from-a-list-to-another-string 
# https://docs.python.org/3/library/difflib.html#difflib.get_close_matches 
import difflib 

In [None]:
task1_pred = [] 
task1_true = [] 

task2_BEGIN_pred = [] 
task2_BEGIN_true = [] 

task2_VRM_pred = [] 
task2_VRM_true = [] 

task3_pred = [] 
task3_true = [] 

for i, (pred, true) in enumerate(zip(model_predictions, test_answers)): 
    task_index = test_questions_index[i] 
    if task_index == 0:     # Task1 
        task1_true.append(0 if true=='No' else 1) 
        if pred == 'No': 
            task1_pred.append(0) 
        elif pred == 'Yes': 
            task1_pred.append(1) 
        else: 
            # raise RuntimeError(f'Task1 prediction format wrong {i} index value: {pred}')
            # https://stackoverflow.com/questions/10018679/python-find-closest-string-from-a-list-to-another-string 
            # https://docs.python.org/3/library/difflib.html#difflib.get_close_matches 
            _pred = difflib.get_close_matches(pred, ['Yes', 'No'], n=1, cutoff=0.3) 
            if _pred == 'No': 
                task1_pred.append(0) 
            else: 
                task1_pred.append(1)
        # pass 
    elif task_index == 1:   # Task2 BEGIN 
        task2_BEGIN_true.append([BEGIN_dict[t] for t in true.split(', ')]) 
        task2_BEGIN_pred.append([BEGIN_dict[p] if p in BEGIN_LIST else difflib.get_close_matches(p, BEGIN_LIST, n=1, cutoff=0.0)[0] for p in pred.split(', ')]) 
        # pass 
    elif task_index == 2:   # Task2 VRM 
        task2_VRM_true.append([VRM_dict[t] for t in true.split(', ')]) 
        task2_VRM_pred.append([VRM_dict[p] if p in VRM_LIST else difflib.get_close_matches(p, VRM_LIST, n=1, cutoff=0.0)[0] for p in pred.split(', ')]) 
    else:                   # Task3 Response 
        task3_true.append(true) 
        task3_pred.append(pred) 

In [None]:
with open('data/test.json', 'r') as r_f, open('data/test_baseline_with_BERT_t5-small_predicted.json', 'w') as w_f: 
    data = json.load(r_f) 
    i = 0 
    for conversation in data: 
        for turn in conversation['utterances']: 
            turn['predicted_response'] = task3_pred[i] 
            i = i + 1 
            
    assert i == len(task3_pred) 

    json_object = json.dumps(data, indent=2) 
    w_f.write(json_object) 
    

In [None]:
print(classification_report(task1_true, task1_pred)) 

In [None]:
print(f1_score(task1_true, task1_pred, average='macro'))

In [None]:
_task2_BEGIN_true = MultiLabelBinarizer(classes=range(len(BEGIN_LIST))).fit_transform(task2_BEGIN_true) 
_task2_BEGIN_pred = MultiLabelBinarizer(classes=range(len(BEGIN_LIST))).fit_transform(task2_BEGIN_pred) 

In [None]:
print(classification_report(_task2_BEGIN_true, _task2_BEGIN_pred)) 

In [None]:
print(f1_score(_task2_BEGIN_true, _task2_BEGIN_pred, average='macro')) 
print(BEGIN_LIST) 
print(multilabel_confusion_matrix(_task2_BEGIN_true, _task2_BEGIN_pred))

In [None]:
_task2_VRM_true = MultiLabelBinarizer(classes=range(len(VRM_LIST))).fit_transform(task2_VRM_true)
_task2_VRM_pred = MultiLabelBinarizer(classes=range(len(VRM_LIST))).fit_transform(task2_VRM_pred)

In [None]:
print(classification_report(_task2_VRM_true, _task2_VRM_pred)) 

In [None]:
print(f1_score(_task2_VRM_true, _task2_VRM_pred, average='macro'))
print(VRM_LIST) 
print(multilabel_confusion_matrix(_task2_VRM_true, _task2_VRM_pred)) 

In [None]:
bleu = evaluate.load("bleu") 
results = bleu.compute(predictions=task3_pred, references=task3_true)

In [None]:
print(results)

In [None]:
rouge = evaluate.load('rouge') # pip install rouge-score 
results = rouge.compute(predictions=task3_pred, references=task3_true)

In [None]:
print(results) 

In [None]:
P, R, F1 = b_score(task3_pred, task3_true, lang='en', verbose=True) 

In [None]:
print(f"System level F1 score: {F1.mean():.3f}")

In [None]:
def build_task1_data_with_pred(data_path, task3_pred): 
    questions = [] 
    contexts = [] 

    with open(data_path, 'r') as f1: 
        data = json.load(f1) 

    i = 0 
    for conversation in data: 
        for turn in conversation['utterances']: 
            # questions.append(f"question: Is the response hallucinated? knowledge: {turn['knowledge']} response: {task3_pred[i]} history: {', '.join(turn['history'])}")
            questions.append(f"question: Is the response hallucinated? response: {task3_pred[i]}")
            contexts.append(f"{turn['knowledge']} {', '.join(turn['history'])}")
            i = i + 1 

    assert i == len(task3_pred)

    return questions, contexts 

In [None]:
task3_pred_task1_data = build_task1_data_with_pred('data/test.json', task3_pred)

In [None]:
class FaithDial_test_Dataset(torch.utils.data.Dataset):
    def __init__(self, questions, contexts, tokenizer):
        self.tokenizer = tokenizer 
        self.questions = questions 
        self.contexts = contexts 

        self.item_count: int = len(self.questions)

    def __getitem__(self, index: int):
        return self.questions[index], self.contexts[index] 

    def __len__(self):
        return self.item_count 

In [None]:
test_set_t3_t1 = FaithDial_test_Dataset(task3_pred_task1_data[0], task3_pred_task1_data[1], t5_tokenizer) 

my_test_t3_t1_dataloader = DataLoader(test_set_t3_t1, batch_size=batch_size,
                                        num_workers=num_workers) 

In [None]:
t5_model.eval() 
db_model.eval() 
with torch.no_grad():
    model_predictions_encoded_t3_t1 = []
    for questions, contexts in tqdm(my_test_t3_t1_dataloader): 
        db_inputs = contexts 
        db_encoded_inputs = db_tokenizer(db_inputs, 
                                        padding="longest", 
                                        max_length=bert_max_input_length, 
                                        truncation=True, 
                                        return_tensors="pt").to(device)
        db_outputs = db_model(**db_encoded_inputs) 

        inputs = questions 
        encoded_inputs = t5_tokenizer(inputs,
                                      # padding="longest", 
                                      padding='max_length', 
                                      max_length=max_input_length, 
                                      truncation=True,
                                      return_tensors="pt",
                                      )
        
        encoded_inputs, attention_mask = encoded_inputs.input_ids, encoded_inputs.attention_mask 

        encoded_inputs = encoded_inputs.to(device) 
        attention_mask = attention_mask.to(device)

        t5_encoder = t5_model.get_encoder() 

        t5_encoder_outputs = t5_encoder(input_ids=encoded_inputs,
                                        attention_mask=attention_mask,
                                        ) 
        t5_encoder_outputs['last_hidden_state'] = (t5_encoder_outputs['last_hidden_state'] + db_outputs) / 2 

        model_predictions = t5_model.generate(input_ids=encoded_inputs,
                                              attention_mask=attention_mask,
                                              encoder_outputs=t5_encoder_outputs, 
                                              )
        model_predictions_encoded_t3_t1 += model_predictions.tolist() 

In [None]:
model_predictions_t3_t1 = t5_tokenizer.batch_decode(model_predictions_encoded_t3_t1, skip_special_tokens=True) 

In [None]:
num_of_yes = 0 
for pred in model_predictions_t3_t1: 
    if pred == 'Yes': 
        num_of_yes = num_of_yes + 1 
    elif pred == 'No': 
        pass 
    else: 
        raise RuntimeError('Wrong prediction') 

In [None]:
print(num_of_yes / len(model_predictions_t3_t1))