In [21]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [22]:
import gymnasium as gym
from minigrid.wrappers import *
import matplotlib.pyplot as plt
import pandas as pd

In [23]:
GRID_SEEDS = list(range(1000))
GRID_SEEDS = list(range(100))
# GRID_SEEDS = [42]

In [24]:
env = gym.make("MiniGrid-UnlockPickup-v0")

In [25]:
def get_unwrapped(env):
    while hasattr(env, 'env'):
        env = env.env
    return env

In [26]:
def plot_obs(obs):
    rgb_obs_env = RGBImgObsWrapper(env)

    obs = rgb_obs_env.observation(obs)

    plt.imshow(obs['image'])

In [27]:
def get_full_obs(env):
    obs = get_unwrapped(env).gen_obs()
    fully_obs = FullyObsWrapper(env).observation(obs)
    return fully_obs

In [28]:
def get_inputs(env):
    # convert to np array
    grid = env.get_wrapper_attr('pprint_grid')()
    split_rows = grid.split('\n')
    grid_cells = [[line[i:i+2] for i in range(0, len(line), 2)] for line in split_rows]

    OBJ_IDX_TO_TYPE = {
        'W': "WALL",
        'D': "DOOR",
        'K': "KEY",
        'A': "BALL",
        'B': "BOX",
        'L': "DOOR",
        'V': 'AGENT',
        '^': 'AGENT',
        '>': 'AGENT',
        '<': 'AGENT',
        ' ': "",
    }
    input_grid = [[OBJ_IDX_TO_TYPE[c[0]] for c in row] for row in grid_cells]

    direction = None
    if 'V' in grid:
        direction = 'DOWN'
    elif '^' in grid:
        direction = 'UP'
    elif '<' in grid:
        direction = 'LEFT'
    elif '>' in grid:
        direction = 'RIGHT'

    return input_grid, direction

In [29]:
def print_input_grid(grid):
    OBJ_IDX_TO_TYPE = {
        'W': "WALL",
        'D': "DOOR",
        'K': "KEY",
        'A': "BALL",
        'B': "BOX",
        'L': "DOOR",
        'V': 'AGENT',
        '^': 'AGENT',
        '>': 'AGENT',
        '<': 'AGENT',
    }
    for row in grid:
        print('[', end='')
        print(",".join([f'"{c}"' for c in row]), end='],\n')
        # print('],')

In [30]:
ACTIONS_MAP = {
    'LEFT': 0,
    'RIGHT': 1,
    'MOVE': 2,
    'PICKUP': 3,
    'DROP': 4,
    'UNLOCK': 5,
}

In [31]:
def eval(env, actions):
    action_ids = [ACTIONS_MAP[action] for action in actions]

    reward = 0
    done = False

    for action in action_ids:
        obs, reward, done, _, _ = env.step(action)

    return reward, done
    

In [32]:
from model_codes.o3_mini import solve as solve_o3_mini
from model_codes.claude import solve as solve_claude
from model_codes.o1 import solve as solve_o1
from model_codes.deepseek import solve as solve_deepseek
from model_codes.gemini_25_pro import solve as solve_gemini_25_pro
from model_codes.gpt_4o import solve as solve_gpt_4o

from model_codes_iter.o3_mini_b import solve as solve_o3_mini_b
from model_codes_iter.o3_mini_c import solve as solve_o3_mini_c


from model_codes_iter.o1_b import solve as solve_o1_b
from model_codes_iter.o1_c import solve as solve_o1_c


from model_codes_iter.claude_b import solve as solve_claude_b
from model_codes_iter.claude_c import solve as solve_claude_c
from model_codes_iter.claude_d import solve as solve_claude_d
from model_codes_iter.claude_e import solve as solve_claude_e

from model_codes_iter.deepseek_b import solve as solve_deepseek_b

from model_codes_iter.gemini_25_pro_b import solve as solve_gemini_25_pro_b

from model_codes_iter.gpt_4o_b import solve as solve_gpt_4o_b

from model_codes.random_walk import solve as solve_random_walk
from model_codes.greedy import solve as solve_greedy

In [33]:
METHODS = {
    'baseline': {
        'random_walk': solve_random_walk,
        'greedy': solve_greedy,
    },
    'direct_code_gen': {
        # 'o3_mini': solve_o3_mini,
        # 'claude': solve_claude,
        # 'o1': solve_o1,
        # 'deepseek': solve_deepseek,
        # 'gemini_25_pro': solve_gemini_25_pro,
        # 'gpt_4o': solve_gpt_4o,
    },
    'iterative': {
        # 'o3_mini_b': solve_o3_mini_b,
        # 'o3_mini_c': solve_o3_mini_c,
        # 'o1_b': solve_o1_b,
        # 'o1_c': solve_o1_c,
        # 'claude_b': solve_claude_b,
        # 'claude_c': solve_claude_c,
        # 'claude_d': solve_claude_d,
        # 'claude_e': solve_claude_e,
        # 'deepseek_b': solve_deepseek_b,
        # 'gemini_25_pro_b': solve_gemini_25_pro_b,
        # 'gpt_4o_b': solve_gpt_4o_b,
    },
}

In [34]:
def add_to_scores(scores, method_cls, method_name, grid_seed, reward, done):
    if method_cls not in scores:
        scores[method_cls] = {}
    if method_name not in scores[method_cls]:
        scores[method_cls][method_name] = {}
    if grid_seed not in scores[method_cls][method_name]:
        scores[method_cls][method_name][grid_seed] = {'reward': 0, 'done': 0}
    
    scores[method_cls][method_name][grid_seed]['reward'] = reward
    scores[method_cls][method_name][grid_seed]['done'] = 1 if done else 0
    return scores

In [35]:
def get_metrics(scores, return_df = False):
    metrics = {}
    data_points = []
    for method_cls, method_scores in scores.items():
        metrics[method_cls] = {}
        for method_name, grid_scores in method_scores.items():
            total_reward = sum([grid_score['reward'] for grid_score in grid_scores.values()])
            total_done = sum([grid_score['done'] for grid_score in grid_scores.values()])
            num_grids = len(grid_scores)
            metrics[method_cls][method_name] = {
                'total_reward': total_reward,
                'total_done': total_done,
                'num_grids': num_grids,
                'avg_reward': total_reward / num_grids if num_grids > 0 else 0,
                'avg_done': total_done / num_grids if num_grids > 0 else 0,
            }

            for grid_seed, grid_score in grid_scores.items():
                data_points.append({
                    'method_cls': method_cls,
                    'method_name': method_name,
                    'grid_seed': grid_seed,
                    'reward': grid_score['reward'],
                    'done': grid_score['done'],
                })

            # data_points.append({
            #     'method_cls': method_cls,
            #     'method_name': method_name,
            #     'total_reward': total_reward,
            #     'total_done': total_done,
            #     'num_grids': num_grids,
            #     'avg_reward': total_reward / num_grids if num_grids > 0 else 0,
            #     'avg_done': total_done / num_grids if num_grids > 0 else 0,
            # })

    if return_df:
        df = pd.DataFrame(data_points)
        return metrics, df
    
    return metrics

In [36]:
def get_worst_grids(scores, method_cls, method_name, k=3):
    grid_scores = scores[method_cls][method_name]
    not_done_grids = [(k, v) for k, v in grid_scores.items() if v['done'] == 0]
    
    worst_grids = not_done_grids

    done_grids = {k: v for k, v in grid_scores.items() if v['done'] == 1}
    done_grids = sorted(done_grids.items(), key=lambda x: x[1]['reward'], reverse=True)
    worst_grids += done_grids
    return worst_grids[:k]

In [37]:
scores = {}

for method_cls, methods_dict in METHODS.items():
    for method_name, method in methods_dict.items():
        print(f"Method: {method_name}")
        for grid_seed in GRID_SEEDS:
            print(f"Grid Seed: {grid_seed}\r", end='')

            env.reset(seed=grid_seed)
            input_grid, direction = get_inputs(env)

            actions = method(input_grid, direction)
            # print(f"Actions: {actions}")

            reward, done = eval(env, actions)
            # print(f"Done: {done}")
            # print(f"Reward: {reward}")
            
            scores = add_to_scores(scores, method_cls, method_name, grid_seed, reward, done)

Method: random_walk
Method: greedy
Grid Seed: 99

In [38]:
metrics = get_metrics(scores)
for method_cls, method_scores in metrics.items():
    print(f"Method Class: {method_cls}")
    for method_name, s in method_scores.items():
        print(f"  Method Name: {method_name}")
        print(f"    Average Reward: {s['avg_reward']}")
        print(f"    Average Done: {s['avg_done']}")

Method Class: baseline
  Method Name: random_walk
    Average Reward: 0.0
    Average Done: 0.0
  Method Name: greedy
    Average Reward: 0.7471562500000001
    Average Done: 0.8


In [39]:
_, metrics_df = get_metrics(scores, return_df=True)
metrics_df.to_csv('metrics_unlock_pickup.csv', index=False)

In [40]:
for bad_grid_seed, results in get_worst_grids(scores, 'iterative', 'gpt_4o_b'):
    env.reset(seed=bad_grid_seed)
    input_grid, direction = get_inputs(env)

    # print(f"Grid Seed: {bad_grid_seed}")
    # print(results)
    
    print("<grid>")
    print_input_grid(input_grid)
    print("</grid>")
    print(f"<start_direction>\n{direction}\n</start_direction>")
    print("--------------")


KeyError: 'iterative'

In [None]:
# actions_lists = {
#     "direct": ["LEFT", "MOVE", "PICKUP", "LEFT", "MOVE", "MOVE", "RIGHT", "MOVE", "UNLOCK", "LEFT", "LEFT", "DROP", "RIGHT", "RIGHT", "MOVE", "MOVE", "MOVE", "MOVE", "RIGHT", "MOVE", "MOVE", "MOVE", "MOVE", "PICKUP"],
#     "cot": ["LEFT", "MOVE", "PICKUP", "LEFT", "MOVE", "MOVE", "RIGHT", "MOVE", "UNLOCK", "MOVE", "MOVE", "MOVE", "MOVE", "MOVE", "RIGHT", "MOVE", "MOVE", "PICKUP"],
#     "2_step": ["LEFT", "MOVE", "PICKUP", "LEFT", "MOVE", "MOVE", "RIGHT", "MOVE", "UNLOCK", "RIGHT", "DROP", "LEFT", "MOVE", "MOVE", "MOVE", "MOVE", "MOVE", "RIGHT", "MOVE", "MOVE", "RIGHT", "MOVE", "LEFT", "MOVE", "LEFT", "PICKUP"],
# }

# for method, actions in actions_lists.items():
#     print(f"Method: {method}")
#     print(f"Actions: {actions}")

#     env.reset(seed=42)
#     input_grid, direction = get_inputs(env)
#     reward, done = eval(env, actions)
#     print(f"Done: {done}")
#     print(f"Reward: {reward}")