# ðŸ§  PHYRE Benchmark Evaluation Demo



In [None]:
!pip install phyre --quiet

In [None]:
import phyre
import numpy as np
import matplotlib.pyplot as plt
import random
random.seed(0)

In [75]:
from typing import Sequence
from phyre.action_simulator import ActionSimulator

def disturbed(tasks: Sequence,
              pos_pct : float = 0.001,
              ang_pct : float = 0.001):
    """
    Apply random Gaussian perturbations to all dynamic bodies (bodyType != 1)
    in each TaskSpec in `tasks`.

    Args:
        tasks: Sequence of TaskSpec objects loaded via load_compiled_task_list.
        pos_sigma: Standard deviation (in pixels) for x,y position noise.
        ang_sigma: Standard deviation (in radians) for angle noise.

    Returns:
        The same list of TaskSpec objects with modified initial body states.
    """
    for task in tasks:
        width, height = task.scene.width, task.scene.height
        # pos_sigma = pos_pct * width           # 2% of width in pixels
        # ang_sigma = ang_pct * 2 * np.pi       # 1% of 2Ï€ in radians
        pos_sigma = 2          # 2% of width in pixels
        ang_sigma = 0.0175      # 1% of 2Ï€ in radians
        
        for body in task.scene.bodies:
            if body.bodyType != 1:  # Skip static boundary bodies
                # Original state
                x, y = body.position.x, body.position.y
                theta = body.angle
                
                # Add Gaussian noise
                x_new = x + np.random.normal(scale=pos_sigma)
                y_new = y + np.random.normal(scale=pos_sigma)
                theta_new = theta + np.random.normal(scale=ang_sigma)
                
                # Clip positions and wrap angle
                x_new = np.clip(x_new, 0, width)
                y_new = np.clip(y_new, 0, height)
                theta_new = (theta_new + np.pi) % (2 * np.pi) - np.pi
                
                # Write back
                body.position.x = x_new
                body.position.y = y_new
                body.angle      = theta_new

    return tasks


def initialize_disturbed_simulator(task_ids: Sequence[str],
                         action_tier: str) -> ActionSimulator:
    """Initialize ActionSimulator for given tasks and tier."""
    tasks = phyre.loader.load_compiled_task_list(task_ids)
    tasks = disturbed(tasks)
    return ActionSimulator(tasks, action_tier)


In [None]:
import re
import json
def strip_markdown(text: str) -> str:
    text = re.sub(r"```", "", text)
    text = re.sub("json", "", text)

    text = re.sub(r"^#+\s*", "", text, flags=re.MULTILINE)
    text = re.sub(r"(\*\*|__)(.*?)\1", r"\2", text)
    text = re.sub(r"(\*|_)(.*?)\1", r"\2", text)
    text = re.sub(r"`(.*?)`", r"\1", text)
    text = re.sub(r"^[-*+]\s+", "", text, flags=re.MULTILINE)
    text = re.sub(r"\n{2,}", "\n", text)
    return text.strip()


# Control Policy
def decode(clean):
    try:
        action_objs = json.loads(clean)
    except json.JSONDecodeError:
        m = re.search(r'\[.*\]', clean, re.DOTALL)
        if not m:
            raise ValueError("No valid JSON array found in decision.")
        try:
            action_objs = json.loads(m.group())
        except Exception as e:
            raise ValueError(f"Failed to parse JSON array: {e}")

    if not isinstance(action_objs, list):
        raise ValueError("Decoded JSON is not a list of actions.")

    res = []
    for obj in action_objs:
        try:
            x = float(obj['x'])
            y = float(obj['y'])
            r = float(obj['r'])
        except Exception as e:
            raise ValueError(f"Action object missing or invalid keys: {e}")
        res.append([x, y, r])

    return res

In [None]:
import RouterBridge
import time
import importlib
importlib.reload(RouterBridge)

def solve_fn(task_idx, simulator,actions, max_attempts):
    # Set need_images=False and need_featurized_objects=False to speed up simulation, when only statuses are needed.
    fe = None
    for action in actions:
        simulation = simulator.simulate_action(task_idx, action, need_images=False, need_featurized_objects=True)
        fe = simulation.featurized_objects
        if fe != None:
            break
    if fe == None:
        print("No obj")
        return [actions[0]],0,0
    # else:
        # # Let's get information about the shape of each of the objects
        # print('Shape of objects in the simulation: ', featurized_objects.shapes)
        # # Let's get information about the color of each of the objects
        # print('Color of objects in the simulation: ', featurized_objects.colors)
        # # Let's get information about the diameter of each of the objects
        # print('Diameter of objects in the simulation: ', featurized_objects.diameters)
        # # Let's get the initial states of the objects
        # print('Initial states of objects in the simulation: \n', featurized_objects.states[0])
    
    orig_shapes      = fe.shapes                # e.g. ['JAR','BAR','BALL','BALL',...]
    orig_colors      = fe.colors                # e.g. ['GRAY','GREEN','RED','BLUE',...]
    orig_diameters   = fe.diameters.tolist()    # e.g. [0.125,0.3,0.02,0.05,...]
    orig_states      = fe.states[0].tolist()    # e.g. [[x,y,Î¸],...]

    keep_idxs = [
        i for i, (sh, col) in enumerate(zip(orig_shapes, orig_colors))
        if not (sh == 'BALL' and col == 'RED')
    ]

    shapes_filtered      = [orig_shapes[i]    for i in keep_idxs]
    colors_filtered      = [orig_colors[i]    for i in keep_idxs]
    diameters_filtered   = [orig_diameters[i] for i in keep_idxs]
    states_filtered      = [orig_states[i]    for i in keep_idxs]

    # print(payload)
    res = []
    apex_res = []
    sim_time = 0.0
    res_time = 0.0
    start_sim = time.time()
    for action in actions:
        simulation = simulator.simulate_action(task_idx, action, need_images=False, need_featurized_objects=False)
        if simulation.status.is_solved():
            # print('Does', action, 'solve task', tasks[task_idx], '?', simulation.status.is_solved())
            apex_res.append({
    "x": action[0],
    "y": action[1],
    "r":action[2]
            })
    end_sim = time.time()
    sim_time = end_sim - start_sim

    payload = {
        "task_idx":      task_idx,
        "shapes":        shapes_filtered,
        "colors":        colors_filtered,
        "diameters":     diameters_filtered,
        "initial_states": states_filtered,
        # "action_space": [list(a) for a in actions],
        "max_attempts":  max_attempts,
        "Physical Engine Tested  Points": apex_res[:5]
    }
    print(payload)
    print(apex_res[:5])
    try:
        file_path ="./run_llm_apex.py" 
        # with open(,"r") as f:
        start = time.time()
        res = RouterBridge.router_bridge(payload,"mcli",file_path)
        end = time.time()
        res_time = end-start
    except Exception as e:
        print(f"router_bridge failed: {e}, falling back to brute force.")
        res = [action],0.0,0.0
    print(res)
    return res,res_time,sim_time


In [None]:
from tqdm import tqdm
def evaluate_custom_agent(tasks, tier, solve_fn, max_attempts=100, save_path="./results/phyre_gpt4o.json"):
    simulator = phyre.initialize_simulator(tasks, tier)
    evaluator = phyre.Evaluator(tasks)
    disturbed_simulator = initialize_disturbed_simulator(tasks,tier)
    actions = disturbed_simulator.build_discrete_action_space(max_actions=10000)
    print('A random action:', actions[0])
    print(len(actions))
    results = []
    with open(save_path,"r") as f:
        results = json.dumps(f.read())

    for task_idx in tqdm(range(len(tasks)), desc="Evaluating custom agent"):
        attempts = 0
    
        with open(save_path,"r") as f:
            new_results = json.loads(f.read())
        raw_actions = new_results[task_idx]["action"]
        sim_time =  new_results[task_idx]["sim_time"]
        res_time =  new_results[task_idx]["res_time"]

        #read raw_action
        # raw_actions,res_time,sim_time  = solve_fn(task_idx,disturbed_simulator,actions, max_attempts)
        solved = False
        # proposed_actions = raw_actions
        proposed_actions = decode(strip_markdown(raw_actions))
        for action in proposed_actions:
            status= simulator.simulate_action(task_idx, action, need_images=False).status
            evaluator.maybe_log_attempt(task_idx, status)
            attempts += 1
            if status.is_solved() :
                solved = True
                if attempts >= max_attempts:
                    break
        # results.append({
        #     "task_idx":task_idx,
        #     "action":raw_actions,
        #     "res_time":res_time,
        #     "sim_time":sim_time,
        #     "solved":solved
        # })
        # with open(save_path,'w') as f:
        #     f.write(json.dumps(results))
        
    return evaluator




In [None]:

eval_setup = 'ball_cross_template'
fold_id = 0  # For simplicity, we will just use one fold for evaluation.
train_tasks, dev_tasks, test_tasks = phyre.get_fold(eval_setup, fold_id)
tasks = test_tasks
print('Size of resulting splits:\n train:', len(train_tasks), '\n dev:',
      len(dev_tasks), '\n test:', len(test_tasks))

action_tier = phyre.eval_setup_to_action_tier(eval_setup)
print('Action tier for', eval_setup, 'is', action_tier)

tire  = phyre.eval_setup_to_action_tier(eval_setup)
evaluator = evaluate_custom_agent(tasks,tire, solve_fn,100,f"./results/phyre_apex_gpt4.1_{eval_setup}.json")

print('AUC:', 
      evaluator.get_aucess())

Size of resulting splits:
 train: 1600 
 dev: 400 
 test: 500
Action tier for ball_cross_template is ball
A random action: [4.17022005e-01 7.20324493e-01 1.14374817e-04]
10000


Evaluating custom agent: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 500/500 [00:04<00:00, 115.42it/s]
Used 3.878000 attempts per task instead of maximum allowed 100.000000. That probably indicate a bug in evaluation loop.


AUC: 0.48655847315745965
