# End-to-End Math Task Pipeline

This notebook implements a complete, standalone pipeline for processing mathematical tasks. The overall idea is to take a CSV file of natural-language math problems, use a large language model (LLM) to convert them into a structured `MathIR` JSON format, execute the tasks using a symbolic math library, and finally, save the results to an output CSV file.

The process is as follows:
1. **Configuration**: All settings, such as file paths and model parameters, are defined in one place.
2. **LLM JSON Generation**: A `vLLM`-powered generator takes a batch of math problems and converts them into `MathIR` JSON objects.
3. **MathIR Execution**: The core logic from `mathir_parser` is embedded to process the `MathIR` objects and compute the answers using `SymPy`.
4. **Main Pipeline**: The main function orchestrates the entire workflow, from reading the input CSV to writing the final answers.

## 1. Configuration

This cell centralizes all user-configurable parameters. By modifying these variables, you can easily change the input/output files, the language model, and other settings without altering the core logic.

In [None]:
!pip uninstall -y torch
# # If using pip
!pip install vllm==0.4.2
!pip install grpcio==1.62.2
!pip install antlr4-python3-runtime==4.11.0
!pip install networkx shapely sage matplotlib gmpy2 scipy numpy sympy mpmath

In [None]:
import os

# File Paths
INPUT_CSV_PATH = "/kaggle/input/aic-2025-math/test_public_w_answers.csv"
OUTPUT_CSV_PATH = "output_answers.csv"

# vLLM and Model Configuration
VLLM_MODEL_NAME = "Qwen/Qwen2.5-7B-Instruct"
VLLM_TENSOR_PARALLEL_SIZE = 1

# Logging
LOG_LEVEL = "INFO"

# Prompt Engineering
SYSTEM_PROMPT = """You are an expert in mathematics and possessing profound knowledge of the MathIR specification. Your main task is to receive a mathematical problem description in natural language and translate it into a valid JSON object that strictly adheres to the MathIR specification."""


## 2. Imports and Setup

This cell imports all the necessary libraries for the notebook. These include standard libraries for data handling (`pandas`), symbolic mathematics (`sympy`), and LLM interaction (`vllm`), as well as Python's built-in modules for asynchronous operations and data structures.

In [None]:
import asyncio
import json
import logging
import pandas as pd
from typing import List, Literal, Optional, Dict, Any, Tuple, Union
from pydantic import BaseModel
from dataclasses import dataclass
import sympy as sp
from sympy.parsing.latex import parse_latex
from vllm import LLM, SamplingParams

## 3. MathIR Parser and Execution Logic

This cell contains the complete `mathir_parser` module, embedded directly into the notebook to ensure it is self-contained. The code defines the `MathIR` data structures using `Pydantic` models, which guarantees that the JSON from the LLM conforms to a strict schema. It also includes the `run_mathir` function, which serves as the execution engine, parsing the `MathIR` object and using `SymPy` to perform the specified mathematical operations.

In [None]:
# === Helpers: LaTeX → SymPy ===
def to_sympy_expr(s: str) -> sp.Expr:
    """Parse a LaTeX string to a SymPy expression without context."""
    return parse_latex(s)

# === Core IR ===
class SymbolSpec(BaseModel):
    name: str
    domain: Literal['R','R+','Z','N','N+','C'] = 'R'

class FunctionDef(BaseModel):
    name: str
    args: List[str]
    expr: str

class SequenceDef(BaseModel):
    name: str
    args: List[str]
    expr: str

class MatrixDef(BaseModel):
    name: str
    rows: int
    cols: int
    data: List[List[str]]

class DistributionDef(BaseModel):
    name: str
    kind: Literal['bernoulli','binomial','geometric','poisson','hypergeom']
    params: Dict[str, str]

class GeometryDef(BaseModel):
    id: str
    kind: Literal['line','circle','parabola','ellipse','polyline']
    equation: Optional[str] = None
    params: Optional[Dict[str, Any]] = None

class Definitions(BaseModel):
    functions: List[FunctionDef] = []
    sequences: List[SequenceDef] = []
    matrices: List[MatrixDef] = []
    distributions: List[DistributionDef] = []
    geometry: List[GeometryDef] = []

class TransformSpec(BaseModel):
    target: str
    type: Literal['rotation','translation','scaling','substitution']
    angle_deg: Optional[float] = None
    center: Optional[List[float]] = None
    vector: Optional[List[float]] = None
    factor: Optional[float] = None
    subs: Optional[Dict[str, str]] = None

class Condition(BaseModel):
    type: str
    expr: Optional[str] = None
    object: Optional[str] = None
    point: Optional[List[float]] = None
    value: Optional[int] = None

class TargetIntegral(BaseModel):
    type: Literal['integral_def']
    expr: str
    var: str
    limits: List[Any]
    name: Optional[str] = None

class TargetLimit(BaseModel):
    type: Literal['limit']
    expr: str
    var: str
    to: str

class TargetSum(BaseModel):
    type: Literal['sum']
    term: str
    idx: str
    start: str
    end: str

class TargetSolve(BaseModel):
    type: Literal['solve_for']
    unknowns: List[str]
    equations: List[str]

class TargetIneq(BaseModel):
    type: Literal['inequalities']
    inequalities: List[str]

class TargetMatrixSolve(BaseModel):
    type: Literal['solve_for_matrix']
    unknown: str

class TargetProbability(BaseModel):
    type: Literal['probability']
    event_expr: str

class TargetValue(BaseModel):
    type: Literal['value']
    name: str
    expr: str

Target = Union[TargetIntegral, TargetLimit, TargetSum, TargetSolve, TargetIneq, TargetMatrixSolve, TargetProbability, TargetValue]

class OutputSpec(BaseModel):
    mode: Literal['exact','decimal'] = 'decimal'
    round_to: Optional[int] = 3
    simplify: bool = True
    rationalize: bool = False

class ValidationSpec(BaseModel):
    tolerance_abs: float = 1e-9
    check_domain_violations: bool = True

class MathIR(BaseModel):
    meta: Dict[str, Any] = {}
    task_type: Literal['auto','integral','limit','sum','algebra','matrix','probability','geometry','optimize'] = 'auto'
    expr_format: Literal['latex','sympy','infix'] = 'latex'
    assumptions: Dict[str, Any] = {}
    constants: Dict[str, str] = {}
    symbols: List[SymbolSpec] = []
    definitions: Definitions = Definitions()
    transforms: List[TransformSpec] = []
    conditions: List[Condition] = []
    targets: List[Target]
    output: OutputSpec = OutputSpec()
    validation: ValidationSpec = ValidationSpec()

# === Runtime context ===
@dataclass
class Runtime:
    symtab: Dict[str, sp.Symbol]
    funcs: Dict[sp.Function, sp.Lambda]
    sequences: Dict[sp.Function, sp.Lambda]
    matrices: Dict[str, sp.Matrix]
    distributions: Dict[str, Any]
    geometry: Dict[str, Any]
    context: Dict[Any, Any]

# === Builders ===
DOMAIN_MAP = {
    'R': sp.S.Reals,
    'R+': sp.Interval.Ropen(0, sp.oo),
    'Z': sp.S.Integers,
    'N': sp.S.Naturals0,
    'N+': sp.S.Naturals,
    'C': sp.S.Complexes,
}

def build_runtime(ir: MathIR) -> Runtime:
    symtab: Dict[str, sp.Symbol] = {}
    for s in ir.symbols:
        if s.domain == 'R':
            symtab[s.name] = sp.Symbol(s.name, real=True)
        elif s.domain == 'Z':
            symtab[s.name] = sp.Symbol(s.name, integer=True)
        elif s.domain == 'N':
            symtab[s.name] = sp.Symbol(s.name, integer=True, nonnegative=True)
        elif s.domain == 'N+':
            symtab[s.name] = sp.Symbol(s.name, integer=True, positive=True)
        elif s.domain == 'C':
            symtab[s.name] = sp.Symbol(s.name, complex=True)
        else:
            symtab[s.name] = sp.Symbol(s.name, real=True)
    
    context: Dict[str, Any] = {**symtab}
    context['pi'] = sp.pi
    context['e'] = sp.E
    context['i'] = sp.I

    funcs: Dict[sp.Function, sp.Lambda] = {}
    for f_def in ir.definitions.functions:
        func_symbol = sp.Function(f_def.name)
        args = [sp.Symbol(a) for a in f_def.args]
        func_arg_context = {a.name: a for a in args}
        expr_template = to_sympy_expr(f_def.expr)
        expr = expr_template.subs(func_arg_context)
        funcs[func_symbol] = sp.Lambda(tuple(args), expr)

    sequences: Dict[sp.Function, sp.Lambda] = {}
    for s_def in ir.definitions.sequences:
        seq_symbol = sp.Function(s_def.name)
        args = [sp.Symbol(a) for a in s_def.args]
        seq_arg_context = {a.name: a for a in args}
        expr_template = to_sympy_expr(s_def.expr)
        expr = expr_template.subs(seq_arg_context)
        sequences[seq_symbol] = sp.Lambda(tuple(args), expr)
        
    matrices: Dict[str, sp.Matrix] = {}
    for m in ir.definitions.matrices:
        mat = sp.Matrix([[to_sympy_expr(v).subs(context) for v in row] for row in m.data])
        matrices[m.name] = mat

    context.update(funcs.items())
    context.update(sequences.items())
    context.update(matrices)

    return Runtime(symtab, funcs, sequences, matrices, {}, {}, context)

# === Node executors ===
def exec_integral(rt: Runtime, t: TargetIntegral) -> Tuple[str, sp.Expr]:
    integration_var = sp.Symbol(t.var, real=True)
    a_template = to_sympy_expr(str(t.limits[0]))
    b_template = to_sympy_expr(str(t.limits[1]))
    expr_template = to_sympy_expr(t.expr)
    subs_dict = rt.context.copy()
    a = a_template.subs(subs_dict)
    b = b_template.subs(subs_dict)
    expr = expr_template.subs(subs_dict)
    val = sp.integrate(expr, (integration_var, a, b))
    return (t.name or 'I', val.doit())

def exec_value(rt: Runtime, t: TargetValue, store: Dict[str, sp.Expr]) -> Tuple[str, sp.Expr]:
    if '\\' in t.expr:
        expr_template = to_sympy_expr(t.expr)
        subs_dict = {k: v for k, v in {**rt.context, **store}.items()}
        expr = expr_template.subs(subs_dict).doit()
    else:
        local_dict = {}
        for k, v in rt.context.items():
            if isinstance(k, str):
                local_dict[k] = v
            elif hasattr(k, 'name'):
                local_dict[k.name] = v
        for k in store:
            if k not in local_dict:
                local_dict[k] = sp.Symbol(k)
        expr_template = sp.parse_expr(t.expr, local_dict=local_dict)
        expr = expr_template.subs(store).doit()
    return (t.name, expr)

def exec_limit(rt: Runtime, t: TargetLimit) -> Tuple[str, sp.Expr]:
    limit_var = sp.Symbol(t.var, real=True)
    subs_dict = rt.context.copy()
    expr_template = to_sympy_expr(t.expr)
    expr = expr_template.subs(subs_dict)
    if t.to == "oo":
        to = sp.oo
    elif t.to == "-oo":
        to = -sp.oo
    else:
        to_template = to_sympy_expr(t.to)
        to = to_template.subs(subs_dict)
    val = sp.limit(expr, limit_var, to)
    simplified = sp.simplify(val.doit())
    return ('limit', simplified)

def exec_sum(rt: Runtime, t: TargetSum) -> Tuple[str, sp.Expr]:
    import uuid
    unique_idx_name = f"idx_{uuid.uuid4().hex[:8]}"
    idx_var = sp.Symbol(unique_idx_name, integer=True)
    if '\\' in t.term:
        infix = t.term.replace('\\frac{', '(').replace('}{', ')/(').replace('}', ')').replace('^', '**')
        term_template = sp.parse_expr(infix, local_dict={t.idx: rt.context[t.idx]})
    else:
        local_dict = {t.idx: rt.context[t.idx]}
        for k, v in rt.context.items():
            if isinstance(k, str):
                local_dict[k] = v
            elif hasattr(k, 'name'):
                local_dict[k.name] = v
        term_template = sp.parse_expr(t.term, local_dict=local_dict)
    term_for_sum = term_template.subs({rt.context[t.idx]: idx_var})
    subs_dict = {k: v for k, v in rt.context.items() if k != t.idx}
    term = term_for_sum.subs(subs_dict).doit()
    start = sp.Integer(t.start)
    end = sp.Integer(t.end)
    if str(end) == 'oo':
        end = sp.oo
    val = sp.summation(term, (idx_var, start, end))
    return ('sum', val.doit())

def exec_solve(rt: Runtime, t: TargetSolve) -> Tuple[str, Any]:
    unknowns = [rt.context[u] for u in t.unknowns if u in rt.context]
    subs_dict = rt.context.copy()
    eqs = []
    for eq_str in t.equations:
        if '=' in eq_str:
            lhs, rhs = eq_str.split('=', 1)
            lhs_expr = to_sympy_expr(lhs).subs(subs_dict)
            rhs_expr = to_sympy_expr(rhs).subs(subs_dict)
            eqs.append(sp.Eq(lhs_expr, rhs_expr))
        else:
            expr = to_sympy_expr(eq_str).subs(subs_dict)
            eqs.append(sp.Eq(expr, 0))
    solution = sp.solve(eqs, unknowns)
    if isinstance(solution, dict):
        solution = [solution]
    elif isinstance(solution, list):
        if solution and isinstance(solution[0], dict):
            pass
        else:
            solution = [dict(zip(unknowns, val if isinstance(val, tuple) else (val,))) for val in solution]
    return ('solve', solution)

def exec_ineq(rt: Runtime, t: TargetIneq) -> Tuple[str, Any]:
    subs_dict = rt.context.copy()
    all_symbols = set()
    parsed_ineqs = []
    for ineq_str in t.inequalities:
        ineq_expr = to_sympy_expr(ineq_str).subs(subs_dict)
        all_symbols.update(ineq_expr.free_symbols)
        parsed_ineqs.append(ineq_expr)
    symbols_list = list(all_symbols)
    solution = sp.reduce_inequalities(parsed_ineqs, symbols_list)
    return ('inequalities', solution)

def parse_matrix_expr(expr_str: str, matrices: Dict[str, sp.Matrix | sp.MatrixSymbol]) -> sp.Matrix | sp.MatrixSymbol:
    parts = expr_str.replace(' ', '').split('*')
    result = None
    for part in parts:
        is_transpose = part.endswith('.T')
        matrix_name = part.replace('.T', '')
        if matrix_name not in matrices:
            raise ValueError(f"Matrix '{matrix_name}' not defined")
        matrix = matrices[matrix_name]
        if is_transpose:
            matrix = matrix.T
        if result is None:
            result = matrix
        else:
            result = result * matrix
    return result

def exec_matrix(rt: Runtime, t: TargetMatrixSolve, ir: MathIR) -> Tuple[str, Any]:
    eq = None
    for c in ir.conditions:
        if c.type == 'matrix_equation' and c.expr:
            eq = c.expr
            break
    if not eq:
        return ('matrix', {'error': 'no_matrix_equation'})
    try:
        left, right = map(str.strip, eq.split('='))
        R = parse_matrix_expr(right, rt.matrices)
        if '*' in left and t.unknown in left:
            parts = left.split('*')
            if len(parts) == 2 and parts[1] == t.unknown:
                coeff_name = parts[0]
                if coeff_name in rt.matrices:
                    A = rt.matrices[coeff_name]
                    try:
                        X = A.inv() * R
                        return ('matrix', X)
                    except Exception as e:
                        return ('matrix', {'error': f'Matrix is not invertible: {e}'})
                else:
                    return ('matrix', {'error': f'Coefficient matrix {coeff_name} not found'})
            else:
                return ('matrix', {'error': 'Unsupported matrix equation format'})
        else:
            return ('matrix', {'error': 'Unsupported matrix equation format'})
    except Exception as e:
        return ('matrix', {'error': f"Failed to solve matrix equation: {e}"})

# === Main runner ===
def run_mathir(ir: MathIR) -> Dict[str, Any]:
    rt = build_runtime(ir)
    store: Dict[str, sp.Expr] = {}
    results: Dict[str, Any] = {}
    for tgt in ir.targets:
        k: Optional[str] = None
        v: Any = None
        try:
            if tgt.type == 'integral_def':
                k, v = exec_integral(rt, tgt)
                if k: store[k] = v
            elif tgt.type == 'value':
                k, v = exec_value(rt, tgt, store)
            elif tgt.type == 'limit':
                k, v = exec_limit(rt, tgt)
            elif tgt.type == 'sum':
                k, v = exec_sum(rt, tgt)
            elif tgt.type == 'solve_for':
                k, v = exec_solve(rt, tgt)
            elif tgt.type == 'inequalities':
                k, v = exec_ineq(rt, tgt)
            elif tgt.type == 'solve_for_matrix':
                k, v = exec_matrix(rt, tgt, ir)
            else:
                results[f"{tgt.type}"] = {"error": "unsupported_target"}
                continue
        except Exception as e:
            logging.error(f"Error processing target {tgt.type}: {e}")
            results[f"{getattr(tgt, 'name', tgt.type)}"] = {"error": str(e)}
            continue
        
        if k is None: continue

        if isinstance(v, sp.Expr):
            if ir.output.simplify:
                v = sp.simplify(v)
            if ir.output.mode == 'decimal':
                try:
                    numerical_value = sp.N(v)
                    if ir.output.round_to is not None:
                        v = round(numerical_value, ir.output.round_to)
                    else:
                        v = numerical_value
                except (TypeError, ValueError):
                    logging.warning(f"Could not convert expression '{v}' to a decimal value.")
        
        results[k] = v

    return results

## 4. LLM JSON Generation

The `LLMJsonGenerator` class is a wrapper around `vLLM` that's specifically designed for this pipeline. It's responsible for taking a batch of natural-language tasks, wrapping them in a system prompt, and sending them to the LLM for processing. The `generate_batch` method is optimized to handle multiple tasks in a single call, which significantly speeds up processing. The class also includes error handling to gracefully manage failed LLM calls or invalid JSON responses.

In [None]:
def setup_logging(level=LOG_LEVEL):
    logging.basicConfig(level=level, format='%(asctime)s - %(levelname)s - %(message)s')

class LLMJsonGenerator:
    def __init__(self, model_name, tensor_parallel_size, system_prompt):
        self.llm = LLM(model=model_name, tensor_parallel_size=tensor_parallel_size)
        self.system_prompt = system_prompt

    def generate_batch(self, user_tasks: List[str]) -> List[Optional[Dict[str, Any]]]:
        """Generates JSON objects from a batch of natural language math tasks."""
        prompts = [f"{self.system_prompt}\n\nUser Task: {task}\n\nJSON Output:" for task in user_tasks]
        sampling_params = SamplingParams(temperature=0.0, max_tokens=2048)
        
        try:
            outputs = self.llm.generate(prompts, sampling_params)
            json_results = []
            for output in outputs:
                response_text = output.outputs[0].text
                json_start = response_text.find('{')
                json_end = response_text.rfind('}') + 1
                if json_start != -1 and json_end != -1:
                    json_str = response_text[json_start:json_end]
                    try:
                        json_results.append(json.loads(json_str))
                    except json.JSONDecodeError:
                        logging.error(f"Could not parse JSON from LLM response: {json_str}")
                        json_results.append(None)
                else:
                    logging.error("Could not find JSON object in LLM response.")
                    json_results.append(None)
            return json_results
        except Exception as e:
            logging.error(f"Error during batch LLM generation or JSON parsing: {e}")
            return [None] * len(user_tasks)

## 5. Main Processing Pipeline

This is the main execution block of the notebook. It orchestrates the entire process:
1. It initializes the `LLMJsonGenerator`.
2. It loads the math problems from the input CSV file into a pandas DataFrame.
3. It calls `generate_batch` to convert all tasks into `MathIR` JSONs in a single, efficient batch operation.
4. It then iterates through the results, validates each `MathIR` JSON, and passes it to the `run_mathir` function for computation.
5. Finally, it collects the answers, handles any errors, and saves the results to the output CSV file.

In [None]:
def main():
    setup_logging()
    
    llm_generator = LLMJsonGenerator(
        model_name=VLLM_MODEL_NAME,
        tensor_parallel_size=VLLM_TENSOR_PARALLEL_SIZE,
        system_prompt=SYSTEM_PROMPT
    )
    
    try:
        input_df = pd.read_csv(INPUT_CSV_PATH)
        logging.info(f"Successfully loaded {len(input_df)} tasks from {INPUT_CSV_PATH}")
    except FileNotFoundError:
        logging.error(f"Input file not found: {INPUT_CSV_PATH}")
        return

    tasks = input_df['task'].tolist()
    task_ids = input_df['id'].tolist()

    logging.info(f"Sending {len(tasks)} tasks to LLM for batch processing...")
    math_ir_jsons = llm_generator.generate_batch(tasks)
    logging.info("LLM batch processing complete.")

    results = []
    for i, math_ir_json in enumerate(math_ir_jsons):
        task_id = task_ids[i]
        logging.info(f"Processing result for task {task_id}...")

        if not math_ir_json:
            logging.warning(f"Skipping task {task_id} due to LLM generation failure.")
            results.append({'id': task_id, 'answer': 'ERROR_GENERATION'})
            continue

        try:
            math_ir = MathIR.model_validate(math_ir_json)
        except Exception as e:
            logging.error(f"Skipping task {task_id} due to invalid MathIR JSON: {e}")
            results.append({'id': task_id, 'answer': 'ERROR_VALIDATION'})
            continue
            
        try:
            computed_results = run_mathir(math_ir)
            answer = next(iter(computed_results.values()), 'NO_ANSWER')
            results.append({'id': task_id, 'answer': str(answer)})
            logging.info(f"Task {task_id} processed successfully. Answer: {answer}")
        except Exception as e:
            logging.error(f"Skipping task {task_id} due to a runtime error in mathir_parser: {e}")
            results.append({'id': task_id, 'answer': 'ERROR_COMPUTATION'})

    output_df = pd.DataFrame(results)
    output_df.to_csv(OUTPUT_CSV_PATH, index=False)
    logging.info(f"Output saved to {OUTPUT_CSV_PATH}")

if __name__ == '__main__':
    main()