# Code for generating the challenge test answers
To evaluate only the public test set, comment the `"./Data/test/questions_new.txt",` line in the first code section.

In [1]:
import pandas as pd
import ujson as json
import re
import os
import traceback
from Source.query import Query
from Source.maneger_dataset import get_embeddings_by_labels
from Source.generate_question import generate_questions_training
from Source.enhancement_query import EnhancementQuery
from Source.get_RAG_context import Get_RAG_Context
from tqdm import tqdm
import sqlite3
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer


PUBLIC_TEST = False

TEST_FILES=[
    "./Data/test/questions_new.txt", 
    "./Data/test/TeleQnA_testing1.txt"
] if not PUBLIC_TEST else ["./Data/test/TeleQnA_testing1.txt"]


NUM_CLUSTERS = 18
TOP_K_CLUSTERS = 8
TOP_K_CHUNCKS = 5
DATABASE_PATH="cluster_data_BisectingKMeans_18_250_chunksize.db"
TEST_FULL_DATASET = "./Data/TeleQnA_Dataset_Full.txt"
TEST_DATA_WITH_RAG_PATH = "./Data/intermediates/TeleQnA_Test_With_RAG_Context.json"
PATH_TERMS_FILE = "./Data/TermsAndDefinitions/terms_definitions.json"
PATH_ABBREVIATIONS_FILE = "./Data/TermsAndDefinitions/abbreviations_definitions.json"
MODEL_NAME = "claudiomello/Phi-2-TeleQnA-Finetune-Final"








# Getting the data and generating context (only run if not using our provided data with generated context)
Only run the next two sections if you want to manually generate the RAG context, if not run the getting the provided RAG data section.

In [None]:


# Create an empty dictionary
test_data = {}

# Read both files and concatenate data
for file in TEST_FILES:
    with open(file) as f:
        test_data.update(json.load(f))

if PUBLIC_TEST:
    full_data = {}
    with open(TEST_FULL_DATASET) as f:
        full_data.update(json.load(f))






# Create a class for enhancement
enhacenment_query = EnhancementQuery(file_name_terms=PATH_TERMS_FILE, file_name_abbreviations=PATH_ABBREVIATIONS_FILE)

# Create an array to store the data
test_data_json = []

# Iterate over the test data for adding the terms and abbreviations
for question in test_data.keys():
    question_id = int(question.split(" ")[1])

    if PUBLIC_TEST:
        try:
            pub_answer = full_data[question]["answer"]
            pub_answer = int(pub_answer[7:8])
        except:
            pub_answer = -2
            print(f"Error in question {question} when trying to get the public answer")

    terms, abreviations = enhacenment_query.define_TA_question(test_data[question]["question"])


    data = {
        "question": test_data[question]["question"],
        "question_id": question_id,
        "terms": terms,
        "abbreviations": abreviations,
        "answer": pub_answer if PUBLIC_TEST else None
    }
    if "option 1" in test_data[question]:
        data["option 1"] = test_data[question]["option 1"]
    if "option 2" in test_data[question]:
        data["option 2"] = test_data[question]["option 2"]
    if "option 3" in test_data[question]:
        data["option 3"] = test_data[question]["option 3"]
    if "option 4" in test_data[question]:
        data["option 4"] = test_data[question]["option 4"]
    if "option 5" in test_data[question]:
        data["option 5"] = test_data[question]["option 5"]
    test_data_json.append(data)

    



In [None]:
# Create a progress bar
pb = tqdm(
    test_data_json,
    total=len(test_data_json),
    desc="Generating RAG Contexts",
    unit="question",
)

# Create a list to store the questions
test_data_json_with_context=[]

# Connect to the SQLite database
conn = sqlite3.connect(DATABASE_PATH)


# Iterate over the questions for generating the RAG context
for question_data in pb:


    # Get the question
    question = question_data["question"]

    # Get the options
    options = {}
    try:
        option_1 = str(question_data["option 1"])
        if option_1 != "nan" and option_1 != "":
            options["option 1"] = option_1
    except KeyError:
        pass

    try:
        option_2 = str(question_data["option 2"])
        if option_2 != "nan" and option_2 != "":
            options["option 2"] = option_2
    except KeyError:
        pass

    try:
        option_3 = str(question_data["option 3"])
        if option_3 != "nan" and option_3 != "":
            options["option 3"] = option_3
    except KeyError:
        pass

    try:
        option_4 = str(question_data["option 4"])
        if option_4 != "nan" and option_4 != "":
            options["option 4"] = option_4
    except KeyError:
        pass
        
    try:
        option_5 = str(question_data["option 5"])
        if option_5 != "nan" and option_5 != "":
            options["option 5"] = option_5
    except KeyError:
        pass


    # Get the terms and abbreviations
    terms = None
    if str(question_data["terms"]) != "nan" and question_data["terms"] != "":
        terms = question_data["terms"]
    
    abbreviations = None
    if str(question_data["abbreviations"]) != "nan" and question_data["abbreviations"] != "":
        abbreviations = question_data["abbreviations"]
    
    
    # Generate the RAG context

    try:
        context = Get_RAG_Context(question, conn, NUM_CLUSTERS, TOP_K_CLUSTERS, TOP_K_CHUNCKS)
    except Exception as e:
        print(f"An error occurred: {e}")
        print(traceback.format_exc())

    test_data_json_with_context.append({
        "question": question,
        "Question_ID": question_data["question_id"],
        "options": options,
        "terms": terms,
        "abbreviations": abbreviations,
        "context": context,
        "answer": question_data["answer"] if PUBLIC_TEST else None
    })


# Close the connection to the database
conn.close()

# Getting the provided RAG data
Run this section **only** if you skipped the gernerating step earlier, it will **overwrite** the previous generated data.

In [None]:
with open(TEST_DATA_WITH_RAG_PATH, "r") as file:
    test_data_json_with_context = json.load(file)

if PUBLIC_TEST:
    aux = []
    for question in test_data_json_with_context:
        question_id = question["Question_ID"]
        if int(question_id) < 10000:
            question["answer"] = full_data[f"question {question_id}"]["answer"]
            aux.append(question)
    test_data_json_with_context = aux


# Test the model accuracy

In [None]:
# Set the torch device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the model and tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME)
model.to(device)



answer_results = []

if PUBLIC_TEST:
    correct_answers = 0
    total_answers = 0

failed = 0

pb = tqdm(test_data_json_with_context, desc="Generating test answers", total=len(test_data_json_with_context), unit="Question")

# Iterate over the rows of the DataFrame
for question_iter in pb:
    # Get the question and answer
    question = str(question_iter["question"])

    try:
        option_1 = str(question_iter["options"]["option 1"])
        option_1_exists = True
    except KeyError:
        option_1 = ""
        option_1_exists = False
    try:
        option_2 = str(question_iter["options"]["option 2"])
        option_2_exists = True
    except KeyError:
        option_2 = ""
        option_2_exists = False
    try:
        option_3 = str(question_iter["options"]["option 3"])
        option_3_exists = True
    except KeyError:
        option_3 = ""
        option_3_exists = False
    try:
        option_4 = str(question_iter["options"]["option 4"])
        option_4_exists = True
    except KeyError:
        option_4 = ""
        option_4_exists = False
    try:
        option_5 = str(question_iter["options"]["option 5"])
        option_5_exists = True
    except KeyError:
        option_5_exists = False
        option_5 = ""

    # Update the question and answer in the DataFrame
    merged_question = (
        (
            question
            + "\n"
            + ("\n1. " + option_1 if option_1_exists else "")
            + ("\n2. " + option_2 if option_2_exists else "")
            + ("\n3. " + option_3 if option_3_exists else "")
            + ("\n4. " + option_4 if option_4_exists else "")
            + ("\n5. " + option_5 if option_5_exists else "")
        )
        + "\n\n"
        + "Choose the correct option from the above options"
    )

    context = ""
    for ret in question_iter["context"]:
        context += ret

    full_context = (
        f"Considering the following context:\n{str(context)}\n"
        + (
            f"Terms and Definitions:\n{question_iter['terms']}\n"
            if question_iter["terms"]
            else ""
        )
        + (
            f"Abbreviations: {question_iter['abbreviations']}\n"
            if question_iter["abbreviations"]
            else ""
        )
    )

    full_question = (
        f"Please provide the answer to the the following multiple choice question:\n{merged_question}\n"
        + "Write only the option number corresponding to the correct answer."
    )

    input_tensor = tokenizer.apply_chat_template(
        [
            {
                "role": "context",
                "content": full_context,
            },
            {
                "role": "user",
                "content": full_question,
            }
        ],
        return_tensors="pt",
    )




    # Generate the answer
    with torch.no_grad():
        output = model.generate(
            input_tensor.to(device),
            max_length=2048,
            num_return_sequences=1,
            pad_token_id=tokenizer.pad_token_id,
            eos_token_id=tokenizer.eos_token_id,
        )
    
    # Decode the answer
    response = tokenizer.decode(output[0], skip_special_tokens=True)


    # Extract the answer from the full answer
    match = re.search(r"The correct option number is option (\d)", response)
    if match:
        answer = match.group(1)
    else:
        failed += 1
        pb.set_postfix({"Failed": failed})
        answer = -1

    try:
        answer = int(answer)
    except ValueError:
        failed += 1
        answer = -1
        pb.set_postfix({"Failed": failed})

    if PUBLIC_TEST:
        total_answers += 1
        if answer == question_iter["answer"]:
            correct_answers += 1
        pb.set_postfix({"Correct": correct_answers, "Total": total_answers, "Accuracy": correct_answers/total_answers})
        

    
    # Update the DataFrame
    answer_results.append(
        {
            "Question_ID": question_iter["Question_ID"],
            "Answer_ID": answer,
            "Task": "Phi-2",
            "Correct_Answer": question_iter["answer"] if PUBLIC_TEST else None
        }
    )


df = pd.DataFrame(answer_results)

if not PUBLIC_TEST:
    df.drop(columns=["Correct_Answer"], inplace=True)


# Show results and save as csv

In [None]:
print(f"Accuracy: {correct_answers/total_answers}, Correct: {correct_answers}, Total: {total_answers}")
df
df.to_csv("./Data/results/test_results.csv", index=False)
