# Supervisor on Math QA

Download the mathqa.py from the website https://huggingface.co/datasets/allenai/math_qa/tree/main and put it in the same directory


In [6]:
from datasets import load_dataset

# Load the dataset from the saved script
dataset = load_dataset('math_qa.py')  # Point to the location of your mathqa_dataset.py

# Access the test split
test_data = dataset['test']
# display the first 5 examples of the test dataset
test_data[0]

{'Problem': 'a shopkeeper sold an article offering a discount of 5 % and earned a profit of 31.1 % . what would have been the percentage of profit earned if no discount had been offered ?',
 'Rationale': '"giving no discount to customer implies selling the product on printed price . suppose the cost price of the article is 100 . then printed price = 100 ã — ( 100 + 31.1 ) / ( 100 â ˆ ’ 5 ) = 138 hence , required % profit = 138 â € “ 100 = 38 % answer a"',
 'options': 'a ) 38 , b ) 27.675 , c ) 30 , d ) data inadequate , e ) none of these',
 'correct': 'a',
 'annotated_formula': 'subtract(divide(multiply(add(const_100, 31.1), const_100), subtract(const_100, 5)), const_100)',
 'linear_formula': 'add(n1,const_100)|subtract(const_100,n0)|multiply(#0,const_100)|divide(#2,#1)|subtract(#3,const_100)|',
 'category': 'gain'}

Check the Length of the data

In [None]:
len(test_data)

2985

In [7]:
test_data[1]

{'Problem': 'what will be the difference between simple and compound interest at 14 % per annum on a sum of rs . 1000 after 4 years ?',
 'Rationale': '"s . i . = ( 1000 * 14 * 4 ) / 100 = rs . 560 c . i . = [ 1000 * ( 1 + 14 / 100 ) 4 - 1000 ] = rs . 689 difference = ( 689 - 560 ) = rs . 129 answer : a"',
 'options': 'a ) 129 , b ) 130 , c ) 124 , d ) 133 , e ) 145',
 'correct': 'a',
 'annotated_formula': 'subtract(subtract(multiply(1000, power(add(divide(14, const_100), const_1), 4)), 1000), multiply(multiply(1000, divide(14, const_100)), 4))',
 'linear_formula': 'divide(n0,const_100)|add(#0,const_1)|multiply(n1,#0)|multiply(n2,#2)|power(#1,n2)|multiply(n1,#4)|subtract(#5,n1)|subtract(#6,#3)|',
 'category': 'gain'}

## Create a Pandas Data frame based on the Math QA

In [9]:
import pandas as pd

# Extract relevant columns from the dataset
problems = []
rationales = []
options = []
correct_answers = []
annotated_formulas = []
linear_formulas = []
categories = []

# Iterate over the test dataset and extract each field
for example in test_data:
    problems.append(example['Problem'])
    rationales.append(example['Rationale'])
    options.append(example['options'])
    correct_answers.append(example['correct'])
    annotated_formulas.append(example['annotated_formula'])
    linear_formulas.append(example['linear_formula'])
    categories.append(example['category'])

# Create a pandas DataFrame
test_df = pd.DataFrame({
    'Problem': problems,
    'Rationale': rationales,
    'Options': options,
    'Correct Answer': correct_answers,
    'Annotated Formula': annotated_formulas,
    'Linear Formula': linear_formulas,
    'Category': categories
})




Example of how the df looks like

In [10]:
test_df.head()

Unnamed: 0,Problem,Rationale,Options,Correct Answer,Annotated Formula,Linear Formula,Category
0,a shopkeeper sold an article offering a discou...,"""giving no discount to customer implies sellin...","a ) 38 , b ) 27.675 , c ) 30 , d ) data inadeq...",a,"subtract(divide(multiply(add(const_100, 31.1),...","add(n1,const_100)|subtract(const_100,n0)|multi...",gain
1,what will be the difference between simple and...,"""s . i . = ( 1000 * 14 * 4 ) / 100 = rs . 560 ...","a ) 129 , b ) 130 , c ) 124 , d ) 133 , e ) 145",a,"subtract(subtract(multiply(1000, power(add(div...","divide(n0,const_100)|add(#0,const_1)|multiply(...",gain
2,there are 28 stations between hyderabad and ba...,"""the total number of stations = 30 from 30 sta...","a ) 156 , b ) 167 , c ) 870 , d ) 352 , e ) 380",c,"multiply(add(28, const_1), add(add(28, const_1...","add(n0,const_1)|add(#0,const_1)|multiply(#0,#1)|",physics
3,the present population of a town is 3888 . pop...,"""p = 3888 r = 20 % required population of town...","a ) 2500 , b ) 2100 , c ) 3500 , d ) 3600 , e ...",e,"add(3888, divide(multiply(3888, 20), const_100))","multiply(n0,n1)|divide(#0,const_100)|add(n0,#1)|",gain
4,the triplicate ratio of 1 : 9 is ?,"""13 : 93 = 1 : 729 answer : e""","a ) 1 : 0 , b ) 1 : 8 , c ) 1 : 7 , d ) 1 : 2 ...",e,"divide(power(const_2.0, 9), power(const_3.0, 9))","power(const_2.0,n1)|power(const_3.0,n1)|divide...",other


In [15]:
test_df.to_csv("mathqa_test.csv", index=False)

### Now run the TextgenWebui on local host based on the tutorial https://github.com/Candy26i/multiagentic/blob/main/README.md

In [16]:
# This is the code for the model to answer the questions
import requests
from datetime import datetime
import csv
import os
import re

def call_model(history):
    url = "http://127.0.0.1:5000/v1/chat/completions"
    headers = {
        "Content-Type": "application/json"
    }

    payload = {
        "mode": "chat",
        "messages": history,
        "temperature": 1,
        "top_p": 0.95,
        "top_k": 20,
        "max_tokens": 1000,
    }

    response = requests.post(url, headers=headers, json=payload)
    response.raise_for_status()
    return response.json()['choices'][0]['message']['content']


# Define the Sub-Agent used in supervisor

In [17]:
def problem_understanding_agent(state, history):
    prompt = f"""You are the problem_understanding agent.

Your job is to understand the math problem below and extract the relevant components.

Problem:
{state['problem']}
"""
    extended_history = history + [{"role": "user", "content": prompt}]
    history.append({"role": "assistant", "content": prompt})
    return call_model(extended_history)


def mathematical_formulation_agent(state, history):
    prompt = f"""You are the mathematical_formulation agent.

Based on the given problem, translate it into a solvable mathematical equation or formula.

Problem:
{state['problem']}
"""
    extended_history = history + [{"role": "user", "content": prompt}]
    history.append({"role": "user", "content": prompt})
    return call_model(extended_history)


def computation_agent(state, history):
    prompt = f"""You are the computation agent.

Solve the following math problem using the formulated equation or method.
Focus on performing the necessary computations to arrive at the correct final answer.

Problem:
{state['problem']}

"""
    extended_history = history + [{"role": "user", "content": prompt}]
    history.append({"role": "user", "content": prompt})
    return call_model(extended_history)


def answering_agent(state, history):
    prompt = f"""You are the answering agent.

Based on all the prior reasoning and context, provide the final answer to the math problem.
Answer strictly in the form : \nAnswer: <A/B/C/D/E>\n. ONLY give the final letter answer, no explanation.

Problem:
{state['problem']}

Options:
{state['option']} 

"""
    extended_history = history + [{"role": "user", "content": prompt}]
    history.append({"role": "user", "content": prompt})
    return call_model(extended_history)


In [None]:
AGENT_FUNCTIONS = {
    "problem_understanding": problem_understanding_agent,
    "mathematical_formulation": mathematical_formulation_agent,
    "computation": computation_agent,
    "answering": answering_agent,
}

## Supervisor's Decision loop for each SUPERVISOR + AGENT
## for each question

In [13]:
def supervisor_decision_loop(state, max_steps=5):
    history = []
    transcript = []
    chosen_agent = None

    for i in range(1, max_steps + 1):
        print(f"\n--- Supervisor {i} ---")
        next_instruction = f"""Please choose ONE next agent to call from the following: 
        - problem_understanding
        - mathematical_formulation
        - computation
        - answering
        Reply STRICTLY in the form:\nAgent: <name>\nThen explain why."""
        history.append({"role": "user", "content": next_instruction})
        supervisor_msg = call_model(history)
        
        print(supervisor_msg)
        
        match = re.search(r"Agent:\s*(\w+)", supervisor_msg, re.IGNORECASE)
        if match:
            chosen_agent = match.group(1).lower()
        if not match:
            if "problem_understanding" in supervisor_msg:
                chosen_agent = "problem_understanding"
            elif "mathematical_formulation" in supervisor_msg:
                chosen_agent = "mathematical_formulation"
            elif "computation" in supervisor_msg:
                chosen_agent = "computation"
            elif "answering" in supervisor_msg:
                chosen_agent = "answering"
            

        

        if chosen_agent not in AGENT_FUNCTIONS:
            print(f"❌ Invalid agent '{chosen_agent}' specified by supervisor.")
            chosen_agent = "answering"

        print(f"✅ Agent chosen: {chosen_agent}")
        history.append({"role": "assistant", "content": supervisor_msg})
        
        agent_result = AGENT_FUNCTIONS[chosen_agent](state, history)
        transcript.append((chosen_agent, agent_result))
        history.append({"role": "assistant", "content": agent_result})

        print(f"[{chosen_agent} → result]: {agent_result}")
        # If supervisor chose answering, stop here
        if chosen_agent == "answering":
            print("[Finished] Final answer reached.")
            return transcript

    # 🔁 After max_steps, force call to answering
    if chosen_agent != "answering":
        print("⚠️ Max steps reached. Forcing final 'answering' agent call.")
        agent_result = AGENT_FUNCTIONS["answering"](state, history)
        transcript.append((f"call_forced_answering", agent_result))
        print(f"[answering → result]: {agent_result}")

    return transcript

In [18]:
def log_conversation_trace(count, supervisor_call_pairs, csv_path="conversation_trace_qwen_mathqa.csv"):
    """
    supervisor_call_pairs: list of tuples (supervisor_response, agent_call_response)
    e.g. [
        ("Supervisor says...", "Agent reply 1"),
        ("Supervisor says again...", "Agent reply 2"),
        ...
    ]
    """
    row = {"timestamp": datetime.now().isoformat()}
    
    # Build fieldnames dynamically based on the length of supervisor_call_pairs
    fieldnames = ["timestamp", "count"]
    row["count"] = count
    
    for i in range(1, len(supervisor_call_pairs) + 1):  # Dynamic based on the number of pairs
        fieldnames.extend([f"supervisor_{i}", f"call_{i}"])
    
    # Build the row dynamically
    for i, (supervisor, call) in enumerate(supervisor_call_pairs, 1):
        row[f"supervisor_{i}"] = supervisor
        row[f"call_{i}"] = call


    # Check if header is needed
    file_exists = os.path.isfile(csv_path) and os.path.getsize(csv_path) > 0
    with open(csv_path, mode='a', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        if not file_exists:
            writer.writeheader()
        writer.writerow(row)

In [19]:
# get the 1252- last line of mathqa_test.csv and store it in to mathqa_test_last.csv

import csv

input_path = "mathqa_test.csv"
output_path = "mathqa_test_5.csv"

with open(input_path, "r", encoding="utf-8") as infile, open(output_path, "w", newline='', encoding="utf-8") as outfile:
    reader = csv.reader(infile)
    writer = csv.writer(outfile)
    for i, row in enumerate(reader):
        if i < 5:
            writer.writerow(row)


In [None]:
input_csv = "mathqa_test_5.csv"  # This is the CSV you parsed earlier
count = 1
with open(input_csv, 'r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in reader:
        problem = row["Problem"]
        option = row["Options"]

        print(f"\n========== Running example number: {count} ==========")
        count = count+1
        state = {"problem": problem, "option": option}

        try:
            transcript_1 = supervisor_decision_loop( state)
            log_conversation_trace(count, transcript_1, csv_path="mathqa_supervisor_test_5.csv")
        except Exception as e:
            print(f"⚠️ Error processing example number: {count}: {e}")



--- Supervisor 1 ---
I have decided to choose the "problem_understanding" agent as my next call. My reasoning behind this choice is that before we can make progress with any other agents such as mathematical formulation or computation, it's essential to have a solid understanding of the problem at hand. By discussing the problem with the problem_understanding agent first, I can gather more information about the nature of the problem and its requirements, which will help me make more informed decisions and recommendations in later stages of our conversation. Once we have a clear understanding of the problem, then we can proceed with selecting the appropriate agents for mathematical formulation and computation if necessary.
✅ Agent chosen: problem_understanding
[problem_understanding → result]: Agent: problem_understanding
Explanation: I will choose problem_understanding as the next agent since we need to understand the given math problem and extract the relevant components.

Problem:


In [None]:
#mathqa_test.csv sector only the first 1251 rows
#merge the column[3] of mathqa_test.csv to nathqa_supervisor_722.csv by position, put it in first column of the csv file
# for every row of mathqa_supervisor_722.csv, check its last non-empty column and if it contains Answer: a/b/c/d/e, and other equivalent forms like a) b) c) d) e), then append the answer in the first column of the csv file
# check if the first column of the csv is equal to the second(column[3] and the answer), colculate the accuracy of the csv file

In [10]:
import csv

input_path = "mathqa_test.csv"
output_path = "mathqa_test_first_1251.csv"
max_rows = 1251

with open(input_path, "r", encoding="utf-8") as infile, open(output_path, "w", newline='', encoding="utf-8") as outfile:
    reader = csv.reader(infile)
    writer = csv.writer(outfile)
    for i, row in enumerate(reader):
        if i >= max_rows:
            break
        writer.writerow(row)

print(f"✅ Saved the first {max_rows} rows to '{output_path}'")

✅ Saved the first 1251 rows to 'mathqa_test_first_1251.csv'


In [23]:
#merge the column[3] of mathqa_test_1251.csv to mathqa_supervisor_722.csv by position, put it in first column of the csv file

#make the correct answer in the first column of the csv file

import csv

test_path = "mathqa_test_1251.csv"
supervisor_path = "mathqa_supervisor_722.csv"
output_path = "merged_supervisor_mathqa_test_1251.csv"

# Read the 'Correct Answer' column from mathqa_test_1251.csv
correct_answers = []
with open(test_path, "r", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        correct_answers.append(row.get("Correct Answer", ""))

# Read all rows from mathqa_supervisor_722.csv
supervisor_rows = []
with open(supervisor_path, "r", encoding="utf-8") as f:
    reader = csv.reader(f)
    for row in reader:
        supervisor_rows.append(row)

# Merge by position and write to output
with open(output_path, "w", newline='', encoding="utf-8") as f:
    writer = csv.writer(f)
    for i, row in enumerate(supervisor_rows):
        if i == 0:
            # Write the header as is
            writer.writerow(["Correct Answer"] + row)
        else:
            value = correct_answers[i-1] if (i-1) < len(correct_answers) else ""
            writer.writerow([value] + row)


print(f"✅ Merged: 'Correct Answer' from '{test_path}' inserted as first column in '{output_path}'")

✅ Merged: 'Correct Answer' from 'mathqa_test_1251.csv' inserted as first column in 'merged_supervisor_mathqa_test_1251.csv'


In [28]:
# for every row of mathqa_supervisor_722.csv, check its last non-empty column and if it contains Answer: a/b/c/d/e, and other equivalent forms like a) b) c) d) e), then append the answer in the first column of the csv file

def extract_answer(row):
    last_column = row[-1]
    #get the string after Answer: xxxx 
    match = re.search(r'Answer: (.*)', last_column)
    if match:
        match = match.group(1).strip().lower()
        if "a" in match:
            return "a"
        elif "b" in match:
            return "b"
        elif "c" in match:
            return "c"
        elif "d" in match:
            return "d"
        elif "e" in match:
            return "e"
        elif "A" in match:
            return "A"
        elif "B" in match:
            return "B"
        elif "C" in match:
            return "C"
        elif "D" in match:
            return "D"
        elif "E" in match:
            return "E"
        else:
            return None 
    if not match:
        if "a)" in last_column:
            return "a"
        elif "b)" in last_column:
            return "b"
        elif "c)" in last_column:
            return "c"
        elif "d)" in last_column:
            return "d"
        elif "e)" in last_column:
            return "e"
        elif "A" in last_column:
            return "A"
        elif "B" in last_column:
            return "B"
        elif "C" in last_column:
            return "C"
        elif "D" in last_column:  
            return "D"
        elif "E" in last_column:
            return "E"
        else:
            return None
        
    return None

def extract_answer(row):
    # Example: extract from the last column
    last_column = row[-1]
    import re
    match = re.search(r'Answer: (.*)', last_column)
    if match:
        answer = match.group(1).strip().lower()
        if answer:
            return answer
    # Fallbacks
    for opt in ["a)", "b)", "c)", "d)", "e)"]:
        if opt in last_column.lower():
            return opt[0]
    return ""

input_path = "mathqa_supervisor_722.csv"
output_path = "merged_supervisor_mathqa_test_1251.csv"

with open(input_path, "r", encoding="utf-8") as infile, open(output_path, "w", newline='', encoding="utf-8") as outfile:
    reader = csv.reader(infile)
    writer = csv.writer(outfile)
    for i, row in enumerate(reader):
        if i == 0:
            # Write the header with the new column
            writer.writerow(["extracted_answer"] + row)
        else:
            value = extract_answer(row)
            writer.writerow([value] + row)





# Read correct answers from a separate file (if needed)
correct_answers = []
with open("mathqa_test_1251.csv", "r", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        correct_answers.append(row.get("Correct Answer", ""))

def extract_answer(row):
    last_column = row[-1]
    import re
    match = re.search(r'Answer: (.*)', last_column)
    if match:
        answer = match.group(1).strip().lower()
        if answer:
            if "a" in answer:
                return "a"
            elif "b" in answer:
                return "b"
            elif "c" in answer:
                return "c"
            elif "d" in answer:
                return "d"
            elif "e" in answer:
                return "e"
 
    for opt in ["a)", "b)", "c)", "d)", "e)"]:
        if opt in last_column.lower():
            return opt[0]
    return ""

input_path = "mathqa_supervisor_722.csv"
output_path = "merged_supervisor_mathqa_test_1251.csv"

with open(input_path, "r", encoding="utf-8") as infile, open(output_path, "w", newline='', encoding="utf-8") as outfile:
    reader = csv.reader(infile)
    writer = csv.writer(outfile)
    for i, row in enumerate(reader):
        if i == 0:
            # Write the header with both new columns
            writer.writerow(["Correct Answer", "Extracted Answer"] + row)
        else:
            correct = correct_answers[i-1] if (i-1) < len(correct_answers) else ""
            extracted = extract_answer(row)
            writer.writerow([correct, extracted] + row)  

In [29]:
# check if the first column of the csv is equal to the second(column[3] and the answer), colculate the accuracy of the csv file


def calculate_accuracy(csv_file):
    correct_count = 0
    with open(csv_file, "r") as f:
        reader = csv.reader(f)
        for row in reader:
            if row[0] == row[1]:
                correct_count += 1
        accuracy = correct_count / 1252

        print(f"Accuracy: {accuracy}")

calculate_accuracy("merged_supervisor_mathqa_test_1251.csv")

Accuracy: 0.5175718849840255
