## Installations

We first must install all required packages necessary for `agent` to run. Packages here are managed by `poetry`. For ease of use, we convert the poetry lock to a `requirements.txt` file. We then `pip install` the packages within this file. Any errors encountered during installation of any particular package can usually be resolved by manually pip installng that package.

In [None]:
import os
import sys

# Create necessary directories for task files
directories = {
    "config": "./../configs/task",
    "simulator": "./../src/agent/environments",
    "task": "./../src/agent/tasks",
    "prompt_templates": "./../src/agent/prompts/templates/simplebot",
    "llm_config": "./../configs/llm",
}

for directory in directories.values():
    if not os.path.exists(directory):
        os.makedirs(directory)

# Building a Simple Bot with our Agent: A Step-by-Step Tutorial
This tutorial will guide you through the process of setting up and deploying a simple bot using the Agent environment. We'll cover everything from configuring your project to implementing a custom task and testing it.

## Step 1: Specify Task Config
Define your task configuration. This involves specifying the task's parameters and settings.

In [None]:
config_data = """
# @package _global_
agent:
  pre_action_flow: ???
  prompt_builder:
    template_paths:
      - simplebot
      - default
task:
  _target_: src.agent.tasks.simplebot.SimpleBot
  name: simplebot_test_env
  description: 
  subtask: null
  version: v0.1
"""

d = directories["config"]
with open(f"{d}/simple_bot_task.yaml", "w") as file:
    # Write some text into the file
    file.write(config_data)

## Step 2: Implement a Custom Pybullet Environment
This code defines a Python module that simulates a Panda robot arm in a PyBullet physics simulation environment. 

- MotionPlanner class: This class is responsible for generating smooth motion plans for the Panda robot arm to reach desired end-effector positions. It uses an optimization-based approach to find joint configurations (q-values) that minimize velocity and acceleration while reaching specified end-effector positions. The optimization problem is solved using the CasADi optimization framework with the IPOPT solver.

- Robot class: This class represents the Panda robot in the simulation environment. It provides methods for controlling the robot, such as resetting joint positions, opening and closing the gripper, getting joint positions, setting joint targets, and moving the end effector to a desired goal position.

- Environment class: This class sets up the simulation environment using PyBullet. It loads the Panda robot, a target box, and a ground plane. It provides methods for sampling box positions, getting box positions, setting the target box position, getting the gripper state, getting the end-effector position, calculating distances between the box and target, and resetting the environment.

In [None]:
%%writefile ./../src/agent/environments/panda_simulator.py
#%env LD_LIBRARY_PATH=

import os
import sys
import time
import warnings
import optas
import pybullet as p
import pybullet_data
import numpy as np
from pybullet_utils.bullet_client import BulletClient

pb_path = pybullet_data.getDataPath()
panda_urdf_filename = os.path.join(pb_path, "franka_panda/panda.urdf")

end_effector_link = "panda_grasptarget"

class MotionPlanner:

    def __init__(self):
        self.robot = optas.RobotModel(panda_urdf_filename, time_derivs=[0, 1, 2])
        self.name = self.robot.get_name()

        self.duration = 5.0
        T = 50
        dt = self.duration / float(T - 1)
        builder = optas.OptimizationBuilder(T, robots=self.robot, derivs_align=False)

        qc = builder.add_parameter("qc", self.robot.ndof)
        pg = builder.add_parameter("pg", 3)
        use_mid_pose = builder.add_parameter("use_mid_pose")
        xg = optas.DM([1.0, 0.0, 0.0])
        zg = optas.DM([0.0, 0.0, -1.0])
        qn = optas.deg2rad([0.0, 20.0, 0.0, -90.0, 0.0, 120.0, 45.0, 0.0, 0.0])

        builder.enforce_model_limits(self.name)

        builder.initial_configuration(self.name, qc)
        builder.initial_configuration(self.name, time_deriv=1)
        builder.initial_configuration(self.name, time_deriv=2)
        builder.fix_configuration(self.name, time_deriv=1)
        builder.fix_configuration(self.name, time_deriv=2)
        builder.integrate_model_states(self.name, time_deriv=1, dt=dt)
        builder.integrate_model_states(self.name, time_deriv=2, dt=dt)

        dQ = builder.get_model_states(self.name, time_deriv=1)
        ddQ = builder.get_model_states(self.name, time_deriv=2)
        builder.add_cost_term("minimize_velocity", 20 * optas.sumsqr(dQ))
        builder.add_cost_term("minimize_acceleration", 20 * optas.sumsqr(ddQ))

        t_mid = int(0.5 * T)
        qmid = builder.get_model_state(self.name, t_mid, time_deriv=0)
        builder.add_equality_constraint("mid", use_mid_pose * qmid, use_mid_pose * qn)

        for t in range(T):
            q = builder.get_model_state(self.name, t)

            Tf = self.robot.get_global_link_transform(end_effector_link, q)

            dp = Tf[:3, 3] - pg
            W = optas.diag([40.0, 40.0, 30.0 * float(t) / float(T - 1)])

            builder.add_cost_term(f"position_goal_{t}", dp.T @ W @ dp)

        builder.add_cost_term(
            f"orientation_goal_x_final", 500 * optas.sumsqr(Tf[:3, 0] - xg)
        )
        builder.add_cost_term(
            f"orientation_goal_z_final", 500 * optas.sumsqr(Tf[:3, 2] - zg)
        )

        builder.add_equality_constraint("final_position", Tf[:3, 3], pg)

        self.solver = optas.CasADiSolver(builder.build()).setup(
            "ipopt", {"ipopt.print_level": 0, "print_time": 0, "ipopt.sb": "yes"}
        )

    def plan(self, qc, pg, use_mid_pose=True):
        qc = np.concatenate((qc, [0.0, 0.0]))
        self.solver.reset_parameters(
            {"pg": pg, "qc": qc, "use_mid_pose": float(use_mid_pose)}
        )
        solution = self.solver.solve()
        if not self.solver.did_solve():
            return False, None
        qfun = self.solver.interpolate(solution[f"{self.name}/q"], self.duration)
        return True, qfun


class Robot:

    def __init__(self, client):
        self.client = client
        self.id = self.client.loadURDF(panda_urdf_filename, useFixedBase=1)
        self.actuated_joint_indices = [0, 1, 2, 3, 4, 5, 6]
        self.gripper_joint_indices = [9, 10]
        self.ee_joint_index = 11
        self.close_gripper()

    def reset_joint_position(self, position):
        for j, q in zip(self.actuated_joint_indices, position):
            self.client.resetJointState(self.id, j, q)

    def open_gripper(self):
        self.client.setJointMotorControlArray(
            self.id,
            self.gripper_joint_indices,
            p.POSITION_CONTROL,
            [0.1, 0.1],
        )

    def close_gripper(self):
        self.client.setJointMotorControlArray(
            self.id,
            self.gripper_joint_indices,
            p.POSITION_CONTROL,
            [0.0, 0.0],
        )

    def get_joint_position(self):
        joint_states = self.client.getJointStates(self.id, self.actuated_joint_indices)
        return [js[0] for js in joint_states]

    def set_joint_target(self, position):
        self.client.setJointMotorControlArray(
            self.id,
            self.actuated_joint_indices,
            p.POSITION_CONTROL,
            position,
        )

    def move_to_end_effector_goal(self, position, use_mid_pose=True):
        # Temporarily redirect stderr to suppress specific PyBullet warnings
        original_stderr = sys.stderr  # Save the original stderr
        sys.stderr = open(os.devnull, 'w')  # Redirect stderr to null device

        try:
            planner = MotionPlanner()
            qc = self.get_joint_position()
            success, plan = planner.plan(qc, position, use_mid_pose=use_mid_pose)

            # Restore stderr to its original state
            sys.stderr.close()
            sys.stderr = original_stderr

            if not success:
                print("Failed to plan joint motion.")
                return

            t0 = time.time()
            while True:
                t = time.time() - t0
                t_clipped = np.clip(t, 0, planner.duration)
                q = plan(t_clipped)
                self.set_joint_target(q[:-2])
                if t > planner.duration:
                    break
        except Exception as e:
            # Ensure stderr is restored even if an exception occurs
            sys.stderr.close()
            sys.stderr = original_stderr            

            raise e  # Re-raise the exception to handle it elsewhere or let it propagate

class Environment:

    def __init__(self):
        self.client = BulletClient(
            p.GUI,
            options="--background_color_red=1 --background_color_blue=1 --background_color_green=1",
        )
        self.client.configureDebugVisualizer(p.COV_ENABLE_GUI, 0)
        self.client.resetDebugVisualizerCamera(
            cameraDistance=1.5,
            cameraYaw=45.0,
            cameraPitch=-30.0,
            cameraTargetPosition=(0.0, 0.0, 0.0),
        )
        self.client.setGravity(0.0, 0.0, -9.81)
        self.box = self.client.loadURDF(
            os.path.join(pb_path, "cube.urdf"),
            self.sample_box_position(),
            globalScaling=0.05,
        )
        self.set_target_box_position(self.sample_box_position())
        self.client.loadURDF(os.path.join(pb_path, "plane.urdf"), useFixedBase=1)
        self.robot = Robot(self.client)
        self.client.setRealTimeSimulation(1)

    def sample_box_position(self):
        low = [0.4, -0.6, 0.1]
        high = [0.7, 0.6, 0.1]
        pos = np.random.uniform(low, high)
        return pos

    def get_box_position(self):
        p, _ = self.client.getBasePositionAndOrientation(self.box)
        return np.array(p)
    
    def set_target_box_position(self, position):
        self.target_box_position = position
    
    def get_gripper_state(self):
        gripper_pos = p.getJointState(self.robot.id, self.robot.gripper_joint_indices[0])[0]
        if gripper_pos < 0.5:
            return "closed"
        else:
            return "open"
        
    def get_ee_position(self):
        return np.array(p.getLinkState(self.robot.id, self.robot.ee_joint_index)[0])
    
    def get_box_distance_to_target(self):
        return np.linalg.norm(self.get_box_position() - self.target_box_position)
    
    def get_ee_distance_to_box(self):
        return np.linalg.norm(self.get_box_position() - self.get_ee_position())
        
    def reset(self):
        qn = np.deg2rad([0, 20, 0, -90, 0, 120, 45])
        self.robot.set_joint_target(qn)
        self.robot.reset_joint_position(qn)
        self.client.resetBasePositionAndOrientation(
            self.box, self.sample_box_position(), (0.0, 0.0, 0.0, 1.0)
        )
        self.set_target_box_position(self.sample_box_position())

    def close(self):
        if self.client.isConnected():
            self.client.disconnect()
            



## Step 3: Implement a Custom Agent Task
This code defines a SimpleBot task, which is a subclass of the Task class from agent-agent. The task involves controlling the simulated Panda robot arm to perform various actions based on feedback from LLM to manipulating a box in the previous defined environment.

In [None]:
%%writefile ./../src/agent/tasks/simplebot.py
from typing import Any, Dict
import numpy as np

from agent.memory import MemKey
from agent.tasks import ActionSpace
from agent.tasks import Task
from agent.utils import break_word_split
from agent.environments.panda_simulator import Environment

class SimpleBot(Task):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.observation_space = NotImplemented
        self.action_space = ActionSpace.DISCRETE
        self.args = kwargs
        self.episode_counter = 0

        self.env = Environment()

        self.possible_actions = [
            'open gripper',
            'close gripper',
            'move to pre grasp',
            'move to box position',
            'move to target box position',
        ]

    def reset(self, next_subtask: str | None = None):
        self.done = False
        self.env.reset()
        return self._return_observation(self.possible_actions)

    def _return_observation(self, available_actions: list[str]) -> Dict[str, Any]:
        """Return the observation dictionary."""

        self.box_distance_to_target = np.round(self.env.get_box_distance_to_target(), 3)
        self.ee_distance_to_box = np.round(self.env.get_ee_distance_to_box(), 3)
        self.gripper_state = self.env.get_gripper_state()
        obs = f"""
        - Box distance to target: {self.box_distance_to_target}, 
        - End effector distance to box: {self.ee_distance_to_box}
        - Gripper state: {self.gripper_state}
        """

        return {
            MemKey.OBSERVATION: obs,
            MemKey.AVAILABLE_ACTIONS: available_actions,
        }
    
    def answer_parser(self, raw_response: str):
        return break_word_split("Action", raw_response)

    def step(self, action):
        available_actions = [a for a in self.possible_actions if a != action]
        self.act(action)

        # calculate reward 
        reward = 1
        done = True if self.box_distance_to_target < 0.01 else False
        if done:
            self.open_gripper()

        return self._return_observation(available_actions), reward, done

    def open_gripper(self):
        self.env.robot.open_gripper()

    def close_gripper(self):
        self.env.robot.close_gripper()

    def move_to_pre_grasp(self):
        pre_grasp = self.env.get_box_position() + np.array([0., 0., 0.1])
        self.env.robot.move_to_end_effector_goal(pre_grasp, use_mid_pose=True)
    
    def move_to_box_position(self):
        box_position = self.env.get_box_position()
        self.env.robot.move_to_end_effector_goal(box_position, use_mid_pose=False)

    def move_to_target_box_position(self):
        target_box_position = self.env.target_box_position
        self.env.robot.move_to_end_effector_goal(target_box_position, use_mid_pose=True)

    def act(self, action):
        action = action.lower()
        if action == 'open gripper':
            self.open_gripper()
        elif action == 'close gripper':
            self.close_gripper()
        elif action == 'move to pre grasp':
            self.move_to_pre_grasp()
        elif action == 'move to box position':
            self.move_to_box_position()
        elif action == 'move to target box position':
            self.move_to_target_box_position()
        else:
            return
    

## Step 5: Define Prompt Template

We now create a new directory to save custom prompt templates for our example task. Within /src/agent/prompts/example_task we create .jinja files to represent prompt templates. Below we define three prompts: 
*    system_prompt.jinja: a prompt to define the task to the LLM
 *   context_example.jinja: a prompt that serves to give the agent contextual information as to how the environment works, used to direct the agent to generate related content.
  *  cot_prompt.jinja: used to get the agent to genrate a chain of thought and think step-by-step

In this example, we want to generate a behaviour tree using an LLM., and structure prompts to represent this.


In [None]:
system_prompt = """
    [[ SYSTEM ]]
    You are an agent tasked with controlling a robotic arm to pick and place a small box.
    The task of picking and placing the box can be broken down into a sequence of sub-tasks.
    You will interact with the environment in a step-by-step manner. On each step, you are tasked with choosing an action with the aim of completing a sub-task. Detailed instructions are below:

    * On each step, you will be given an observation of the current environment state. This includes the distance of the box to its target location, the state of the gripper on the arm, and the distance of the end effector to the box.
    * You will also be given a list of available actions to take. You must choose a single action from this list to take. You cannot create new actions, this is a strict requirement.
    * The robotic arm possesses a gripper. You may only grasp if the gripper is open.
    * You must navigate the end effector to the target object before attempting to grasp it.
    * Before attempting to grasp the box, you should move the arm to a pre-grasp position, in which the end effector is position is directly above the box.
    * The gripper should be open before you attempt to move the end effector to the box position, in order to avoid pushing the box away.
    * Occasionally the box may be in an awkward position that makes grasping difficult. If you fail to move the box, you should try again.
    
    {%- if cot_type in ["zero_shot_cot", "few_shot_cot"] %}
    When asked for an action, you should think step by step, and then finish by providing an available action from the list.
    Your response should be in the following format:
    Thought: ...
    Action: <action>
    {%- else %}
    When asked for an action, your response should be an available action from the list, using the following format:
    Action: <action>
    {%- endif %}
"""


cot_prompt = """
    Now please think step by step, and then finish by providing an action that will help you complete the task.
    Note: You can have multiple thoughts, but only generate a single action.
    Answer in the format
    Thought: ...
    Action: <action>
"""

reflect_prompt = """
    {% include "system_prompt.jinja" %}
    You will be given the history of a past experience in which you were placed in an environment and given a task to complete.
    Do not summarize your environment or past actions, but rather think about the strategy and path you took to attempt to complete the task.
    Devise a concise, new plan of action that accounts for any mistakes very briefly with reference to specific actions that you should have taken.
    For example, if you tried A and B but forgot C, then devise a plan to achieve C with environment-specific actions.
    Give your plan after seeing the past experience.
    [[ USER ]]
    {% include "trajectory.jinja" %}

    Do not provide an action or a thought, but a new plan of action.
    Answer in the format
    New plan: <plan>
"""

trajectory_prompt = """
    {%- set observations = memory.retrieve_all({memory.mem_keys.OBSERVATION: 1.0})[::-1] -%}
    {%- set actions = memory.retrieve_all({memory.mem_keys.EXTERNAL_ACTION: 1.0})[::-1] -%}

    Here is what happened in this episode so far:
    {%- for obs in observations -%}
    {%- if loop.index0 > 0 %}
    Action: {{actions[loop.index0-1]}}
    {%- endif %}
    {% if loop.index0 > 0 %}Observation: {% endif %}{{obs}}
    {% endfor %}
"""

context_example = """
    An example of solving a similar task is presented below:

    Observation:
    - Box distance to target: 0.387, 
    - End effector distance to box: 0.443
    - Gripper state: closed
    {%- if cot_type in ["few_shot_cot", "react"] %}
    Thought: The box is not at the target, and the end effector is not near the box. I should move the arm to the pre grasp position.
    {%- endif %}
    Action: move to pre grasp.
    Observation: 
    - Box distance to target: 0.387, 
    - End effector distance to box: 0.103
    - Gripper state: closed
    {%- if cot_type in ["few_shot_cot", "react"] %}
    Thought: The box is not at the target, but the arm is in the pre grasp position. I should open the gripper.
    {%- endif %}
    Action: open gripper.
    Observation: 
    - Box distance to target: 0.387, 
    - End effector distance to box: 0.103
    - Gripper state: open
    {%- if cot_type in ["few_shot_cot", "react"] %}
    Thought: The gripper is open, the arm is in the pre grasp position, the box is not at the target position. I should move the arm to the box position.
    {%- endif %}
    Action: move to box position.
    Observation: 
    - Box distance to target: 0.387, 
    - End effector distance to box: 0.02
    - Gripper state: open
    {%- if cot_type in ["few_shot_cot", "react"] %}
    Thought: The gripper is open, the end effector is at the box, the box is not at the target position. I should close the gripper in order to grasp the box.
    {%- endif %}
    Action:  close gripper.
    Observation: 
    - Box distance to target: 0.387, 
    - End effector distance to box: 0.02
    - Gripper state: closed
    {%- if cot_type in ["few_shot_cot", "react"] %}
    Thought: The gripper is closed, the end effector is at the box, the box is not at the target position. I should move the arm to the target box position.
    {%- endif %}
    Action: move to target box position.
"""

d = directories["prompt_templates"]

with open(f"{d}/system_prompt.jinja", "w") as file:
    file.write(system_prompt)

with open(f"{d}/cot_prompt.jinja", "w") as file:
    file.write(cot_prompt)

with open(f"{d}/reflect.jinja", "w") as file:
    file.write(reflect_prompt)

with open(f"{d}/trajectory.jinja", "w") as file:
    file.write(trajectory_prompt)

with open(f"{d}/context_example.jinja", "w") as file:
    file.write(context_example)

## Step 6: Config LLM server
We must define an LLM backend for the agent to use. Within /configs/llm we create a file example_task_llm.yaml. Within this file, we specify the LLM and server IP that we want the agent to call.

In [None]:
llm_config = """
_target_: src.agent.llms.llm.OpenAIBackend
_partial_: true
server_ip: http://127.0.0.1:8000/v1
model_id: deepseek-coder-6.7b-instruct
api_key: EMPTY 
"""

d = directories["llm_config"]
with open(f"{d}/deepseek_coder.yaml", "w") as file:
    file.write(llm_config)

## Step 7: Test Custom Example

In [None]:
!python ../src/agent/start.py task=simple_bot_task method=fs-cot-reflect llm@agent.llm=deepseek_coder

## Step 8: Clear files that has been created

In [None]:
file_paths_delete = [
    "./../src/agent/tasks/simplebot.py",
    "./../src/agent/environments/panda_simulator.py",
]
d = directories["config"]
file_paths_delete.append(f"{d}/simple_bot_task.yaml")
d = directories["prompt_templates"]
file_paths_delete.append(f"{d}/system_prompt.jinja")
file_paths_delete.append(f"{d}/cot_prompt.jinja")
file_paths_delete.append(f"{d}/reflect.jinja")
file_paths_delete.append(f"{d}/trajectory.jinja")
file_paths_delete.append(f"{d}/context_example.jinja")
d = directories["llm_config"]
file_paths_delete.append(f"{d}/deepseek_coder.yaml")
for file_path in file_paths_delete:
    try:
        # Attempt to delete the file
        os.remove(file_path)
        print(f"Deleted file: {file_path}")
    except OSError as e:
        # Handle any errors that occur during deletion
        print(f"Error deleting file {file_path}: {e}")