# IMPORTS

In [None]:
import numpy as np
import pandas as pd
import re
import torch
from difflib import SequenceMatcher
from sympy import N, symbols, sympify
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, T5Tokenizer

# OUR MODEL

## LOADING (FROM HUGGINGFACE)

In [None]:
# The model is stored on HuggingFace due to large size

model_fromhf = "andrewyw/mathsolverprelim"

tokenizer = AutoTokenizer.from_pretrained(model_fromhf)
model = AutoModelForSeq2SeqLM.from_pretrained(model_fromhf)

## PREPARE TEST SET

In [None]:
tokenizer = T5Tokenizer.from_pretrained('google/flan-t5-base')

test_df = pd.read_csv('test_answerextracted.csv')

def insert_spaces(formula):
    if not isinstance(formula, str):
        return formula
    return re.sub(r'([(),])', r' \1 ', formula).replace("  ", " ").strip()


def remove_const(expression):
    return re.sub(r'const_([-0-9_.]+)', r'\1', expression)

ops = ['add', 'subtract', 'multiply', 'divide', 'power', 'sqrt', 'log', 'choose', 'speed',
       'volume_rectangular_prism', 'square_area', 'circle_area', 'circumface']

def fuse_operator_parens(expression, operators):
    for op in operators:
        expression = re.sub(rf'\b{op}\s*\(', f'{op}(', expression)
    return expression

test_df['annotated_formula'] = test_df['annotated_formula'].apply(insert_spaces)
test_df['annotated_formula'] = test_df['annotated_formula'].apply(remove_const)
test_df['annotated_formula'] = test_df['annotated_formula'].apply(lambda x: fuse_operator_parens(x, ops))
test_df['count'] = test_df["annotated_formula"].apply(lambda x: len(tokenizer.encode(x, truncation=False)))
test_df = test_df[test_df["count"] <= 30]
test_df['count2'] = test_df["Problem"].apply(lambda x: len(tokenizer.encode(x, truncation=False)))
test_df = test_df[test_df["count2"] <= 100]
test_df = test_df.reset_index(drop=True)

## N-L CLOSENESS FORMULA

In [None]:
def normalized_levenshtein(pred, truth):
    ratio = SequenceMatcher(None, pred, truth).ratio()
    return ratio

## PREDICTIONS

In [None]:
def batch_output_formula_pretrained(model, tokenizer, problems, batch_size=32):
    results = []
    device = next(model.parameters()).device

    for i in range(0, len(problems), batch_size):
        batch = problems[i:i + batch_size]
        inputs = tokenizer(batch.tolist(), return_tensors="pt", padding=True, truncation=True, max_length=512)
        inputs = {k: v.to(device) for k, v in inputs.items()}

        with torch.no_grad():
            output_ids = model.generate(
                input_ids=inputs['input_ids'],
                attention_mask=inputs['attention_mask'],
                max_length=100,
                min_length=10,
                do_sample=False,
                num_beams=4,
                early_stopping=True
            )

        decoded_outputs = tokenizer.batch_decode(output_ids, skip_special_tokens=True)
        results.extend(decoded_outputs)

    return results

testcopy_df = test_df.sample(n = 100, random_state=1).reset_index(drop=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(torch.float32)
model.eval()

testcopy_df['prediction'] = batch_output_formula_pretrained(model, tokenizer, testcopy_df['Problem'])

## METRICS

In [None]:
testcopy_df['score'] = testcopy_df.apply(lambda x: normalized_levenshtein(x['prediction'], x['annotated_formula']), axis=1)
print(testcopy_df['score'].mean())

## EXEMPLIFICATION OF QUESTIONS THAT WORKED WELL

In [None]:
high_df = testcopy_df[testcopy_df['score'] >= 0.9][['Problem', 'annotated_formula', 'prediction', 'score']]
high_df.head(10)

## EXEMPLIFICATION OF QUESTIONS THAT DID NOT WORK WELL

In [None]:
low_df = testcopy_df[testcopy_df['score'] < 0.5][['Problem', 'annotated_formula', 'prediction', 'score']]
low_df.head(10)

## SYMPY

In [None]:
const_100 = symbols('const_100')

def evaluate_functional_expression(expr_str):
    stack = []
    num_buffer = ""
    i = 0
    while i < len(expr_str):
        char = expr_str[i]

        if char.isalnum() or char == '.':
            num_buffer += char
        elif char == "_":
            num_buffer += '.'
        elif char == "(":
            if num_buffer:
                if num_buffer.startswith("const_"):
                    const_value = num_buffer.replace("const_", "").replace("_", ".")
                    stack.append(const_value)
                else:
                    stack.append(num_buffer)
                num_buffer = ""

        elif char == "," or char == ")":
            if num_buffer:
                if num_buffer.startswith("const_"):
                    const_value = num_buffer.replace("const_", "").replace("_", ".")
                    stack.append(const_value)
                else:
                    stack.append(num_buffer)
                num_buffer = ""

            if char == ")":
                args = []
                while stack and stack[-1] not in {"add", "subtract", "multiply", "divide"}:
                    args.append(stack.pop())
                args.reverse()

                if stack:
                    func = stack.pop()
                    if func == "add":
                        result = f"({args[0]} + {args[1]})"
                    elif func == "subtract":
                        result = f"({args[0]} - {args[1]})"
                    elif func == "multiply":
                        result = f"({args[0]} * {args[1]})"
                    elif func == "divide":
                        result = f"({args[0]} / {args[1]})"
                    stack.append(result)

        i += 1

    return stack[0] if stack else ""


def check_answer_numeric(x):
    try:
        math_expr = evaluate_functional_expression(x)
        sympy_expr = sympify(math_expr, locals={'const_100': 100})
        return sympy_expr.simplify()
    except Exception as e:
        return

In [None]:
testcopy_df['pred_ans'] = testcopy_df['prediction'].apply(lambda x: check_answer_numeric(x))
def is_close(pred, truth, rtol=1e-5, atol=1e-1):
    try:
        if pred is None or truth is None:
            return False
        return np.isclose(float(N(pred)), float(N(truth)), rtol=rtol, atol=atol)
    except:
        return False

testcopy_df['is_close'] = testcopy_df.apply(lambda row: is_close(row['pred_ans'], row['answer_numeric']), axis=1)
testing = testcopy_df[testcopy_df['pred_ans'].notna()]

print("Accuracy:", np.round(testing['is_close'].mean(), 2))