# Setup

In [None]:
%reload_ext autoreload
%autoreload 2

In [None]:
import os
import sys
sys.path.append(".bin")

import logging
logging.basicConfig(level=logging.INFO)

FILTERED_DIR = "data/filtered_code_contest_data"
CODE_CONTEST_DATA_PATH = "data/code_contest_data/"
PROMPTED_DIR = "data/patched_solutions_v2"
PATCHED_EVAL_RESULTS_PATH = "data/patched_eval_results"
BASE_EVAL_RESULTS_PATH = "data/eval_results"

### Loading Test Result and Problem Data

In [None]:
from domain.domain_dao import CompressedDomainFileDAO
from domain.problems_d import TestResultSetD, ContestProblemSetD, ContestProblemSetD, PatchedSolutionSetD, CodePatchingPromptD
from code_patching.prompts import PROMPTS


test_result_dao = CompressedDomainFileDAO(PATCHED_EVAL_RESULTS_PATH, TestResultSetD)
test_result_sets = list(test_result_dao.read())
test_results = [
    test_result for test_result_set in test_result_sets 
    for test_result in test_result_set.test_results]

base_result_dao = CompressedDomainFileDAO(BASE_EVAL_RESULTS_PATH, TestResultSetD)   
base_result_sets = list(base_result_dao.read())
base_results = [
    test_result for test_result_set in base_result_sets 
    for test_result in test_result_set.test_results]

problem_dao = CompressedDomainFileDAO(FILTERED_DIR, ContestProblemSetD)
problem_sets = list(problem_dao.read())
problem_ds = [
    problem for problem_set in problem_sets
    for problem in problem_set.problems]

patched_solution_dao = CompressedDomainFileDAO(PROMPTED_DIR, PatchedSolutionSetD)
patched_solution_sets = list(patched_solution_dao.read())
patched_solutions = {
    patched_solution.proto_id: patched_solution
    for patched_solution_set in patched_solution_sets
    for patched_solution in patched_solution_set.solutions}


patching_prompts = {
    prompt.proto_id: prompt
    for prompt in PROMPTS}


### Creating Unified DataFrame

In [None]:
# Problem ID Alignment
result_problem_ids = set([test_result.problem_id for test_result in test_results])
problem_ids = set([problem.proto_id for problem in problem_ds])
unified_problem_ids = result_problem_ids.union(problem_ids)
if result_problem_ids != problem_ids:
    difference = result_problem_ids.symmetric_difference(problem_ids)
    logging.warning(f"Problem ids in test results and problem set do not match with {len(difference)}\n {difference}")

# Test ID Alignment
result_test_ids = set([test_result.test_id for test_result in test_results])
test_ids = set([test.proto_id for problem in problem_ds for test in problem.public_tests])
unified_test_ids = result_test_ids.union(test_ids)
if result_test_ids != test_ids:
    raise ValueError(f"Test ids in test results and problem set do not match with {result_test_ids.symmetric_difference(test_ids)}")

In [None]:
import proto.contest_problem_pb2 as cp_pb2

def difficulty_to_int(difficulty: int) -> float:
    """ Translates to 1-20 scale for difficulty then quantizes to 0-1 float""" 
    DIFFICULTY_SCALER_MAP = {
        cp_pb2.ContestProblem.Difficulty.UNKNOWN_DIFFICULTY: -1,  # to purposefully segregate unknown difficulties
        cp_pb2.ContestProblem.Difficulty.EASY: 1,
        cp_pb2.ContestProblem.Difficulty.MEDIUM: 10,
        cp_pb2.ContestProblem.Difficulty.HARD: 15,
        cp_pb2.ContestProblem.Difficulty.HARDER: 17,
        cp_pb2.ContestProblem.Difficulty.HARDEST: 20,
        cp_pb2.ContestProblem.Difficulty.A: 1,
        cp_pb2.ContestProblem.Difficulty.B: 2,
        cp_pb2.ContestProblem.Difficulty.C: 3,
        cp_pb2.ContestProblem.Difficulty.D: 4,
        cp_pb2.ContestProblem.Difficulty.E: 5,
        cp_pb2.ContestProblem.Difficulty.F: 6,
        cp_pb2.ContestProblem.Difficulty.G: 7,
        cp_pb2.ContestProblem.Difficulty.H: 8,
        cp_pb2.ContestProblem.Difficulty.I: 9,
        cp_pb2.ContestProblem.Difficulty.J: 10,
        cp_pb2.ContestProblem.Difficulty.K: 11,
        cp_pb2.ContestProblem.Difficulty.L: 12,
        cp_pb2.ContestProblem.Difficulty.M: 13,
        cp_pb2.ContestProblem.Difficulty.N: 14,
        cp_pb2.ContestProblem.Difficulty.O: 15,
        cp_pb2.ContestProblem.Difficulty.P: 16,
        cp_pb2.ContestProblem.Difficulty.Q: 16,
        cp_pb2.ContestProblem.Difficulty.R: 17,
        cp_pb2.ContestProblem.Difficulty.S: 17,
        cp_pb2.ContestProblem.Difficulty.T: 18,
        cp_pb2.ContestProblem.Difficulty.U: 19,
        cp_pb2.ContestProblem.Difficulty.V: 20}
    if difficulty not in DIFFICULTY_SCALER_MAP:
        raise ValueError(f"Unknown difficulty {difficulty}")
    diff_scaler = DIFFICULTY_SCALER_MAP[difficulty]
    return diff_scaler / 20

In [None]:
from typing import List, Dict
from collections import defaultdict

from domain.problems_d import TestResultD

unified_result_dict: Dict[str, List[TestResultD]] = defaultdict(list)
for test_result in test_results:
    unified_result_dict[test_result.problem_id].append(test_result)

base_unified_result_dict: Dict[str, List[TestResultD]] = defaultdict(list)
for test_result in base_results:
    base_unified_result_dict[test_result.problem_id].append(test_result)

unified_problem_ds = [
    problem for problem in problem_ds
    if problem.proto_id in unified_problem_ids]


In [None]:
from typing import Any, List, Dict
import pandas as pd

import proto.patched_solutions_pb2 as ps_pb2


unified_dict_records: List[Dict[str, Any]] = []
for problem in unified_problem_ds:
    patched_test_results = unified_result_dict[problem.proto_id]
    base_test_results = base_unified_result_dict[problem.proto_id]
    test_results = patched_test_results + base_test_results

    difficulty = difficulty_to_int(problem.difficulty)
    problem_dict = {
        "problem_id": problem.proto_id,
        "problem_name": problem.name,
        "problem_difficulty": problem.difficulty,
        "mapped_difficulty": difficulty,
        "cf_points": problem.cf_points,
        "cf_rating": problem.cf_rating,
        "time_limit_nsec": problem.time_limit_nsec,
        "memory_limit_bytes": problem.memory_limit_bytes}
    
    for result in test_results:
        model = "base_result"
        prompt_name = "base_result"
        if result.solution_id in patched_solutions:
            solution = patched_solutions[result.solution_id]
            model = ps_pb2.ModelType.Name(solution.model)
            prompt_name = patching_prompts[solution.prompt_id].prompt_name
        
        te_output = [int(char) for char in result.expected_output if char.isdigit()]
        ts_output = [int(char) for char in result.solution_output if char.isdigit()]
        ts_correct = te_output == ts_output
        test_dict = {
            "expected_output": result.expected_output,
            "solution_output": result.solution_output,
            'te_output': te_output,
            'ts_output': ts_output,
            "ts_correct": ts_correct,
            "result_id": result.proto_id,
            "test_id": result.test_id,
            "solution_id": result.solution_id,
            "correct": result.is_correct,
            "exception": result.exception_info,
            "model": model,
            "prompt_name": prompt_name}
        
        unified_dict_records.append({**problem_dict, **test_dict})
        

In [None]:
unified_df = pd.DataFrame(unified_dict_records)

# Establishing Baseline

In [None]:
baseline_df = unified_df[unified_df.model == "base_result"]
gpt4_df = unified_df[unified_df.model == "MODEL_TYPE_GPT_4_TURBO"]
gpt3_df = unified_df[unified_df.model == "MODEL_TYPE_GPT_3_5_TURBO"]

In [None]:
gpt4_test_ids = set(gpt4_df['test_id'])
gpt3_test_ids = set(gpt3_df['test_id'])
baseline_test_ids = set(baseline_df['test_id'])

assert gpt3_test_ids == gpt4_test_ids == baseline_test_ids
gpt4_specific_test_ids = gpt4_test_ids - gpt3_test_ids - baseline_test_ids
gpt3_specific_test_ids = gpt3_test_ids - gpt4_test_ids - baseline_test_ids
baseline_specific_test_ids = baseline_test_ids - gpt4_test_ids - gpt3_test_ids

print(f"Baseline Specific Test Ids: {len(baseline_specific_test_ids)}")
print(f"GPT3 Specific Test Ids: {len(gpt3_specific_test_ids)}")
print(f"GPT4 Specific Test Ids: {len(gpt4_specific_test_ids)}")

In [None]:
gpt4_avg_correct = gpt4_df['correct'].mean()
gpt3_avg_correct = gpt3_df['correct'].mean()
baseline_avg_correct = baseline_df['correct'].mean()

print(f"Baseline Average Correct: {baseline_avg_correct}")
print(f"GPT3 Average Correct: {gpt3_avg_correct}")
print(f"GPT4 Average Correct: {gpt4_avg_correct}")


In [None]:
gpt4_avg_correct = gpt4_df['ts_correct'].mean()
gpt3_avg_correct = gpt3_df['ts_correct'].mean()
baseline_avg_correct = baseline_df['ts_correct'].mean()

print(f"Trans Baseline Average Correct: {baseline_avg_correct}")
print(f"Trans GPT3 Average Correct: {gpt3_avg_correct}")
print(f"Trans GPT4 Average Correct: {gpt4_avg_correct}")


In [None]:
gpt4_non_exception_cnt = gpt4_df['exception'].value_counts()['']
gpt3_non_exception_cnt = gpt3_df['exception'].value_counts()['']
baseline_non_exception_cnt = baseline_df['exception'].value_counts()['']

gpt4_exception_rate = (len(gpt4_df) - gpt4_non_exception_cnt) / len(gpt4_df)
gpt3_exception_rate = (len(gpt3_df) - gpt3_non_exception_cnt) / len(gpt3_df)
baseline_exception_rate = (len(baseline_df) - baseline_non_exception_cnt) / len(baseline_df)

print(f"Baseline Exception Rate: {round(baseline_exception_rate, 4)}")
print(f"GPT3 Exception Rate: {round(gpt3_exception_rate, 4)}")
print(f"GPT4 Exception Rate: {round(gpt4_exception_rate, 4)}")

In [None]:
# correct outside of exception
gpt4_non_exception_correct = gpt4_df[gpt4_df.exception == '']['ts_correct'].mean()
gpt3_non_exception_correct = gpt3_df[gpt3_df.exception == '']['ts_correct'].mean()
baseline_non_exception_correct = baseline_df[baseline_df.exception == '']['ts_correct'].mean()

print(f"Baseline Non-Exception Correct: {round(baseline_non_exception_correct, 4)}")
print(f"GPT3 Non-Exception Correct: {round(gpt3_non_exception_correct, 4)}")
print(f"GPT4 Non-Exception Correct: {round(gpt4_non_exception_correct, 4)}")

In [None]:
# check relative performance of gpt3 and gpt4 for different prompts
gpt4_df[gpt4_df.exception == ''].groupby('prompt_name')['ts_correct'].mean()

In [None]:
gpt3_df[gpt3_df.exception == ''].groupby('prompt_name')['ts_correct'].mean()

In [None]:

# plot performance of gpt4 against mapped_difficulty
display(gpt4_df[gpt4_df.exception == ''].groupby('mapped_difficulty')['ts_correct'].mean().to_html())

In [None]:
gpt4_df[gpt4_df.exception == '']

In [None]:
gpt4_df[gpt4_df.exception == '']

In [None]:
corr_gpt4 = unified_df[['problem_difficulty', 'mapped_difficulty', 'cf_points', 'cf_rating', 'time_limit_nsec', 'memory_limit_bytes', 'model', 'prompt_name']]
# change model and prompt_name to an integer
corr_gpt4.loc[:,'model_2']= corr_gpt4['model'].astype('category').cat.codes
corr_gpt4['prompt_name_2'] = corr_gpt4['prompt_name'].astype('category').cat.codes
corr_gpt4 = corr_gpt4.drop(columns=['model', 'prompt_name'])

corr_gpt4.corrwith(gpt4_df['ts_correct'])

In [None]:
# make model and prompt_name into integers
corr_gpt4['model'] = corr_gpt4['model'].cat.codes

In [None]:
from matplotlib import pyplot as plt 


diff_vals = gpt4_df[gpt4_df.exception == ''].groupby('mapped_difficulty')['ts_correct'].mean()
plt.bar(diff_vals.index, diff_vals.values)
plt.show()