In [None]:
from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader("data/rel18").load_data()

In [None]:
docs_str = []
for doc in documents:
  docs_str.append(doc.text)

### Indexing

In [None]:
len(docs_str)

In [None]:
from ragatouille import RAGPretrainedModel
RAG = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")
RAG.index(
    collection=docs_str,
    index_name="ITU RAG 150",
    max_document_length=150,
    split_documents=True,
)

In [None]:
results = RAG.search(query="What does the UE provide to the AS for slice aware cell reselection?", k=7)
results

In [None]:
from peft import PeftModel, PeftConfig
from transformers import AutoModelForCausalLM, AutoTokenizer

config = PeftConfig.from_pretrained("alexgichamba/phi-2-finetuned-qa-lora-r32-a16_notag")
base_model = AutoModelForCausalLM.from_pretrained("microsoft/phi-2").to('cuda')
model = PeftModel.from_pretrained(base_model, "alexgichamba/phi-2-finetuned-qa-lora-r32-a16_notag").to('cuda')
tokenizer = AutoTokenizer.from_pretrained("microsoft/phi-2")

In [None]:
import json
# Read questions from the JSON file
with open('data/366qs.txt', 'r') as file1:
  with open('data/questions_new.txt', 'r') as file2:
    questions = json.load(file1)
    # questions.update(json.load(file2))

In [None]:
len(questions)

In [None]:
first_key = next(iter(questions))
first_value = questions[first_key]
first_value

In [None]:
options = [(k, v) for k, v in first_value.items() if k.startswith("option")]
options

In [None]:
res = RAG.search(query=first_value['question'], k=7)
len(res)

In [None]:
type(first_value)

In [None]:
def create_prompt(question, options, context, abbreviations):
    options_text = "\n".join([f"Option {i+1}: {opt[1]}" for i, opt in enumerate(options)])
    # abbreviations is a list of dictionaries of form {"abbreviation": "full form"}
    abbreviations_text = "\n".join([f"{list(abbrev.keys())[0]}: {list(abbrev.values())[0]}" for abbrev in abbreviations])
    prompt = (
        f"Instruct: You will answer each question correctly by giving only the Option ID, the number that follows each Option.\n"
        f"The output should be in the format: Option <Option id>\n"
        f"Provide the answer to the following multiple choice question in the specified format.\n\n"
        f"Context:\n{context}\n\n"
        f"Abbreviations:\n{abbreviations_text}\n\n"
        f"Question: {question}\n"
        f"Options:\n{options_text}\n"
        f"Answer: Option"
    )
    return prompt

In [None]:
def generate_answer(question, options, context, abbreviations, model, tokenizer):
    prompt = create_prompt(question, options, context, abbreviations)
    input_ids = tokenizer.encode(prompt, return_tensors='pt').to('cuda')

    # Ensure the pad token is set
    if tokenizer.pad_token_id is None:
        tokenizer.pad_token_id = tokenizer.eos_token_id

    attention_mask = input_ids.ne(tokenizer.pad_token_id).long().to('cuda')  # Set attention mask

    # Generate the answer with appropriate parameters
    outputs = model.generate(
        input_ids,
        attention_mask=attention_mask,
        max_new_tokens=10,  # Limit the number of new tokens generated
        pad_token_id=tokenizer.eos_token_id,  # Handle padding correctly
        num_beams=1,  # Use beam search to improve quality of generated answers
        early_stopping=True  # Stop early when enough beams have reached EOS
    )
    answer = tokenizer.decode(outputs[0][input_ids.shape[1]:], skip_special_tokens=True)
    # print(f"RESPONSE - {answer[900:]}")
    # print("-------------------------------------------")
    print(f"Generated answer: {answer}")
    return answer


In [None]:
from data.prepare_docs import find_appearing_abbreviations
print(find_appearing_abbreviations(first_value))
type(find_appearing_abbreviations(first_value))

In [None]:
ans = generate_answer(first_value['question'], options, " ".join([result['content'] for result in results]), find_appearing_abbreviations(first_value), model, tokenizer)
print(ans)

In [None]:
import re
# First search for the full pattern
def parse_answer(response):
  match = re.search(r'Answer:\s*Option\s*(\d+)', response, re.IGNORECASE)
  if match:
      answer = f"Option {match.group(1)}"
  else:
      # Try another pattern if the first one fails
      match = re.search(r'(\d+)', response, re.IGNORECASE)
      if match:
          answer = f"Option {match.group(1)}"
      else:
          answer = "Error"
  return answer


In [None]:
import csv
from tqdm import tqdm

responses = []

# Loop through each question and get the response
for q_id, q_data in tqdm(questions.items(), desc="Processing questions"):
    q_id_number = q_id.split()[1]
    question_text = q_data["question"]
    question_text = re.sub(r'\s*\[.*?\]\s*$', '', question_text)
    # options = [v for k, v in q_data.items() if k.startswith("option")]
    options = [(k, v) for k, v in q_data.items() if k.startswith("option")]

    # Retrieve context using ColBERT search
    results = RAG.search(query=question_text, k=7)
    context = " ".join([result['content'] for result in results])

    abbreviations = find_appearing_abbreviations(q_data)
    # Generate the answer using the loaded model
    response = generate_answer(question_text, options, context, abbreviations, model, tokenizer)

    answer = parse_answer(response)

    # Extract the answer ID from the response
    match = re.search(r'Option (\d+)', answer)
    if match:
        try:
            answer_id = int(match.group(1))
            print(f"Answer ID: {answer_id}")
            responses.append([q_id_number, answer_id, "Phi-2"])
        except (KeyError, IndexError, ValueError) as e:
            responses.append([q_id_number, "Error", "Phi-2"])
            print(f"Error processing question {q_id}: {answer}")
    else:
        responses.append([q_id_number, "Error", "Phi-2"])
        print(f"Error processing question {q_id_number}: {answer}")

# Save responses to a CSV file
with open('output_results.csv', 'w', newline='') as csvfile:
    csvwriter = csv.writer(csvfile)
    csvwriter.writerow(["Question_ID", "Answer_ID", "Task"])
    csvwriter.writerows(responses)

print("Processing complete. Responses saved to 'output_results.csv'.")


grade the 366 qs

In [None]:
# Load the questions dataset from the JSON file
with open('data/366qs.txt', 'r') as rubric:
    qs_w_ans = json.load(rubric)

# Load the responses from the CSV file
responses = []
with open('output_results.csv', 'r') as answers:
    reader = csv.DictReader(answers)
    for row in reader:
        responses.append(row)

# Initialize score
correct_answers = 0
total_questions = len(responses)

# track question_ids for failed questions
failed_questions = []
# Compare the responses with the correct answers
for response in responses:
    question_id = response['Question_ID']
    answer_id = response['Answer_ID']
    task = response['Task']
    
    # Find the corresponding question in the JSON data
    question_key = f"question {question_id}"
    if question_key in qs_w_ans:
        correct_answer = qs_w_ans[question_key]['answer']
        # Extract the correct option number from the correct answer string
        correct_option_number = correct_answer.split()[1].replace(":", "")
        
        # Check if the given answer matches the correct answer
        if answer_id == correct_option_number:
            correct_answers += 1
        else:
            # append questionid and answerid to failed questions
            failed_questions.append((question_id, answer_id))

# Calculate the score
score = (correct_answers / total_questions) * 100

# Print the results
print(f"Total Questions: {total_questions}")
print(f"Correct Answers: {correct_answers}")
print(f"Score: {score:.2f}%")
# write failed questions to a file
with open('failed_questions.txt', 'w') as file:
    for question_id, answer_id in failed_questions:
        file.write(f"{question_id} {answer_id}\n")

In [None]:
def append_dummy_data(csv_filename):
    dummy_task = "Phi-2"
    dummy_entries = [(dummy_id, 0, dummy_task) for dummy_id in range(10000, 12000)]

    try:
        # Open the existing CSV file and append dummy data
        with open(csv_filename, "a", newline='') as csvfile:
            csv_writer = csv.writer(csvfile)
            for entry in dummy_entries:
                csv_writer.writerow(entry)
        print("Dummy data has been appended to the CSV file.")
    except Exception as e:
        print("Encountered an error while appending dummy data.")
        print(e)

# Assuming 'output_results.csv' is the CSV file to which we need to append dummy data
csv_filename = "output_results.csv"
append_dummy_data(csv_filename)

## Generate static context for training set

In [15]:
# training set file
training_set_file = "data/qs_train.txt"
output_file = "data/qs_train_with_context.txt"
from tqdm import tqdm
import re
# training_set_file = "data/366qs.txt"
# output_file = "data/366qs_with_context.txt"

with open(training_set_file, 'r') as file:
    questions = json.load(file)

for q_id, q_data in tqdm(questions.items(), desc="Processing questions"):
    q_id_number = q_id.split()[1]
    question_text = q_data["question"]
    question_text = re.sub(r'\s*\[.*?\]\s*$', '', question_text)
    results = RAG.search(query=question_text, k=3)
    context = " ".join([result['content'] for result in results])
    q_data["context"] = context

with open(output_file, 'w') as file:
    json.dump(questions, file, indent=4)

Processing questions: 100%|██████████| 1400/1400 [00:40<00:00, 34.56it/s]


In [20]:
results = RAG.search(query="What does the NEF notify to the AF after determining the suitable DNAI(s)?", k=3)
results_exp = " ".join([result['content'] for result in results])
print(len(results_exp))

1640
