In [81]:
import os
import google.generativeai as genai
import json
import warnings
import re
import textwrap
import time
from google.api_core.exceptions import InternalServerError
warnings.filterwarnings('ignore')

from utils import get_gemini_api_key
gemini_api_key = get_gemini_api_key()

genai.configure(api_key=gemini_api_key)

model = genai.GenerativeModel('gemini-1.5-flash')

from datasets import load_dataset

ds = load_dataset("AI-MO/aimo-validation-math-level-5")
ds = ds['train']

In [82]:
import random

with open("seed.txt", "r") as f:
    seed = int(f.read())
shuffled_ds = ds.shuffle(seed=seed)

random_sample = shuffled_ds

In [83]:
# Define function to save results to a file
def save_results_to_file(results, output_file):
    with open(output_file, 'w') as f:
        json.dump(results, f, indent=4)
def extract_answer(answer):
    match = re.findall(r'\\boxed\{(.*?)\}', answer)  # Find all occurrences of '()'
    if match:
        return match[-1]  # Return the last occurrence
    return None

In [84]:
def review_agent(task, gota, max_retries=5, delay=5):
    # Enhanced prompt with clear instructions
    prompt = f"""
You are a mathematical problem-solving expert. Please evaluate the following task graph designed to solve the math problem:{task}. Assess it based on redundancy, completeness, and feasibility. Let's think step by step. Provide a score from 1 to 10 for each criterion, and briefly explain your reasoning.

Task Graph:
{gota}

Please provide your evaluation **exactly** in the following format, without adding extra symbols or formatting:
Redundancy: X - Explanation: [Your explanation here]

Completeness: X - Explanation: [Your explanation here]

Feasibility: X - Explanation: [Your explanation here]
"""

    required_keys = ['Redundancy', 'Completeness', 'Feasibility']
    
    for attempt in range(1, max_retries + 1):
        try:
            response = model.generate_content(prompt)
            texts = [part.text for part in response.parts]
            evaluation = ''.join(texts)
            print(f"Attempt {attempt}: Raw Evaluation Response:\n{evaluation}\n")
            
            scores = parse_evaluation(evaluation)
            print(f"Attempt {attempt}: Parsed Scores:\n{scores}\n")
            
            # Check if all required keys are present
            missing_keys = [key for key in required_keys if key not in scores]
            if not missing_keys:
                return scores  
            else:
                print(f"Attempt {attempt}: Missing keys detected: {missing_keys}. Retrying...\n")
                time.sleep(delay)
        except InternalServerError as e:
            print(f"Attempt {attempt}: Internal server error encountered: {e}. Retrying...\n")
            if attempt < max_retries:
                time.sleep(delay)
    
    print("All attempts failed. Returning None.")
    return None  


In [85]:
def parse_evaluation(evaluation_text):
    import re
    # Initialize the scores dictionary
    scores = {}
    lines = evaluation_text.strip().split('\n')
    criterion = None
    score = None
    explanation_lines = []
    i = 0
    while i < len(lines):
        line = lines[i].strip()
        # Try to match the criteria line and capture possible initial explanation content
        match = re.match(r"^\**(Redundancy|Completeness|Feasibility)\**:*\s*(\d+)\s*-\s*Explanation:\s*(.*)", line, re.IGNORECASE)
        if match:
            # If there was a previous criterion, save its explanation
            if criterion:
                explanation = ' '.join(explanation_lines).strip()
                scores[criterion] = int(score)
                scores[f"{criterion}_Explanation"] = explanation
                # Reset explanation lines
                explanation_lines = []
            # Get new criterion and score
            criterion = match.group(1).strip().capitalize()
            score = int(match.group(2).strip())
            # Get any initial explanation content on the same line
            initial_explanation = match.group(3).strip()
            if initial_explanation:
                explanation_lines.append(initial_explanation)
            # Move to the next line to collect additional explanation
            i += 1
            while i < len(lines):
                next_line = lines[i].strip()
                # Check if next line is another criterion, summary, or empty
                if re.match(r"^\**(Redundancy|Completeness|Feasibility|Summary):?\**", next_line, re.IGNORECASE) or next_line == '':
                    break
                else:
                    explanation_lines.append(next_line)
                    i += 1
            continue  # Process next criterion
        else:
            i += 1
    # Save the last criterion's explanation
    if criterion:
        explanation = ' '.join(explanation_lines).strip()
        scores[criterion] = int(score)
        scores[f"{criterion}_Explanation"] = explanation
    return scores


In [86]:
def chain_of_task(task, agents, plan_rounds, max_retries=5, delay=5):
    agent_contexts = [
        [{
            "role": "user",
            "content": f"""
You are an AI agent tasked with constructing a detailed chain of tasks to solve the math problem: {task}. Let's think step by step. The task graph should consist of interconnected subtasks where the result of one subtask determines the next steps. For example, if a subtask yields Result A, proceed to Task B; if it yields Result C, proceed to Task D, and so on.
Important: Do not provide answers or solutions to the tasks. Your role is solely to create the task chain without solving the tasks themselves.
"""

        }] for _ in range(agents)
    ]
    
    for i, agent_context in enumerate(agent_contexts):
        agent_context_text = "\n".join([f"{msg['role']}: {msg['content']}" for msg in agent_context])
        for attempt in range(1, max_retries + 1):
            try:
                response = model.generate_content(agent_context_text)
                assistant_message = construct_assistant_message(response)
                agent_context.append(assistant_message)
                break
            except InternalServerError:
                if attempt < max_retries:
                    time.sleep(delay)
    
    agent_scores = []
    for i, agent_context in enumerate(agent_contexts):
        gota = agent_context[-1]["content"]
        score = review_agent(task,gota)
        agent_scores.append(score)
    
    for round_num in range(1, plan_rounds):
        for i, agent_context in enumerate(agent_contexts):
            own_score = agent_scores[i]
            own_feedback = f"Based on the peer evaluations, here is the feedback for your task graph:\nRedundancy: {own_score['Redundancy']} - {own_score['Redundancy_Explanation']}\nCompleteness: {own_score['Completeness']} - {own_score['Completeness_Explanation']}\nFeasibility: {own_score['Feasibility']} - {own_score['Feasibility_Explanation']}\n"
            other_proposals = ""
            for j, other_agent_context in enumerate(agent_contexts):
                if j != i:
                    other_gota = other_agent_context[-1]["content"]
                    other_score = agent_scores[j]
                    other_proposals += f"\n\nAgent {j+1} proposal:\nTask graph: {other_gota}\nScores:\nRedundancy: {other_score['Redundancy']} - {other_score['Redundancy_Explanation']}\nCompleteness: {other_score['Completeness']} - {other_score['Completeness_Explanation']}\nFeasibility: {other_score['Feasibility']} - {other_score['Feasibility_Explanation']}\n"
            message_content = f"{own_feedback}\n\nHere are some proposals from other agents:{other_proposals}\n\nUsing these proposals as additional advice, what is your updated task plan?"
            message = {"role": "user", "content": message_content}
            agent_context.append(message)
            agent_context_text = "\n".join([f"{msg['role']}: {msg['content']}" for msg in agent_context])
            for attempt in range(1, max_retries + 1):
                try:
                    response = model.generate_content(agent_context_text)
                    assistant_message = construct_assistant_message(response)
                    agent_context.append(assistant_message)
                    break
                except InternalServerError:
                    if attempt < max_retries:
                        time.sleep(delay)
            gota = agent_context[-1]["content"]
            score = review_agent(task,gota)
            agent_scores[i] = score  
    return agent_contexts, agent_scores


def construct_assistant_message(response):
    texts = [part.text for part in response.parts]
    agent_answer = ''.join(texts)
    return {"role": "assistant", "content": agent_answer}

In [87]:
def select_plan(task, agent_contexts, agent_scores, max_retries=5, delay=5):
    best_index = None
    best_score = -1
    for i, score in enumerate(agent_scores):
        total_score = score['Redundancy'] + score['Completeness'] + score['Feasibility']
        if total_score > best_score:
            best_score = total_score
            best_index = i
    best_plan = agent_contexts[best_index][-1]['content']
    return best_plan

In [88]:
def construct_message_answer(other_answers, idx, task_chain, ids):
    prefix_string = f"""The following is the task graph for your reference : {task_chain}."""
    prefix_string = prefix_string + "\n Here are answers from other agents: "
    if len(other_answers) == 0:
        return {"role": "user", "content": "Closely examine your answer and provide an updated version."}

    for i, plan in enumerate(other_answers):
        agent_answer = plan[idx]["content"]
        response = f"\n\n Agent {ids[i]} answer: ```{agent_answer}```"
        prefix_string = prefix_string + response

    prefix_string = prefix_string + "\n\n Using the solutions from other agents as additional information, update your answer to the math problem. Let's think step by step. Your final answer should be a single numerical number, in the form \\boxed{{answer}}, at the end of your response."
    
    return {"role": "user", "content": prefix_string}

In [89]:
def get_answer(task, agents, task_chain, answer_rounds, max_retries=5, delay=5):
    agent_contexts = [[{
        "role": "user",
        "content": f"""
Can you solve the following problem? {task}. Let's think step by step.
Your final answer should be a single numerical number, in the form \\boxed{{answer}}, at the end of your response.
"""
    }] for _ in range(agents)]

    agent_list = list(range(1, agents + 1))
    for round_num in range(answer_rounds):
        for i, agent_context in enumerate(agent_contexts):
            if round_num != 0:
                agent_rest = [x for x in agent_list if x != i + 1]
                other_answers = agent_contexts[:i] + agent_contexts[i + 1:]
                message = construct_message_answer(other_answers, 2 * round_num - 1, task_chain, agent_rest)
                agent_context.append(message)
            agent_context_text = "\n".join([f"{msg['role']}: {msg['content']}" for msg in agent_context])
            
            # 重试逻辑
            for attempt in range(1, max_retries + 1):
                try:
                    response = model.generate_content(agent_context_text)
                    assistant_message = construct_assistant_message(response)
                    agent_context.append(assistant_message)
                    break 
                except InternalServerError:
                    if attempt < max_retries:
                        time.sleep(delay)
                    else:
                        break
    return agent_contexts

In [90]:
all_results = []
for sample  in random_sample:
    results = {}
    task = sample['problem']
    agents = 3
    plan_rounds = 2
    answer_rounds = 3
    agent_contexts, agent_scores = chain_of_task(task, agents, plan_rounds)
    task_chain = select_plan(task, agent_contexts, agent_scores)
    answers = get_answer(task, agents, task_chain, answer_rounds)
    results['agent_contexts'] = agent_contexts
    results['task_chain'] = task_chain
    results['answer_text'] = answers
    extracted_data = [
        [(i // 2 + 1, extract_answer(sublist[i]['content'])) for i in range(1, len(sublist), 2)]
        for sublist in answers
    ]
    rounds_data = {}
    for round_index in range(len(extracted_data[0])): 
        round_key = f"round {round_index + 1}"
        rounds_data[round_key] = [item[round_index][1] for item in extracted_data]
    results['answer'] = rounds_data
    results['correct_answer'] = sample['answer']
    all_results.append(results)
output_file = 'filename.json'
save_results_to_file(all_results, output_file)

Attempt 1: Raw Evaluation Response:
Redundancy: 7 - Explanation:Tasks 2a and 2b are somewhat redundant, as they both aim to solve for a variable.  Task 4 is also somewhat redundant as it's a catch-all for if the initial approach fails. However, the redundancy allows for flexibility in solving the problem, which is valuable given the complexity.

Completeness: 9 - Explanation:The task graph covers a range of plausible approaches to the problem. It accounts for both direct solution and the need for simplification or alternative methods.  The steps are logically sequenced. The only potential gap is that it doesn't explicitly mention checking the validity of the solution obtained within the constraints of the problem.

Feasibility: 8 - Explanation:The tasks are generally feasible.  Squaring and adding (Task 1), utilizing trigonometric identities (Task 2b), and attempting to minimize (Task 3) are all standard mathematical techniques. Task 4 is less precisely defined, but offers a reasonable