In [1]:
from minigrid.core.constants import OBJECT_TO_IDX, IDX_TO_OBJECT

import gymnasium as gym

import pygame

  from pkg_resources import resource_stream, resource_exists


In [None]:
class ObservationParser:
    def __init__(self):
        self.OBJECT_TO_IDX = {
            "unseen": 0,
            "empty": 1,
            "wall": 2,
            "floor": 3,
            "door": 4,
            "key": 5,
            "ball": 6,
            "box": 7,
            "goal": 8,
            "lava": 9,
            "agent": 10,
        }

        self.IDX_TO_OBJECT = {v: k for k, v in self.OBJECT_TO_IDX.items()}

        self.DIR_TO_STR = {0: "right", 1: "down", 2: "left", 3: "up"}

        self.is_open_door = False

    def parse_observation(self, obs):
        """Parse the observation to detailed text with spatial info."""
        if isinstance(obs, dict) and "image" in obs:
            image = obs["image"]
        else:
            image = obs

        h, w, _ = image.shape

        agent_pos = (w // 2, h - 1)  # Agent is at the bottom center of the view

        # get observation channels
        obj_grid = image[:, :, 0]  # Object types
        state_grid = image[:, :, 2]  # State types

        # get agent direction
        agent_dir_idx = obs["direction"]

        return self._build_spatial_description(
            obj_grid, state_grid, agent_pos, agent_dir_idx
        )

    def _build_spatial_description(
        self, obj_grid, state_grid, agent_pos, agent_dir_idx
    ):
        """Build a detailed spatial description of the observation."""

        object_info = []
        h, w = obj_grid.shape

        for i in range(h):
            for j in range(w):
                obj_idx = obj_grid[i, j]

                # skip unseen, empty, wall
                if obj_idx in [0, 1]:
                    continue
                # skip agent position
                if agent_pos == (i, j):
                    continue
                obj_type = self.IDX_TO_OBJECT.get(obj_idx)

                if obj_type in ["key", "door", "goal"]:
                    state = state_grid[i, j]

                    # Get relative position from agent's perspective
                    relative_pos = self._get_relative_position(
                        agent_pos, (i, j), agent_dir_idx
                    )

                    info = {
                        "type": obj_type,
                        "relative_position": relative_pos,
                        "distance": abs(i - agent_pos[0]) + abs(j - agent_pos[1]),
                    }

                    if obj_type == "door":
                        info["is_open"] = state == 0

                    object_info.append(info)

        carrying = "key" if obj_grid[agent_pos] == 5 else "nothing"
        # carrying = 'nothing'

        front = (agent_pos[0], agent_pos[1] - 1)

        is_wall_in_front = obj_grid[front] == self.OBJECT_TO_IDX["wall"]

        return self._format_description(object_info, carrying, is_wall_in_front)

    def _get_relative_position(self, agent_pos, obj_pos, agent_dir):
        """Get object position relative to agent's facing direction"""

        # print(f"Agent pos: {agent_pos}, Object pos: {obj_pos}, Agent dir: {agent_dir}")

        # Calculate raw offset
        dy = agent_pos[1] - obj_pos[1]  # alway postive or equal to 0
        dx = obj_pos[0] - agent_pos[0]  # postive is right, negative is left

        # Convert to description
        position_parts = []

        if dx == 0 and dy == 1:
            return "is right infront"

        if dy > 0:  # Object is to the right
            position_parts.append(f"{dy} steps forward")
        if dx > 0:
            position_parts.append(f"{dx} steps right")
        if dx < 0:  # Object is to the left
            position_parts.append(f"{-dx} steps left")

        if not position_parts:
            return "at agent position"

        return " and ".join(position_parts)

    def _format_description(self, objects_info, carrying, is_wall_in_front):
        """Format the complete description"""

        parts = []
        if is_wall_in_front:
            parts.append("Agent (wall in front)")
        else:
            parts.append("Agent (clear path ahead)")

        # Objects in view
        if objects_info:
            obj_descriptions = []

            # Sort by distance for consistent ordering
            objects_info.sort(key=lambda x: x["distance"])

            for obj in objects_info:
                if obj["type"] == "door":
                    state = "open" if obj["is_open"] else "closed"
                    if state == "open":
                        self.is_open_door = True
                    desc = f"{state} door {obj['relative_position']}"
                else:
                    desc = f"{obj['type']} {obj['relative_position']}"
                obj_descriptions.append(desc)

            parts.append(f"sees: {'; '.join(obj_descriptions)}")
        else:
            parts.append("sees: nothing")

        # Carrying status
        parts.append(f"carrying: {carrying}")

        return ", ".join(parts)

In [67]:
ACTIONS = {
    pygame.K_LEFT: 0,  # Turn left
    pygame.K_RIGHT: 1,  # Turn right
    pygame.K_UP: 2,  # Move forward
    pygame.K_SPACE: 3,  # Pick up
    pygame.K_d: 4,  # Drop
    pygame.K_RETURN: 5,  # Toggle (open door)
    pygame.K_q: 6,  # Done
}

parser = ObservationParser()


def main():
    env = gym.make("MiniGrid-DoorKey-8x8-v0", render_mode="human")
    obs, _ = env.reset(seed=1)

    running = True
    while running:
        env.render()

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
                env.close()
                break

            elif event.type == pygame.KEYDOWN:
                if event.key in ACTIONS:
                    action = ACTIONS[event.key]
                    if parser.is_open_door and action == 2:
                        print("Agent went through the door!")
                    obs, reward, terminated, truncated, info = env.step(action)
                    print(
                        f"Action: {action}, Reward: {reward}, Done: {terminated or truncated}"
                    )
                    image = obs["image"]
                    h, w = image.shape[:2]
                    # print(f"Agent facing: {obs['direction']}")
                    # print(f"Observation:\n {obs['image'][:, :, 0]}")  # Print object types
                    print(f"Description: {parser.parse_observation(obs)}")
                    if terminated or truncated:
                        print("Episode finished. Resetting...")
                        obs, _ = env.reset(seed=1)

    pygame.quit()


main()

Action: 2, Reward: 0, Done: False
Description: Agent (clear path ahead), sees: key 4 steps forward and 1 steps right; closed door 4 steps forward and 2 steps right, carrying: nothing
Action: 2, Reward: 0, Done: False
Description: Agent (clear path ahead), sees: key 3 steps forward and 1 steps right; closed door 3 steps forward and 2 steps right, carrying: nothing
Action: 2, Reward: 0, Done: False
Description: Agent (clear path ahead), sees: key 2 steps forward and 1 steps right; closed door 2 steps forward and 2 steps right, carrying: nothing
Action: 2, Reward: 0, Done: False
Description: Agent (clear path ahead), sees: key 1 steps forward and 1 steps right; closed door 1 steps forward and 2 steps right, carrying: nothing
Action: 1, Reward: 0, Done: False
Description: Agent (clear path ahead), sees: key 1 steps forward and 1 steps left; closed door 2 steps forward and 1 steps left, carrying: nothing
Action: 2, Reward: 0, Done: False
Description: Agent (wall in front), sees: key 1 steps

KeyboardInterrupt: 

In [None]:
class PromptManager:
    """Manages prompts for the LLM"""

    def __init__(self):
        self.system_prompt = """You are an expert agent navigating a grid world in the DoorKey-8x8 environment.

ENVIRONMENT RULES:
- You control an agent that can move in a grid world
- You must: 1) Pick up the key, 2) Unlock the door, 3) Reach the goal
- You can only move to empty spaces or through open doors
- You cannot move through walls or closed doors or keys
- Remember you have to be in front of the object to interact with it
- If you don't see an object in your view, you should turn left/right or move to see more of the grid world
- If you already went through a door once, you must not go through it again. Instead you should explore by turning left or right or going forward to find the goal

Your goal is to:
1. Find and pick up the key
2. Use the key to unlock the door
3. Reach the goal

ACTIONS:
- 0 = Turn left (rotate 90° counter-clockwise)
- 1 = Turn right (rotate 90° clockwise)
- 2 = Move forward one cell
- 3 = Pick up key (must be at key position)
- 5 = Toggle door (must be at door position with key)

Think step by step:
1. What is my current state?
2. What is my immediate objective?
3. What action gets me closer to that objective?

CRITICAL: You have to be in front of the objects to interact with it. You can not 1 step away from it to interact with it!

Respond in this format:
Reasoning: <your_reasoning>
Action: <action_number>."""

        self.few_shot_examples = """
Example 1:
State: Agent (clear path ahead), sees: key is right infrond; closed door 3 steps forward and 3 steps left, carrying: nothing
Reasoning: I see the key and it is directly in front of me. I should pick it up.
Action: 3

Example 2:
State:   Agent (wall in front), sees: closed door 1 steps forward and 3 steps left, carrying: key
Reasoning: I have the key I should go to the door. But the wall is blocking my path to the key. So I should turn left. 
Action: 0

Example 3:
State: Agent has just gone through a door once, Agent (clear path ahead), sees: open door 2 steps forward, carrying: key
Reasoning: I already opened the door, and went through it once, so I should not go through it again. Instead I should turn left or right to find the goal.
Action: 1

Example 4:
State: Agent (clear path ahead), sees: door 1 steps forward and 1 steps left, carrying: key
Reasoning: I see the door, but it is not in front of me, so I should prioritize going forward to reach it.
Action: 0
"""

    def create_query_prompt(self, state_description, is_went_through_door):
        """Create prompt for action query"""
        # prompt_parts = []
        # prompt_parts.append(f"\nCurrent state: {state_description}")
        print(f"Is agent went through door: {is_went_through_door}")
        if is_went_through_door == True:
            return f"\nCurrent state: Agent has just gone through a door once, {state_description}"
        return f"\nCurrent state: {state_description}"

In [None]:
class Agent:
    def __init__(self, llm_interface, env, prompt_manager, obs_parser):
        self.llm = llm_interface
        self.env = env
        self.prompt_manager = prompt_manager or PromptManager()
        self.obs_parser = obs_parser or ObservationParser()
        self.is_went_through_door = False

    def get_action(self, obs) -> int:
        """Get action from LLM given observation"""
        # Convert observation to text
        state_desc = self.obs_parser.parse_observation(obs)

        # Check cache
        # if cache in self.action_cache:
        #     return self.action_cache[cache]

        # Create prompt
        prompt = self.prompt_manager.create_query_prompt(
            state_desc, self.is_went_through_door
        )

        # Query LLM
        try:
            print(f"Querying LLM with state: {state_desc}")

            # print(f"Prompt: {prompt}")

            response = self.llm.query(
                prompt, system_prompt=self.prompt_manager.system_prompt
            )

            print(f"Response: {response}")

            # Parse action from response
            action = self._parse_action(response)

            return action

        except Exception as e:
            print(f"LLM query failed: {e}")

    def _parse_action(self, response):
        """Parse action number from LLM response"""
        # Extract first number from response
        import re

        # Action: number
        numbers = re.findall(r"Action:\s*(\d+)", response)
        print(f"Numbers: {numbers}")
        if numbers:
            action = int(numbers[0])
            # Validate action
            if 0 <= action < self.env.action_space.n:
                return action
        import random

        # Fallback to forward movement
        return random.choice([0, 1, 2])

    def generate_trajectory(self, max_steps: int = 50):
        """Generate a single trajectory"""

        obs = self.env.reset()[0]

        actions = []
        rewards = []
        next_observations = []
        dones = []
        infos = []

        total_reward = 0
        done = False
        self.obs_parser.is_open_door = False
        for step in range(max_steps):
            self.env.render()
            # Get action from LLM
            action = self.get_action(obs)
            if (
                self.obs_parser.is_open_door
                and action == 2
                and not self.is_went_through_door
            ):
                print("Agent went through the door!")

                self.is_went_through_door = True

            # Execute action
            next_obs, reward, terminated, truncated, info = self.env.step(action)
            done = terminated or truncated

            actions.append(action)
            rewards.append(reward)
            next_observations.append(
                next_obs["image"] if isinstance(next_obs, dict) else next_obs
            )
            dones.append(done)
            infos.append(info)

            total_reward += reward
            obs = next_obs

            if done:
                break

In [None]:
import gymnasium as gym
import openai
import re
from minigrid.wrappers import FlatObsWrapper
from openai import OpenAI
import os
import minigrid
import pprint
import json
import dotenv
import numpy as np
from dotenv import load_dotenv
from abc import ABC, abstractmethod


class LLMInterface(ABC):
    """Abstract base class for LLM interfaces"""

    @abstractmethod
    def query(self, prompt: str, system_prompt: str = None) -> str:
        pass


# Load environment variables from .env file
load_dotenv()


class OpenAIInterface(LLMInterface):
    """OpenAI API interface"""

    def __init__(self, model: str = "gpt-4"):
        self.client = OpenAI(
            api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL")
        )
        self.model = model

    def query(self, prompt: str, system_prompt: str = None) -> str:
        messages = []
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        messages.append({"role": "user", "content": prompt})

        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            temperature=0.1,  # Low temperature for consistency
            max_tokens=200,
        )

        return response.choices[0].message.content

In [None]:
env = gym.make("MiniGrid-DoorKey-8x8-v0", render_mode="human")

llm = OpenAIInterface(model="meta-llama/llama-3.3-70b-instruct")

agent = Agent(
    llm_interface=llm,
    env=env,
    prompt_manager=PromptManager(),
    obs_parser=ObservationParser(),
)

In [None]:
agent.generate_trajectory()

Is agent went through door: False
Querying LLM with state: Agent (clear path ahead), sees: closed door 2 steps forward and 3 steps left, carrying: nothing
Response: Reasoning: The agent's current state is that it has a clear path ahead, sees a closed door 2 steps forward and 3 steps left, and is not carrying anything. The immediate objective is to find the key, as the agent cannot unlock the door without it. Since the agent does not see the key, it needs to explore its surroundings to locate it. The agent should move forward to get closer to the area where the door is, which may help in finding the key or getting a better view of the surroundings.

Action: 2.
Numbers: ['2']
Is agent went through door: False
Querying LLM with state: Agent (wall in front), sees: closed door 1 steps forward and 3 steps left, carrying: nothing
Response: Reasoning: The agent is currently facing a wall and sees a closed door 1 step forward and 3 steps left. The immediate objective is to explore the environme

KeyboardInterrupt: 

: 