### Setup

In [1]:
# reload imports.
%load_ext autoreload
%autoreload 2

In [2]:
import os
import json

# Gemotry - t
folder_path = "./MATH/test/geometry/"
json_files = [f for f in os.listdir(folder_path) if f.endswith('.json')]

# Store each in list
json_objects = []
for file in json_files:
    file_path = os.path.join(folder_path, file)
    with open(file_path, 'r') as f:
        json_data = json.load(f)
        json_data["file_path"]=file_path # Add file path so we can keep track of them easily
        json_objects.append(json_data)

print(len(json_objects))
filtered_json_objects = [obj for obj in json_objects if obj.get('level') == 'Level 5']
print(len(filtered_json_objects))

# Get 5 random ones
import random

random.seed(10)
samples = random.sample(filtered_json_objects, 5)

question = samples[0]['problem']
print(question)
print(samples[0]["file_path"])

479
132
Point $P$ is inside equilateral triangle $ABC$ such that the altitudes from $P$ to $\overline{AB}$, $\overline{BC}$, and $\overline{CA}$ have lengths 5, 6, and 7 respectively.  What is the area of triangle $ABC$?
./MATH/test/geometry/990.json


### Idea 1 - Step by Step

#### Explanation 

Maybe I should try asking it to solve to the end to start, and have it return a list of steps. Then I can go back and see how many are correct?
We can limit this by only having it create max 5 steps.

Then we can go through step by step. For each step->

** For now, avoid this recursive breaking up **

1. Should we break this further into steps? is it too detailed?
    1. If so, repeat this process and recurse
2. If not, then we analyze this step, and extract the following things
    - Proven mathematical relationships - written as formulas
    - Problem relationships - written without math, just words. Some sort of logical condition in the project
    - Intermediate math relationships - numerical relationships between parts of the problem. Only valid in this context. Uses a math formula and some specific knowledge. Break up into a proven math rel and a problem rel

3. We need to verify the above. 
    - Proven math relations can be verified by asking
    - Problem relationships verified by asking as well
    - Intermediate math relationships should have any math relations verified, then any problem relationships verified, then finally they should be executed and verified with code

4. Now the step is good to go. We can re-write it to show the verified above things. We can also store the proven relationships and problem relationships seperately. 

5. Now if at any point in the larger step we had a problem, we are going to have to recalculate the rest of the steps. If it's the same we can continue through the steps, but if its different it may change the next ones.
    - I think this is fine. We need some way of error correcting, and propogating that correction

6. Now this is all very good for fixing problems with steps, but what if the whole approach is wrong in some way? Eventually we will build up a list of correct steps, but not all of them will be relevant. And there may be a better order? for some. (Though the order should be fixed by regenerating steps each time.. hmm)
Not sure what to do *yet* here

Original step by step idea->
1. Ask what it would like to do - what is the first step, given what we have.
2. Then analyze the step it returns - what is the rationale behind it? What outside theorems does it use? What new assumptions does it make?
3. Check each assumption individually, and iterate until we get a good first step.
4. Then do this step, update the knowns, and then repeat the process until we get the objective

#### Functions

In [3]:
# 1. Try and solve the original problem, using max steps. Have it format them w/ schema.
# Go step by step..
from utils.async_gpt import agenerate_from_gpt_with_schema
from pydantic import BaseModel

class Steps(BaseModel):
    steps: list[str]
    summary: str

create_steps_prompt = """
Given the question:
{question}

{prev_steps_str}

Return a series of the next steps to solve the problem. 
Return a maximum of {steps_left} steps, but it's okay to return less, even just 1 step if that's all it takes to get the solution.
Be detailed, and break up complex steps into multiple steps. Try and balance the complexity of each step.
Additionally, give a brief summary on the overall strategy, or any key points
"""

async def create_steps(question: str, prev_steps: list[str], steps_left:int):
    """
    Create steps to solve a question
    """
    if len(prev_steps) == 0:
        prev_steps_str = ""
    else:
        numbered_list = "\n".join([f"Step {i+1}:\n {step}" for i, step in enumerate(prev_steps)])
        prev_steps_str = f"And the previous steps:\n {numbered_list}"
    messages = [
        {
            "role": "user",
            "content": create_steps_prompt.format(question=question, prev_steps_str=prev_steps_str, steps_left=steps_left),
        },
    ]
    steps: Steps = await agenerate_from_gpt_with_schema(
        messages, Steps
    )

    return steps.steps

In [4]:
# When I change the question manually, it seems to get it wrong. I should check this out by running 20 or so times each. 
# THis is kind of expected, after reading that paper about how the models are likely learning common problems...
## IMPORTANT

In [5]:
# 2. Let's go through step by step and see if each step is good. 
# Generate an explanation for why or why not it's correct. Then return true/false. If any step is wrong, we re-calculate the next steps
from utils.async_gpt import agenerate_from_gpt_with_schema
from pydantic import BaseModel

class VerifyStep(BaseModel):
    reasoning: str
    correct: bool


verify_step_prompt = """
Given the following info, verify if the CURRENT STEP is correct. 
Assume the question and any previous steps are correct.
ONLY VERIFY the current step.
Return the reasoning for why or why not it is correct, and a bool for if it is correct or not

Existing Info ====
Question:
{question}

{prev_steps_str}

New ====
Current step:
{current_step}

"""

async def verify_step(question: str, prev_steps: list[str], current_step: str):
    """
    Verify a step given the previous ones. 
    * Note: we do not care if this step is helpful towards the objective. We're just checking the assumptions it makes.
    """
    if len(prev_steps) == 0:
        prev_steps_str = ""
    else:
        # Turn prev steps to str
        numbered_list = "\n".join([f"Step {i+1}:\n {step}" for i, step in enumerate(prev_steps)])
        prev_steps_str = f"Previous Steps:\n {numbered_list}"

    messages = [
        {
            "role": "user",
            "content": verify_step_prompt.format(question=question, prev_steps_str=prev_steps_str, current_step=current_step),
        },
    ]
    verify: VerifyStep = await agenerate_from_gpt_with_schema(
        messages, VerifyStep
    )

    return verify


In [6]:
# 2.1 If a step is wrong, we'll want to correct it.
# For now lets try giving the function all the info. We will give it the existing info, the previous incorrect step, and the reasoning behind it
from utils.async_gpt import agenerate_from_gpt


previous_info_prompt = """
Existing True Information ====
Question:
{question}

{prev_steps_str}
"""

wrong_step_prompt = """
New Information ===
Current step (incorrect):
{current_step}

Provided reason why current step is incorrect:
{reasoning}
"""

ask="""
The existing true information has been verified. It shows the preceding steps.
The new information shows the generated current step. This current step contains an error in it. The reason for the error is given.

Your job is to rewrite this current step so that it is CORRECT. Use the reason given to fix the current step.
Refer back to the existing true information for verified assumptions. Do not add ANY new ideas, or more steps. 
ONLY change the current step.
Fix the current step for the reason provided, and return ONLY the new current step.
"""


async def fix_step(question: str, prev_steps: list[str], current_step: str, reasoning: str):
    """
    Fix a step that was said to be wrong. Returns the new step.
    Currently we are giving all the information. In the future consider limiting scope
    """
    if len(prev_steps) == 0:
        prev_steps_str = ""
    else:
        # Turn prev steps to str
        numbered_list = "\n".join([f"Step {i+1}:\n {step}" for i, step in enumerate(prev_steps)])
        prev_steps_str = f"Previous Steps:\n {numbered_list}"

    messages = [
        {"role": "assistant",
            "content": previous_info_prompt.format(question=question, prev_steps_str=prev_steps_str)},

        {
            "role": "assistant",
            "content": wrong_step_prompt.format(current_step=current_step, reasoning=reasoning),
        },
        {
            "role": "user",
            "content": ask,
        },
    ]
    new_step = await agenerate_from_gpt(
        messages
    )

    return new_step

In [7]:
# Function to get final answer from steps

async def get_answer_from_steps(steps: list[str]):
    steps_str = "\n".join([f"Step {i+1}:\n {step}" for i, step in enumerate(steps)])
    messages = [
        {
            "role": "user",
            "content": f""" Given the following solution steps to a problem return ONLY the final answer: 
            {steps_str}""",
        },
    ]
    return await agenerate_from_gpt(messages)

#### Main loop

In [8]:
from utils.async_logger import AsyncLogger

async def step_by_step_loop(question: str, log_path:str):
    AsyncLogger.add_message(log_path, f"Starting run {log_path}",)
    # Max steps is used to try and keep gpt from generating a million steps or 1 step each time.
    MAX_STEPS = 5
    verified_steps = []
    # Create the first steps
    unverified_steps = await create_steps(question, [], MAX_STEPS)

    while len(unverified_steps) > 0 and len(verified_steps) < MAX_STEPS:
        current_step = unverified_steps.pop(0)
        AsyncLogger.add_message(log_path, f"Verify the current step: \n{current_step}")
        verify = await verify_step(question, verified_steps, current_step)

        if not verify.correct:
            AsyncLogger.add_message(log_path, f"Fix the current step for reasoning: {verify.reasoning}")
            fixed_step = await fix_step(question, verified_steps, current_step, verify.reasoning)
            AsyncLogger.add_message(log_path, f"#Fixed step: {fixed_step}")

            # After fixing a step we need to verify it since it often gets off the rails. 
            # If it's correct we can add it. Else we throw the current step out and regenerate
            verify_fixed = await verify_step(question, verified_steps, fixed_step)
            AsyncLogger.add_message(log_path, f"Fixed step was verified as: {verify_fixed.correct}")
    
            if verify_fixed.correct:
                verified_steps.append(fixed_step)
                if len(verified_steps) == MAX_STEPS: # If we have reached the end, break here. #TODO: would be nice to avoid checking len(verified_steps) in two places..
                    break
            
            unverified_steps = await create_steps(question, verified_steps, MAX_STEPS - len(verified_steps))
            AsyncLogger.add_message(log_path, f"Regenerated steps: \n{unverified_steps}")

        else:
            AsyncLogger.add_message(log_path, "Step was correct. Add to verified")
            verified_steps.append(current_step)


    AsyncLogger.add_message(log_path, "We have our verified steps:==========\n")

    steps_str = "\n".join([f"Step {i+1}:\n{step}" for i, step in enumerate(verified_steps)])
    AsyncLogger.add_message(log_path, steps_str)

    final_answer = await get_answer_from_steps(verified_steps)
    AsyncLogger.add_message(log_path, f"And our Final Answer\n{final_answer}")

    await AsyncLogger.flush_one(log_path)
    return final_answer

#### Async wrapper to run multiple times

In [9]:
import anyio
from utils.custom_logger import CustomLogger
from typing import Callable, Awaitable


async def run_many_times_async(function: Callable[[str, str], Awaitable[str]], question: str, max_concurrent_tasks=10, runs=10):
    """ Run the given function many times. Logs results"""
    generated_answers = []
    CustomLogger.start_watch() 

    # Used in case we need to limit for e.g. rate limits
    semaphore = anyio.Semaphore(max_concurrent_tasks)

    # Async wrapper - edits generated_answers
    async def wrapper(question: str, i: int):
        log_path = f"run-{i}"
        async with semaphore: # Limit concurrent tasks
            try:
                result = await function(question, log_path)
                generated_answers.append(f"{i}: {result}")
                CustomLogger.print(f"Finished task {i}") 
            except Exception as e:
                CustomLogger.print(f"Error on problem {log_path}: {e}")
                AsyncLogger.add_message(log_path, "Error", str(e))
                await AsyncLogger.flush_one(log_path)

    async with anyio.create_task_group() as tg:
       for i in range(runs):
            tg.start_soon(wrapper, question, i) 
    

    CustomLogger.print("All answers collected.")

    # Add to validation
    CustomLogger.update_path("validation")
    CustomLogger.default_log("Generated", *generated_answers)
    CustomLogger.default_log("Actual", "147 * sqrt(3)")

#### Run 
(skipped for now)

In [10]:
%%script echo skipping

# Test out custom questions
# Answer to q1 is 147*sqrt(3)
question1 = "Point $P$ is inside equilateral triangle $ABC$ such that the altitudes from $P$ to $\overline{AB}$, $\overline{BC}$, and $\overline{CA}$ have lengths 8, 6, and 7 respectively.  What is the area of triangle $ABC$?"


await run_many_times_async(step_by_step_loop, question1)

skipping


### Idea 2: Take Step by Step and improve on it

#### Explanation

Exploration of failed in nb2_output/idea 1/README.md

Fixes needed:
- To fix the math we can add a second validator that looks for mathematical inconsistencies using the code editor.
- To fix the premature stopping we can add a check at the end that makes sure we have the exact answer. If not, we will add a step.
- To fix the big steps being created we can 
    - change create_steps to always create the given amount and ask it to equalize them
    - have a check for if a step is too large. If it is we want to break it up.

For now we're just going to do one change to avoid cluttering it. We can see how this does, and see what problems it still faces. 

Implementation:
- Changing create_steps to always create exact amount, try and balance steps, and not create summary.
- Change the loop to keep running. After MIN, check if we have the answer
    - This allows us to have create steps always create the exact amount of steps, since even if it creates too many steps we will check along the way.
    - If it's not the last step, we will always make at least one more step. Even if we already hit max steps.
    - This check should happen after the verify step.

Future:
- In idea 2 output we'll look for math errors and steps being created that are too large
- The math stuff we can eventually sort each step verification into math, assumptions, etc.
- The too large steps I'm thinking will occur more. Ideally I want to break them up recursively, but dunno how to practically do this yet. Let's see if its even necessary :)

#### New Functions

In [11]:
# Rewrite create_steps to always return exact amount asked for
from utils.async_gpt import agenerate_from_gpt_with_schema
from pydantic import BaseModel

class StepObj(BaseModel):
    steps: list[str]

create_steps_prompt = """
Given the question:
{question}

{prev_steps_str}

Return a series of the next steps to solve the problem, ending in the solution. 
Return exactly {desired_steps} step(s), balancing the complexity evenly among them.
Be detailed, and break up complex steps. 
"""

async def create_steps_2(question: str, prev_steps: list[str], desired_steps:int):
    """
    Create steps to solve a question. Returns exact amount of desired steps. 
    """
    if len(prev_steps) == 0:
        prev_steps_str = ""
    else:
        numbered_list = "\n".join([f"Step {i+1}:\n {step}" for i, step in enumerate(prev_steps)])
        prev_steps_str = f"And the previous steps:\n {numbered_list}"

    messages = [
        {
            "role": "user",
            "content": create_steps_prompt.format(question=question, prev_steps_str=prev_steps_str, desired_steps=desired_steps),
        },
    ]

    # Keep generating until we get the exact amount
    while True:
        step_obj: StepObj = await agenerate_from_gpt_with_schema(
            messages, StepObj
        )
        steps = step_obj.steps
        if len(steps) == desired_steps:
            break

    return steps

In [12]:
# Check if we have answer
from pydantic import BaseModel

class HaveAnswer(BaseModel):
    objective: str
    answer: str
    answered: bool

answer_check_prompt = """ 
What is the given Problem asking? return only the objective.
What is the specific answer given by the Text? Does it have one? If there's no specific, exact answer, return answer:""

If the Text contains the EXACT ANSWER to the objective, then return answered: True.
Else, in all other cases, return answered: False

Problem:
{problem}

Text:
{step}
"""

async def do_we_have_answer(problem:str, step: str):
    # Function to get final answer rom steps
    messages = [
        {
            "role": "user",
            "content": answer_check_prompt.format(problem=problem, step=step),
        },
    ]

    have_answer: HaveAnswer = await agenerate_from_gpt_with_schema(messages, HaveAnswer)
    return have_answer.answered

In [13]:
# Rewrite get answer from steps to just get answer from last step
# Before, since we weren't checking if each step had the exact answer, we had to check the whole thing. 
# Now, since have_answer triggered we can just check the last step
# Function to get final answer from steps

final_answer_prompt = """ 
Return ONLY the final answer to the Problem from the given Text.
The answer should be no more than a line long. Return ONLY the EXACT answer given in the Text.

Given the Problem: 
{problem}
            
And the Text
{step}
"""

async def get_final_answer(problem:str, step: str,):
    messages = [
        {
            "role": "user",
            "content": final_answer_prompt.format(problem=problem, step=step),
        },
    ]
    return await agenerate_from_gpt(messages)

In [14]:
# Sanity check - given the propensity to think we have the answer too early, let's do one final sanity check

class TrueOrFalse(BaseModel):
    value: bool

sanity_prompt =""" 
Given the Problem: 
{problem}

The Text:
{step}
            
And the extracted Answer
{answer}

Return True if and only if the Text contains the Answer word for word, AND the Answer EXACTLY ANSWERS the Question
"""

async def sanity_check(problem:str, step: str, answer: str):
    messages = [
        {
            "role": "user",
            "content": final_answer_prompt.format(problem=problem, step=step, answer=answer),
        },
    ]
    true_or_false: TrueOrFalse = await agenerate_from_gpt_with_schema(messages, TrueOrFalse)
    return true_or_false.value

In [15]:
# New do we have answer
# Reasoning - I want to look at all the steps. Now its not triggering enough LOL.

class HaveAnswer2(BaseModel):
    value: bool

answer_2_prompt =""" 
Your job is to check if we have the exact answer, or if we need to take more steps to solve the problem.
Return True if and only if the steps contain the exact answer that the problem asks for
If the steps are finished, and give an exact answer to the Problem, return True.
Else, if the steps do not give an exact answer (have unsolved equations, etc.) then return False

Given the Problem: 
{problem}

And the steps to solve it:
{steps_str}
            
"""

async def do_we_have_answer2(problem:str, steps: list[str]):
    steps_str = "\n".join([f"Step {i+1}:\n {step}" for i, step in enumerate(steps)])
    messages = [
        {
            "role": "user",
            "content": answer_2_prompt.format(problem=problem, steps_str=steps_str),
        },
    ]
    true_or_false: HaveAnswer2 = await agenerate_from_gpt_with_schema(messages, HaveAnswer2)
    return true_or_false.value

In [16]:
# Rewriting this to include the question

final_answer_prompt2 = """ 
Return ONLY the final answer to the Problem from the given Steps.
The answer should be no more than a line long. Return ONLY the EXACT answer given in the Text.

Given the Problem: 
{problem}
            
And the Steps
{steps_str}
"""

async def get_answer_from_steps2(problem, steps: list[str]):
    steps_str = "\n".join([f"Step {i+1}:\n {step}" for i, step in enumerate(steps)])
    messages = [
        {
            "role": "user",
            "content": final_answer_prompt2.format(problem=problem, steps_str=steps_str),
        },
    ]
    return await agenerate_from_gpt(messages)

#### New Loop

In [17]:
# Redefine the loop to incorporate stopping check
# Runs in a while True loop. 
# Once we have the MIN num, then we check each time for do_we_have_answer check.
# Will always generate more unverified steps if there are none left (min 1 step)
from utils.async_logger import AsyncLogger


async def step_by_step_loop_2(question: str, log_path:str):
    AsyncLogger.add_message(log_path, f"Starting run {log_path}",)
    # More of a suggestion.. used as a starting point. The regeneration will always generate at least 1 step
    MIN_STEPS = 5

    unverified_steps = []
    verified_steps = []
    while True:
        # Generate if unverified_steps empty.
        if not unverified_steps:
            desired_steps = max(1, MIN_STEPS - len(verified_steps))
            unverified_steps = await create_steps_2(question, verified_steps, desired_steps)
            AsyncLogger.add_message(log_path, f"Generated steps: \n{unverified_steps}")

        # Verify the current step
        current_step = unverified_steps.pop(0)
        verify = await verify_step(question, verified_steps, current_step)
        AsyncLogger.add_message(log_path, f"Verifying current step: \n{current_step}")
        
        # If correct, add to verified_steps
        if verify.correct:
            AsyncLogger.add_message(log_path, "Step was correct. Add to verified")
            verified_steps.append(current_step)
        else:
            # Since the step was incorrect, throw away the rest of the unverified steps (triggers regeneration next time)
            unverified_steps = []

            # Try fixing step -> overwrite current_step
            current_step = await fix_step(question, verified_steps, current_step, verify.reasoning)
            AsyncLogger.add_message(log_path, f"Fix the current step for reasoning: {verify.reasoning}")
            AsyncLogger.add_message(log_path, f"Fixed step: {current_step}")
            
            # Verify fixed step
            verify_fixed = await verify_step(question, verified_steps, current_step)
            AsyncLogger.add_message(log_path, f"Fixed step was verified as: {verify_fixed.correct}")
            
            # Add if correct
            if verify_fixed.correct:
                verified_steps.append(current_step)
            else:
                # If the fixed_step is incorrect, skip checking for the answer
                continue
            
        # We want to at least generate 5 steps.. to avoid premature stopping
        if len(verified_steps) >= MIN_STEPS:
            # Check if all the steps give the answer
            if await do_we_have_answer2(question, verified_steps):
                AsyncLogger.add_message(log_path, f"We have answer triggered")
                break
                
                
    # Get the final answer
    final_answer = await get_answer_from_steps2(question, verified_steps)            
    
    steps_str = "\n".join([f"Step {i+1}:\n{step}" for i, step in enumerate(verified_steps)])
    AsyncLogger.add_message(log_path, "We have our verified steps:==========\n")
    AsyncLogger.add_message(log_path, steps_str)
    AsyncLogger.add_message(log_path, f"And our Final Answer\n{final_answer}")
    
    await AsyncLogger.flush_one(log_path)
    return final_answer

#### Run

In [18]:
# We can reuse the async wrapper

# Using same question as idea 1
# Answer to q1 is 147*sqrt(3)
question1 = "Point $P$ is inside equilateral triangle $ABC$ such that the altitudes from $P$ to $\overline{AB}$, $\overline{BC}$, and $\overline{CA}$ have lengths 8, 6, and 7 respectively.  What is the area of triangle $ABC$?"

# Run # 2
await run_many_times_async(step_by_step_loop_2, question1, max_concurrent_tasks=10, runs=40)


#### Notes

I had a lot of trouble with checking that the EXACT answer was included in the step. 
For some reason it loves to trigger on non exact answer.

This is why we have the sanity_check() function.

This still doesn't help that much...
Let's try making it wait until it hits the max step limits

### Below is some old stuff re: math relationships and trying to identify the different types of assumptions. We can try some of it out later

In [19]:
# 2. Analyze the step, and extract the following: 
# proven mathematical relationships (external)
# Problem relationships (conditions and 2nd level conditions). These need to be supported by the question.
# Intermediate math relationships - furtther extract from each one,
    # proven math rels
    # problem rels

In [20]:
# 3. Verify and *correct* all above. 
# If a math relationship was wrong, we want to correct it by asking gpt
# If a problem relationship was wrong, we want to correct it by asking gpt
# If an intermediate math relationship was wrong, we want to correct it by fixing the problem rel or the math rel, and then re-calculating w/ code assistant

# We should only calculate each relationship once. So if an intermediate math relationship relies on a proven math rel or a problem rel,
# and one of those rels are wrong, it has to be re-calculated.

In [21]:
# 4. Now we have verified the step. If it changed we need to re-run the step generation here. We will re-run # 1 but ask it to generate 4
# steps now, and give it the first step.

In [22]:
# 5. We should do some other stuff here..