# Learning Code as Policy for Metaworld


This notebook shows a basic example of using the optimizer to follow language feedback online during interaction.

In [1]:
# Run experiment

seed = 0
horizon = 30
env_name = "llf-metaworld-pick-place-v2"
stepsize = 1

In [2]:
import llfbench
import autogen.trace as trace
from autogen.trace.optimizers import FunctionOptimizer
import random
import numpy as np


class TracedEnv:

    def __init__(self, env_name, seed=0):
        random.seed(seed)
        np.random.seed(seed)
        self.env = llfbench.make(env_name)
        self.env.reset(seed=seed)
        self.env.action_space.seed(seed)

    @trace.trace_op()
    def reset(self):
        """
        Reset the environment and return the initial observation and info.
        """
        return self.env.reset()  # obs, info

    @trace.trace_op()
    def step(self, action):
        """
        Take action in the environment and return the next observation, reward, termination, truncation, and info.
        """
        return self.env.step(action)


def user_feedback(obs, action, next_obs):
    """
    Provide feedback from the user.
    """
    return f"Taking action {action.data} at observation {obs['observation'].data} resulted in next observation {next_obs['observation'].data}. Recieved feedback {next_obs['feedback'].data}."

In [3]:
# Run the expert policy to get an oracle performance


def expert_run(env, horizon):

    # Initialize the environment
    obs, info = env.reset()

    # Rollout
    sum_of_rewards = 0
    t = 0
    expert_action = None
    while t < horizon:
        action = env.env.action_space.sample() if expert_action is None else expert_action
        next_obs, reward, termination, truncation, info = env.step(action)
        expert_action = env.env.expert_action

        sum_of_rewards += reward.data  # not traced
        t += 1
        if termination or truncation or info.data["success"]:
            break

    print("Sum of rewards:", sum_of_rewards)
    print("Success:", info.data["success"])
    print("Termination:", termination.data)
    print("Truncation:", truncation.data)
    print("# of time steps:", t)

    return sum_of_rewards, info.data["success"], termination.data, truncation.data


env = TracedEnv(env_name, seed=seed)
sum_of_rewards, success, termination, truncation = expert_run(env, horizon=horizon)

  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")
  logger.warn(
  gym.logger.warn("Casting input x to numpy array.")
  logger.warn(f"{pre} is not within the observation space.")
  logger.warn(
  logger.warn(


Sum of rewards: 16.13824992438523
Success: True
Termination: False
Truncation: False
# of time steps: 7


In [4]:
def single_step(env, horizon, user_feedback, controller, optimizer, max_iter=None, delay=0):
    """Run optimizer step for every time step."""

    max_iter = max_iter or horizon * 2

    # Initialize the environment
    obs, info = env.reset()

    # Rollout
    sum_of_rewards = 0
    t = 0
    i = 0
    controller_input = obs["observation"]
    while t < horizon and i < max_iter:

        controller_input = controller_input.detach()  # Detach; otherwise, it would be back-propagated across time.
        error = None
        try:
            for _ in range(delay + 1):  # Delayed feedback
                action = controller(controller_input)
                next_obs, reward, termination, truncation, info = env.step(action)
                controller_input = next_obs["observation"]
                sum_of_rewards += reward.data  # not traced
                t += 1  # time step
            if termination or truncation or info.data["success"]:
                break
        except trace.TraceExecutionError as e:
            error = e

        if error is None:
            # only get feedback for the last observation
            feedback = user_feedback(obs, action, next_obs)  # not traced
            obs = next_obs
            target = next_obs["observation"]
        else:  # Self debugging
            feedback = str(error)
            target = error.exception_node

        # Optimization step
        optimizer.zero_feedback()
        optimizer.backward(target, feedback)  # obs = next obs
        optimizer.step(verbose="output")
        i += 1  # optimization iteration

        print(f"Time Step: {t} of {horizon}")
        print(f"Iteration: {i}")
        print(f"Feedback: {feedback}")
        print(f"Variable:\n {controller.parameter.data}")

    print("Termination:", termination.data)
    print("Truncation:", truncation.data)
    print("Sum of rewards:", sum_of_rewards)
    print("Success:", info.data["success"])
    print("# of optimization iterations:", i)
    print("# of time steps:", t)

    return sum_of_rewards, info.data["success"], termination.data, truncation.data, optimizer


@trace.trace_op()
def parse_obs(obs):
    """Parse the observation string into a dictionary of lists of floas."""
    import json

    obs = json.loads(obs)
    for key in obs:
        obs[key] = obs[key].replace("[", "").replace("]", "").split()
        obs[key] = [float(i) for i in obs[key]]
    return obs

In [5]:
# Create an optimizer (Delay=1)
env = TracedEnv(env_name, seed=seed)
action_space = env.env.action_space


@trace.trace_op(trainable=True)
def controller(obs: str):
    """
    The controller takes in an observation and returns an action.
    """
    return action_space.sample()


optimizer = trace.optimizers.FunctionOptimizer(controller.parameters(), stepsize=stepsize)
max_iter = None
delay = 1
sum_of_rewards, success, termination, truncation, optimizer = single_step(
    env,
    horizon=horizon,
    controller=controller,
    user_feedback=user_feedback,
    optimizer=optimizer,
    max_iter=max_iter,
    delay=delay,
)

LLM response:
 {
"reasoning": "Given the feedback and its high level of trustworthiness (stepsize = 1), it is clear that the action determined by the controller coded in __code0 is incorrect for achieving the desired goal state, as indicated by the provided feedback. This feedback suggests that the action [-0.07 0.68 0.12 0.] would have been more promising. The current controller function `def controller(obs: str): return action_space.sample()` appears to randomly sample an action from the action space, which in this context has resulted in a non-efficient action. To align with the feedback, a more deterministic or goal-directed action selection method is required, potentially one that could move towards the suggested promising action of [-0.07 0.68 0.12 0.]. Although the exact implementation would heavily depend on the available methods to interact with or analyze the `action_space`, a basic deterministic approach could be to return a fixed action that aligns more closely with the pro

In [6]:
# Create an optimizer (Delay=2)
env = TracedEnv(env_name, seed=seed)
action_space = env.env.action_space


@trace.trace_op(trainable=True)
def controller(obs: str):
    """
    The controller takes in an observation and returns an action.
    """
    return action_space.sample()


optimizer = trace.optimizers.FunctionOptimizer(controller.parameters(), stepsize=stepsize)
max_iter = None
delay = 2
sum_of_rewards, success, termination, truncation, optimizer = single_step(
    env,
    horizon=horizon,
    controller=controller,
    user_feedback=user_feedback,
    optimizer=optimizer,
    max_iter=max_iter,
    delay=delay,
)

LLM response:
 {
"reasoning": "The feedback suggests that the action resulting from the controller logic is leading the arm movement in the wrong direction, thus making the task of reaching the goal more distant than before. The current implementation of the controller does not take into account the desired next action as per feedback, which specifically recommends moving to [-0.07  0.68  0.12  0.  ]. Therefore, the controller logic needs to be hardcoded to return this recommended action sequence instead of using action_space.sample(), to ensure the arm moves in the correct direction towards the goal.",
"suggestion": {
    "__code1": "def controller(obs: str):\n    return [-0.07, 0.68, 0.12, 0.0]"
}
}
Time Step: 3 of 30
Iteration: 1
Feedback: Taking action [ 0.72635784  0.08292244 -0.40057622 -0.15462556] at observation {"hand_pos": "[0.006 0.6   0.194]", "gripper_distance_apart": "1.000", "puck_pos": "[-0.064  0.679  0.02 ]", "puck_rot": "[0. 0. 0. 1.]", "goal_pos": "[0.039 0.878 0.24

In [7]:
# Create an optimizer (Delay=3)
env = TracedEnv(env_name, seed=seed)
action_space = env.env.action_space


@trace.trace_op(trainable=True)
def controller(obs: str):
    """
    The controller takes in an observation and returns an action.
    """
    return action_space.sample()


optimizer = trace.optimizers.FunctionOptimizer(controller.parameters(), stepsize=stepsize)
max_iter = None
delay = 3
sum_of_rewards, success, termination, truncation, optimizer = single_step(
    env,
    horizon=horizon,
    controller=controller,
    user_feedback=user_feedback,
    optimizer=optimizer,
    max_iter=max_iter,
    delay=delay,
)

LLM response:
 {"reasoning": "The provided feedback suggests that the action taken by the controller (__code2) led to a positive outcome with a recommendation to continue on the same path but to consider a specific move [-0.07 0.68 0.12 0.]. However, the __code2 as it stands only samples randomly from the action space without considering this feedback or targeting a specific action. To align the controller's action with the feedback, the __code2 function needs to be modified to target or approximate the recommended action. Given the stepsize of 1, this feedback is fully trusted and should be incorporated directly into the action selection process by the controller.",
 "suggestion": {"__code2": "def controller(obs: str):\n    \"\"\"\n    The controller takes in an observation and returns an action.\n    Modified to target the recommended action based on feedback.\n    \"\"\"\n    return [-0.07, 0.68, 0.12, 0.0]"}}

Time Step: 4 of 30
Iteration: 1
Feedback: Taking action [-0.72980699  0.

In [8]:
# Create an optimizer (Delay=4)
env = TracedEnv(env_name, seed=seed)
action_space = env.env.action_space


@trace.trace_op(trainable=True)
def controller(obs: str):
    """
    The controller takes in an observation and returns an action.
    """
    return action_space.sample()


optimizer = trace.optimizers.FunctionOptimizer(controller.parameters(), stepsize=stepsize)
max_iter = None
delay = 4
sum_of_rewards, success, termination, truncation, optimizer = single_step(
    env,
    horizon=horizon,
    controller=controller,
    user_feedback=user_feedback,
    optimizer=optimizer,
    max_iter=max_iter,
    delay=delay,
)

LLM response:
 {
"reasoning": "The feedback indicates that the chosen action, based on the current observation led to poor performance and a minimal reward. Given the significant impact described by the feedback and a stepsize of 1, implying full trust in the feedback, it's clear the action chosen by the 'controller' function in __code3 variable needs to be directed towards the suggested new pose [-0.25 0.61 0.12 0.]. This implies that the controller logic should be redefined to consider the feedback direction to improve the outcomes. The suggested action involves adjusting the geometric positioning, which should influence the 'controller' function to select actions closer to the recommendation.",
 "suggestion": {
   "__code3": "def controller(obs: str):\n    \"\"\"\n    The controller takes in an observation and returns an action aimed towards the suggested pose [-0.25, 0.61, 0.12, 0.].\n    \"\"\"\n    return [-0.25, 0.61, 0.12, 0.]"
}
}
Time Step: 5 of 30
Iteration: 1
Feedback: Taki



LLM response:
 {
"reasoning": "The feedback indicates that the suggested pose of [-0.25, 0.61, 0.12, 0.0] has led to an undesired direction, making it harder to achieve the task's goal. The feedback further suggests aiming for a new pose, [-0.23, 0.59, 0.12, 0.0], instead. Given the step size of 1, this feedback is completely trusted, and hence, the action suggested by the controller function needs to be adjusted accordingly. Therefore, updating the controller function to return the new suggested pose will align the outcomes closer to the feedback's guidance.",
 "suggestion": {
    "__code3": "def controller(obs: str):\n    \"\"\"\n    The controller takes in an observation and returns an action aimed towards the suggested pose [-0.23, 0.59, 0.12, 0.].\n    \"\"\"\n    return [-0.23, 0.59, 0.12, 0.]"
}
}
Time Step: 10 of 30
Iteration: 2
Feedback: Taking action [-0.25, 0.61, 0.12, 0.0] at observation {"hand_pos": "[-0.109  0.519  0.089]", "gripper_distance_apart": "1.000", "puck_pos": "

In [9]:
# Create an optimizer (Delay=5)
env = TracedEnv(env_name, seed=seed)
action_space = env.env.action_space


@trace.trace_op(trainable=True)
def controller(obs: str):
    """
    The controller takes in an observation and returns an action.
    """
    return action_space.sample()


optimizer = trace.optimizers.FunctionOptimizer(controller.parameters(), stepsize=stepsize)
max_iter = None
delay = 5
sum_of_rewards, success, termination, truncation, optimizer = single_step(
    env,
    horizon=horizon,
    controller=controller,
    user_feedback=user_feedback,
    optimizer=optimizer,
    max_iter=max_iter,
    delay=delay,
)

  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")
  logger.warn(
  gym.logger.warn("Casting input x to numpy array.")
  logger.warn(f"{pre} is not within the observation space.")
  logger.warn(


LLM response:
 {"reasoning": "The feedback indicates that the current action taken based on the observation led to a positive result, signifying that the arm is moving in the correct direction as per the task's requirements. However, the next suggested action to take involves reaching a specific pose [-0.02 0.72 0.13 0. ]. Since the feedback is entirely trusted (stepsize of 1), and there's a specific pose suggested for the next step, the necessary change must optimize the controller function to achieve this pose. Given the limitations of the problem statement, and considering we cannot change the __code4 directly as per instructions, but instead work with given variables, it looks like we are supposed to understand the feedback in terms of whether our adjustments are correct. The __code4 function 'controller' should ideally be strategized to follow the feedback's advice more closely. However, since we are asked not to change the code but only the value of variables, and given our limit

In [10]:
# Create an optimizer (Delay=6)
env = TracedEnv(env_name, seed=seed)
action_space = env.env.action_space


@trace.trace_op(trainable=True)
def controller(obs: str):
    """
    The controller takes in an observation and returns an action.
    """
    return action_space.sample()


optimizer = trace.optimizers.FunctionOptimizer(controller.parameters(), stepsize=stepsize)
max_iter = None
delay = 6
sum_of_rewards, success, termination, truncation, optimizer = single_step(
    env,
    horizon=horizon,
    controller=controller,
    user_feedback=user_feedback,
    optimizer=optimizer,
    max_iter=max_iter,
    delay=delay,
)

LLM response:
 {
  "reasoning": "The feedback suggests that the action taken by the controller did not yield optimal results but did get the system closer to the task's completion. The constant advice throughout feedback is to adjust the pose towards [-0.07  0.68  0.12  0.  ], indicating that the controller's output action should ideally be mapped closer to these target values to improve task performance. Given that __code5 defines the controller's behavior, the controller's logic needs alteration to generate actions closer to the recommended pose. Since the feedback's stepsize is 1, we fully trust and apply this feedback, suggesting that the controller's method of determining actions likely needs a more targeted approach towards achieving the goal pose.",
  "suggestion": {
    "__code5": "def controller(obs: str):\n    \"\"\"\n    The controller takes in an observation and returns an action.\n    Adjust the controller to aim for specific pose values.\n    \"\"\"\n    return [-0.07, 0.

In [12]:
# Create an optimizer (Delay=7)
env = TracedEnv(env_name, seed=seed)
action_space = env.env.action_space


@trace.trace_op(trainable=True)
def controller(obs: str):
    """
    The controller takes in an observation and returns an action.
    """
    return action_space.sample()


optimizer = trace.optimizers.FunctionOptimizer(controller.parameters(), stepsize=stepsize)
max_iter = None
delay = 7
sum_of_rewards, success, termination, truncation, optimizer = single_step(
    env,
    horizon=horizon,
    controller=controller,
    user_feedback=user_feedback,
    optimizer=optimizer,
    max_iter=max_iter,
    delay=delay,
)

LLM response:
 {
"reasoning": "The feedback describes that the action taken based on the controller's decision led the arm away from completing its task, moving in the wrong direction compared to the desired pose [-0.07  0.68  0.12  0.  ]. Given the trust level (Stepsize) of 1, we completely trust this feedback and should adjust the controller code to reflect the suggested pose. However, since the controller function's implementation (`controller(obs: str)`) isn't directly shown or modifiable in inputs, it's interpreted that the controller's decision-making logic should lead towards this pose. Assuming action space sampling (`action_space.sample()`) is pseudo-random and leads to unsatisfactory outcomes, switching to a deterministic or target-driven strategy would be ideal, preferably aiming directly for the suggested coordinates. As such, adjusting the controller function to consider the desired pose may influence the actions generated towards achieving the goal more effectively.",
 "s