# April 21, 2025

In [1]:
import sys
import os

# Add the project root (one level up) to sys.path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))

In [2]:
# Imports.
from os import listdir
from json import load, dump
from EscGridEnv.EscGridEnv import EscGridEnv
from EscGridEnv.levels import (
    lvl_1, lvl_2, lvl_3, lvl_4, lvl_5, lvl_6, lvl_7, lvl_8,
    lvl_9, lvl_10, lvl_11, lvl_12, lvl_13a, lvl_13b,
    lvl_5b, lvl_5c, lvl_5d, lvl_5e
)
from minigrid.wrappers import ImgObsWrapper, FullyObsWrapper
from agent.Perception import get_agentPos, obs
from tqdm import tqdm

In [3]:
# Flatten.
exps    = ['exp1', 'exp2']
prompts = ['base', 'cot']

all_data = []
count    = 0

for x in exps:
    for p in prompts:
        for f in listdir(f'../data/raw/{x}/{p}'):
            with open(f'../data/raw/{x}/{p}/{f}') as fp:
                data = load(fp)

            for entry in data:

                meta = {}
                meta['lvl'] = entry['lvl']
                meta['exp']    = x
                meta['prompt'] = p
                meta['model']  = f.split('_')[1]
                meta['split']  = bool(f.split('_')[2].split('=')[1])
                meta['encoding'] = f.split('_')[3].split('=')[1].strip('.json')

                entry['meta'] = meta

                all_data.append(entry)
                count += 1
print(f"Count: {count}")

Count: 432


In [4]:
with open('../data/flattened.json', 'w') as fp:
    dump(all_data, fp, indent=4)

In [5]:
KEYWORDS = [
    'brown',
    'orange',
    'pink',
    'blue',
    'yellow',
    'cyan',
    'grey'
]
LEVELS = {
    'lvl_1'  : lvl_1(),
    'lvl_2'  : lvl_2(),
    'lvl_3'  : lvl_3(),
    'lvl_4'  : lvl_4(),
    'lvl_5'  : lvl_5(),
    'lvl_5b'  : lvl_5b(),
    'lvl_5c'  : lvl_5c(),
    'lvl_5d'  : lvl_5d(),
    'lvl_5e'  : lvl_5e(),
    'lvl_6'  : lvl_6(),
    'lvl_7'  : lvl_7(),
    'lvl_8'  : lvl_8(),
    'lvl_9'  : lvl_9(),
    'lvl_10'  : lvl_10(),
    'lvl_11' : lvl_11(),
    'lvl_12' : lvl_12(),
    'lvl_13a'  : lvl_13a(),
    'lvl_13b'  : lvl_13b()
}
REQUIRES = {
    'FalseWall'      : ['brown'],
    'Crate_moves'    : ['orange'],
    'Key_obtained'   : ['yellow'],
    'Button_pressed' : ['orange', 'pink', 'blue'],
    'KeyDoor_opened' : ['yellow', 'cyan'],
    'KeyCrate_made'  : ['orange', 'yellow'],
    'KeyCrate_used'  : ['orange', 'yellow'],
}

In [6]:
def run_seq(env : EscGridEnv, seq):
    states = []
    for action in seq:
        env.step(action)
        states.append((get_agentPos(env), obs(env)))
    return states

def get_objs(env: EscGridEnv):
    objs = set()
    for row in obs(env):
        for v in row:
            objs.add(v)
    return list(objs)

def get_pos_of_color(color: str, matrix : list[list[str]]) -> list[tuple[int, int]]:
    instances = []
    for y, row in enumerate(matrix):
        for x, v in enumerate(row):
            if v == color:
                instances.append((x, y))
    return instances

def check_FalseWall(prev, curr):
    assert len(prev[1]) == len(curr[1])
    assert len(prev[1][0]) == len(curr[1][0])

    prev_pos = prev[0]
    curr_pos = curr[0]

    return all([
        (prev_pos[0] <= 3 or prev_pos[0] >= 18),
        (prev_pos[1] <= 5 or prev_pos[1] >= 16),
        (curr_pos[0] <= 3 or curr_pos[0] >= 18),
        (curr_pos[1] <= 5 or curr_pos[1] >= 16),
    ])

def check_Crate_moves(prev, curr):
    assert len(prev[1]) == len(curr[1])
    assert len(prev[1][0]) == len(curr[1][0])

    prev_set = set(get_pos_of_color('orange', prev[1]))
    curr_set = set(get_pos_of_color('orange', curr[1]))

    return all([
        prev_set != curr_set,
        prev_set != set(),
        curr_set != set(),
    ])

def check_Key_obtained(prev, curr):
    assert len(prev[1]) == len(curr[1])
    assert len(prev[1][0]) == len(curr[1][0])

    prev_coords = get_pos_of_color('yellow', prev[1])
    curr_coords = get_pos_of_color('yellow', curr[1])

    return all([
        prev_coords != [],
        curr_coords == []
    ])

def check_Button_pressed(prev, curr):
    assert len(prev[1]) == len(curr[1])
    assert len(prev[1][0]) == len(curr[1][0])

    prev_coords = get_pos_of_color('blue', prev[1])
    curr_coords = get_pos_of_color('blue', curr[1])

    return all([
        prev_coords != [],
        curr_coords == []
    ])

def check_KeyDoor_opened(prev, curr):
    assert len(prev[1]) == len(curr[1])
    assert len(prev[1][0]) == len(curr[1][0])

    prev_coords = get_pos_of_color('cyan', prev[1])
    curr_coords = get_pos_of_color('cyan', curr[1])

    return all([
        prev_coords != [],
        curr_coords == []
    ])

def check_KeyCrate_made(prev, curr):
    assert len(prev[1]) == len(curr[1])
    assert len(prev[1][0]) == len(curr[1][0])

    prev_coords = get_pos_of_color('purple', prev[1])
    curr_coords = get_pos_of_color('purple', curr[1])

    return all([
        prev_coords == [],
        curr_coords != []
    ])

def check_KeyCrate_used(prev, curr):
    assert len(prev[1]) == len(curr[1])
    assert len(prev[1][0]) == len(curr[1][0])

    prev_coords = get_pos_of_color('purple', prev[1])
    curr_coords = get_pos_of_color('purple', curr[1])

    return all([
        prev_coords      != [],
        len(curr_coords) != len(prev_coords)
    ])

def check_completion(prev, curr):
    assert len(prev[1]) == len(curr[1])
    assert len(prev[1][0]) == len(curr[1][0])

    prev_coords = get_pos_of_color('green', prev[1])
    curr_coords = get_pos_of_color('green', curr[1])
    agent_pos   = get_pos_of_color('agent', curr[1])

    return any([
        agent_pos == prev_coords,
        agent_pos == curr_coords
    ])

def requirement_are_met(k, objs):
    all_are_met = all([
        o in objs
        for o in REQUIRES[k]
    ])
    return all_are_met

def get_colors(k):
    return {
        'FalseWall'      : ['brown'],
        'Crate_moves'    : ['orange'],
        'Button_pressed' : ['pink', 'blue'],
        'Key_obtained'   : ['yellow'],
        'KeyDoor_opened' : ['cyan'],
        'KeyCrate_made'  : ['orange', 'yellow'],
        'KeyCrate_used'  : ['orange', 'yellow']
    }[k]

def get_truth(to_check, mentions, uses):
    '''
    The high-level plan **mentions** an object, and the low-level plan **uses**
    that object.
    '''
    tp, fp, fn, tn = 0, 0, 0, 0
    if any([mentions[color] for color in to_check]):
        if uses:
            tp += 1
        else:
            fp += 1
    else:
        if uses:
            fn += 1
        else:
            tn += 1
    return tp, fp, tn, tn

In [7]:
analysis_data = {
    "byAffs"     : {
        "exp1" : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0},
        "exp2" : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0}
    },
    "byLevel"    : {
        "lvl_1"  : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0},
        "lvl_3"  : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0},
        "lvl_5"  : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0},
        "lvl_9"  : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0},
        "lvl_11" : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0},
        "lvl_12" : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0}
    },
    "byEncoding" : {
        "matrix" : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0},
        "text"   : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0}
    },
    "byPrompt"   : {
        "base" : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0},
        "cot"  : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0}
    },
    "byModel"    : {
        "llama3"   : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0},
        "qwen"     : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0},
        "deepseek" : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0}
    },
}

for idx, entry in enumerate(tqdm(all_data)):

    if entry['meta']['prompt'] == 'self-consistency':
        continue

    # Start with Resourcefulness.
    contains_keyword = False
    made_a_plan      = True
    details          = {
        'brown'  : False,
        'orange' : False,
        'pink'   : False,
        'blue'   : False,
        'yellow' : False,
        'cyan'   : False,
        'grey'   : False
    }
    if entry['solve_plan'] is not None:
        lower_plan = entry['solve_plan'].lower()

        for word in KEYWORDS:
            if word in lower_plan:
                contains_keyword = True
                details[word]    = True
    else:
        made_a_plan = False
    if contains_keyword:
        analysis_data['byAffs'][entry['meta']['exp']]['Resourcefulness'] += 1
        analysis_data['byLevel'][entry['meta']['lvl']]['Resourcefulness'] += 1
        analysis_data['byEncoding'][entry['meta']['encoding']]['Resourcefulness'] += 1
        analysis_data['byPrompt'][entry['meta']['prompt']]['Resourcefulness'] += 1
        analysis_data['byModel'][entry['meta']['model']]['Resourcefulness'] += 1

    # Second do Completion.
    seq = entry['solution']

    # - - - - - - - - - - - - - - - - - - - - - -
    env = EscGridEnv(
        grid_layout=LEVELS[entry['meta']['lvl']]
    )
    env = FullyObsWrapper(env)
    env = ImgObsWrapper(env)
    env.unwrapped.render_mode = 'rgb_array'
    env.reset()
    # - - - - - - - - - - - - - - - - - - - - - -

    objs   = get_objs(env)
    states = run_seq(env, seq)

    results = {
        'FalseWall'      : False,
        'Crate_moves'    : False,
        'Key_obtained'   : False,
        'Button_pressed' : False,
        'KeyDoor_opened' : False,
        'KeyCrate_made'  : False,
        'KeyCrate_used'  : False,
    }

    completion = False
    if states != []:
        curr_obs = states[0]
        for o in states[1:]:
            results['FalseWall']      = (check_FalseWall(curr_obs, o)      or results['FalseWall'])
            # results['Crate_moves']    = (check_Crate_moves(curr_obs, o)    or results['Crate_moves'])
            results['Key_obtained']   = (check_Key_obtained(curr_obs, o)   or results['Key_obtained'])
            results['Button_pressed'] = (check_Button_pressed(curr_obs, o) and results['Button_pressed'])
            results['KeyDoor_opened'] = (check_KeyDoor_opened(curr_obs, o) or results['KeyDoor_opened'])
            results['KeyCrate_made']  = (check_KeyCrate_made(curr_obs, o)  or results['KeyCrate_made'])
            results['KeyCrate_used']  = (check_KeyCrate_used(curr_obs, o)  or results['KeyCrate_used'])
            completion = check_completion(curr_obs, o) or completion

            curr_obs = o
    if completion:
        analysis_data['byAffs'][entry['meta']['exp']]['Completeness'] += 1
        analysis_data['byLevel'][entry['meta']['lvl']]['Completeness'] += 1
        analysis_data['byEncoding'][entry['meta']['encoding']]['Completeness'] += 1
        analysis_data['byPrompt'][entry['meta']['prompt']]['Completeness'] += 1
        analysis_data['byModel'][entry['meta']['model']]['Completeness'] += 1

    # Third do Interaction and Accuracy.
    performed_interaction = False
    was_accurate = False
    for k in results.keys():
        if requirement_are_met(k, objs):
            if results[k]:
                performed_interaction = True
            colors = get_colors(k)
            tp, fp, tn, fn = get_truth(colors, details, results[k])
            if tp > 0:
                was_accurate = True
    if performed_interaction:
        analysis_data['byAffs'][entry['meta']['exp']]['Interaction'] += 1
        analysis_data['byLevel'][entry['meta']['lvl']]['Interaction'] += 1
        analysis_data['byEncoding'][entry['meta']['encoding']]['Interaction'] += 1
        analysis_data['byPrompt'][entry['meta']['prompt']]['Interaction'] += 1
        analysis_data['byModel'][entry['meta']['model']]['Interaction'] += 1
    if was_accurate:
        analysis_data['byAffs'][entry['meta']['exp']]['Accuracy'] += 1
        analysis_data['byLevel'][entry['meta']['lvl']]['Accuracy'] += 1
        analysis_data['byEncoding'][entry['meta']['encoding']]['Accuracy'] += 1
        analysis_data['byPrompt'][entry['meta']['prompt']]['Accuracy'] += 1
        analysis_data['byModel'][entry['meta']['model']]['Accuracy'] += 1

    analysis_data['byAffs'][entry['meta']['exp']]['Total'] += 1
    analysis_data['byLevel'][entry['meta']['lvl']]['Total'] += 1
    analysis_data['byEncoding'][entry['meta']['encoding']]['Total'] += 1
    analysis_data['byPrompt'][entry['meta']['prompt']]['Total'] += 1
    analysis_data['byModel'][entry['meta']['model']]['Total'] += 1

100%|██████████| 432/432 [00:15<00:00, 27.37it/s] 


In [9]:
with open('../data/preprocessed.json', 'w') as fp:
    dump(analysis_data, fp, indent=4)

## Processing Reinforcement Learning Data

In [19]:
# Change the index to decide which RL data to process.
curr_rl_data = ['rl_data', 'rl_data_lvl5'][1]

with open(f'../data/{curr_rl_data}/succ.json') as fp:
    succ_and_steps = load(fp)
with open(f'../data/{curr_rl_data}/paths.json') as fp:
    paths = load(fp)

In [20]:
rl_perf_by_level = {
    lvl : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0}
    for lvl in paths.keys()
}

In [21]:
for i, (lvl, actSeqs) in enumerate(paths.items()):

    if curr_rl_data == 'rl_data':
        if lvl not in ['lvl_1', 'lvl_3', 'lvl_5', 'lvl_9', 'lvl_11', 'lvl_12']:
            continue

    for seq in tqdm(actSeqs):
        # - - - - - - - - - - - - - - - - - - - - - -
        env = EscGridEnv(
            grid_layout=LEVELS[lvl]
        )
        env = FullyObsWrapper(env)
        env = ImgObsWrapper(env)
        env.unwrapped.render_mode = 'rgb_array'
        env.reset()
        # - - - - - - - - - - - - - - - - - - - - - -

        objs   = get_objs(env)
        states = run_seq(env, seq)

        results = {
            'FalseWall'      : False,
            'Crate_moves'    : False,
            'Key_obtained'   : False,
            'Button_pressed' : False,
            'KeyDoor_opened' : False,
            'KeyCrate_made'  : False,
            'KeyCrate_used'  : False,
        }

        completion = False
        if states != []:
            curr_obs = states[0]
            for o in states[1:]:
                results['FalseWall']      = (check_FalseWall(curr_obs, o)      or results['FalseWall'])
                # results['Crate_moves']    = (check_Crate_moves(curr_obs, o)    or results['Crate_moves'])
                results['Key_obtained']   = (check_Key_obtained(curr_obs, o)   or results['Key_obtained'])
                results['Button_pressed'] = (check_Button_pressed(curr_obs, o) and results['Button_pressed'])
                results['KeyDoor_opened'] = (check_KeyDoor_opened(curr_obs, o) or results['KeyDoor_opened'])
                results['KeyCrate_made']  = (check_KeyCrate_made(curr_obs, o)  or results['KeyCrate_made'])
                results['KeyCrate_used']  = (check_KeyCrate_used(curr_obs, o)  or results['KeyCrate_used'])

                completion = check_completion(curr_obs, o) or completion

                curr_obs = o
        if completion:
            rl_perf_by_level[lvl]['Completeness'] += 1

        # Interaction.
        performed_interaction = False
        for k in results.keys():
            if requirement_are_met(k, objs):
                if results[k]:
                    performed_interaction = True
        if performed_interaction:
            rl_perf_by_level[lvl]['Interaction'] += 1

        rl_perf_by_level[lvl]['Total'] += 1

    if rl_perf_by_level[lvl]['Total'] == 0:
        rl_perf_by_level[lvl]['Total'] = 1

100%|██████████| 100/100 [00:01<00:00, 95.43it/s]
100%|██████████| 100/100 [00:02<00:00, 37.91it/s]
100%|██████████| 100/100 [00:02<00:00, 37.19it/s]
100%|██████████| 100/100 [00:02<00:00, 34.38it/s]
100%|██████████| 100/100 [00:02<00:00, 37.67it/s]


In [22]:
with open(f'../data/{curr_rl_data}/processed_RL.json', 'w') as fp:
    dump(rl_perf_by_level, fp, indent=4)

## Preprocess New Level 5 Experiments

In [11]:
with open('../data/lvl5_exps/lvl5_exp_results.json') as fp:
    new_llmData = load(fp)

In [12]:
llmData = {
    lvl : {"Resourcefulness" : 0, "Interaction" : 0, "Accuracy" : 0, "Completeness" : 0, 'Total' : 0}
    for lvl in ['lvl_5', 'lvl_5b', 'lvl_5c', 'lvl_5d', 'lvl_5e']
}

for i, entry in enumerate(new_llmData):

    curr_lvl = entry['lvl']

    # - - - - - - - - - - - - - - - - - - - - - -
    env = EscGridEnv(
        grid_layout=LEVELS[curr_lvl]
    )
    env = FullyObsWrapper(env)
    env = ImgObsWrapper(env)
    env.unwrapped.render_mode = 'rgb_array'
    env.reset()
    # - - - - - - - - - - - - - - - - - - - - - -

    objs   = get_objs(env)
    states = run_seq(env, entry['solution'])

    results = {
        'FalseWall'      : False,
        'Crate_moves'    : False,
        'Key_obtained'   : False,
        'Button_pressed' : False,
        'KeyDoor_opened' : False,
        'KeyCrate_made'  : False,
        'KeyCrate_used'  : False,
    }

    completion = False
    if states != []:
        curr_obs = states[0]
        for o in states[1:]:
            results['FalseWall']      = (check_FalseWall(curr_obs, o)      or results['FalseWall'])
            # results['Crate_moves']    = (check_Crate_moves(curr_obs, o)    or results['Crate_moves'])
            results['Key_obtained']   = (check_Key_obtained(curr_obs, o)   or results['Key_obtained'])
            results['Button_pressed'] = (check_Button_pressed(curr_obs, o) and results['Button_pressed'])
            results['KeyDoor_opened'] = (check_KeyDoor_opened(curr_obs, o) or results['KeyDoor_opened'])
            results['KeyCrate_made']  = (check_KeyCrate_made(curr_obs, o)  or results['KeyCrate_made'])
            results['KeyCrate_used']  = (check_KeyCrate_used(curr_obs, o)  or results['KeyCrate_used'])

            completion = check_completion(curr_obs, o) or completion

            curr_obs = o
    if completion:
        llmData[curr_lvl]['Completeness'] += 1

    # Interaction.
    performed_interaction = False
    for k in results.keys():
        if requirement_are_met(k, objs):
            if results[k]:
                performed_interaction = True
    if performed_interaction:
        llmData[curr_lvl]['Interaction'] += 1

    llmData[curr_lvl]['Total'] += 1

    if llmData[curr_lvl]['Total'] == 0:
        llmData[curr_lvl]['Total'] = 1

In [14]:
with open('../data/lvl5_exps/preprocessed_lvl5.json', 'w') as fp:
    dump(llmData, fp, indent=4)