In [11]:
from datasets import load_dataset, DatasetDict
import subprocess
from typing import cast
import os
from openai import OpenAI
import time
import json
from prompts import *

deepmind_ds: DatasetDict = cast(DatasetDict, load_dataset("deepmind/code_contests"))

In [12]:
def parse_usaco_tests(problem_id):
  test_cases = []
  test_folder = f"datasets/usaco_v3/tests/{problem_id}"
  for i in range(1, 11):
    for (input_path, output_path) in [(f"{test_folder}/I.{i}", f"{test_folder}/O.{i}"), 
                                      (f"{test_folder}/{i}.in", f"{test_folder}/{i}.out")]:
      if not os.path.isfile(input_path) or not os.path.isfile(output_path):
        break
      with open(input_path, 'r') as f:
        input_data = f.read()
      with open(output_path, 'r') as f:
        output_data = f.read()
      test_cases.append({
        "input": input_data,
        "output": output_data
      })
      
  # keep only the smallest 10 test cases
  test_cases = sorted(test_cases, key=lambda x: len(x["input"]))[:10]
  
  return test_cases

def parse_usaco_bronze_20_problem_ds():
  with open('datasets/usaco_subset307_dict.json', 'r') as f:
    usaco_ds = json.load(f)
    
  # read through the fields of the json
  bronze_usaco_20_problem_ds = []
  for key in usaco_ds.keys():
    # if more than 20 problems, break
    if len(bronze_usaco_20_problem_ds) >= 20:
      break
    problem = usaco_ds[key]
    if problem["problem_level"] != "bronze":
      continue
    
    # add a new problem to the list, with the following fields: title, description, tests (list of json with input and output fields)
    bronze_usaco_20_problem_ds.append({
      "title": problem["name"],
      "description": problem["description"],
      "tests": parse_usaco_tests(problem["problem_id"])
    })
  
  return bronze_usaco_20_problem_ds

In [13]:
def parse_deepmind_tests(problem):
  test_cases = []
  for test_case in problem["public_tests"]:
    test_cases.append({
      "input": test_case["input"],
      "output": test_case["output"]
    })
  for test_case in problem["generated_tests"]:
    test_cases.append({
      "input": test_case["input"],
      "output": test_case["output"]
    })
    
  # keep only the smallest 10 test cases
  test_cases = sorted(test_cases, key=lambda x: len(x["input"]))[:10]
  
  return test_cases

def parse_deepmind_20_1400rating_ds():
  deepmind_20_1400rating_ds = []
  for problem in cast(dict, deepmind_ds['train']):
    if len(deepmind_20_1400rating_ds) >= 20:
      break
    if problem["cf_rating"] > 1400:
      continue
    deepmind_20_1400rating_ds.append({
      "title": problem["title"],
      "description": problem["description"],
      "tests": parse_deepmind_tests(problem)
    })
    
  return deepmind_20_1400rating_ds

In [14]:
class CompilationError():
  def __init__(self, message):
    self.message = message

class CompilationSuccess():
  pass

class RuntimeError():
  def __init__(self, message):
    self.message = message

class RunSuccess():
  def __init__(self, output):
    self.output = output

class TestFailed():
  def __init__(self, message, input, expected_output, output, number_of_tests, tests_passed):
    self.message = message
    self.input = input
    self.expected_output = expected_output
    self.output = output
    self.number_of_tests = number_of_tests
    self.tests_passed = tests_passed

class TestSuccess():
  def __init__(self, message, number_of_tests):
    self.message = message
    self.number_of_tests = number_of_tests

In [15]:
# Function to compile the C++ program
def compile_cpp(solution_code, task_name, strategy):
  folder = f"generated/{strategy}/{task_name}"
  os.makedirs(folder, exist_ok=True)
  for file in os.listdir(folder):
    os.remove(f"{folder}/{file}")
    
  with open(f"{folder}/{task_name}.cpp", "w") as f:
    f.write(solution_code)
    compile_command = ["g++", f"{folder}/{task_name}.cpp", "-o", f"{folder}/{task_name}"]

  result = subprocess.run(compile_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  if result.returncode != 0:
    with open(f"{folder}/compilation_error.txt", "w") as f:
      f.write(result.stderr.decode('utf-8'))

    return CompilationError(f"Compilation failed: {result.stderr.decode('utf-8')}")
  return CompilationSuccess()

def get_compilation_report(content, messages, problem, strategy):
  start = content.find("cpp{{")
  end = content.find("}}cpp")

  if start == -1 or end == -1:
    messages.append({
      "role": "user",
      "content": "You MUST provide the solution code in the format of cpp{{YOUR SOLUTION HERE}}cpp."
    })
    return None

  solution_code = content[start+5:end]
  task_name = problem["title"]
  
  r = compile_cpp(solution_code, task_name, strategy)
  if r.__class__.__name__ == "CompilationError":
    messages.append({
      "role": "user",
      "content": "Compilation failed. Please correct the errors and try again.\
                  \n\nError message: {}".format(r.message)
    })
    return None

  return solution_code

In [16]:
class ProblemOutput:
  def __init__(self, passed_tests, rte_tests, failed_tests, solution_code):
    self.passed_tests = passed_tests
    self.rte_tests = rte_tests
    self.failed_tests = failed_tests
    self.solution_code = solution_code
    
def run_program(executable, input_data):
  process = subprocess.Popen([f"./{executable}"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  output, error = process.communicate(input=input_data.encode())
  if process.returncode != 0:
    return RuntimeError(f"Runtime error: {error.decode('utf-8')}")
  return RunSuccess(output.decode('utf-8'))

def run_tests(test_cases, executable):
  f = open('tmp_output.txt', 'w')
  for i, test in enumerate(test_cases):
    input_data = test["input"]
    expected_output = test["output"]
    output = run_program(executable, input_data)
    if output.__class__.__name__ != "RunSuccess":
      return output
    
    actual_output = output.output

    # Compare outputs
    if actual_output.strip() == expected_output.strip():
      f.write(f"Test case {i+1}: PASSED\n")
    else:
      f.write(f"Test case {i+1}: FAILED\n")
      f.write(f"Expected output:\n{expected_output}\n")
      f.write(f"Actual output:\n{actual_output}\n")
      return TestFailed(f"Test case {i+1} failed", input_data, expected_output, actual_output, len(test_cases), i+1)
  
  f.close()
  os.remove('tmp_output.txt')
  
  return TestSuccess("All test cases passed", len(test_cases)) 

def get_runtime_report(problem, strategy, solution_code):
  task_name = problem["title"]
  
  passed_tests = []
  rte_tests = []
  failed_tests = []

  for test_case in problem["tests"]:
    input_data = test_case["input"]
    expected_output = test_case["output"]

    output = run_program(f"generated/{strategy}/{task_name}/{task_name}", input_data)
    if output.__class__.__name__ == "RuntimeError":
      rte_tests.append({
        "input": input_data,
        "output": expected_output,
        "error": output.message
      })
    elif output.__class__.__name__ == "RunSuccess" and output.output.strip() == expected_output.strip():
      passed_tests.append({
        "input": input_data,
        "output": expected_output
      })
    else:
      failed_tests.append({
        "input": input_data,
        "expected_output": expected_output,
        "actual_output": output.output
      })
      
  return ProblemOutput(passed_tests, rte_tests, failed_tests, solution_code)

In [17]:
client = OpenAI(api_key='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')

def call_ai(model, messages):
  response = client.chat.completions.create(
    model=model,
    messages=messages
  )
  messages.append({"role": "assistant", "content": response.choices[0].message.content})
  time.sleep(20)
  content = response.choices[0].message.content
  
  return content

In [18]:
def get_cpp_code(model, messages, problem, strategy):
  content = call_ai(model, messages)

  best_output = None
  trials_since_last_solution = 0

  for _ in range(10):
    source_code = get_compilation_report(content, messages, problem, strategy)
    if source_code is None:
      continue

    run_report = get_runtime_report(problem, strategy, source_code)
    if best_output is None or len(run_report.passed_tests) > len(best_output.passed_tests):
      best_output = run_report
      trials_since_last_solution = 0
      
      if len(run_report.passed_tests) == len(problem["tests"]):
        break
    else:
      trials_since_last_solution += 1
      if trials_since_last_solution >= 4:
        messages.append({
          "role": "user",
          "content": "You MUST start again from your last best solution and try to improve it by fixing the failed tests.\
                      \nYour last best solution is:\n\n{}".format(best_output.solution_code)
        })
    
  return best_output

In [19]:
def get_solution_direct_prompting(problem, model):
    strategy = "direct_prompting"
    title = problem["title"]
    description = problem["description"]
    public_test = problem["test_cases"][0]

    messages = [{
      "role": "user",
      "content": generate_solution_direct_prompt.format(Problem=title, Objective=description, Input=public_test["input"], Output=public_test["output"])
    }]

    return get_cpp_code(model, messages, problem, strategy)


In [20]:
def get_objective_and_constraints(model, problem):
  messages = [{
    "role": "user",
    "content": get_objective_and_constraints_prompt.format(Problem=problem["description"])
  }]

  content = call_ai(model, messages)

  # find constraints
  constraints = []
  start = 0
  while True:
    start = content.find("cns{{", start)
    if start == -1:
      break
    end = content.find("}}cns", start)
    constraints.append(content[start+5:end])
    start = end
  
  # find objective
  start = content.find("obj{{")
  end = content.find("}}obj")
  objective = content[start+5:end]

  return constraints, objective

In [21]:
def get_additional_constraint(model, problem, constraints, objective):
  messages = [{
    "role": "system",
    "content": get_additional_constraint_prompt.format(Problem=problem["description"], 
      Constraints="\n".join(["- " + c for c in constraints]), Objective="- " + objective)
  }]

  content = call_ai(model, messages)

  # find additional constraint
  start = content.find("ncns{{")
  end = content.find("}}ncns")
  additional_constraint = content[start+6:end]

  return additional_constraint

In [22]:
def is_valid_constraint(problem, constraints, additional_constraint):
  messages = [{
    "role": "user",
    "content": test_constraint_prompt.format(Problem=problem["description"], 
      Constraints="\n".join(["- " + c for c in constraints]), Objective="- " + problem["goal"], NewConstraint=additional_constraint)
  }]

  cnt_yes = 0
  for model in ["gpt-3.5-turbo", "gpt-4o-mini", "gpt-3.5-turbo-0125"]:
    content = call_ai(model, messages)
    start = content.find("Answer:")
    end = content.find("```", start)
    answer = content[start+8:end].strip()
    if answer == "YES":
      cnt_yes += 1
    if cnt_yes == 2:
      return True
    
  return False

In [23]:
def fix_constraint(model, problem, constraints, objective, incorrect_constraint):
  messages = [{
    "role": "user",
    "content": fix_constraint_prompt.format(Problem=problem["description"], 
      Constraints="\n".join(["- " + c for c in constraints]), Objective="- " + objective, Incorrect_constraint=incorrect_constraint)
  }]

  content = call_ai(model, messages)

  # find fixed constraint in the content
  start = content.find("Corrected constraint:")
  end = content.find("```", start)
  fixed_constraint = content[start+21:end].strip()

  return fixed_constraint

In [24]:
def can_solve_problem(model, problem, constraints, objective):
  messages = [{
    "role": "user",
    "content": can_solve_problem_prompt.format(Problem=problem["description"], 
      Constraints="\n".join(["- " + c for c in constraints]), Objective="- " + objective)
  }]

  content = call_ai(model, messages)

  # find answer
  start = content.find("Answer:")
  end = content.find("```", start)
  answer = content[start+8:end].strip()

  return answer

In [25]:
def get_solution_MACM(problem, model):
  strategy = "MACM"

  constraints, objective = get_objective_and_constraints(model, problem)
  for _ in range(10):
    additional_constraint = get_additional_constraint(model, problem, constraints, objective)
    if not is_valid_constraint(problem, constraints, additional_constraint):
      additional_constraint = fix_constraint(model, problem, strategy, constraints, additional_constraint)
      if additional_constraint is not None:
        constraints.append(additional_constraint)
    if can_solve_problem(model, problem, constraints, objective):
      break

  messages = [{
    "role": "user",
    "content": generate_solution_MACM_prompt.format(Problem=problem["description"], 
      Constraints="\n".join(["- " + c for c in constraints]), Objective="- " + objective, Input=problem["input"], Output=problem["output"])
  }]

  return get_cpp_code(model, messages, problem, strategy)

In [26]:
def get_solution_flow_engineering(problem, model):
  strategy = "flow_engineering"

  messages = [{
    "role": "user",
    "content": explain_bullet_points_prompt.format(Problem=problem["description"])
  }]
  call_ai(model, messages)

  messages.append({
    "role": "user",
    "content": explain_input_output_prompt.format(
      Input1=problem["test_cases"][0]["input"], Output1=problem["test_cases"][0]["output"],
      Input2=problem["test_cases"][1]["input"], Output2=problem["test_cases"][1]["output"],
      Input3=problem["test_cases"][2]["input"], Output3=problem["test_cases"][2]["output"],
    )
  })
  call_ai(model, messages)

  messages.append({
    "role": "user",
    "content": generate_starting_solutions_prompt
  })
  call_ai(model, messages)

  messages.append({
    "role": "user",
    "content": gen_final_solution_flow_engineering_prompt
  })
  return get_cpp_code(model, messages, problem, strategy)

In [27]:
def get_objective_for_problem(model, problem):
  messages = [{
    "role": "user",
    "content": get_problem_objective_prompt.format(Problem=problem["description"])
  }]
  content = call_ai(model, messages)
  
  start = content.find("obj{{")
  end = content.find("}}obj")
  objective = content[start+5:end]

  return objective

In [28]:
def get_new_key_point(model, problem, objective, key_points):
  messages = [{
    "role": "user",
    "content": get_new_key_point_prompt.format(Problem=problem["description"], 
      Objective=objective, Key_points="\n".join(["- " + kp for kp in key_points]))
  }]
  content = call_ai(model, messages)
  
  start = content.find("key{{")
  end = content.find("}}key")
  new_key_point = content[start+5:end]

  return new_key_point

In [29]:
def validate_likelyhood_of_key_point(model, problem, objective, key_points, eval_key_point):
  messages = [{
    "role": "user",
    "content": validate_likelihood_of_key_point_prompt.format(Problem=problem["description"], 
      Objective=objective, Key_points="\n".join(["- " + kp for kp in key_points]), Eval_key_point=eval_key_point)
  }]    

  content = call_ai(model, messages)

  start = content.find("Answer:")
  end = content.find("```", start)
  answer = content[start+8:end].strip()

  return answer

def get_node_score(model, problem, objective, key_points, eval_key_point):

  score = 0.0
  for _ in range(3):
    answer = validate_likelyhood_of_key_point(model, problem, objective, key_points, eval_key_point)
    if answer == "SURE":
      score += 1.0
    elif answer == "LIKELY":
      score += 0.5
    elif answer == "UNLIKELY":
      score -= 0.5
    elif answer == "NO":
      score -= 1.0

  return score

In [30]:
class ThoughtNode:
  def __init__(self, key_points):
    self.key_points = key_points

def get_solution_tree_of_thoughts(problem, model):
  strategy = "ToT"
  get_solution_ToT_prompt = generate_solution_MACM_prompt
  objective = get_objective_for_problem(model, problem)
  current_level_nodes = [ThoughtNode([])]

  for _ in range(3):
    new_level_nodes = []
    for node in current_level_nodes:
      for _ in range(3):
        new_key_point = get_new_key_point(model, problem, objective, node.key_points)
        new_level_nodes.append((new_key_point, ThoughtNode(node.key_points + [new_key_point])))

    current_level_nodes = new_level_nodes.sort(key=lambda x: get_node_score(model, problem, objective, x[1].key_points, x[0]), reverse=True)[:5]

  key_points = []
  for node in current_level_nodes:
    key_points += node.key_points

  messages = [{
    "role": "user",
    "content": get_solution_ToT_prompt.format(Problem=problem["description"], 
      Constraints="\n".join(["- " + c for c in key_points]), Objective="- " + objective, Input=problem["input"], Output=problem["output"])
  }]

  return get_cpp_code(model, messages, problem, strategy)