In [None]:
import logging
from importlib import reload
reload(logging)
logger = logging.getLogger(__name__)

In [None]:
import ast
import json
import pandas as pd
import time
import random
import numpy as np
import os
from pathlib import Path
from dotenv import load_dotenv
from langchain.chat_models import ChatOpenAI
from langchain.prompts import load_prompt
from pathlib import Path
from typing import Dict
from llm_assessor import (
    run_prompt_chain,
)

load_dotenv(override=True)

In [None]:
# Set working directory - Not required if using Jupyter outside of VScode
workdir = os.environ["workdir"]
os.chdir(workdir)
os.getcwd()

In [None]:
# Helper Functions
def find_marking_level(
        mark: int, 
        levels_dict: Dict
        ) -> int:
    
    levels_found = {level for (level, marks) in levels_dict.items() if min(marks) <= mark <= max(marks)}

    if len(levels_found) == 0:
        raise ValueError("Mark not found in Marking Levels.")

    level = list( levels_found)[0]

    return level

def compute_metric(
        llm_awarded_mark: int, 
        teacher_awarded_mark: int, 
        level_structure: Dict
        ) -> float:
    
    awarded_level = find_marking_level(teacher_awarded_mark, level_structure )
    llm_level = find_marking_level(llm_awarded_mark, level_structure)
    difference_between_levels = abs(awarded_level - llm_level) + 1
    
    metric = difference_between_levels * abs(teacher_awarded_mark - llm_awarded_mark)
    
    return metric

In [None]:
# Load Directories containing Prompt Templates and Student Answers
prompt_dir = Path("prompt_templates")

data_dir = Path(f"{workdir}/validation_results/processed_data")

In [None]:
# Load LLM
llm = ChatOpenAI(
    model_name="gpt-4o", 
    temperature=0.00, 
    max_tokens=800,
    )

In [None]:
root = logging.getLogger()
for handler in root.handlers[:]:
    root.removeHandler(handler)

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt="%Y-%m-%d %H:%M:%S",
    )

# Load in Student Answers

In [None]:
# Load in Data
completed_paper_df = pd.read_csv(data_dir / "student_answers_augmented.csv")
student_answers_records = completed_paper_df.to_dict(orient="records")

# Read in Prompt Templates

In [None]:
# Load all prompt templates
prompts = {
    "grade_answer": 
    {
        "aqa_history": {
            "hs_analyse": load_prompt(prompt_dir/ "aqa_history" / "analyse_prompt" / "grade_answer_aqa_history_analyse_prompt.json"),
            "hs_explain": load_prompt(prompt_dir/ "aqa_history" / "explain_prompt" / "grade_answer_aqa_history_explain_prompt.json"),
            "hs_judgement": load_prompt(prompt_dir/ "aqa_history" / "judgement_prompt" / "grade_answer_aqa_history_judgement_prompt.json"),
            "hs_spag": load_prompt(prompt_dir/ "aqa_history" / "spag_prompt" / "grade_answer_aqa_history_spag_prompt.json")
        },
        "edexcel_business_studies": {
            "bs_analyse": load_prompt(prompt_dir/ "edexcel_business_studies" / "analyse_prompt" / "grade_answer_edexcel_business_studies_analyse_prompt.json"),
            "bs_discuss": load_prompt(prompt_dir/ "edexcel_business_studies" / "discuss_prompt" / "grade_answer_edexcel_business_studies_discuss_prompt.json"),
            "bs_evaluate": load_prompt(prompt_dir/ "edexcel_business_studies" / "evaluate_prompt" / "grade_answer_edexcel_business_studies_evaluate_prompt.json"),
            "bs_explain": load_prompt(prompt_dir/ "edexcel_business_studies" / "explain_prompt" / "grade_answer_edexcel_business_studies_explain_prompt.json"),
            "bs_identify": load_prompt(prompt_dir/ "edexcel_business_studies" / "identify_prompt" / "grade_answer_edexcel_business_studies_identify_prompt.json"),
            "bs_justify": load_prompt(prompt_dir/ "edexcel_business_studies" / "justify_prompt" / "grade_answer_edexcel_business_studies_justify_prompt.json"),
            "bs_outline": load_prompt(prompt_dir/ "edexcel_business_studies" / "outline_prompt" / "grade_answer_edexcel_business_studies_outline_prompt.json"),
            "bs_state": load_prompt(prompt_dir/ "edexcel_business_studies" / "state_prompt" / "grade_answer_edexcel_business_studies_state_prompt.json"),
            "bs_mcq": load_prompt(prompt_dir/ "edexcel_business_studies" / "mcq_prompt" / "grade_answer_edexcel_business_studies_mcq_prompt.json"),
            "bs_calculate": load_prompt(prompt_dir/ "edexcel_business_studies" / "calculate_prompt" / "grade_answer_edexcel_business_studies_calculate_prompt.json")
        }
    },
    "extract_marks": load_prompt(prompt_dir / "extract_mark_count" / "extract_mark_count_prompt.json"),
    }

# Grade Answers for Each Student

## Pass 1: Grade all Student Answers with Few Shot Prompting

In [None]:
checkpoint_iter = 10
resume_idx = 0
use_checkpoint = True

checkpoint_path = "./validation_results/processed_data/grading_checkpoint/grading_checkpoint_890.json"
checkpoint_file = Path(checkpoint_path)

if checkpoint_file.is_file():
    
    # Update iterable with cached entries
    with open(checkpoint_path, 'r') as openfile:
        checkpointed_records = json.load(openfile)
    
    # Checkpoint end index
    resume_idx = int(checkpoint_path.split("_")[-1].split(".")[0])

    # Update iterable
    student_answers_records[0:resume_idx] = checkpointed_records

print("Resume Index:\n", resume_idx)

In [None]:
# Pass 1: Grade All Answers
valid_types = list(set(completed_paper_df[completed_paper_df.questions_with_marking_level_flag == 1]["question_type"].tolist()))

for idx in range(resume_idx, len(student_answers_records)):

    student_answer = student_answers_records[idx]
    student_answer["start_time"] = time.time()
    
    if (student_answer["answer_text"] is np.nan or len(student_answer["answer_text"]) == 0):
        
        logging.info(f"Starting Grading Answer for Student ID: {student_answer['student_id']} and Question ID: {student_answer['question_id']}")
        
        student_answer["llm_graded_answer"] = "No Answer is provided. Therefore 0 marks are awarded for this answer."   
        student_answer["llm_graded_answer_token_costing"] = 0
        student_answer["llm_awarded_marks"] = 0
        student_answer["llm_awarded_marks_token_costing"] = 0
    
    else:
        try:
            # Grade Answer
            logging.info(f"Starting Grading Answer for Student ID: {student_answer['student_id']} and Question ID: {student_answer['question_id']}")
            prompt_template = prompts["grade_answer"].get(student_answer["subject_id"]).get(student_answer["question_type"])
            
            input_args = {
                "question": student_answer["question_text"], 
                "answer": student_answer["answer_text"], 
                "mark_scheme": student_answer["mark_scheme_text"], 
                "context": student_answer["context"]
                }
            prompt_template_input_args = dict((k, input_args[k]) for k in prompt_template.input_variables if k in input_args)

            graded_answer_response = run_prompt_chain(
                prompt_template=prompt_template.template,
                llm=llm,
                burn_in_runs=1,
                **prompt_template_input_args
                )
            
            student_answer["llm_graded_answer"] = graded_answer_response["prompt_chain_response"]
            student_answer["llm_graded_answer_token_costing"] = graded_answer_response["prompt_chain_token_costing"]
            
            logging.info(f"Completed Grading Answer for Student ID: {student_answer['student_id']} and Question ID: {student_answer['question_id']}")
            
            # Extract Marks
            logging.info(f"Starting Extracting Marks for Student ID: {student_answer['student_id']} and Question ID: {student_answer['question_id']}")
            
            extract_marks_response = run_prompt_chain(
                prompt_template="Extract the number of marks awarded to the student answer. Read the answer carefully and extract the final number of marks to be awarded to the student. Report the number of marks only. Answer: {answer}",
                llm=ChatOpenAI(model_name="gpt-4o", temperature=0.0, max_tokens=500),
                burn_in_runs=1,
                answer=student_answer["llm_graded_answer"]
                )
            marks = extract_marks_response["prompt_chain_response"]
            marking_token_cost = extract_marks_response["prompt_chain_token_costing"]

            extracted_marks = [int(i) for i in marks.split() if i.isdigit()] 
            student_answer["llm_awarded_marks"] = extracted_marks[0] if len(extracted_marks) > 0 else np.nan 
            student_answer["llm_awarded_marks_token_costing"] = marking_token_cost
            
            logging.info(f"Completed Extracting Marks for Student ID: {student_answer['student_id']} and Question ID: {student_answer['question_id']}")

        except Exception as e:
            logging.info(e)
            logging.info(f"Question type: {student_answer['question_type']} is not supported. Assigning NA and skipping to next question.")

            student_answer["llm_graded_answer"] = "NA"   
            student_answer["llm_graded_answer_token_costing"] = 0
            student_answer["llm_awarded_marks"] = np.nan
            student_answer["llm_awarded_marks_token_costing"] = 0

    student_answer["end_time"] = time.time()
    student_answer["elapsed_time_in_seconds"] = student_answer["end_time"] - student_answer["start_time"]

    # Save Checkpoint
    if (use_checkpoint) and (idx > 0) and (idx % checkpoint_iter == 0):

        # Create a savedir
        checkpoint_savedir = Path("./validation_results/processed_data/grading_checkpoint")
        checkpoint_savedir.mkdir(parents=True, exist_ok=True)
       
        # Save the list up until this index
        checkpointed_records = student_answers_records[0:idx]

        # Save checkpoint file as json
        checkpoint_savename = f"grading_checkpoint_{idx}.json"
        with open(checkpoint_savedir / checkpoint_savename, "w") as savefile:
            json.dump(checkpointed_records, savefile)

    # Randomly sleep for seconds to avoid API throttling. Between 1-3 seconds
    seconds_to_sleep = random.sample([1,2,3], 1)[0]

    logging.info(f"Sleeping for {seconds_to_sleep} seconds to avoid API Throttling.")
    time.sleep(seconds_to_sleep)    

In [None]:
# Compute hitrate, Level Hitrate and Scaled Error Metric
for _, student_answer in enumerate(student_answers_records):

    logging.info(f"Computing Hitrate for Student ID: {student_answer['student_id']} and Question ID: {student_answer['question_id']}")

    # Hitrate
    ## Minimum Marks and Maximum Marks from range of awarded marks
    min_mark = min( [student_answer.get("awarded_marks")] )
    max_mark = max( [student_answer.get("awarded_marks")] )

    if (student_answer["awarded_marks"] == 0 and student_answer["answer_text"] is np.nan):
        student_answer["llm_mark_hitrate"] = np.nan
        student_answer['same_level_hitrate'] = np.nan
        student_answer['scaled_error_metric'] = np.nan
    else:
        student_answer["llm_mark_hitrate"]=min_mark<=student_answer["llm_awarded_marks"]<=max_mark
        
    # Level Hitrate and Scaled Error Metric
    if student_answer['level_structure'] is not None and student_answer['question_type'] in valid_types:
        
        levels_dict = ast.literal_eval(student_answer['level_structure'])
        max_error = max(
            compute_metric(llm_awarded_mark = 0, teacher_awarded_mark = student_answer['awarded_marks'], level_structure = levels_dict), 
            compute_metric(llm_awarded_mark = max( max(lst) for lst in levels_dict.values() ), teacher_awarded_mark = student_answer['awarded_marks'], level_structure = levels_dict) 
            )
        logging.info(f"Maximum Error: \n{max_error}")
        
        try:
            metric = compute_metric(
                llm_awarded_mark = student_answer['llm_awarded_marks'], 
                teacher_awarded_mark = student_answer['awarded_marks'], 
                level_structure = levels_dict
                )
        except:
            metric = max_error
        finally:
            scaled_error = metric/max_error
            logging.info(f"Scaled Error: \n{scaled_error}")

        llm_awarded_level = find_marking_level(mark = student_answer['llm_awarded_marks'], levels_dict = levels_dict)
        awarded_level = find_marking_level(mark = student_answer['awarded_marks'], levels_dict = levels_dict)
        difference_in_level = abs(llm_awarded_level - awarded_level)

        student_answer["llm_awarded_level"] = llm_awarded_level
        student_answer["awarded_level"] = awarded_level
        student_answer['llm_level_hitrate'] = int(difference_in_level == 0)
        #student_answer['same_level_hitrate_pm1'] = int(difference_in_level <= 1 and abs(student_answer["awarded_marks"] - student_answer["llm_awarded_marks"]) <= 1)
        student_answer['scaled_error_metric'] = scaled_error
    else:
        student_answer["llm_awarded_level"] = np.nan
        student_answer["awarded_level"] = np.nan
        student_answer['llm_level_hitrate'] = np.nan
        #student_answer['same_level_hitrate_pm1'] = np.nan
        student_answer['scaled_error_metric'] = np.nan            

In [None]:
# Combine Student Answers and save as a DataFrame
completed_paper_df = pd.DataFrame(student_answers_records)
ordered_cols = [
    'subject_id', 'question_id', 'question_type', 'student_id','question_text', 'mark_scheme_text', 
    'context', 'answer_text', 'llm_graded_answer', 'awarded_marks', 'llm_awarded_marks', 'total_marks',
    'awarded_level', 'llm_awarded_level', 'llm_mark_hitrate' , 'llm_level_hitrate', 'scaled_error_metric', 'answer_id', 
    'linked_answer_id', 'topic_id', 'answer_scanned_image', 'elapsed_time_in_seconds', 'llm_graded_answer_token_costing', 'llm_awarded_marks_token_costing'
    ]

completed_paper_df = completed_paper_df[ ordered_cols ]

In [None]:
# print("Mark Scheme:\n", student_answer["mark_scheme_text"])
# print("-"*40)
# print("Student Answer:\n", student_answer["answer_text"])
# print("-"*40)
# print("Graded Answer:\n", student_answer["llm_graded_answer"])
# print("-"*40)
# print("LLM Marks:\n", student_answer["llm_awarded_marks"])
# print("-"*40)
# print("Actual Marks:\n", student_answer["awarded_marks"])

In [None]:
# Save graded student answers
completed_paper_df.to_csv(data_dir / "student_answers_llm_graded.csv", index=False)