In [1]:
import dspy
from utils.observation_encoder import ObservationEncoder
from utils.env import make_env
import utils
import gymnasium as gym
from minigrid.wrappers import FullyObsWrapper

  from .autonotebook import tqdm as notebook_tqdm


pygame 2.5.2 (SDL 2.28.3, Python 3.12.2)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
def configure_llm(llm_model_id='llama-3.3-70b-versatile', cache_llm_dspy=False, GROQ=True):
    if GROQ:
        lm = dspy.LM(llm_model_id, api_base='https://api.groq.com/openai/v1', api_key=utils.GROQ_API_KEY, cache=cache_llm_dspy)
    else:
        lm = dspy.LM(llm_model_id, api_base='http://localhost:11434', cache=cache_llm_dspy)
    dspy.configure(lm=lm)

class NextStepSignature(dspy.Signature):
    """
    Predicts either primitive action or subgoal based on mode.
    """
    task_description: str = dspy.InputField(desc="Task description")
    current_state: str = dspy.InputField(desc="Textual encoding of the env state")
    previous_actions: int = dspy.InputField(desc="The previous actions taken by the agent", default=None)

    # Output fields based on mode
    primitive_action: int = dspy.OutputField(desc="An integer between 0 and 6 where: 0 Turn left; 1 Turn right; 2 Move forward; 3 Pick up; 4 Drop; 5 Toggle; 6 Done", default=None)

class SubgoalPredictor(dspy.Module):
    """Predictor that outputs either primitive action or subgoal."""
    
    def __init__(self):
        super().__init__()
        self.predictor = dspy.ChainOfThought(NextStepSignature)

    def forward(self, task_description, current_state, previous_actions):
        pred = self.predictor(task_description=task_description, current_state=current_state, previous_actions=previous_actions)
        return pred

In [10]:

class NextStepSignature(dspy.Signature):
    """
    Predicts either primitive action or subgoal based on mode.
    """
    task_description: str = dspy.InputField(desc="Task description")
    current_state: str = dspy.InputField(desc="Textual encoding of the env state")
    previous_actions: int = dspy.InputField(desc="The previous actions taken by the agent", default=None)

    # Output fields based on mode
    primitive_action: int = dspy.OutputField(desc="An integer between 0 and 6 where: 0 Turn left; 1 Turn right; 2 Move forward; 3 Pick up; 4 Drop; 5 Toggle; 6 Done", default=None)

class SubgoalPredictor(dspy.Module):
    """Predictor that outputs either primitive action or subgoal."""
    
    def __init__(self):
        super().__init__()
        self.predictor = dspy.ChainOfThought(NextStepSignature)

    def forward(self, task_description, current_state, previous_actions):
        pred = self.predictor(task_description=task_description, current_state=current_state, previous_actions=previous_actions)
        return pred


In [4]:
class NextStepSignature(dspy.Signature):
    """
    Predicts either primitive action or subgoal based on mode.
    """
    task_description: str = dspy.InputField(desc="Expert's textual description")
    mission: str = dspy.InputField(desc="Overall mission description")
    current_state: str = dspy.InputField(desc="Textual encoding of the env state")

    # Output fields based on mode
    primitive_action: int = dspy.OutputField(desc="An integer between 0 and 6 where: 0 Turn left; 1 Turn right; 2 Move forward; 3 Pick up; 4 Drop; 5 Toggle; 6 Done", default=None)
    subgoal: str = dspy.OutputField(desc="One of: CloseSubgoal, OpenSubgoal, DropSubgoal, PickupSubgoal, GoNextToSubgoal, ExploreSubgoal, done, none", default=None)

class SubgoalPredictor(dspy.Module):
    """Predictor that outputs either primitive action or subgoal."""
    
    def __init__(self, mode="action"):
        super().__init__()
        self.mode = mode
        # Use the proper signature instead of ChainOfThought with string
        self.predictor = dspy.ChainOfThought(NextStepSignature)

    def forward(self, task_description, mission, current_state):
        pred = self.ChainOfThought(task_description=task_description, mission=mission, current_state=current_state)
        
        # Return only the requested output based on mode
        if self.mode == "action":
            return {"primitive_action": pred.primitive_action}
        else:  # subgoal mode
            return {"subgoal": pred.subgoal}

In [3]:
configure_llm()
llm_agent = SubgoalPredictor()

In [None]:
env = make_env(env_key="BabyAI-GoToObj-v0")
env = FullyObsWrapper(env)
encoder = ObservationEncoder()
obs, _ = env.reset()

all_encodings = encoder.encode_all(obs)
previous_actions = ''
step_count = 0

print(all_encodings['ascii'])

Grid:
W W W W W W W W W W W W W W W W W W W W W W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W W W W W W W W W W BD W W W W W W W W W W W
W . . . . . . W . . . . . . RD . . . . . . W
W . . . . . . ZD . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . > . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W W W W W W W W W W W W W PD W W W W W W W W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W W W W W W W W W W W W W W W W W W W W W W

Legend:
> - agent fac

In [9]:
action = 1

step_count += 1
previous_actions += f'Step-{step_count}: {str(action)}\n'

obs, _,_,_,_ = env.step(action)

all_encodings = encoder.encode_all(obs)

print(all_encodings['ascii'])

Grid:
W W W W W W W W W W W W W W W W W W W W W W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W W W W W W W W W W BD W W W W W W W W W W W
W . . . . . . W . . . . . . RD . . . . . . W
W . . . . . . ZD . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . v . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W W W W W W W W W W W W W PD W W W W W W W W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W . . . . . . W . . . . . . W . . . . . . W
W W W W W W W W W W W W W W W W W W W W W W

Legend:
v - agent fac

In [10]:
task_description = "You are an agent in a grid environment with access to actions like (0:Turn left, 1: Turn right, 2: move forward, 3: pickup, 4: drop, 5: toggle, 6:done). Following is the current state of the environment:"
current_state = all_encodings['ascii']


In [11]:
response = llm_agent(task_description=task_description, current_state=current_state, previous_actions=previous_actions)
print(response)

Prediction(
    reasoning='The agent is currently facing south and its mission is to open the door on its right. Given that the agent has already turned right in the previous step, it should now move forward to approach the door. Since the door is closed, the agent will need to toggle it to open it. However, the first step is to move closer to the door.',
    primitive_action=2
)


In [7]:
previous_actions

''

In [13]:
env = make_env(env_key="BabyAI-GoToObj-v0")
env = FullyObsWrapper(env)
encoder = ObservationEncoder()
obs, _ = env.reset()
all_encodings = encoder.encode_all(obs)
current_state = all_encodings['ascii']


previous_actions = []
step_count = 0
done = False
with open('debug_llm.txt', 'w') as f:
    while not done:
        f.write(f'Step-{step_count}\n')
        print(f'Step-{step_count}')
        f.write(current_state)
        response = llm_agent(task_description=task_description, current_state=current_state, previous_actions=previous_actions[-5:])
        f.write(f'Previous actions: {previous_actions[-5:]}\n')
        f.write(f'\nResponse: {response}\n')
        f.write('--------------------------------\n')

        action = response.primitive_action
        obs, _,term,trunc,_ = env.step(action)

        step_count += 1
        previous_actions.append(f'Step-{step_count}: {str(action)}')

        if term or trunc or step_count > 20:
            done = True

        all_encodings = encoder.encode_all(obs)
        current_state = all_encodings['ascii']


Step-0
Step-1
Step-2
Step-3
Step-4
Step-5
Step-6
Step-7
Step-8
Step-9
Step-10
Step-11
Step-12
Step-13
Step-14
Step-15
Step-16
Step-17
Step-18
Step-19
Step-20
