# APPS Benchmarking

## Setup

### Packages and Data

In [1]:
from datasets import load_dataset
from pvg.code_validation.agents import get_openrouter_response
import re
import numpy as np
import json

In [2]:
data = load_dataset("codeparrot/apps",trust_remote_code=True)
buggy_data = load_dataset("lrhammond/buggy-apps")

### Queries

In [3]:
def generate_query(question, solution, show_working):

    if show_working:
        show_working_header = "\n\nREASONING: <your reasoning here>"
        show_working_prompt = "You should write down your reasoning step by step before providing a final response. "
    else:
        show_working_header = ""
        show_working_prompt = "Do *not* include any reasoning or other text in your response apart from your verdict. "    

    system_prompt = f"I need to check answers to some coding challenges. In what follows, I will provide you with a QUESTION and an ANSWER. Your task is to determine whether the ANSWER is correct. Note that while most answers *look* correct, several of them are subtly inccorrect, so make sure you think carefully. {show_working_prompt}If the ANSWER is correct, please respond with 'yes', if the ANSWER is incorrect, please respond with 'no'. The format of your answer should be:{show_working_header}\n\nCORRECT: <your verdict here>"

    python_preamble = "python\nstdin = sys.stdin\nstdout = sys.stdout\n"

    user_prompt = f"QUESTION:\n\n\n{question}\n\nANSWER:\n\n\n```\n{python_preamble}\n{solution}"
    
    query = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt},
    ]

    return query

In [4]:
def extract_answer(response):

    pattern = r"CORRECT:\s*(.*)"
    match = re.search(pattern, response)

    if match:
        answer = match.group(1).strip().lower()
        if "yes" in answer and "no" in answer:
            return None
        elif "yes" in answer:
            return "yes"
        elif "no" in answer:
            return "no"
        else:
            return None
        
    else:
        return None

## Evaluation

### Methods

In [5]:
def guess_if_correct(model, api_key, query, temperature, n_attempts, check_probs, num_alternatives):

    responses = get_openrouter_response(model, query, api_key, num_responses=n_attempts, log_probs=check_probs,  top_logprobs=num_alternatives, temperature=temperature)

    answers = []
    answer_probs = []
    alt_probs = []

    for r in responses:
        
        answer = extract_answer(r["message"])

        if answer is None:

            answers.append(None)
            answer_probs.append(None)
            alt_probs.append(None)
            
        else:
            
            answers.append(answer)
            answer_prob = None
            alt_prob = None

            if check_probs:
                for t in reversed(r["log_probs"]):
                    if "yes" in t["token"].lower() or "no" in t["token"].lower():
                        if answer not in t["token"].lower():
                            raise ValueError("The extracted answer does not match the answer from the latest token.")
                        answer_prob = np.exp(t["logprob"])
                        # answer_index = r["log_probs"].index(t)
                        break
                answer_probs.append(answer_prob)
                        
            if num_alternatives is not None:
                alt = "no" if answer == "yes" else "yes"
                for tt in t["top_logprobs"]:
                    if alt in tt["token"].lower():
                        alt_prob = np.exp(tt["logprob"])
                        break
                alt_probs.append(alt_prob)

    return answers, answer_probs, alt_probs

In [6]:
def evaluate_model(model,  api_key, show_working=False, temperature=1.0, n_attempts=10, check_probs=True, num_alternatives=None, save_path=None):

    results = {}

    for split in ["train", "test"]:

        results[split] = {}

        indices = buggy_data[split]["problem_id"]
        sliced_data = data[split].filter(lambda x: x["problem_id"] in indices)

        for buggy_datum, datum in zip(buggy_data[split], sliced_data):

            if buggy_datum["problem_id"] != datum["problem_id"]:
                raise ValueError("The data is not aligned.")
            
            question = datum["question"]
            solutions = json.loads(datum["solutions"])
            buggy_solutions = json.loads(buggy_datum["solutions"])

            for actual_answer, sols in zip(["yes", "no"], [solutions, buggy_solutions]):

                results[split][actual_answer] = {'fraction_correct': [],
                                                'avg_prob_when_correct': [],
                                                'avg_prob_when_incorrect': [],
                                                'fraction_failed': []}

                for sol in sols:
                    query = generate_query(question, sol, show_working)
                    answers, answer_probs, _ = guess_if_correct(model, api_key, query, temperature, n_attempts, check_probs, num_alternatives)

                    correct = []
                    correct_probs = []
                    incorrect_probs = []

                    for i in range(len(answers)):
                        if answers[i] is None:
                            continue
                        elif answers[i] == actual_answer:
                            correct.append(1)
                            correct_probs.append(answer_probs[i])
                        else:
                            correct.append(0)
                            incorrect_probs.append(answer_probs[i])
                    
                    results[split][actual_answer]['fraction_correct'].append(np.mean(correct))
                    results[split][actual_answer]['avg_prob_when_correct'].append(np.mean(correct_probs))
                    results[split][actual_answer]['avg_prob_when_incorrect'].append(np.mean(incorrect_probs))
                    results[split][actual_answer]['fraction_failed'].append(len(correct) / len(answers))

    if save_path is not None:
        model_name = model.split('/')[-1]
        reasoning = "with_reasoning" if show_working else "without_reasoning"
        with open(f"{save_path}/{model_name}-{reasoning}-{n_attempts}_attempts.json", "w") as f:
            json.dump(results, f)

### Run

In [51]:
OPENROUTER_API_KEY = "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"
SAVE_PATH = ""
MODELS = ["openai/gpt-4o-mini",
          "meta-llama/llama-3.1-8b-instruct", 
          "deepseek/deepseek-coder",
          "mistralai/codestral-mamba"]

for model in MODELS:
    for show_working in [False, True]:
        evaluate_model(model, OPENROUTER_API_KEY, show_working=show_working, n_attempts=3, temperature=3.0, save_path=SAVE_PATH)

KeyboardInterrupt: 

In [4]:
SYSTEM_PROMPT = "Answer the following question by writing a function in Python (you do not need to include `python` anywhere and you do not need to call the function). Do NOT include any explanation or natural language text, ONLY the code. Your code should be concise and should not include any comments. The solution should take input via stdin (i.e., as `input()`) and should output by printing to stdout.\n\n\n"

## Results