# GPT WARP Evaluator
This notebook is used to evaluate the performance of different GPT models on the WARP benchmark. We utilise the OpenAI batch API to efficiently evaluate multiple models on the benchmark. User needs to specify their own OpenAI API key and feed it into the client. This notebook is still under development but is functional.

This script will:
1. Create the batch jsonl files for each model stored in the `results/batch` directory
2. Upload the batch files to OpenAI
3. Queue the 3 batches for processing for each model 
4. Download the results to the `results` directory.

In [2]:
import datetime
import json
import os

import openai
from datasets import load_dataset
from dotenv import load_dotenv

load_dotenv()
import pandas as pd

client = openai.OpenAI(api_key=os.getenv("OPENAI_KEY"))


In [5]:
client.files.content("file-HdvpqA9Pmu4SZi7R1AmMo8").write_to_file("test.jsonl")

In [None]:
# Load the benchmark dataset
dataset = load_dataset("dannkoh/warp-benchmark", split="test")
dataset = dataset.to_pandas()

In [None]:
def format_question(str):
    end = "All per-variable constraints must be combined using a top-level (assert (and ...)) clause.\nThe output must be in exact, canonical SMT-LIB format without extra commentary in the constraint string.\nShow your work in <think> </think> tags. And return the final SMT-LIB constraint string in <answer> </answer> tags.\nFor example: <answer>(assert (and  ( >=  in0 97)  ( <=  in0 122)))</answer>."
    return end + "\n" + str


In [None]:

# Choose your models into an array.
# models = ["gpt-4.1-2025-04-14", "gpt-4.1-mini-2025-04-14", "gpt-4o-2024-11-20","o3-mini-2025-01-31","o4-mini-2025-04-16"]
models = ["gpt-5-2025-08-07"]


endpoint = "/v1/chat/completions"

batches = { i : [] for i in range(3)}

for model in models:
    batch = []
    for i, row in dataset.iterrows():
        batch.append(
            {
                "custom_id": str(i),
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": model, "messages": [{"role": "user", "content": format_question(row["question"])}]},
            }
        )
    with open(f"../results/batch/{model}-batch.jsonl", "w") as f:
        for item in batch:
            json.dump(item, f)
            f.write("\n")

    file = client.files.create(
        file=open(f"../results/batch/{model}-batch.jsonl", "rb"),
        purpose="batch"
    )
    for trial in range(3):
        batches[trial].append(
            client.batches.create(
                input_file_id=file.id,
                endpoint=endpoint,
                completion_window="24h"
            )
        )

In [None]:
import subprocess
def load_z3_template(constants: str, original_assertions: str, generated_assertions: str) -> str:
    """
    Load the Z3 template for the experiment.
    """
    return f"""; Combined SMT for checking equivalence
; Original constants:
{constants}

; Original constraints (A):
(push)
{original_assertions}
(pop)

; Generated constraints (B):
(push)
{generated_assertions}
(pop)

; Now do two checks:
; 1) A => B fails means we push A and then (not B)
(push)
{original_assertions}
(assert (not
{_parse_constraints(generated_assertions)}
))
(check-sat)
(pop)

; 2) B => A fails means we push B and then (not A)
(push)
{generated_assertions}
(assert (not
{_parse_constraints(original_assertions)}
))
(check-sat)
(pop)
"""

def _parse_constraints(constraints: str) -> str:
    """
    Parse raw SMT-LIB2 constraints into a single conjunctive form.
    """
    assertions = [line.strip()[8:-1] for line in constraints.splitlines() if line.startswith("(assert")]
    return f"(and {' '.join(assertions)})"

def extract_response(response: str) -> str:
    """
    Extract the response from the experiment.
    """
    try:
        return response.split("<answer>")[1].split("</answer>")[0].strip().replace("\n", "")
    except:
        return None

def evaluate_with_z3(response: str, truth: str, constants: str) -> dict:
    """
    Evaluate the response using Z3.
    """
    result = {}
    smt2 = load_z3_template(constants=constants, original_assertions=truth, generated_assertions=response)

    try:
        proc = subprocess.run(["z3", "-in"], input=smt2, capture_output=True, text=True, check=False)
        output = proc.stdout.strip()

        results = [line for line in output.splitlines() if line in ("sat", "unsat", "unknown")]

        if len(results) < 2:
            result["result"] = False
            result["reason"] = "Could not parse results correctly."
            return result

        if results[0] != "unsat":
            result["result"] = False
            result["reason"] = "Original does not imply generated."
            return result

        if results[1] != "unsat":
            result["result"] = False
            result["reason"] = "Generated does not imply original."
            return result

        result["result"] = True
        result["reason"] = "Constraints are logically equivalent."
    except Exception as e:
        result["result"] = False
        result["reason"] = f"Error running Z3: {e}"

    return result


In [None]:
all_completed = True
for trial_batches in batches.values():
    for batch in trial_batches:
        batch_info = client.batches.retrieve(batch.id)
        if batch_info.status != "completed":
            all_completed = False
            print(f"Batch {batch.id} is not completed. Status: {batch_info.status}")

if all_completed:
    print("All batches are completed!")

In [None]:
for trial_batches in batches.values():
    for batch in trial_batches:
        output_dir = f"../results/{trial_batches}"
        output_file_path = f"{output_dir}/{client.files.retrieve(batch.input_file_id).filename}"
        client.files.content(batch.output_file_id).write_to_file(f"{output_file_path}")
        model = client.files.retrieve(batch.input_file_id).filename.rsplit("-",maxsplit=1)[0]

        with open(f"{output_file_path}") as f:
            data = [json.loads(line) for line in f]


        df = pd.DataFrame(data)

        # Extract content from each response
        df["content"] = df["response"].apply(
            lambda r: (
                r["body"]["choices"][0]["message"]["content"]
                if (r and "body" in r and "choices" in r["body"] and len(r["body"]["choices"]) > 0)
                else None
            )
        )
        df = df[["custom_id", "content"]]
        df["custom_id"] = df["custom_id"].astype(int)

        # Merge with original dataset
        merged_df = dataset.merge(df, left_index=True, right_on="custom_id", how="left")

        # Evaluate each response
        results = []
        for i, row in merged_df.iterrows():
            response = row["content"]
            truth = row["answer"]
            constants = row["constants"]
            question = row["question"]

            extracted = extract_response(response) if response else None
            z3_result = (
                evaluate_with_z3(response=extracted, truth=truth, constants=constants)
                if extracted else {"result": False, "reason": "Failed to extract response."}
            )

            results.append({
                "custom_id": row["custom_id"],
                "question": question,
                "response": response,
                "extracted": extracted,
                "truth": truth,
                "constants": constants,
                "z3_result": z3_result["result"],
                "reason": z3_result["reason"]
            })

        # Compute final metrics and save
        result_summary = {
            "model": model,
            "accuracy": sum(1 for r in results if r["z3_result"]) / len(results) if len(results) else 0,
            "correct": sum(1 for r in results if r["z3_result"]),
            "incorrect": sum(1 for r in results if not r["z3_result"]),
            "total": len(results),
            "results": results,
        }


        with open(f"{output_dir}/{model}.json", "w") as f:
            json.dump(result_summary, f, indent=4)