a 1*n corridor

We sincerely thank you for this critical question, and we apologize for the significant error in our previous response. We are very grateful that you caught this mistake.
To be perfectly clear: the selection of digit positions is performed by uniform random sampling without replacement. We will ensure this is corrected and made explicit in the revised manuscript. Specifically, we will update Line 8 of Algorithm 1 to state

```text
randomly sample L positions without replacement from {1,...,p} for digits.
```


# RBPF-DLG — Loop per time step
1. **Motion proposal**: mixed model (step length \(\ell\in\{0,1,2\}\); turn may fail or go wrong; rare global jump). Clip at room bounds.  
2. **Observation likelihood**: for each flag and cell \(c\), compute \(\mathcal L_j(c)\) using detection \(P_D\), clutter \(\lambda\), and a discrete error kernel \(\phi(d)\) on Chebyshev distance in the 5×5 view; unobserved but in-view cells use a miss term \((1-P_D)\).  
3. **Weight update**: \( w_t^{(k)} \propto w_{t-1}^{(k)} \prod_j \Big(\sum_c \pi_j^{(k)}(c)\,\mathcal L_j^{(k)}(c)\Big). \)  
4. **Selective resampling**: if \(N_{\text{eff}}=1/\sum_k (w_t^{(k)})^2\) is low, resample (systematic).  
5. **Belief update**: \( \pi_j(c)\leftarrow \text{normalize}\,[\pi_j(c)\,\mathcal L_j(c)] \); prune to Top-\(K\) or cumulative mass \(\rho\); keep tail mass for later re-allocation; optional soft constraints (different flags should not share one cell).  
6. **Output**: best particle pose; for each flag report \( \arg\max_c \pi_j(c) \).


In [125]:
from __future__ import annotations
import gymnasium as gym
from minigrid.minigrid_env import MiniGridEnv
from minigrid.core.world_object import Goal, Ball, Key, Wall
from minigrid.core.mission import MissionSpace
from minigrid.core.actions import Actions
from gymnasium.envs.registration import register
import numpy as np

class CorridorEnv(MiniGridEnv):
    """
    一个 1xN 的一维走廊环境。
    AI需要从左端走到右端的终点，途中可能会有随机放置的物体。
    """
    def __init__(self, length: int = 10, num_objects: int = 2, max_steps: int | None = None, **kwargs):
        self.length = length
        self.num_objects = num_objects
        width = length + 2
        height = 3
        if max_steps is None:
            max_steps = 5 * length
        # 修改了任务描述以反映新的游戏规则
        mission_space = MissionSpace(
            mission_func=lambda: "touch all goals"
        )
        super().__init__(
            mission_space=mission_space,
            width=width,
            height=height,
            max_steps=max_steps,
            see_through_walls=False,
            **kwargs
        )

    def _gen_grid(self, width, height):
        self.grid.wall_rect(0, 0, width, height)
        for x in range(0, width):
            self.grid.set(x, 0, Wall())
            self.grid.set(x, 2, Wall())
        self.put_obj(Wall(), 0, 1)
        self.put_obj(Wall(), width - 1, 1)
        for x in range(1, width - 1):
            self.grid.set(x, 1, None)
            
        # 在走廊内随机放置两个Goal
        # 注意：这里的 place_obj 逻辑被简化了以确保能放下两个目标
        goal_pos = self.np_random.choice(range(2, width - 2), self.num_objects, replace=False)
        for i in range(self.num_objects):
            self.put_obj(Goal(self._rand_color()), goal_pos[i], 1)

        self.place_agent(top=(1, 1), size=(1, 1), rand_dir=False)
        self.agent_dir = 0
    
    # --- 新增：重写 step 方法 ---
    def step(self, action):
        # 1. 首先，调用父类的step方法，让它处理所有基本动作
        obs, reward, terminated, truncated, info = super().step(action)

        # 2. 检查我们当前所在的格子是什么
        current_cell = self.grid.get(*self.agent_pos)

        # 3. 我们的新规则：如果走到了Goal上...
        if current_cell is not None and current_cell.type == 'goal':
            
            # 给予一个正奖励来鼓励AI
            reward = 0.5  # 你可以设置任何你想要的奖励值
            
            # 关键：把父类设置的 terminated 标志覆盖回 False，让游戏继续
            terminated = False
            
            # 我们可以让这个Goal“消失”，避免重复得分
            #self.grid.set(*self.agent_pos, None)
            print(f"Agent touched a goal at {self.agent_pos}! Game continues.")

        return obs, reward, terminated, truncated, info

In [133]:
from collections import defaultdict

IDX_TO_OBJECT = {
    0: 'unseen',
    1: 'empty',
    2: 'wall',
    3: 'floor',
    4: 'door',
    5: 'key',
    6: 'ball',
    7: 'box',
    8: 'flag',#goal
    9: 'lava',
    10: 'agent',
}

IDX_TO_COLOR = {
    0: 'red',
    1: 'green',
    2: 'blue',
    3: 'purple',
    4: 'yellow',
    5: 'grey',
}

IDX_TO_STATE = {
    0: 'open',
    1: 'closed',
    2: 'locked',
}

def describe_observation_in_english_grouped(obs,walls=False, oneDim=False) -> str:
    """
    (新版本) 接收Minigrid的观察字典，
    返回一个将相同物体组合在一起的、更自然的英文文字描述。
    """
    direct = obs['direction']
    direction_map = {
        0: 'right (East)',
        1: 'down (South)',
        2: 'left (West)',
        3: 'up (North)'
    }
    direction = direction_map.get(direct, 'unknown direction')
    direction_text = f"You're facing {direction}."

    image= obs['image']
    height, width, _ = image.shape
    
    # --- 新增逻辑：使用字典来给物体分组 ---
    # 键是一个元组 (颜色, 类型, 状态)，值是一个坐标列表
    grouped_objects = defaultdict(list)
    
    object_grid = image[:, :, 0]
    color_grid = image[:, :, 1]
    state_grid = image[:, :, 2]
    
    # 第一步：遍历视野，收集并给所有物体分组
    for y in range(height):
        for x in range(width):
            obj_id = object_grid[y, x]
            
            # 忽略 "unseen" (0) 和 "empty" (1) 的格子
            if obj_id not in [0, 1]:
                color_id = color_grid[y, x]
                state_id = state_grid[y, x]
                
                obj_name = IDX_TO_OBJECT.get(obj_id, f'object(ID:{obj_id})')
                color_name = IDX_TO_COLOR.get(color_id, f'color(ID:{color_id})')
                
                # 为物体创建一个唯一的键。对于门，状态也是其身份的一部分
                object_key = (color_name, obj_name)
                if obj_name == 'door':
                    state_name = IDX_TO_STATE.get(state_id, f'state(ID:{state_id})')
                    object_key = (color_name, obj_name, state_name)
                
                # 将当前坐标添加到对应的分组中
                grouped_objects[object_key].append((x, y))

    # 第二步：根据分组后的物体生成描述
    object_descriptions = generate_object_descriptions(grouped_objects,walls, oneDim)

    # 组装最终的完整描述
    
    if not object_descriptions:
        full_text = direction_text+"You see nothing of interest."
    else:
        full_text = direction_text+" You can see"+",".join(object_descriptions)
        
    return full_text

def generate_object_descriptions(grouped_objects, Walls=False, oneDim=False):
    object_descriptions = []
    for object_key, coords in grouped_objects.items():
        color_name = object_key[0]
        obj_name = object_key[1]
        
        # 如果不显示墙壁，则跳过
        if obj_name == 'wall' and not Walls:
            continue

        if oneDim:
            # 如果是单维度描述，只使用第一个坐标
            coord_str = []
            for coord in coords:
                # 只保留 y=3 的坐标
                if coord[1] != 3:
                    continue
                coord_str.append(str(6-coord[0]))#不知道为什么是反着来的，6-coord[0] 是正前方的为1
            if not coord_str:
                continue
            coords_str = ", ".join(coord_str)
        else:
            coord_str = []
            for coord in coords:    
                if not Walls and coord[1] != 3:
                    continue
                coord_str.append(str((6-coord[0],coord[1])))#不知道为什么是反着来的，6-coord[0] 是正前方的为1
            coords_str = ", ".join(coord_str)
        
        # 根据物体数量选择单数或复数形式
        if len(coord_str) == 1:
            description = f" a {color_name} {obj_name} at coordinate {coords_str}"
        else:
            plural_obj_name = obj_name + 's' 
            description = f" {color_name} {plural_obj_name} at coordinates: {coords_str}"
        
        if obj_name == 'door':
            state_name = object_key[2]
            description += f" It is {state_name}."
            
        object_descriptions.append(description)
        
    return object_descriptions

# --- 运行环境测试 ---
env = gym.make(
        'MiniGrid-MyCorridor-v0', 
        length=10, 
        num_objects=2,
        render_mode="rgb_array",  # <-- 关键修正！
    )
obs, info = env.reset(seed=42)

print(f"{env.unwrapped.agent_pos}")
print(f"Initial: {describe_observation_in_english_grouped(obs, walls=False, oneDim=False)}")

(np.int64(1), np.int64(1))
Initial: You're facing right (East). You can see a grey flag at coordinate (1, 3)


In [127]:
import imageio
def run_scripted_playthrough(env: MiniGridEnv, command_sequence: list[str], picfilename="1Dtrace.gif") -> list[dict]:
    """
    (修正后) 自动执行一个指令序列，返回观察列表，并保存GIF。
    """
    COMMAND_TO_ACTION = { "left": Actions.left, "right": Actions.right, "forward": Actions.forward, "pickup": Actions.pickup, "drop": Actions.drop, "toggle": Actions.toggle, "done": Actions.done }
    obs_str = []
    observations_history = []
    frames = []

    obs, info = env.reset(seed=42)
    obs_str.append(f"Initial Observation: {describe_observation_in_english_grouped(obs, walls=True, oneDim=True)}")
    observations_history.append(obs)
    
    # 采集初始帧
    frame = env.render()
    frames.append(frame)
    print("--- Scripted Playthrough Started ---")
    
    # 循环执行指令序列
    for i, cmd in enumerate(command_sequence):
        cmd = cmd.lower()
        print(f"\nExecuting Step {i+1}: '{cmd}'")
        
        actions_to_execute = []
        if cmd == 'turnaround':
            actions_to_execute = ['left', 'left']
        elif cmd in COMMAND_TO_ACTION:
            actions_to_execute = [cmd]
        else:
            print(f"Warning: Invalid command '{cmd}' found. Skipping.")
            continue
            
        final_obs_for_this_step = obs
        for sub_action_name in actions_to_execute:
            action_id = COMMAND_TO_ACTION[sub_action_name]
            obs, reward, terminated, truncated, info = env.step(action_id)
            final_obs_for_this_step = obs
            
            # 采集执行子动作后的帧
            frame = env.render()
            frames.append(frame)
            
            if terminated or truncated:
                break
        
        observations_history.append(final_obs_for_this_step)
        obs_str.append(
            f"Step{i+1}: {cmd}. {describe_observation_in_english_grouped(final_obs_for_this_step, walls=True, oneDim=True)}")
        # 为了简洁，这里的print可以简化或移除
        # agent_pos_str = str(env.unwrapped.agent_pos)
        # print(f"State after '{cmd}': Agent is at {agent_pos_str}")

        if terminated or truncated:
            print("Episode ended before script finished.")
            break
            
    # --- 核心修改点2：将保存和返回的逻辑移到循环外部 ---
    # 确保在所有动作都执行完毕后，再保存GIF
    print(f"\nSaving {len(frames)} frames to {picfilename}...")
    imageio.mimsave(picfilename, frames, fps=3)
    print("--- Visualization saved! ---")
    
    print("\n--- Scripted Playthrough Finished ---")
    obs_str= ".\n".join(obs_str)
    return observations_history, obs_str


# --- 主程序入口 ---
if __name__ == '__main__':
    # --- 核心修改点1：创建环境时，必须设置 render_mode="rgb_array" ---
    my_env = gym.make(
        'MiniGrid-MyCorridor-v0', 
        length=10, 
        num_objects=2,
        render_mode="rgb_array"  # <-- 关键修正！
    )
    
    my_script = [
        "forward",
        "forward",
        "forward",
        "forward",
        "forward",
        "forward",
        "forward",
        "forward",
        "forward",
        "forward",
        "forward",
        "turnaround",
        "forward",
        "forward",
        "forward",
        "forward",
        "forward",
        "forward",
        "forward",
        "forward",
        "forward",
        "forward",
    ]
    
    collected_observations,obsstring = run_scripted_playthrough(my_env, my_script, picfilename="my_corridor_trace.gif")
    
    print(f"\n--- Collected {len(collected_observations)} Observations ---")
    print("Observations:")
    print(obsstring)
    my_env.close()

--- Scripted Playthrough Started ---

Executing Step 1: 'forward'
Agent touched a goal at (np.int64(2), np.int64(1))! Game continues.

Executing Step 2: 'forward'

Executing Step 3: 'forward'

Executing Step 4: 'forward'

Executing Step 5: 'forward'

Executing Step 6: 'forward'

Executing Step 7: 'forward'
Agent touched a goal at (np.int64(8), np.int64(1))! Game continues.

Executing Step 8: 'forward'

Executing Step 9: 'forward'

Executing Step 10: 'forward'

Executing Step 11: 'forward'

Executing Step 12: 'turnaround'

Executing Step 13: 'forward'

Executing Step 14: 'forward'
Agent touched a goal at (np.int64(8), np.int64(1))! Game continues.

Executing Step 15: 'forward'

Executing Step 16: 'forward'

Executing Step 17: 'forward'

Executing Step 18: 'forward'

Executing Step 19: 'forward'

Executing Step 20: 'forward'
Agent touched a goal at (np.int64(2), np.int64(1))! Game continues.

Executing Step 21: 'forward'

Executing Step 22: 'forward'

Saving 24 frames to my_corridor_trac

In [128]:
Prompt =f"""
You are an expert spatial reasoning AI. Your task is to build a consistent 1D map of a corridor based on a sequence of limited, first-person observations.

### RULES OF THE WORLD ###
1.  The world is a static, continuous 1D track. Your absolute position on this track is a single integer.
2.  Your observation consists ONLY of the 6 grid cells directly in front of you.
3.  The number you see (e.g., "at coordinate 3") is a **relative position**, indexed 1 to 6.
    - Position 1 is the cell immediately in front of you.
    - Position 6 is the farthest cell you can see.
    - An object at relative position 7 or greater is invisible to you.
4.  Your movement is governed by your absolute position and direction:
    - 'forward': Changes your absolute position by +1 if facing East (positive direction), or -1 if facing West (negative direction).
    - 'turnaround': Reverses your direction (East becomes West, West becomes East).

### YOUR TASK ###
Your mission is to process the following log step-by-step and create a single, unified map of the world.
- Your **starting absolute position is 0**.
- You are **initially facing East** (the direction of increasing position numbers).

First, show your step-by-step reasoning by tracking your own absolute position and direction, and calculating the absolute coordinates of all objects you see.

Finally, provide a summary list of all unique objects and their deduced absolute coordinates.

### OBSERVATION LOG ###
{obsstring}

### YOUR RECONSTRUCTION ###
Please begin your step-by-step reasoning and then provide the final map summary in a new line in a box format like this:
```
a red flag at coordinate 3, a green flag at coordinate 4, a wall at coordinate 5, a wall at coordinate -1.

"""
print(f"Prompt:\n{Prompt}")

Prompt:

You are an expert spatial reasoning AI. Your task is to build a consistent 1D map of a corridor based on a sequence of limited, first-person observations.

### RULES OF THE WORLD ###
1.  The world is a static, continuous 1D track. Your absolute position on this track is a single integer.
2.  Your observation consists ONLY of the 6 grid cells directly in front of you.
3.  The number you see (e.g., "at coordinate 3") is a **relative position**, indexed 1 to 6.
    - Position 1 is the cell immediately in front of you.
    - Position 6 is the farthest cell you can see.
    - An object at relative position 7 or greater is invisible to you.
4.  Your movement is governed by your absolute position and direction:
    - 'forward': Changes your absolute position by +1 if facing East (positive direction), or -1 if facing West (negative direction).
    - 'turnaround': Reverses your direction (East becomes West, West becomes East).

### YOUR TASK ###
Your mission is to process the following

In [129]:
import gymnasium as gym
import minigrid
import numpy as np
from collections import defaultdict
import google.generativeai as genai
import os
import time
import re
from minigrid.core.actions import Actions

# --- 0. 配置 Gemini API ---
# 请确保您已经设置了 GOOGLE_API_KEY 环境变量
# 或者直接在这里配置：genai.configure(api_key="YOUR_API_KEY")
import os
import google.generativeai as genai
import textworld.gym
genai.configure(api_key= "AIzaSyApmTC5BaW21s9xrhznwYyuYxWU6U2GTno")

#try call gemini-2.5-flash
model = genai.GenerativeModel('gemini-2.5-pro')
#response = model.generate_content(
#    contents=Prompt,  # 使用上面定义的 Prompt",
#)
#print(response.text)

