### Copyright 2022 Google LLC. SPDX-License-Identifier: Apache-2.0

Copyright 2022 Google LLC. SPDX-License-Identifier: Apache-2.0

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

# SayCan on a Robot Pick and Place Tabletop Environment

[SayCan](https://say-can.github.io/) is an algorithm that grounds large language models with robotic affordances for long-horizon planning. Given a set of low-level robotic skills (e.g., "put the green block in the red bowl") and a high-level instruction (e.g., "stack all the blocks"), it scores what a language model believes will help forward the high-level instruction and scores what a robotic affordance model believes is possible. Together these give a task that is useful and possible and the robot executes the command.

<img src="https://github.com/say-can/say-can.github.io/blob/main/img/saycan.png?raw=true" height="320px">

This colab runs an example of SayCan for a pick and place robot on a table top.

<img src="https://raw.githubusercontent.com/say-can/say-can.github.io/main/img/open_source_tabletop.png" height="320px">

Models used: [GPT-3](https://arxiv.org/abs/2005.14165) (InstructGPT), [CLIP](https://arxiv.org/abs/2103.00020) (ViT-B/32), [ViLD](https://arxiv.org/abs/2104.13921), and [CLIPort](https://cliport.github.io/) variant ([Transporter Nets](https://transporternets.github.io/))

### **Quick Start:**

**Step 1.** Register for an [OpenAI API key](https://openai.com/blog/openai-api/) to use GPT-3 (there's a free trial) and enter it below

**Step 2.** Menu > Change runtime type > Hardware accelerator > "GPU"

**Step 3.** Menu > Runtime > Run all

## Core Setup

### Setup and Installation
Installs required packages (CLIP, PyTorch, etc.) and imports necessary libraries for vision, robotics, and machine learning tasks.


In [None]:
#@markdown Setup and Installation

# Install required packages
%pip install ftfy regex tqdm fvcore imageio==2.4.1 imageio-ffmpeg==0.4.5
%pip install git+https://github.com/openai/CLIP.git
%pip install -U --no-cache-dir gdown --pre
%pip install pybullet moviepy flax==0.5.3 openai easydict imageio-ffmpeg
%pip install tensorflow==2.7.0  # Uncomment if error: UNIMPLEMENTED: DNN library is not found.

# Import required libraries
import collections
import datetime
import os
os.environ['PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION'] = 'python'
import random
import threading
import time
from dotenv import load_dotenv

# Computer Vision and Image Processing
import cv2
import imageio
from PIL import Image
from moviepy.editor import ImageSequenceClip

# Machine Learning Libraries
import clip
import flax
from flax import linen as nn
# from flax.training import checkpoints
# from flax.metrics import tensorboard
import jax
import jax.numpy as jnp
import torch
import tensorflow.compat.v1 as tf
from torch.utils.tensorboard import SummaryWriter

# Scientific Computing and Data Processing
import numpy as np
import optax
from heapq import nlargest
import matplotlib.pyplot as plt

# Utilities and Others
from easydict import EasyDict
import openai
import pickle
import pybullet

import pybullet_data
from tqdm import tqdm
import IPython

# Download required assets if they don't exist
if not os.path.exists('ur5e/ur5e.urdf'):
    !gdown --id 1Cc_fDSBL6QiDvNT4dpfAEbhbALSVoWcc
    !gdown --id 1yOMEm-Zp_DL3nItG9RozPeJAmeOldekX
    !gdown --id 1GsqNLhEl9dd4Mc3BM0dX3MibOI1FVWNM
    !unzip ur5e.zip
    !unzip robotiq_2f_85.zip
    !unzip bowl.zip

# Download ViLD pretrained model weights
!gsutil cp -r gs://cloud-tpu-checkpoints/detection/projects/vild/colab/image_path_v2 ./

# Setup TensorBoard
%load_ext tensorboard


# Display GPU information
!nvidia-smi

# Check JAX backend
from jax.lib import xla_bridge
print("JAX backend:", xla_bridge.get_backend().platform)
# Configure OpenAI API

load_dotenv()  # Load environment variables from .env file
openai_api_key = os.getenv('OPENAI_API_KEY')
if openai_api_key is None:
    raise ValueError("Please set OPENAI_API_KEY in .env file")
openai.api_key = openai_api_key  # Set the API key for the openai package
ENGINE = "gpt-3.5-turbo-instruct"
# Note for scoring model, due to limitations of the GPT-3 api, each option 
# requires a separate call and can be expensive. Recommend iterating with ada.

In [2]:
#@markdown Global constants: pick and place objects, colors, workspace bounds

PICK_TARGETS = {
  "blue block": None,
  "red block": None,
  "green block": None,
  "yellow block": None,
}

COLORS = {
    "blue":   (78/255,  121/255, 167/255, 255/255),
    "red":    (255/255,  87/255,  89/255, 255/255),
    "green":  (89/255,  169/255,  79/255, 255/255),
    "yellow": (237/255, 201/255,  72/255, 255/255),
}

PLACE_TARGETS = {
  "blue block": None,
  "red block": None,
  "green block": None,
  "yellow block": None,

  "blue bowl": None,
  "red bowl": None,
  "green bowl": None,
  "yellow bowl": None,

  "top left corner":     (-0.3 + 0.05, -0.2 - 0.05, 0),
  "top right corner":    (0.3 - 0.05,  -0.2 - 0.05, 0),
  "middle":              (0,           -0.5,        0),
  "bottom left corner":  (-0.3 + 0.05, -0.8 + 0.05, 0),
  "bottom right corner": (0.3 - 0.05,  -0.8 + 0.05, 0),
}

PIXEL_SIZE = 0.00267857
BOUNDS = np.float32([[-0.3, 0.3], [-0.8, -0.2], [0, 0.15]])  # X Y Z

### Environment Class + Initialization
Main PyBullet environment implementation including:
- Robot arm control
- Object spawning and physics
- Camera rendering
- State management
- Action execution

In [3]:
#@markdown Gripper (Robotiq 2F85) code

class Robotiq2F85:
  """Gripper handling for Robotiq 2F85."""

  def __init__(self, robot, tool):
    self.robot = robot
    self.tool = tool
    pos = [0.1339999999999999, -0.49199999999872496, 0.5]
    rot = pybullet.getQuaternionFromEuler([np.pi, 0, np.pi])
    urdf = "robotiq_2f_85/robotiq_2f_85.urdf"
    self.body = pybullet.loadURDF(urdf, pos, rot)
    self.n_joints = pybullet.getNumJoints(self.body)
    self.activated = False

    # Connect gripper base to robot tool.
    pybullet.createConstraint(self.robot, tool, self.body, 0, jointType=pybullet.JOINT_FIXED, jointAxis=[0, 0, 0], parentFramePosition=[0, 0, 0], childFramePosition=[0, 0, -0.07], childFrameOrientation=pybullet.getQuaternionFromEuler([0, 0, np.pi / 2]))

    # Set friction coefficients for gripper fingers.
    for i in range(pybullet.getNumJoints(self.body)):
      pybullet.changeDynamics(self.body, i, lateralFriction=10.0, spinningFriction=1.0, rollingFriction=1.0, frictionAnchor=True)

    # Start thread to handle additional gripper constraints.
    self.motor_joint = 1
    self.constraints_thread = threading.Thread(target=self.step)
    self.constraints_thread.daemon = True
    self.constraints_thread.start()

  # Control joint positions by enforcing hard contraints on gripper behavior.
  # Set one joint as the open/close motor joint (other joints should mimic).
  def step(self):
    while True:
      try:
        currj = [pybullet.getJointState(self.body, i)[0] for i in range(self.n_joints)]
        indj = [6, 3, 8, 5, 10]
        targj = [currj[1], -currj[1], -currj[1], currj[1], currj[1]]
        pybullet.setJointMotorControlArray(self.body, indj, pybullet.POSITION_CONTROL, targj, positionGains=np.ones(5))
      except:
        return
      time.sleep(0.001)

  # Close gripper fingers.
  def activate(self):
    pybullet.setJointMotorControl2(self.body, self.motor_joint, pybullet.VELOCITY_CONTROL, targetVelocity=1, force=10)
    self.activated = True

  # Open gripper fingers.
  def release(self):
    pybullet.setJointMotorControl2(self.body, self.motor_joint, pybullet.VELOCITY_CONTROL, targetVelocity=-1, force=10)
    self.activated = False

  # If activated and object in gripper: check object contact.
  # If activated and nothing in gripper: check gripper contact.
  # If released: check proximity to surface (disabled).
  def detect_contact(self):
    obj, _, ray_frac = self.check_proximity()
    if self.activated:
      empty = self.grasp_width() < 0.01
      cbody = self.body if empty else obj
      if obj == self.body or obj == 0:
        return False
      return self.external_contact(cbody)
  #   else:
  #     return ray_frac < 0.14 or self.external_contact()

  # Return if body is in contact with something other than gripper
  def external_contact(self, body=None):
    if body is None:
      body = self.body
    pts = pybullet.getContactPoints(bodyA=body)
    pts = [pt for pt in pts if pt[2] != self.body]
    return len(pts) > 0  # pylint: disable=g-explicit-length-test

  def check_grasp(self):
    while self.moving():
      time.sleep(0.001)
    success = self.grasp_width() > 0.01
    return success

  def grasp_width(self):
    lpad = np.array(pybullet.getLinkState(self.body, 4)[0])
    rpad = np.array(pybullet.getLinkState(self.body, 9)[0])
    dist = np.linalg.norm(lpad - rpad) - 0.047813
    return dist

  def check_proximity(self):
    ee_pos = np.array(pybullet.getLinkState(self.robot, self.tool)[0])
    tool_pos = np.array(pybullet.getLinkState(self.body, 0)[0])
    vec = (tool_pos - ee_pos) / np.linalg.norm((tool_pos - ee_pos))
    ee_targ = ee_pos + vec
    ray_data = pybullet.rayTest(ee_pos, ee_targ)[0]
    obj, link, ray_frac = ray_data[0], ray_data[1], ray_data[2]
    return obj, link, ray_frac

#@markdown Gym-style environment code

class PickPlaceEnv():

  def __init__(self):
    self.dt = 1/480
    self.sim_step = 0

    # Configure and start PyBullet.
    # python3 -m pybullet_utils.runServer
    # pybullet.connect(pybullet.SHARED_MEMORY)  # pybullet.GUI for local GUI.
    pybullet.connect(pybullet.DIRECT)  # pybullet.GUI for local GUI.
    pybullet.configureDebugVisualizer(pybullet.COV_ENABLE_GUI, 0)
    pybullet.setPhysicsEngineParameter(enableFileCaching=0)
    assets_path = os.path.dirname(os.path.abspath(""))
    pybullet.setAdditionalSearchPath(assets_path)
    pybullet.setAdditionalSearchPath(pybullet_data.getDataPath())
    pybullet.setTimeStep(self.dt)

    self.home_joints = (np.pi / 2, -np.pi / 2, np.pi / 2, -np.pi / 2, 3 * np.pi / 2, 0)  # Joint angles: (J0, J1, J2, J3, J4, J5).
    self.home_ee_euler = (np.pi, 0, np.pi)  # (RX, RY, RZ) rotation in Euler angles.
    self.ee_link_id = 9  # Link ID of UR5 end effector.
    self.tip_link_id = 10  # Link ID of gripper finger tips.
    self.gripper = None

  def reset(self, config):
    pybullet.resetSimulation(pybullet.RESET_USE_DEFORMABLE_WORLD)
    pybullet.setGravity(0, 0, -9.8)
    self.cache_video = []

    # Temporarily disable rendering to load URDFs faster.
    pybullet.configureDebugVisualizer(pybullet.COV_ENABLE_RENDERING, 0)

    # Add robot.
    pybullet.loadURDF("plane.urdf", [0, 0, -0.001])
    self.robot_id = pybullet.loadURDF("ur5e/ur5e.urdf", [0, 0, 0], flags=pybullet.URDF_USE_MATERIAL_COLORS_FROM_MTL)
    self.ghost_id = pybullet.loadURDF("ur5e/ur5e.urdf", [0, 0, -10])  # For forward kinematics.
    self.joint_ids = [pybullet.getJointInfo(self.robot_id, i) for i in range(pybullet.getNumJoints(self.robot_id))]
    self.joint_ids = [j[0] for j in self.joint_ids if j[2] == pybullet.JOINT_REVOLUTE]

    # Move robot to home configuration.
    for i in range(len(self.joint_ids)):
      pybullet.resetJointState(self.robot_id, self.joint_ids[i], self.home_joints[i])

    # Add gripper.
    if self.gripper is not None:
      while self.gripper.constraints_thread.is_alive():
        self.constraints_thread_active = False
    self.gripper = Robotiq2F85(self.robot_id, self.ee_link_id)
    self.gripper.release()

    # Add workspace.
    plane_shape = pybullet.createCollisionShape(pybullet.GEOM_BOX, halfExtents=[0.3, 0.3, 0.001])
    plane_visual = pybullet.createVisualShape(pybullet.GEOM_BOX, halfExtents=[0.3, 0.3, 0.001])
    plane_id = pybullet.createMultiBody(0, plane_shape, plane_visual, basePosition=[0, -0.5, 0])
    pybullet.changeVisualShape(plane_id, -1, rgbaColor=[0.2, 0.2, 0.2, 1.0])

    # Load objects according to config.
    self.config = config
    self.obj_name_to_id = {}
    obj_names = list(self.config["pick"]) + list(self.config["place"])
    obj_xyz = np.zeros((0, 3))
    for obj_name in obj_names:
      if ("block" in obj_name) or ("bowl" in obj_name):

        # Get random position 15cm+ from other objects.
        while True:
          rand_x = np.random.uniform(BOUNDS[0, 0] + 0.1, BOUNDS[0, 1] - 0.1)
          rand_y = np.random.uniform(BOUNDS[1, 0] + 0.1, BOUNDS[1, 1] - 0.1)
          rand_xyz = np.float32([rand_x, rand_y, 0.03]).reshape(1, 3)
          if len(obj_xyz) == 0:
            obj_xyz = np.concatenate((obj_xyz, rand_xyz), axis=0)
            break
          else:
            nn_dist = np.min(np.linalg.norm(obj_xyz - rand_xyz, axis=1)).squeeze()
            if nn_dist > 0.15:
              obj_xyz = np.concatenate((obj_xyz, rand_xyz), axis=0)
              break
        
        object_color = COLORS[obj_name.split(" ")[0]]
        object_type = obj_name.split(" ")[1]
        object_position = rand_xyz.squeeze()
        if object_type == "block":
          object_shape = pybullet.createCollisionShape(pybullet.GEOM_BOX, halfExtents=[0.02, 0.02, 0.02])
          object_visual = pybullet.createVisualShape(pybullet.GEOM_BOX, halfExtents=[0.02, 0.02, 0.02])
          object_id = pybullet.createMultiBody(0.01, object_shape, object_visual, basePosition=object_position)
        elif object_type == "bowl":
          object_position[2] = 0
          object_id = pybullet.loadURDF("bowl/bowl.urdf", object_position, useFixedBase=1)
        pybullet.changeVisualShape(object_id, -1, rgbaColor=object_color)
        self.obj_name_to_id[obj_name] = object_id

    # Re-enable rendering.
    pybullet.configureDebugVisualizer(pybullet.COV_ENABLE_RENDERING, 1)

    for _ in range(200):
      pybullet.stepSimulation()
    return self.get_observation()

  def servoj(self, joints):
    """Move to target joint positions with position control."""
    pybullet.setJointMotorControlArray(
      bodyIndex=self.robot_id,
      jointIndices=self.joint_ids,
      controlMode=pybullet.POSITION_CONTROL,
      targetPositions=joints,
      positionGains=[0.01]*6)
  
  def movep(self, position):
    """Move to target end effector position."""
    joints = pybullet.calculateInverseKinematics(
        bodyUniqueId=self.robot_id,
        endEffectorLinkIndex=self.tip_link_id,
        targetPosition=position,
        targetOrientation=pybullet.getQuaternionFromEuler(self.home_ee_euler),
        maxNumIterations=100)
    self.servoj(joints)

  def step(self, action=None):
    """Do pick and place motion primitive."""
    pick_xyz, place_xyz = action["pick"].copy(), action["place"].copy()

    # Set fixed primitive z-heights.
    hover_xyz = pick_xyz.copy() + np.float32([0, 0, 0.2])
    pick_xyz[2] = 0.03
    place_xyz[2] = 0.15

    # Move to object.
    ee_xyz = np.float32(pybullet.getLinkState(self.robot_id, self.tip_link_id)[0])
    while np.linalg.norm(hover_xyz - ee_xyz) > 0.01:
      self.movep(hover_xyz)
      self.step_sim_and_render()
      ee_xyz = np.float32(pybullet.getLinkState(self.robot_id, self.tip_link_id)[0])
    while np.linalg.norm(pick_xyz - ee_xyz) > 0.01:
      self.movep(pick_xyz)
      self.step_sim_and_render()
      ee_xyz = np.float32(pybullet.getLinkState(self.robot_id, self.tip_link_id)[0])

    # Pick up object.
    self.gripper.activate()
    for _ in range(240):
      self.step_sim_and_render()
    while np.linalg.norm(hover_xyz - ee_xyz) > 0.01:
      self.movep(hover_xyz)
      self.step_sim_and_render()
      ee_xyz = np.float32(pybullet.getLinkState(self.robot_id, self.tip_link_id)[0])
    
    # Move to place location.
    while np.linalg.norm(place_xyz - ee_xyz) > 0.01:
      self.movep(place_xyz)
      self.step_sim_and_render()
      ee_xyz = np.float32(pybullet.getLinkState(self.robot_id, self.tip_link_id)[0])

    # Place down object.
    while (not self.gripper.detect_contact()) and (place_xyz[2] > 0.03):
      place_xyz[2] -= 0.001
      self.movep(place_xyz)
      for _ in range(3):
        self.step_sim_and_render()
    self.gripper.release()
    for _ in range(240):
      self.step_sim_and_render()
    place_xyz[2] = 0.2
    ee_xyz = np.float32(pybullet.getLinkState(self.robot_id, self.tip_link_id)[0])
    while np.linalg.norm(place_xyz - ee_xyz) > 0.01:
      self.movep(place_xyz)
      self.step_sim_and_render()
      ee_xyz = np.float32(pybullet.getLinkState(self.robot_id, self.tip_link_id)[0])
    place_xyz = np.float32([0, -0.5, 0.2])
    while np.linalg.norm(place_xyz - ee_xyz) > 0.01:
      self.movep(place_xyz)
      self.step_sim_and_render()
      ee_xyz = np.float32(pybullet.getLinkState(self.robot_id, self.tip_link_id)[0])

    observation = self.get_observation()
    reward = self.get_reward()
    done = False
    info = {}
    return observation, reward, done, info

  def set_alpha_transparency(self, alpha: float) -> None:
    for id in range(20):
      visual_shape_data = pybullet.getVisualShapeData(id)
      for i in range(len(visual_shape_data)):
        object_id, link_index, _, _, _, _, _, rgba_color = visual_shape_data[i]
        rgba_color = list(rgba_color[0:3]) +  [alpha]
        pybullet.changeVisualShape(
            self.robot_id, linkIndex=i, rgbaColor=rgba_color)      
        pybullet.changeVisualShape(
            self.gripper.body, linkIndex=i, rgbaColor=rgba_color)

  def step_sim_and_render(self):
    pybullet.stepSimulation()
    self.sim_step += 1

    # Render current image at 8 FPS.
    if self.sim_step % 60 == 0:
      self.cache_video.append(self.get_camera_image())

  def get_camera_image(self):
    image_size = (240, 240)
    intrinsics = (120., 0, 120., 0, 120., 120., 0, 0, 1)
    color, _, _, _, _ = env.render_image(image_size, intrinsics)
    return color

  def get_camera_image_top(self, 
                           image_size=(240, 240), 
                           intrinsics=(2000., 0, 2000., 0, 2000., 2000., 0, 0, 1),
                           position=(0, -0.5, 5),
                           orientation=(0, np.pi, -np.pi / 2),
                           zrange=(0.01, 1.),
                           set_alpha=True):
    set_alpha and self.set_alpha_transparency(0)
    color, _, _, _, _ = env.render_image_top(image_size, 
                                             intrinsics,
                                             position,
                                             orientation,
                                             zrange)
    set_alpha and self.set_alpha_transparency(1)
    return color

  def get_reward(self):
    return 0  # TODO: check did the robot follow text instructions?

  def get_observation(self):
    observation = {}

    # Render current image.
    color, depth, position, orientation, intrinsics = self.render_image()

    # Get heightmaps and colormaps.
    points = self.get_pointcloud(depth, intrinsics)
    position = np.float32(position).reshape(3, 1)
    rotation = pybullet.getMatrixFromQuaternion(orientation)
    rotation = np.float32(rotation).reshape(3, 3)
    transform = np.eye(4)
    transform[:3, :] = np.hstack((rotation, position))
    points = self.transform_pointcloud(points, transform)
    heightmap, colormap, xyzmap = self.get_heightmap(points, color, BOUNDS, PIXEL_SIZE)

    observation["image"] = colormap
    observation["xyzmap"] = xyzmap
    observation["pick"] = list(self.config["pick"])
    observation["place"] = list(self.config["place"])
    return observation

  def render_image(self, image_size=(720, 720), intrinsics=(360., 0, 360., 0, 360., 360., 0, 0, 1)):

    # Camera parameters.
    position = (0, -0.85, 0.4)
    orientation = (np.pi / 4 + np.pi / 48, np.pi, np.pi)
    orientation = pybullet.getQuaternionFromEuler(orientation)
    zrange = (0.01, 10.)
    noise=True

    # OpenGL camera settings.
    lookdir = np.float32([0, 0, 1]).reshape(3, 1)
    updir = np.float32([0, -1, 0]).reshape(3, 1)
    rotation = pybullet.getMatrixFromQuaternion(orientation)
    rotm = np.float32(rotation).reshape(3, 3)
    lookdir = (rotm @ lookdir).reshape(-1)
    updir = (rotm @ updir).reshape(-1)
    lookat = position + lookdir
    focal_len = intrinsics[0]
    znear, zfar = (0.01, 10.)
    viewm = pybullet.computeViewMatrix(position, lookat, updir)
    fovh = (image_size[0] / 2) / focal_len
    fovh = 180 * np.arctan(fovh) * 2 / np.pi

    # Notes: 1) FOV is vertical FOV 2) aspect must be float
    aspect_ratio = image_size[1] / image_size[0]
    projm = pybullet.computeProjectionMatrixFOV(fovh, aspect_ratio, znear, zfar)

    # Render with OpenGL camera settings.
    _, _, color, depth, segm = pybullet.getCameraImage(
        width=image_size[1],
        height=image_size[0],
        viewMatrix=viewm,
        projectionMatrix=projm,
        shadow=1,
        flags=pybullet.ER_SEGMENTATION_MASK_OBJECT_AND_LINKINDEX,
        renderer=pybullet.ER_BULLET_HARDWARE_OPENGL)

    # Get color image.
    color_image_size = (image_size[0], image_size[1], 4)
    color = np.array(color, dtype=np.uint8).reshape(color_image_size)
    color = color[:, :, :3]  # remove alpha channel
    if noise:
      color = np.int32(color)
      color += np.int32(np.random.normal(0, 3, color.shape))
      color = np.uint8(np.clip(color, 0, 255))

    # Get depth image.
    depth_image_size = (image_size[0], image_size[1])
    zbuffer = np.float32(depth).reshape(depth_image_size)
    depth = (zfar + znear - (2 * zbuffer - 1) * (zfar - znear))
    depth = (2 * znear * zfar) / depth
    if noise:
      depth += np.random.normal(0, 0.003, depth.shape)

    intrinsics = np.float32(intrinsics).reshape(3, 3)
    return color, depth, position, orientation, intrinsics

  def render_image_top(self, 
                       image_size=(240, 240), 
                       intrinsics=(2000., 0, 2000., 0, 2000., 2000., 0, 0, 1),
                       position=(0, -0.5, 5),
                       orientation=(0, np.pi, -np.pi / 2),
                       zrange=(0.01, 1.)):

    # Camera parameters.
    orientation = pybullet.getQuaternionFromEuler(orientation)
    noise=True

    # OpenGL camera settings.
    lookdir = np.float32([0, 0, 1]).reshape(3, 1)
    updir = np.float32([0, -1, 0]).reshape(3, 1)
    rotation = pybullet.getMatrixFromQuaternion(orientation)
    rotm = np.float32(rotation).reshape(3, 3)
    lookdir = (rotm @ lookdir).reshape(-1)
    updir = (rotm @ updir).reshape(-1)
    lookat = position + lookdir
    focal_len = intrinsics[0]
    znear, zfar = (0.01, 10.)
    viewm = pybullet.computeViewMatrix(position, lookat, updir)
    fovh = (image_size[0] / 2) / focal_len
    fovh = 180 * np.arctan(fovh) * 2 / np.pi

    # Notes: 1) FOV is vertical FOV 2) aspect must be float
    aspect_ratio = image_size[1] / image_size[0]
    projm = pybullet.computeProjectionMatrixFOV(fovh, aspect_ratio, znear, zfar)

    # Render with OpenGL camera settings.
    _, _, color, depth, segm = pybullet.getCameraImage(
        width=image_size[1],
        height=image_size[0],
        viewMatrix=viewm,
        projectionMatrix=projm,
        shadow=1,
        flags=pybullet.ER_SEGMENTATION_MASK_OBJECT_AND_LINKINDEX,
        renderer=pybullet.ER_BULLET_HARDWARE_OPENGL)

    # Get color image.
    color_image_size = (image_size[0], image_size[1], 4)
    color = np.array(color, dtype=np.uint8).reshape(color_image_size)
    color = color[:, :, :3]  # remove alpha channel
    if noise:
      color = np.int32(color)
      color += np.int32(np.random.normal(0, 3, color.shape))
      color = np.uint8(np.clip(color, 0, 255))

    # Get depth image.
    depth_image_size = (image_size[0], image_size[1])
    zbuffer = np.float32(depth).reshape(depth_image_size)
    depth = (zfar + znear - (2 * zbuffer - 1) * (zfar - znear))
    depth = (2 * znear * zfar) / depth
    if noise:
      depth += np.random.normal(0, 0.003, depth.shape)

    intrinsics = np.float32(intrinsics).reshape(3, 3)
    return color, depth, position, orientation, intrinsics

  def get_pointcloud(self, depth, intrinsics):
    """Get 3D pointcloud from perspective depth image.
    Args:
      depth: HxW float array of perspective depth in meters.
      intrinsics: 3x3 float array of camera intrinsics matrix.
    Returns:
      points: HxWx3 float array of 3D points in camera coordinates.
    """
    height, width = depth.shape
    xlin = np.linspace(0, width - 1, width)
    ylin = np.linspace(0, height - 1, height)
    px, py = np.meshgrid(xlin, ylin)
    px = (px - intrinsics[0, 2]) * (depth / intrinsics[0, 0])
    py = (py - intrinsics[1, 2]) * (depth / intrinsics[1, 1])
    points = np.float32([px, py, depth]).transpose(1, 2, 0)
    return points

  def transform_pointcloud(self, points, transform):
    """Apply rigid transformation to 3D pointcloud.
    Args:
      points: HxWx3 float array of 3D points in camera coordinates.
      transform: 4x4 float array representing a rigid transformation matrix.
    Returns:
      points: HxWx3 float array of transformed 3D points.
    """
    padding = ((0, 0), (0, 0), (0, 1))
    homogen_points = np.pad(points.copy(), padding,
                            "constant", constant_values=1)
    for i in range(3):
      points[Ellipsis, i] = np.sum(transform[i, :] * homogen_points, axis=-1)
    return points

  def get_heightmap(self, points, colors, bounds, pixel_size):
    """Get top-down (z-axis) orthographic heightmap image from 3D pointcloud.
    Args:
      points: HxWx3 float array of 3D points in world coordinates.
      colors: HxWx3 uint8 array of values in range 0-255 aligned with points.
      bounds: 3x2 float array of values (rows: X,Y,Z; columns: min,max) defining
        region in 3D space to generate heightmap in world coordinates.
      pixel_size: float defining size of each pixel in meters.
    Returns:
      heightmap: HxW float array of height (from lower z-bound) in meters.
      colormap: HxWx3 uint8 array of backprojected color aligned with heightmap.
      xyzmap: HxWx3 float array of XYZ points in world coordinates.
    """
    width = int(np.round((bounds[0, 1] - bounds[0, 0]) / pixel_size))
    height = int(np.round((bounds[1, 1] - bounds[1, 0]) / pixel_size))
    heightmap = np.zeros((height, width), dtype=np.float32)
    colormap = np.zeros((height, width, colors.shape[-1]), dtype=np.uint8)
    xyzmap = np.zeros((height, width, 3), dtype=np.float32)

    # Filter out 3D points that are outside of the predefined bounds.
    ix = (points[Ellipsis, 0] >= bounds[0, 0]) & (points[Ellipsis, 0] < bounds[0, 1])
    iy = (points[Ellipsis, 1] >= bounds[1, 0]) & (points[Ellipsis, 1] < bounds[1, 1])
    iz = (points[Ellipsis, 2] >= bounds[2, 0]) & (points[Ellipsis, 2] < bounds[2, 1])
    valid = ix & iy & iz
    points = points[valid]
    colors = colors[valid]

    # Sort 3D points by z-value, which works with array assignment to simulate
    # z-buffering for rendering the heightmap image.
    iz = np.argsort(points[:, -1])
    points, colors = points[iz], colors[iz]
    px = np.int32(np.floor((points[:, 0] - bounds[0, 0]) / pixel_size))
    py = np.int32(np.floor((points[:, 1] - bounds[1, 0]) / pixel_size))
    px = np.clip(px, 0, width - 1)
    py = np.clip(py, 0, height - 1)
    heightmap[py, px] = points[:, 2] - bounds[2, 0]
    for c in range(colors.shape[-1]):
      colormap[py, px, c] = colors[:, c]
      xyzmap[py, px, c] = points[:, c]
    colormap = colormap[::-1, :, :]  # Flip up-down.
    xv, yv = np.meshgrid(np.linspace(BOUNDS[0, 0], BOUNDS[0, 1], height),
                         np.linspace(BOUNDS[1, 0], BOUNDS[1, 1], width))
    xyzmap[:, :, 0] = xv
    xyzmap[:, :, 1] = yv
    xyzmap = xyzmap[::-1, :, :]  # Flip up-down.
    heightmap = heightmap[::-1, :]  # Flip up-down.
    return heightmap, colormap, xyzmap

In [None]:
#@markdown Initialize environment 

if 'env' in locals():
  # Safely exit gripper threading before re-initializing environment.
  env.gripper.running = False
  while env.gripper.constraints_thread.isAlive():
    time.sleep(0.01)
env = PickPlaceEnv()
#@markdown Render images.

# Define and reset environment.
config = {'pick':  ['yellow block', 'green block', 'blue block'],
          'place': ['yellow bowl', 'green bowl', 'blue bowl']}

np.random.seed(42)
obs = env.reset(config)

plt.subplot(1, 2, 1)
img = env.get_camera_image()
plt.title('Perspective side-view')
plt.imshow(img)
plt.subplot(1, 2, 2)
img = env.get_camera_image_top()
img = np.flipud(img.transpose(1, 0, 2))
plt.title('Orthographic top-view')
plt.imshow(img)
plt.show()

# Note: orthographic cameras do not exist. But we can approximate them by
# projecting a 3D point cloud from an RGB-D camera, then unprojecting that onto
# an orthographic plane. Orthographic views are useful for spatial action maps.
plt.title('Unprojected orthographic top-view')
plt.imshow(obs['image'])
plt.show()

### Direct Control Implementation

In [5]:
class DirectExecutor:
    """Handles direct execution of pick and place actions"""
    def __init__(self, env):
        self.env = env
        
    def get_object_position(self, obj_name):
        """Get the position of an object in the environment"""
        if obj_name in self.env.obj_name_to_id:
            obj_id = self.env.obj_name_to_id[obj_name]
            obj_pose = pybullet.getBasePositionAndOrientation(obj_id)
            return np.float32(obj_pose[0])
        elif obj_name in PLACE_TARGETS:
            return np.float32(PLACE_TARGETS[obj_name])
        return None
        
    def execute_action(self, obs, instruction):
        """Execute action without showing initial/final states"""
        # Parse pick and place targets from instruction
        split_text = instruction.lower().split("and")
        if len(split_text) != 2:
            print("Invalid instruction format")
            return obs
        
        pick_text, place_text = split_text
        
        # Find pick target
        pick_target = None
        for name in PICK_TARGETS.keys():
            if name in pick_text:
                pick_target = name
                break
        
        # Find place target    
        place_target = None
        for name in PLACE_TARGETS.keys():
            if name in place_text:
                place_target = name
                break
                
        if not pick_target or not place_target:
            print("Could not identify pick or place targets")
            return obs
            
        # Get object positions
        pick_pos = self.get_object_position(pick_target)
        place_pos = self.get_object_position(place_target)
        
        if pick_pos is None or place_pos is None:
            print("Could not locate objects")
            return obs
            
        # Add some random noise to positions for robustness
        pick_pos[:2] += np.random.normal(scale=0.01)
        place_pos[:2] += np.random.normal(scale=0.01)
        
        # Execute action
        try:
            action = {'pick': pick_pos, 'place': place_pos}
            new_obs, reward, done, info = self.env.step(action)
            return new_obs
            
        except Exception as e:
            print(f"Execution failed: {e}")
            return obs

    def run(self, obs, text):
        """Main function to run direct control execution"""
        before = self.env.get_camera_image()
        print("\nInitial state:")
        plt.imshow(before)
        plt.show()
        
        new_obs = self.execute_action(obs, text)
        
        print("\nFinal state:")
        plt.imshow(self.env.get_camera_image())
        plt.show()
        
        return new_obs

## **Demo:** ViLD
Run zero-shot open-vocabulary object detection with [ViLD](https://arxiv.org/abs/2104.13921) to generate a list of objects as a scene description for a large language model.


### ViLD Setup
Configures Vision-Language Detection (ViLD) model for:
- Object detection
- Scene understanding
- Visual grounding of language commands

In [None]:
# Define and reset environment.
config = {'pick':  ['yellow block', 'green block', 'blue block'],
          'place': ['yellow bowl', 'green bowl', 'blue bowl']}

np.random.seed(42)
obs = env.reset(config)
img = env.get_camera_image_top()
img = np.flipud(img.transpose(1, 0, 2))
plt.title('ViLD Input Image')
plt.imshow(img)
plt.show()
imageio.imwrite('tmp.jpg', img)

### CLIP Model Configuration
Sets up CLIP model for:
- Text-to-image understanding
- Visual feature extraction
- Language understanding

In [None]:
#@markdown Load CLIP model with memory management

# Clear CUDA cache before loading model
torch.cuda.empty_cache()

# Define device globally
DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu"
if torch.cuda.is_available():
    torch.cuda.set_device(DEVICE)

def load_clip_model(max_retries=3, initial_batch_size=32):
    """Load CLIP model with automatic memory management"""
    current_device = DEVICE  # Use global device setting
    
    for attempt in range(max_retries):
        try:
            # Clear cache before loading
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
            
            # Load model
            clip_model, clip_preprocess = clip.load("ViT-B/32", device=current_device)
            clip_model.eval()
            
            # Print model info
            print("Model parameters:", f"{np.sum([int(np.prod(p.shape)) for p in clip_model.parameters()]):,}")
            print("Input resolution:", clip_model.visual.input_resolution)
            print("Context length:", clip_model.context_length)
            print("Vocab size:", clip_model.vocab_size)
            
            return clip_model, clip_preprocess, initial_batch_size

        except RuntimeError as e:
            print(f"CUDA out of memory on attempt {attempt + 1}. Error: {e}")
            if attempt < max_retries - 1:
                # Reduce batch size for next attempt
                initial_batch_size = initial_batch_size // 2
                print(f"Reducing batch size to {initial_batch_size} and retrying...")
                if torch.cuda.is_available():
                    torch.cuda.empty_cache()
                time.sleep(1)  # Give GPU a moment to free memory
            else:
                print("Failed all attempts to load model. Trying CPU fallback...")
                current_device = "cpu"
                clip_model, clip_preprocess = clip.load("ViT-B/32", device=current_device)
                clip_model.eval()
                return clip_model, clip_preprocess, 16  # Smaller batch size for CPU

# Load the model with automatic memory management
try:
    clip_model, clip_preprocess, batch_size = load_clip_model()
    print(f"Successfully loaded CLIP model on {DEVICE} with batch size {batch_size}")
except Exception as e:
    print(f"Failed to load CLIP model: {e}")
    raise

In [8]:
#@markdown Define ViLD hyperparameters.
FLAGS = {
    'prompt_engineering': True,
    'this_is': True,
    'temperature': 100.0,
    'use_softmax': False,
}
FLAGS = EasyDict(FLAGS)


# # Global matplotlib settings
# SMALL_SIZE = 16#10
# MEDIUM_SIZE = 18#12
# BIGGER_SIZE = 20#14

# plt.rc('font', size=MEDIUM_SIZE)         # controls default text sizes
# plt.rc('axes', titlesize=SMALL_SIZE)     # fontsize of the axes title
# plt.rc('axes', labelsize=MEDIUM_SIZE)    # fontsize of the x and y labels
# plt.rc('xtick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
# plt.rc('ytick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
# plt.rc('legend', fontsize=MEDIUM_SIZE)   # legend fontsize
# plt.rc('figure', titlesize=BIGGER_SIZE)  # fontsize of the figure title


# Parameters for drawing figure.
display_input_size = (10, 10)
overall_fig_size = (18, 24)

line_thickness = 1
fig_size_w = 35
# fig_size_h = min(max(5, int(len(category_names) / 2.5) ), 10)
mask_color =   'red'
alpha = 0.5

### 1. ViLD Prompt Engineering Cell
- **Purpose**: Create language templates for object detection
- **Key Functions**: 
 - `article()`: Determines "a" vs "an"
 - `processed_name()`: Cleans object names
 - `build_text_embedding()`: Creates CLIP text embeddings
- **Templates**: Multiple prompt formats like:
 - "a photo of {}"
 - "This is a {}"
 - Various quality descriptions (good/bad/bright/dark)

In [9]:
#@markdown ViLD prompt engineering.
device = "cuda:0" if torch.cuda.is_available() else "cpu"

def article(name):
  return "an" if name[0] in "aeiou" else "a"

def processed_name(name, rm_dot=False):
  # _ for lvis
  # / for obj365
  res = name.replace("_", " ").replace("/", " or ").lower()
  if rm_dot:
    res = res.rstrip(".")
  return res

single_template = [
    "a photo of {article} {}."
]

# multiple_templates = [
#     "There is {article} {} in the scene.",
#     "a painting of a {}.",
# ]

multiple_templates = [
    'There is {article} {} in the scene.',
    'There is the {} in the scene.',
    'a photo of {article} {} in the scene.',
    'a photo of the {} in the scene.',
    'a photo of one {} in the scene.',


    'itap of {article} {}.',
    'itap of my {}.',  # itap: I took a picture of
    'itap of the {}.',
    'a photo of {article} {}.',
    'a photo of my {}.',
    'a photo of the {}.',
    'a photo of one {}.',
    'a photo of many {}.',

    'a good photo of {article} {}.',
    'a good photo of the {}.',
    'a bad photo of {article} {}.',
    'a bad photo of the {}.',
    'a photo of a nice {}.',
    'a photo of the nice {}.',
    'a photo of a cool {}.',
    'a photo of the cool {}.',
    'a photo of a weird {}.',
    'a photo of the weird {}.',

    'a photo of a small {}.',
    'a photo of the small {}.',
    'a photo of a large {}.',
    'a photo of the large {}.',

    'a photo of a clean {}.',
    'a photo of the clean {}.',
    'a photo of a dirty {}.',
    'a photo of the dirty {}.',

    'a bright photo of {article} {}.',
    'a bright photo of the {}.',
    'a dark photo of {article} {}.',
    'a dark photo of the {}.',

    'a photo of a hard to see {}.',
    'a photo of the hard to see {}.',
    'a low resolution photo of {article} {}.',
    'a low resolution photo of the {}.',
    'a cropped photo of {article} {}.',
    'a cropped photo of the {}.',
    'a close-up photo of {article} {}.',
    'a close-up photo of the {}.',
    'a jpeg corrupted photo of {article} {}.',
    'a jpeg corrupted photo of the {}.',
    'a blurry photo of {article} {}.',
    'a blurry photo of the {}.',
    'a pixelated photo of {article} {}.',
    'a pixelated photo of the {}.',

    'a black and white photo of the {}.',
    'a black and white photo of {article} {}.',

    'a plastic {}.',
    'the plastic {}.',

    'a toy {}.',
    'the toy {}.',
    'a plushie {}.',
    'the plushie {}.',
    'a cartoon {}.',
    'the cartoon {}.',

    'an embroidered {}.',
    'the embroidered {}.',

    'a painting of the {}.',
    'a painting of a {}.',
]

def build_text_embedding(categories):
    if FLAGS.prompt_engineering:
        templates = multiple_templates
    else:
        templates = single_template

    with torch.no_grad():
        all_text_embeddings = []
        print("Building text embeddings...")
        
        # Process in smaller batches
        batch_size = 32  # Adjust this based on your GPU memory
        for i in range(0, len(categories), batch_size):
            batch_categories = categories[i:i + batch_size]
            batch_embeddings = []
            
            for category in tqdm(batch_categories):
                texts = [
                    template.format(processed_name(category["name"], rm_dot=True),
                                  article=article(category["name"]))
                    for template in templates]
                if FLAGS.this_is:
                    texts = [
                        "This is " + text if text.startswith("a") or text.startswith("the") else text 
                        for text in texts
                    ]
                texts = clip.tokenize(texts).to(device)
                
                try:
                    text_embeddings = clip_model.encode_text(texts)
                    text_embeddings /= text_embeddings.norm(dim=-1, keepdim=True)
                    text_embedding = text_embeddings.mean(dim=0)
                    text_embedding /= text_embedding.norm()
                    batch_embeddings.append(text_embedding)
                except RuntimeError as e:
                    print(f"CUDA out of memory. Error: {e}")
                    torch.cuda.empty_cache()
                    # Retry with smaller batch
                    texts = texts[:len(texts)//2]
                    text_embeddings = clip_model.encode_text(texts)
                    text_embeddings /= text_embeddings.norm(dim=-1, keepdim=True)
                    text_embedding = text_embeddings.mean(dim=0)
                    text_embedding /= text_embedding.norm()
                    batch_embeddings.append(text_embedding)
                
                # Clear cache periodically
                if len(batch_embeddings) % 8 == 0:
                    torch.cuda.empty_cache()
            
            all_text_embeddings.extend(batch_embeddings)
            torch.cuda.empty_cache()
        
        all_text_embeddings = torch.stack(all_text_embeddings, dim=1)
        all_text_embeddings = all_text_embeddings.to(device)
    
    return all_text_embeddings.cpu().numpy().T

In [None]:
#@markdown Load ViLD model.

gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.2)
session = tf.Session(graph=tf.Graph(), config=tf.ConfigProto(gpu_options=gpu_options))
saved_model_dir = "./image_path_v2"
_ = tf.saved_model.loader.load(session, ["serve"], saved_model_dir)

numbered_categories = [{"name": str(idx), "id": idx,} for idx in range(50)]
numbered_category_indices = {cat["id"]: cat for cat in numbered_categories}

In [11]:
#@markdown Non-maximum suppression (NMS).
def nms(dets, scores, thresh, max_dets=1000):
  """Non-maximum suppression.
  Args:
    dets: [N, 4]
    scores: [N,]
    thresh: iou threshold. Float
    max_dets: int.
  """
  y1 = dets[:, 0]
  x1 = dets[:, 1]
  y2 = dets[:, 2]
  x2 = dets[:, 3]

  areas = (x2 - x1) * (y2 - y1)
  order = scores.argsort()[::-1]

  keep = []
  while order.size > 0 and len(keep) < max_dets:
    i = order[0]
    keep.append(i)

    xx1 = np.maximum(x1[i], x1[order[1:]])
    yy1 = np.maximum(y1[i], y1[order[1:]])
    xx2 = np.minimum(x2[i], x2[order[1:]])
    yy2 = np.minimum(y2[i], y2[order[1:]])

    w = np.maximum(0.0, xx2 - xx1)
    h = np.maximum(0.0, yy2 - yy1)
    intersection = w * h
    overlap = intersection / (areas[i] + areas[order[1:]] - intersection + 1e-12)

    inds = np.where(overlap <= thresh)[0]
    order = order[inds + 1]
  return keep

### 2. ViLD Result Visualization Cell
- **Purpose**: Visualization tools for detection results
- **Key Functions**:
 - `draw_bounding_box_on_image()`
 - `draw_mask_on_image_array()`
 - `visualize_boxes_and_labels_on_image_array()`
- **Features**:
 - Bounding box drawing
 - Instance mask visualization 
 - Label and score display

In [12]:
#@markdown ViLD Result Visualization
import PIL.ImageColor as ImageColor
import PIL.ImageDraw as ImageDraw
import PIL.ImageFont as ImageFont

STANDARD_COLORS = ["White"]
# STANDARD_COLORS = [
#     "AliceBlue", "Chartreuse", "Aqua", "Aquamarine", "Azure", "Beige", "Bisque",
#     "BlanchedAlmond", "BlueViolet", "BurlyWood", "CadetBlue", "AntiqueWhite",
#     "Chocolate", "Coral", "CornflowerBlue", "Cornsilk", "Cyan",
#     "DarkCyan", "DarkGoldenRod", "DarkGrey", "DarkKhaki", "DarkOrange",
#     "DarkOrchid", "DarkSalmon", "DarkSeaGreen", "DarkTurquoise", "DarkViolet",
#     "DeepPink", "DeepSkyBlue", "DodgerBlue", "FloralWhite",
#     "ForestGreen", "Fuchsia", "Gainsboro", "GhostWhite", "Gold", "GoldenRod",
#     "Salmon", "Tan", "HoneyDew", "HotPink", "Ivory", "Khaki",
#     "Lavender", "LavenderBlush", "LawnGreen", "LemonChiffon", "LightBlue",
#     "LightCoral", "LightCyan", "LightGoldenRodYellow", "LightGray", "LightGrey",
#     "LightGreen", "LightPink", "LightSalmon", "LightSeaGreen", "LightSkyBlue",
#     "LightSlateGray", "LightSlateGrey", "LightSteelBlue", "LightYellow", "Lime",
#     "LimeGreen", "Linen", "Magenta", "MediumAquaMarine", "MediumOrchid",
#     "MediumPurple", "MediumSeaGreen", "MediumSlateBlue", "MediumSpringGreen",
#     "MediumTurquoise", "MediumVioletRed", "MintCream", "MistyRose", "Moccasin",
#     "NavajoWhite", "OldLace", "Olive", "OliveDrab", "Orange",
#     "Orchid", "PaleGoldenRod", "PaleGreen", "PaleTurquoise", "PaleVioletRed",
#     "PapayaWhip", "PeachPuff", "Peru", "Pink", "Plum", "PowderBlue", "Purple",
#     "RosyBrown", "RoyalBlue", "SaddleBrown", "Green", "SandyBrown",
#     "SeaGreen", "SeaShell", "Sienna", "Silver", "SkyBlue", "SlateBlue",
#     "SlateGray", "SlateGrey", "Snow", "SpringGreen", "SteelBlue", "GreenYellow",
#     "Teal", "Thistle", "Tomato", "Turquoise", "Violet", "Wheat", "White",
#     "WhiteSmoke", "Yellow", "YellowGreen"
# ]

def draw_bounding_box_on_image(image,
                               ymin,
                               xmin,
                               ymax,
                               xmax,
                               color="red",
                               thickness=4,
                               display_str_list=(),
                               use_normalized_coordinates=True):
  """Adds a bounding box to an image.

  Bounding box coordinates can be specified in either absolute (pixel) or
  normalized coordinates by setting the use_normalized_coordinates argument.

  Each string in display_str_list is displayed on a separate line above the
  bounding box in black text on a rectangle filled with the input "color".
  If the top of the bounding box extends to the edge of the image, the strings
  are displayed below the bounding box.

  Args:
    image: a PIL.Image object.
    ymin: ymin of bounding box.
    xmin: xmin of bounding box.
    ymax: ymax of bounding box.
    xmax: xmax of bounding box.
    color: color to draw bounding box. Default is red.
    thickness: line thickness. Default value is 4.
    display_str_list: list of strings to display in box
                      (each to be shown on its own line).
    use_normalized_coordinates: If True (default), treat coordinates
      ymin, xmin, ymax, xmax as relative to the image.  Otherwise treat
      coordinates as absolute.
  """
  draw = ImageDraw.Draw(image)
  im_width, im_height = image.size
  if use_normalized_coordinates:
    (left, right, top, bottom) = (xmin * im_width, xmax * im_width,
                                  ymin * im_height, ymax * im_height)
  else:
    (left, right, top, bottom) = (xmin, xmax, ymin, ymax)
  draw.line([(left, top), (left, bottom), (right, bottom),
             (right, top), (left, top)], width=thickness, fill=color)
  try:
    font = ImageFont.truetype("arial.ttf", 24)
  except IOError:
    font = ImageFont.load_default()

  # If the total height of the display strings added to the top of the bounding
  # box exceeds the top of the image, stack the strings below the bounding box
  # instead of above.
  display_str_heights = [font.getsize(ds)[1] for ds in display_str_list]
  # Each display_str has a top and bottom margin of 0.05x.
  total_display_str_height = (1 + 2 * 0.05) * sum(display_str_heights)

  if top > total_display_str_height:
    text_bottom = top
  else:
    text_bottom = bottom + total_display_str_height
  # Reverse list and print from bottom to top.
  for display_str in display_str_list[::-1]:
    text_left = min(5, left)
    text_width, text_height = font.getsize(display_str)
    margin = np.ceil(0.05 * text_height)
    draw.rectangle(
        [(left, text_bottom - text_height - 2 * margin), (left + text_width,
                                                          text_bottom)],
        fill=color)
    draw.text(
        (left + margin, text_bottom - text_height - margin),
        display_str,
        fill="black",
        font=font)
    text_bottom -= text_height - 2 * margin

def draw_bounding_box_on_image_array(image,
                                     ymin,
                                     xmin,
                                     ymax,
                                     xmax,
                                     color="red",
                                     thickness=4,
                                     display_str_list=(),
                                     use_normalized_coordinates=True):
  """Adds a bounding box to an image (numpy array).

  Bounding box coordinates can be specified in either absolute (pixel) or
  normalized coordinates by setting the use_normalized_coordinates argument.

  Args:
    image: a numpy array with shape [height, width, 3].
    ymin: ymin of bounding box.
    xmin: xmin of bounding box.
    ymax: ymax of bounding box.
    xmax: xmax of bounding box.
    color: color to draw bounding box. Default is red.
    thickness: line thickness. Default value is 4.
    display_str_list: list of strings to display in box
                      (each to be shown on its own line).
    use_normalized_coordinates: If True (default), treat coordinates
      ymin, xmin, ymax, xmax as relative to the image.  Otherwise treat
      coordinates as absolute.
  """
  image_pil = Image.fromarray(np.uint8(image)).convert("RGB")
  draw_bounding_box_on_image(image_pil, ymin, xmin, ymax, xmax, color,
                             thickness, display_str_list,
                             use_normalized_coordinates)
  np.copyto(image, np.array(image_pil))


def draw_mask_on_image_array(image, mask, color="red", alpha=0.4):
  """Draws mask on an image.

  Args:
    image: uint8 numpy array with shape (img_height, img_height, 3)
    mask: a uint8 numpy array of shape (img_height, img_height) with
      values between either 0 or 1.
    color: color to draw the keypoints with. Default is red.
    alpha: transparency value between 0 and 1. (default: 0.4)

  Raises:
    ValueError: On incorrect data type for image or masks.
  """
  if image.dtype != np.uint8:
    raise ValueError("`image` not of type np.uint8")
  if mask.dtype != np.uint8:
    raise ValueError("`mask` not of type np.uint8")
  if np.any(np.logical_and(mask != 1, mask != 0)):
    raise ValueError("`mask` elements should be in [0, 1]")
  if image.shape[:2] != mask.shape:
    raise ValueError("The image has spatial dimensions %s but the mask has "
                     "dimensions %s" % (image.shape[:2], mask.shape))
  rgb = ImageColor.getrgb(color)
  pil_image = Image.fromarray(image)

  solid_color = np.expand_dims(
      np.ones_like(mask), axis=2) * np.reshape(list(rgb), [1, 1, 3])
  pil_solid_color = Image.fromarray(np.uint8(solid_color)).convert("RGBA")
  pil_mask = Image.fromarray(np.uint8(255.0*alpha*mask)).convert("L")
  pil_image = Image.composite(pil_solid_color, pil_image, pil_mask)
  np.copyto(image, np.array(pil_image.convert("RGB")))

def visualize_boxes_and_labels_on_image_array(
    image,
    boxes,
    classes,
    scores,
    category_index,
    instance_masks=None,
    instance_boundaries=None,
    use_normalized_coordinates=False,
    max_boxes_to_draw=20,
    min_score_thresh=.5,
    agnostic_mode=False,
    line_thickness=1,
    groundtruth_box_visualization_color="black",
    skip_scores=False,
    skip_labels=False,
    mask_alpha=0.4,
    plot_color=None,
):
  """Overlay labeled boxes on an image with formatted scores and label names.

  This function groups boxes that correspond to the same location
  and creates a display string for each detection and overlays these
  on the image. Note that this function modifies the image in place, and returns
  that same image.

  Args:
    image: uint8 numpy array with shape (img_height, img_width, 3)
    boxes: a numpy array of shape [N, 4]
    classes: a numpy array of shape [N]. Note that class indices are 1-based,
      and match the keys in the label map.
    scores: a numpy array of shape [N] or None.  If scores=None, then
      this function assumes that the boxes to be plotted are groundtruth
      boxes and plot all boxes as black with no classes or scores.
    category_index: a dict containing category dictionaries (each holding
      category index `id` and category name `name`) keyed by category indices.
    instance_masks: a numpy array of shape [N, image_height, image_width] with
      values ranging between 0 and 1, can be None.
    instance_boundaries: a numpy array of shape [N, image_height, image_width]
      with values ranging between 0 and 1, can be None.
    use_normalized_coordinates: whether boxes is to be interpreted as
      normalized coordinates or not.
    max_boxes_to_draw: maximum number of boxes to visualize.  If None, draw
      all boxes.
    min_score_thresh: minimum score threshold for a box to be visualized
    agnostic_mode: boolean (default: False) controlling whether to evaluate in
      class-agnostic mode or not.  This mode will display scores but ignore
      classes.
    line_thickness: integer (default: 4) controlling line width of the boxes.
    groundtruth_box_visualization_color: box color for visualizing groundtruth
      boxes
    skip_scores: whether to skip score when drawing a single detection
    skip_labels: whether to skip label when drawing a single detection

  Returns:
    uint8 numpy array with shape (img_height, img_width, 3) with overlaid boxes.
  """
  # Create a display string (and color) for every box location, group any boxes
  # that correspond to the same location.
  box_to_display_str_map = collections.defaultdict(list)
  box_to_color_map = collections.defaultdict(str)
  box_to_instance_masks_map = {}
  box_to_score_map = {}
  box_to_instance_boundaries_map = {}
  
  if not max_boxes_to_draw:
    max_boxes_to_draw = boxes.shape[0]
  for i in range(min(max_boxes_to_draw, boxes.shape[0])):
    if scores is None or scores[i] > min_score_thresh:
      box = tuple(boxes[i].tolist())
      if instance_masks is not None:
        box_to_instance_masks_map[box] = instance_masks[i]
      if instance_boundaries is not None:
        box_to_instance_boundaries_map[box] = instance_boundaries[i]
      if scores is None:
        box_to_color_map[box] = groundtruth_box_visualization_color
      else:
        display_str = ""
        if not skip_labels:
          if not agnostic_mode:
            if classes[i] in list(category_index.keys()):
              class_name = category_index[classes[i]]["name"]
            else:
              class_name = "N/A"
            display_str = str(class_name)
        if not skip_scores:
          if not display_str:
            display_str = "{}%".format(int(100*scores[i]))
          else:
            float_score = ("%.2f" % scores[i]).lstrip("0")
            display_str = "{}: {}".format(display_str, float_score)
          box_to_score_map[box] = int(100*scores[i])

        box_to_display_str_map[box].append(display_str)
        if plot_color is not None:
          box_to_color_map[box] = plot_color
        elif agnostic_mode:
          box_to_color_map[box] = "DarkOrange"
        else:
          box_to_color_map[box] = STANDARD_COLORS[
              classes[i] % len(STANDARD_COLORS)]

  # Handle the case when box_to_score_map is empty.
  if box_to_score_map:
    box_color_iter = sorted(
        box_to_color_map.items(), key=lambda kv: box_to_score_map[kv[0]])
  else:
    box_color_iter = box_to_color_map.items()

  # Draw all boxes onto image.
  for box, color in box_color_iter:
    ymin, xmin, ymax, xmax = box
    if instance_masks is not None:
      draw_mask_on_image_array(
          image,
          box_to_instance_masks_map[box],
          color=color,
          alpha=mask_alpha
      )
    if instance_boundaries is not None:
      draw_mask_on_image_array(
          image,
          box_to_instance_boundaries_map[box],
          color="red",
          alpha=1.0
      )
    draw_bounding_box_on_image_array(
        image,
        ymin,
        xmin,
        ymax,
        xmax,
        color=color,
        thickness=line_thickness,
        display_str_list=box_to_display_str_map[box],
        use_normalized_coordinates=use_normalized_coordinates)
    
  return image


def paste_instance_masks(masks,
                         detected_boxes,
                         image_height,
                         image_width):
  """Paste instance masks to generate the image segmentation results.

  Args:
    masks: a numpy array of shape [N, mask_height, mask_width] representing the
      instance masks w.r.t. the `detected_boxes`.
    detected_boxes: a numpy array of shape [N, 4] representing the reference
      bounding boxes.
    image_height: an integer representing the height of the image.
    image_width: an integer representing the width of the image.

  Returns:
    segms: a numpy array of shape [N, image_height, image_width] representing
      the instance masks *pasted* on the image canvas.
  """

  def expand_boxes(boxes, scale):
    """Expands an array of boxes by a given scale."""
    # Reference: https://github.com/facebookresearch/Detectron/blob/master/detectron/utils/boxes.py#L227  # pylint: disable=line-too-long
    # The `boxes` in the reference implementation is in [x1, y1, x2, y2] form,
    # whereas `boxes` here is in [x1, y1, w, h] form
    w_half = boxes[:, 2] * .5
    h_half = boxes[:, 3] * .5
    x_c = boxes[:, 0] + w_half
    y_c = boxes[:, 1] + h_half

    w_half *= scale
    h_half *= scale

    boxes_exp = np.zeros(boxes.shape)
    boxes_exp[:, 0] = x_c - w_half
    boxes_exp[:, 2] = x_c + w_half
    boxes_exp[:, 1] = y_c - h_half
    boxes_exp[:, 3] = y_c + h_half

    return boxes_exp

  # Reference: https://github.com/facebookresearch/Detectron/blob/master/detectron/core/test.py#L812  # pylint: disable=line-too-long
  # To work around an issue with cv2.resize (it seems to automatically pad
  # with repeated border values), we manually zero-pad the masks by 1 pixel
  # prior to resizing back to the original image resolution. This prevents
  # "top hat" artifacts. We therefore need to expand the reference boxes by an
  # appropriate factor.
  _, mask_height, mask_width = masks.shape
  scale = max((mask_width + 2.0) / mask_width,
              (mask_height + 2.0) / mask_height)

  ref_boxes = expand_boxes(detected_boxes, scale)
  ref_boxes = ref_boxes.astype(np.int32)
  padded_mask = np.zeros((mask_height + 2, mask_width + 2), dtype=np.float32)
  segms = []
  for mask_ind, mask in enumerate(masks):
    im_mask = np.zeros((image_height, image_width), dtype=np.uint8)
    # Process mask inside bounding boxes.
    padded_mask[1:-1, 1:-1] = mask[:, :]

    ref_box = ref_boxes[mask_ind, :]
    w = ref_box[2] - ref_box[0] + 1
    h = ref_box[3] - ref_box[1] + 1
    w = np.maximum(w, 1)
    h = np.maximum(h, 1)

    mask = cv2.resize(padded_mask, (w, h))
    mask = np.array(mask > 0.5, dtype=np.uint8)

    x_0 = min(max(ref_box[0], 0), image_width)
    x_1 = min(max(ref_box[2] + 1, 0), image_width)
    y_0 = min(max(ref_box[1], 0), image_height)
    y_1 = min(max(ref_box[3] + 1, 0), image_height)

    im_mask[y_0:y_1, x_0:x_1] = mask[
        (y_0 - ref_box[1]):(y_1 - ref_box[1]),
        (x_0 - ref_box[0]):(x_1 - ref_box[0])
    ]
    segms.append(im_mask)

  segms = np.array(segms)
  assert masks.shape[0] == segms.shape[0]
  return segms  

In [13]:
#@markdown Plot instance masks.
def plot_mask(color, alpha, original_image, mask):
  rgb = ImageColor.getrgb(color)
  pil_image = Image.fromarray(original_image)

  solid_color = np.expand_dims(
      np.ones_like(mask), axis=2) * np.reshape(list(rgb), [1, 1, 3])
  pil_solid_color = Image.fromarray(np.uint8(solid_color)).convert("RGBA")
  pil_mask = Image.fromarray(np.uint8(255.0*alpha*mask)).convert("L")
  pil_image = Image.composite(pil_solid_color, pil_image, pil_mask)
  img_w_mask = np.array(pil_image.convert("RGB"))
  return img_w_mask

%matplotlib inline
def display_image(path_or_array, size=(10, 10)):
  if isinstance(path_or_array, str):
    image = np.asarray(Image.open(open(image_path, "rb")).convert("RGB"))
  else:
    image = path_or_array
  
  plt.figure(figsize=size)
  plt.imshow(image)
  plt.axis("off")
  plt.show()

### 3. ViLD Forward Pass Cell
- **Purpose**: Main model execution logic
- **Key Steps**:
 1. Preprocess categories and parameters
 2. Run object detection
 3. Filter and process boxes
 4. Compute text embeddings
 5. Match detections with categories
 6. Return found objects and visualizations


In [14]:
#@markdown Define ViLD forward pass.

def vild(image_path, category_name_string, params, plot_on=True, prompt_swaps=[]):
  #################################################################
  # Preprocessing categories and get params
  for a, b in prompt_swaps:
    category_name_string = category_name_string.replace(a, b)
  category_names = [x.strip() for x in category_name_string.split(";")]
  category_names = ["background"] + category_names
  categories = [{"name": item, "id": idx+1,} for idx, item in enumerate(category_names)]
  category_indices = {cat["id"]: cat for cat in categories}
  
  max_boxes_to_draw, nms_threshold, min_rpn_score_thresh, min_box_area, max_box_area = params
  fig_size_h = min(max(5, int(len(category_names) / 2.5) ), 10)


  #################################################################
  # Obtain results and read image
  roi_boxes, roi_scores, detection_boxes, scores_unused, box_outputs, detection_masks, visual_features, image_info = session.run(
        ["RoiBoxes:0", "RoiScores:0", "2ndStageBoxes:0", "2ndStageScoresUnused:0", "BoxOutputs:0", "MaskOutputs:0", "VisualFeatOutputs:0", "ImageInfo:0"],
        feed_dict={"Placeholder:0": [image_path,]})
  
  roi_boxes = np.squeeze(roi_boxes, axis=0)  # squeeze
  # no need to clip the boxes, already done
  roi_scores = np.squeeze(roi_scores, axis=0)

  detection_boxes = np.squeeze(detection_boxes, axis=(0, 2))
  scores_unused = np.squeeze(scores_unused, axis=0)
  box_outputs = np.squeeze(box_outputs, axis=0)
  detection_masks = np.squeeze(detection_masks, axis=0)
  visual_features = np.squeeze(visual_features, axis=0)

  image_info = np.squeeze(image_info, axis=0)  # obtain image info
  image_scale = np.tile(image_info[2:3, :], (1, 2))
  image_height = int(image_info[0, 0])
  image_width = int(image_info[0, 1])

  rescaled_detection_boxes = detection_boxes / image_scale # rescale

  # Read image
  image = np.asarray(Image.open(open(image_path, "rb")).convert("RGB"))
  assert image_height == image.shape[0]
  assert image_width == image.shape[1]


  #################################################################
  # Filter boxes

  # Apply non-maximum suppression to detected boxes with nms threshold.
  nmsed_indices = nms(
      detection_boxes,
      roi_scores,
      thresh=nms_threshold
      )

  # Compute RPN box size.
  box_sizes = (rescaled_detection_boxes[:, 2] - rescaled_detection_boxes[:, 0]) * (rescaled_detection_boxes[:, 3] - rescaled_detection_boxes[:, 1])

  # Filter out invalid rois (nmsed rois)
  valid_indices = np.where(
      np.logical_and(
        np.isin(np.arange(len(roi_scores), dtype=np.int32), nmsed_indices),
        np.logical_and(
            np.logical_not(np.all(roi_boxes == 0., axis=-1)),
            np.logical_and(
              roi_scores >= min_rpn_score_thresh,
              np.logical_and(
                box_sizes > min_box_area,
                box_sizes < max_box_area
                )
              )
        )    
      )
  )[0]

  detection_roi_scores = roi_scores[valid_indices][:max_boxes_to_draw, ...]
  detection_boxes = detection_boxes[valid_indices][:max_boxes_to_draw, ...]
  detection_masks = detection_masks[valid_indices][:max_boxes_to_draw, ...]
  detection_visual_feat = visual_features[valid_indices][:max_boxes_to_draw, ...]
  rescaled_detection_boxes = rescaled_detection_boxes[valid_indices][:max_boxes_to_draw, ...]


  #################################################################
  # Compute text embeddings and detection scores, and rank results
  text_features = build_text_embedding(categories)
  
  raw_scores = detection_visual_feat.dot(text_features.T)
  if FLAGS.use_softmax:
    scores_all = softmax(FLAGS.temperature * raw_scores, axis=-1)
  else:
    scores_all = raw_scores

  indices = np.argsort(-np.max(scores_all, axis=1))  # Results are ranked by scores
  indices_fg = np.array([i for i in indices if np.argmax(scores_all[i]) != 0])

  
  #################################################################
  # Print found_objects
  found_objects = []
  for a, b in prompt_swaps:
    category_names = [name.replace(b, a) for name in category_names]  # Extra prompt engineering.
  for anno_idx in indices[0:int(rescaled_detection_boxes.shape[0])]:
    scores = scores_all[anno_idx]
    if np.argmax(scores) == 0:
      continue
    found_object = category_names[np.argmax(scores)]
    if found_object == "background":
      continue
    print("Found a", found_object, "with score:", np.max(scores))
    found_objects.append(category_names[np.argmax(scores)])
  if not plot_on:
    return found_objects
  

  #################################################################
  # Plot detected boxes on the input image.
  ymin, xmin, ymax, xmax = np.split(rescaled_detection_boxes, 4, axis=-1)
  processed_boxes = np.concatenate([xmin, ymin, xmax - xmin, ymax - ymin], axis=-1)
  segmentations = paste_instance_masks(detection_masks, processed_boxes, image_height, image_width)

  if len(indices_fg) == 0:
    display_image(np.array(image), size=overall_fig_size)
    print("ViLD does not detect anything belong to the given category")

  else:
    image_with_detections = visualize_boxes_and_labels_on_image_array(
        np.array(image),
        rescaled_detection_boxes[indices_fg],
        valid_indices[:max_boxes_to_draw][indices_fg],
        detection_roi_scores[indices_fg],    
        numbered_category_indices,
        instance_masks=segmentations[indices_fg],
        use_normalized_coordinates=False,
        max_boxes_to_draw=max_boxes_to_draw,
        min_score_thresh=min_rpn_score_thresh,
        skip_scores=False,
        skip_labels=True)

    # plt.figure(figsize=overall_fig_size)
    plt.imshow(image_with_detections)
    # plt.axis("off")
    plt.title("ViLD detected objects and RPN scores.")
    plt.show()

  return found_objects

In [None]:
category_names = ['blue block',
                  'red block',
                  'green block',
                  'orange block',
                  'yellow block',
                  'purple block',
                  'pink block',
                  'cyan block',
                  'brown block',
                  'gray block',

                  'blue bowl',
                  'red bowl',
                  'green bowl',
                  'orange bowl',
                  'yellow bowl',
                  'purple bowl',
                  'pink bowl',
                  'cyan bowl',
                  'brown bowl',
                  'gray bowl']
image_path = 'tmp.jpg'

#@markdown ViLD settings.
category_name_string = ";".join(category_names)
max_boxes_to_draw = 8 #@param {type:"integer"}

# Extra prompt engineering: swap A with B for every (A, B) in list.
prompt_swaps = [('block', 'cube')]

nms_threshold = 0.4 #@param {type:"slider", min:0, max:0.9, step:0.05}
min_rpn_score_thresh = 0.4  #@param {type:"slider", min:0, max:1, step:0.01}
min_box_area = 10 #@param {type:"slider", min:0, max:10000, step:1.0}
max_box_area = 3000  #@param {type:"slider", min:0, max:10000, step:1.0}
vild_params = max_boxes_to_draw, nms_threshold, min_rpn_score_thresh, min_box_area, max_box_area
found_objects = vild(image_path, category_name_string, vild_params, plot_on=True, prompt_swaps=prompt_swaps)

### **Scripted Expert**
Scripted pick and place oracle to collect expert demonstrations.

In [16]:
class ScriptedPolicy():

  def __init__(self, env):
    self.env = env

  def step(self, text, obs):
    print(f'Input: {text}')

    # Parse pick and place targets.
    pick_text, place_text = text.split('and')
    pick_target, place_target = None, None
    for name in PICK_TARGETS.keys():
      if name in pick_text:
        pick_target = name
        break
    for name in PLACE_TARGETS.keys():
      if name in place_text:
        place_target = name
        break

    # Admissable targets only.
    assert pick_target is not None
    assert place_target is not None

    pick_id = self.env.obj_name_to_id[pick_target]
    pick_pose = pybullet.getBasePositionAndOrientation(pick_id)
    pick_position = np.float32(pick_pose[0])

    if place_target in self.env.obj_name_to_id:
      place_id = self.env.obj_name_to_id[place_target]
      place_pose = pybullet.getBasePositionAndOrientation(place_id)
      place_position = np.float32(place_pose[0])
    else:
      place_position = np.float32(PLACE_TARGETS[place_target])

    # Add some noise to pick and place positions.
    # pick_position[:2] += np.random.normal(scale=0.01)
    place_position[:2] += np.random.normal(scale=0.01)

    act = {'pick': pick_position, 'place': place_position}
    return act

### **Dataset**

In [17]:
#@markdown Collect demonstrations with a scripted expert, or download a pre-generated dataset.
load_pregenerated = True  #@param {type:"boolean"}

# Load pre-existing dataset.
if load_pregenerated:
  if not os.path.exists('dataset-9999.pkl'):
    # !gdown --id 1TECwTIfawxkRYbzlAey0z1mqXKcyfPc-
    !gdown --id 1yCz6C-6eLWb4SFYKdkM-wz5tlMjbG2h8
  dataset = pickle.load(open('dataset-9999.pkl', 'rb'))  # ~10K samples.
  dataset_size = len(dataset['text'])

# Generate new dataset.
else:
  dataset = {}
  dataset_size = 2  # Size of new dataset.
  dataset['image'] = np.zeros((dataset_size, 224, 224, 3), dtype=np.uint8)
  dataset['pick_yx'] = np.zeros((dataset_size, 2), dtype=np.int32)
  dataset['place_yx'] = np.zeros((dataset_size, 2), dtype=np.int32)
  dataset['text'] = []
  policy = ScriptedPolicy(env)
  data_idx = 0
  while data_idx < dataset_size:
    np.random.seed(data_idx)
    num_pick, num_place = 3, 3

    # Select random objects for data collection.
    pick_items = list(PICK_TARGETS.keys())
    pick_items = np.random.choice(pick_items, size=num_pick, replace=False)
    place_items = list(PLACE_TARGETS.keys())
    for pick_item in pick_items:  # For simplicity: place items != pick items.
      place_items.remove(pick_item)
    place_items = np.random.choice(place_items, size=num_place, replace=False)
    config = {'pick': pick_items, 'place': place_items}

    # Initialize environment with selected objects.
    obs = env.reset(config)

    # Create text prompts.
    prompts = []
    for i in range(len(pick_items)):
      pick_item = pick_items[i]
      place_item = place_items[i]
      prompts.append(f'Pick the {pick_item} and place it on the {place_item}.')

    # Execute 3 pick and place actions.
    for prompt in prompts:
      act = policy.step(prompt, obs)
      dataset['text'].append(prompt)
      dataset['image'][data_idx, ...] = obs['image'].copy()
      dataset['pick_yx'][data_idx, ...] = xyz_to_pix(act['pick'])
      dataset['place_yx'][data_idx, ...] = xyz_to_pix(act['place'])
      data_idx += 1
      obs, _, _, _ = env.step(act)
      debug_clip = ImageSequenceClip(env.cache_video, fps=25)
      display(debug_clip.ipython_display(autoplay=1, loop=1))
      env.cache_video = []
      if data_idx >= dataset_size:
        break

  pickle.dump(dataset, open(f'dataset-{dataset_size}.pkl', 'wb'))

In [None]:
#@markdown Show a demonstration example from the dataset.

img = dataset['image'][0]
pick_yx = dataset['pick_yx'][0]
place_yx = dataset['place_yx'][0]
text = dataset['text'][0]
plt.title(text)
plt.imshow(img)
plt.arrow(pick_yx[1], pick_yx[0], place_yx[1]-pick_yx[1], place_yx[0]-pick_yx[0], color='w', head_starts_at_zero=False, head_width=7, length_includes_head=True)
plt.show()

## SayCan Implementation

### Performance Monitoring Setup

This section sets up tracking for:
- API call latency
- Token usage and costs
- Success/failure rates
- Resource utilization

The PerformanceMonitor class handles metrics collection for the language model interactions.

In [19]:
from collections import defaultdict # Default dictionary for metrics

#@title Performance Monitoring Setup
class PerformanceMonitor:
    def __init__(self):
        self.metrics = defaultdict(list)
        self.total_cost = 0
        # GPT-3.5-turbo-instruct pricing per 1K tokens
        self.price_per_token = {
            'prompt': 0.0015 / 1000,  # Input tokens
            'completion': 0.002 / 1000 # Output tokens
        }
    
    def log_api_call(self, start_time, response, call_type="scoring"):
        duration = time.time() - start_time
        prompt_tokens = response['usage']['prompt_tokens']
        completion_tokens = response['usage']['completion_tokens']
        total_tokens = prompt_tokens + completion_tokens
        
        # Calculate cost
        call_cost = (prompt_tokens * self.price_per_token['prompt'] + 
                    completion_tokens * self.price_per_token['completion'])
        self.total_cost += call_cost
        
        self.metrics['duration'].append(duration)
        self.metrics['prompt_tokens'].append(prompt_tokens)
        self.metrics['completion_tokens'].append(completion_tokens)
        self.metrics['total_tokens'].append(total_tokens)
        self.metrics['cost'].append(call_cost)
        self.metrics['call_type'].append(call_type)
    
    def get_summary(self):
        return {
            'total_calls': len(self.metrics['duration']),
            'total_cost': self.total_cost,
            'avg_duration': sum(self.metrics['duration']) / len(self.metrics['duration']),
            'total_tokens': sum(self.metrics['total_tokens']),
            'avg_tokens_per_call': sum(self.metrics['total_tokens']) / len(self.metrics['total_tokens'])
        }
    
    def print_summary(self):
        summary = self.get_summary()
        print(f"Performance Summary:")
        print(f"Total API Calls: {summary['total_calls']}")
        print(f"Total Cost: ${summary['total_cost']:.4f}")
        print(f"Average Duration: {summary['avg_duration']:.2f} seconds")
        print(f"Total Tokens Used: {summary['total_tokens']}")
        print(f"Average Tokens per Call: {summary['avg_tokens_per_call']:.1f}")

performance_monitor = PerformanceMonitor()

### Execution Tracking Setup

This section implements tracking for:
- Plan generation success rates
- Execution success rates
- Historical record of plans and executions
- Plan-to-execution correlation

The ExecutionTracker class maintains metrics about the robot's performance in carrying out generated plans.

In [20]:
#@title Execution Tracking Setup
class ExecutionTracker:
    def __init__(self):
        self.metrics = {
            'total_plans': 0,
            'successful_plans': 0,
            'total_executions': 0,
            'successful_executions': 0,
            'plan_history': [],
            'execution_history': []
        }
    
    def log_plan(self, instruction, plan, success):
        self.metrics['total_plans'] += 1
        if success:
            self.metrics['successful_plans'] += 1
        self.metrics['plan_history'].append({
            'instruction': instruction,
            'plan': plan,
            'success': success
        })
    
    def log_execution(self, instruction, plan, success):
        self.metrics['total_executions'] += 1
        if success:
            self.metrics['successful_executions'] += 1
        self.metrics['execution_history'].append({
            'instruction': instruction,
            'plan': plan,
            'success': success
        })
    
    def print_summary(self):
        print(f"Execution Summary:")
        print(f"Planning Success Rate: {self.metrics['successful_plans']/max(1,self.metrics['total_plans'])*100:.1f}%")
        print(f"Execution Success Rate: {self.metrics['successful_executions']/max(1,self.metrics['total_executions'])*100:.1f}%")

execution_tracker = ExecutionTracker()

### LLM Scoring Functions
Implements GPT-3 based scoring for action selection:
- Option generation
- Action scoring
- Planning

In [21]:
from openai import OpenAI

#@title LLM Cache
overwrite_cache = True
if overwrite_cache:
  LLM_CACHE = {}

# Initialize OpenAI client
client = OpenAI(api_key=openai.api_key)

def gpt3_call(engine="gpt-3.5-turbo-instruct", prompt="", max_tokens=128, temperature=0):
    """Updated GPT-3 call using new OpenAI API format"""
    full_query = ""
    for p in prompt:
        full_query += p
    id = tuple((engine, full_query, max_tokens, temperature))
    
    if id in LLM_CACHE.keys():
        print('cache hit, returning')
        response = LLM_CACHE[id]
    else:
        start_time = time.time()
        response = client.completions.create(
            model=engine,
            prompt=prompt,
            max_tokens=max_tokens,
            temperature=temperature
        )
        # Convert response to dict for compatibility with existing code
        response_dict = {
            "choices": [{"text": choice.text} for choice in response.choices],
            "usage": {
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens
            }
        }
        performance_monitor.log_api_call(start_time, response_dict)
        LLM_CACHE[id] = response_dict
        response = response_dict
    return response

def gpt3_scoring(query, options, engine="gpt-3.5-turbo-instruct", limit_num_options=None, 
                option_start="\n", verbose=False, print_tokens=False):
    if limit_num_options:
        options = options[:limit_num_options]
    
    gpt3_prompt_options = []
    for option in options:
        # Handle done() case first
        if option == "done()":
            prompt = f"""Task: {query}
Current action to evaluate: {option}
Rate if task is complete (0-10):
0: Task is far from complete
10: All required steps are done
Provide score (0-10):"""
            gpt3_prompt_options.append(prompt)
            continue

        try:
            # Safely parse pick and place objects
            if "pick_and_place" in option:
                parts = option.split("(")[1].strip(")").split(",")
                if len(parts) == 2:
                    pick_obj = parts[0].strip()
                    place_target = parts[1].strip()
                else:
                    raise ValueError(f"Invalid option format: {option}")
            else:
                raise ValueError(f"Unknown option type: {option}")

            if "corner" in query.lower():
                prompt = f"""Task: {query}
Action: Pick {pick_obj} and place at {place_target}

Rate this specific action considering:
1. Does it place a block in a corner? (necessary for task)
2. Is this the most efficient corner to use next?
3. Will this placement block other required corner placements?

Score meaning:
0: Invalid or impossible action
2: Places block in non-corner location
4: Places in corner but creates inefficient distribution
6: Valid corner placement but not optimal sequence
8: Good corner placement enabling efficient completion
10: Perfect next move for corner distribution

Provide score (0-10):"""
            
            elif "matching" in query.lower() or "colored" in query.lower():
                prompt = f"""Task: {query}
Action: Pick {pick_obj} and place at {place_target}

Rate this specific action considering:
1. Does it match block color with bowl color?
2. Is this the best color match to make now?
3. Is this placement efficient for overall task?

Score meaning:
0: Invalid action
2: Non-matching colors or wrong target
4: Valid but inefficient color match
6: Good color match but suboptimal timing
8: Efficient color match
10: Perfect color match and sequence

Provide score (0-10):"""

            else:
                prompt = f"""Task: {query}
Action: Pick {pick_obj} and place at {place_target}

Rate this specific action considering:
1. Does it directly help achieve the task?
2. Is this the optimal next step?
3. Could other actions be better right now?

Score meaning:
0: Invalid or counterproductive
2: Valid but unhelpful
4: Somewhat helps but inefficient
6: Helpful but not optimal timing
8: Very good next step
10: Perfect action for current state

Provide score (0-10):"""

        except Exception as e:
            # Fallback prompt for unparseable options
            prompt = f"""Task: {query}
Action: {option}
Rate how this action helps the task (0-10):"""

        gpt3_prompt_options.append(prompt)

    # Process in batches
    BATCH_SIZE = 20
    scores = {}
    for i in range(0, len(gpt3_prompt_options), BATCH_SIZE):
        batch = gpt3_prompt_options[i:i + BATCH_SIZE]
        batch_options = options[i:i + BATCH_SIZE]
        
        response = gpt3_call(
            engine=engine,
            prompt=batch,
            max_tokens=5,
            temperature=0.3)
        
        for option, choice in zip(batch_options, response["choices"]):
            try:
                score_text = choice["text"].strip()
                score = float(next(s for s in score_text.split() if s.replace('.','',1).isdigit()))
                scores[option] = score
            except:
                scores[option] = 0
                
        time.sleep(0.1)

    return scores, response

def make_options(pick_targets=None, place_targets=None, options_in_api_form=True, termination_string="done()"):
  if not pick_targets:
    pick_targets = PICK_TARGETS
  if not place_targets:
    place_targets = PLACE_TARGETS
  options = []
  for pick in pick_targets:
    for place in place_targets:
      if options_in_api_form:
        option = "robot.pick_and_place({}, {})".format(pick, place)
      else:
        option = "Pick the {} and place it on the {}.".format(pick, place)
      options.append(option)

  options.append(termination_string)
  print("Considering", len(options), "options")
  return options

In [None]:
query = "To pick the blue block and put it on the red block, I should:\n"
options = make_options(PICK_TARGETS, PLACE_TARGETS)
scores, response = gpt3_scoring(query, options, engine=ENGINE, limit_num_options=5, option_start='\n', verbose=True)


### Helper Functions
Utility functions for:
- Scene description
- Score normalization
- Visualization
- Step conversion


In [23]:
#@title Helper Functions

def build_scene_description(found_objects, block_name="box", bowl_name="circle"):
  scene_description = f"objects = {found_objects}"
  scene_description = scene_description.replace(block_name, "block")
  scene_description = scene_description.replace(bowl_name, "bowl")
  scene_description = scene_description.replace("'", "")
  return scene_description

def step_to_nlp(step):
  step = step.replace("robot.pick_and_place(", "")
  step = step.replace(")", "")
  pick, place = step.split(", ")
  return "Pick the " + pick + " and place it on the " + place + "."

def normalize_scores(scores):
  max_score = max(scores.values())  
  normed_scores = {key: np.clip(scores[key] / max_score, 0, 1) for key in scores}
  return normed_scores

def plot_saycan(llm_scores, affordance_scores, combined_scores, step, show_top=10):
    """Enhanced visualization with numerical values"""
    # Get top options
    top_options = nlargest(show_top, combined_scores, key=combined_scores.get)
    
    # Format scores for display
    scores_data = []
    for opt in top_options:
        scores_data.append({
            'option': opt,
            'llm': llm_scores[opt],
            'affordance': affordance_scores[opt],
            'combined': combined_scores[opt]
        })
    
    # Print detailed scores
    print("\nDetailed Scores for Step:")
    for data in scores_data:
        print(f"\nOption: {data['option']}")
        print(f"LLM Score: {data['llm']:.3f}")
        print(f"Affordance Score: {data['affordance']:.3f}")
        print(f"Combined Score: {data['combined']:.3f}")
    
    # Create visualization
    plt.figure(figsize=(15, 6))
    positions = np.arange(len(scores_data))
    width = 0.25
    
    # Add value labels on bars
    plt.bar(positions - width, [d['affordance'] for d in scores_data], 
           width, label='Affordance', color='#ea9999ff')
    plt.bar(positions, [d['llm'] for d in scores_data], 
           width, label='Language', color='#a4c2f4ff')
    plt.bar(positions + width, [d['combined'] for d in scores_data], 
           width, label='Combined', color='#93CE8E')
    
    # Add value labels
    for i in positions:
        plt.text(i - width, scores_data[i]['affordance'], 
                f"{scores_data[i]['affordance']:.2f}", ha='center', va='bottom')
        plt.text(i, scores_data[i]['llm'], 
                f"{scores_data[i]['llm']:.2f}", ha='center', va='bottom')
        plt.text(i + width, scores_data[i]['combined'], 
                f"{scores_data[i]['combined']:.2f}", ha='center', va='bottom')
    
    # Formatting
    plt.xticks(positions, [d['option'].replace('robot.pick_and_place(', '')
                         .replace(')', '').replace(', ', '\n→') 
                         for d in scores_data], rotation=45)
    plt.ylabel('Score')
    plt.title(f"{step}\nScore Components")
    plt.legend(bbox_to_anchor=(1.05, 1))
    plt.tight_layout()
    plt.show()

### Affordance Scoring
Implements object-detection based affordance scoring without RL policy.

In [24]:
def affordance_scoring(options, found_objects, verbose=False, block_name="box", bowl_name="circle", termination_string="done()"):
    """Enhanced affordance scoring with physics-aware and task-relevant bonuses"""
    affordance_scores = {}
    
    # Process found objects
    found_objects = [
        found_object.replace(block_name, "block").replace(bowl_name, "bowl")
        for found_object in found_objects + list(PLACE_TARGETS.keys())[-5:]
    ]
    verbose and print("found_objects", found_objects)
    
    # Get current object positions for physics-aware scoring
    object_positions = {}
    for obj_name, obj_id in env.obj_name_to_id.items():
        if obj_id is not None:
            pos, _ = pybullet.getBasePositionAndOrientation(obj_id)
            object_positions[obj_name] = np.array(pos)
    
    for option in options:
        if option == termination_string:
            affordance_scores[option] = 0.2
            continue
            
        # Parse option
        pick, place = option.replace("robot.pick_and_place(", "").replace(")", "").split(", ")
        base_score = 0
        
        # Check basic existence
        found_objects_copy = found_objects.copy()
        if pick in found_objects_copy:
            found_objects_copy.remove(pick)
            if place in found_objects_copy or place in PLACE_TARGETS:
                base_score = 1.0
        
        if base_score == 0:
            affordance_scores[option] = 0
            continue
            
        # Apply physics-aware modifications
        score_multiplier = 1.0
        
        # Check if pick object is accessible (not underneath something)
        if pick in object_positions:
            pick_pos = object_positions[pick]
            for other_obj, other_pos in object_positions.items():
                if other_obj != pick:
                    # If another object is significantly above and close horizontally
                    if (other_pos[2] > pick_pos[2] + 0.02 and 
                        np.linalg.norm(other_pos[:2] - pick_pos[:2]) < 0.05):
                        score_multiplier *= 0.2  # Heavy penalty for blocked objects
        
        # Penalize stacking on unstable bases
        if ("block" in place and place in object_positions and 
            any(pos[2] > object_positions[place][2] + 0.02 
                for pos in object_positions.values())):
            score_multiplier *= 0.5  # Penalty for stacking on already stacked blocks
        
        # Bonus for matching colors in color tasks
        if "matching" in option.lower() or "color" in option.lower():
            pick_color = pick.split()[0]
            place_color = place.split()[0]
            if pick_color == place_color:
                score_multiplier *= 1.2
        
        # Bonus for corners in corner tasks
        if "corner" in option.lower() and "corner" in place:
            score_multiplier *= 1.1
        
        # Calculate final score
        final_score = base_score * score_multiplier
        affordance_scores[option] = np.clip(final_score, 0, 1)
        
        verbose and print(f"{final_score:.2f} \t {option}")
    
    return affordance_scores



### Test Cell
Demonstrates basic SayCan functionality with test case.

In [25]:
# #@title Test
# termination_string = "done()"
# query = "To pick the blue block and put it on the red block, I should:\n"

# options = make_options(PICK_TARGETS, PLACE_TARGETS, termination_string=termination_string)
# llm_scores, _ = gpt3_scoring(query, options, verbose=True, engine=ENGINE)

# affordance_scores = affordance_scoring(options, found_objects, block_name="box", bowl_name="circle", verbose=False, termination_string=termination_string)

# combined_scores = {option: np.exp(llm_scores[option]) * affordance_scores[option] for option in options}
# combined_scores = normalize_scores(combined_scores)
# selected_task = max(combined_scores, key=combined_scores.get)
# print("Selecting: ", selected_task)

# # Added performance summary
# print("\nPerformance for test run:")
# performance_monitor.print_summary()

### SayCan Demo (with affordance function)
Final demo cells showing:
1. Full SayCan implementation
2. Socratic Model variant
Each including:
- Prompt setup
- Task configuration
- Scene setup
- Execution with direct control

In [None]:
#@title Prompt

termination_string = "done()"

gpt3_context = """
objects = [red block, yellow block, blue block, green bowl]
# move all the blocks to the top left corner.
robot.pick_and_place(blue block, top left corner)
robot.pick_and_place(red block, top left corner)
robot.pick_and_place(yellow block, top left corner)
done()

objects = [red block, yellow block, blue block, green bowl]
# put the yellow one the green thing.
robot.pick_and_place(yellow block, green bowl)
done()

objects = [yellow block, blue block, red block]
# move the light colored block to the middle.
robot.pick_and_place(yellow block, middle)
done()

objects = [blue block, green bowl, red block, yellow bowl, green block]
# stack the blocks.
robot.pick_and_place(green block, blue block)
robot.pick_and_place(red block, green block)
done()

objects = [red block, blue block, green bowl, blue bowl, yellow block, green block]
# group the blue objects together.
robot.pick_and_place(blue block, blue bowl)
done()

objects = [green bowl, red block, green block, red bowl, yellow bowl, yellow block]
# sort all the blocks into their matching color bowls.
robot.pick_and_place(green block, green bowl)
robot.pick_and_place(red block, red bowl)
robot.pick_and_place(yellow block, yellow bowl)
done()
"""

use_environment_description = False
gpt3_context_lines = gpt3_context.split("\n")
gpt3_context_lines_keep = []
for gpt3_context_line in gpt3_context_lines:
  if "objects =" in gpt3_context_line and not use_environment_description:
    continue
  gpt3_context_lines_keep.append(gpt3_context_line)

gpt3_context = "\n".join(gpt3_context_lines_keep)
print(gpt3_context)

In [27]:
#@title Task and Config
# only_plan = False

# raw_input = "put all the blocks in different corners." 
# config = {"pick":  ["red block", "yellow block", "green block", "blue block"],
#           "place": ["red bowl"]}

# raw_input = "move the block to the bowl."
# config = {'pick':  ['red block'],
#           'place': ['green bowl']}

# raw_input = "put any blocks on their matched colored bowls."
# config = {'pick':  ['yellow block', 'green block', 'blue block'],
#           'place': ['yellow bowl', 'green bowl', 'blue bowl']}
          
# raw_input = "put all the blocks in the green bowl."
# config = {'pick':  ['yellow block', 'green block', 'red block'],
#           'place': ['yellow bowl', 'green bowl']}

# raw_input = "stack all the blocks."
# config = {'pick':  ['yellow block', 'blue block', 'red block'],
#           'place': ['blue bowl', 'red bowl']}

# raw_input = "make the highest block stack."
# config = {'pick':  ['yellow block', 'blue block', 'red block'],
#           'place': ['blue bowl', 'red bowl']}

# raw_input = "stack all the blocks."
# config = {'pick':  ['green block', 'blue block', 'red block'],
#           'place': ['yellow bowl', 'green bowl']}

# raw_input = "put the block in all the corners." 
# config = {'pick':  ['red block'],
#           'place': ['red bowl', 'green bowl']}

# raw_input = "clockwise, move the block through all the corners."
# config = {'pick':  ['red block'],
#           'place': ['red bowl', 'green bowl', 'yellow bowl']}

In [28]:
#@title Setup Scene
image_path = "./2db.png"
np.random.seed(2)
if config is None:
  pick_items = list(PICK_TARGETS.keys())
  pick_items = np.random.choice(pick_items, size=np.random.randint(1, 5), replace=False)

  place_items = list(PLACE_TARGETS.keys())[:-9]
  place_items = np.random.choice(place_items, size=np.random.randint(1, 6 - len(pick_items)), replace=False)
  config = {"pick":  pick_items,
            "place": place_items}
  print(pick_items, place_items)

# obs = env.reset(config)

# img_top = env.get_camera_image_top()
# img_top_rgb = cv2.cvtColor(img_top, cv2.COLOR_BGR2RGB)
# plt.imshow(img_top)

# imageio.imsave(image_path, img_top)

In [29]:
# #@title Runner
# plot_on = True
# max_tasks = 5

# try:
#     # Initialize scene and options
#     options = make_options(PICK_TARGETS, PLACE_TARGETS, termination_string=termination_string)
#     found_objects = vild(image_path, category_name_string, vild_params, plot_on=False)
#     scene_description = build_scene_description(found_objects)
#     env_description = scene_description
#     print(scene_description)

#     # Initialize prompts
#     gpt3_prompt = gpt3_context
#     if use_environment_description:
#         gpt3_prompt += "\n" + env_description
#     gpt3_prompt += "\n# " + raw_input + "\n"

#     # Initialize tracking lists
#     all_llm_scores = []
#     all_affordance_scores = []
#     all_combined_scores = []
#     num_tasks = 0
#     selected_task = ""
#     steps_text = []

#     # Initialize executor if executing actions
#     if not only_plan:
#         executor = DirectExecutor(env)
#         print('Initial state:')
#         plt.imshow(env.get_camera_image())
#         plt.show()

#     while not selected_task == termination_string:
#         num_tasks += 1
#         if num_tasks > max_tasks:
#             break

#         # Update scene understanding and affordances
#         found_objects = vild(image_path, category_name_string, vild_params, plot_on=False)
#         affordance_scores = affordance_scoring(options, found_objects, block_name="box",
#                                             bowl_name="circle", verbose=False)

#         # Get language scores
#         llm_scores, _ = gpt3_scoring(gpt3_prompt, options, verbose=True,
#                                    engine=ENGINE, print_tokens=False)

#         # Combine scores and select action
#         combined_scores = {option: np.exp(llm_scores[option]) * affordance_scores[option]
#                          for option in options}
#         combined_scores = normalize_scores(combined_scores)
#         selected_task = max(combined_scores, key=combined_scores.get)

#         if selected_task and selected_task != termination_string:
#             steps_text.append(selected_task)
#             print(f"{num_tasks} Selecting: {selected_task}")
#             gpt3_prompt += selected_task + "\n"

#             # Store scores for visualization
#             all_llm_scores.append(llm_scores)
#             all_affordance_scores.append(affordance_scores)
#             all_combined_scores.append(combined_scores)

#             # Execute action if not just planning
#             if not only_plan:
#                 nlp_step = step_to_nlp(selected_task)
#                 print(f'Executing: {nlp_step}')
#                 obs = executor.run(obs, nlp_step)
#                 if obs is None:
#                     print("Failed to execute step, stopping execution")
#                     break
                
#                 # Update image for next iteration
#                 img_top = env.get_camera_image_top()
#                 imageio.imwrite(image_path, img_top)

#     # Visualization section
#     if plot_on:
#         for llm_scores, affordance_scores, combined_scores, step in zip(
#                 all_llm_scores, all_affordance_scores, all_combined_scores, steps_text):
#             plot_saycan(llm_scores, affordance_scores, combined_scores, step, show_top=10)

#     # Print solution
#     print('**** Solution ****')
#     print(env_description)
#     print('# ' + raw_input)
#     for i, step in enumerate(steps_text):
#         if step == '' or step == termination_string:
#             break
#         print('Step ' + str(i) + ': ' + step)

#     # Show final state if executing
#     if not only_plan:
#         print('Final state:')
#         plt.imshow(env.get_camera_image())
#         plt.show()

#         # Print performance metrics
#         print("\nOverall Performance Metrics:")
#         performance_monitor.print_summary()
#         execution_tracker.print_summary()

# except Exception as e:
#     print(f"Error in main execution: {e}")
#     import traceback
#     traceback.print_exc()

### Socratic Model: VILD, GPT3, CLIPort Demo

This implements a version of LLM planning shown in [Socratic Models](https://socraticmodels.github.io/), without the grounding, but with a scene description. For this relatively simple environment with clear robotic affordances, the scene description is generally sufficient. This mirrors the implementation attached to the paper [here](https://github.com/google-research/google-research/tree/master/socraticmodels).

In [30]:
#@title Prompt

gpt3_context = """
objects = [red block, yellow block, blue block, green bowl]
# move all the blocks to the top left corner.
robot.pick_and_place(blue block, top left corner)
robot.pick_and_place(red block, top left corner)
robot.pick_and_place(yellow block, top left corner)
done()

objects = [red block, yellow block, blue block, green bowl]
# put the yellow one the green thing.
robot.pick_and_place(yellow block, green bowl)
done()

objects = [yellow block, blue block, red block]
# move the light colored block to the middle.
robot.pick_and_place(yellow block, middle)
done()

objects = [blue block, green bowl, red block, yellow bowl, green block]
# stack the blocks.
robot.pick_and_place(green block, blue block)
robot.pick_and_place(red block, green block)
done()

objects = [red block, blue block, green bowl, blue bowl, yellow block, green block]
# group the blue objects together.
robot.pick_and_place(blue block, blue bowl)
done()

objects = [green bowl, red block, green block, red bowl, yellow bowl, yellow block]
# sort all the blocks into their matching color bowls.
robot.pick_and_place(green block, green bowl)
robot.pick_and_place(red block, red bowl)
robot.pick_and_place(yellow block, yellow bowl)
done()
"""

In [31]:
# #@title Queries and Configs

# only_plan = False

# raw_input = "put all the blocks in different corners." 
# config = {'pick':  ['red block', 'yellow block', 'green block', 'blue block'],
#           'place': ['red bowl']}

In [32]:
# #@title Runner

# env_description = ''
# image_path = './2db.png'

# np.random.seed(2)

# if config is None:
#   pick_items = list(PICK_TARGETS.keys())
#   pick_items = np.random.choice(pick_items, size=np.random.randint(1, 5), replace=False)

#   place_items = list(PLACE_TARGETS.keys())[:-9]
#   place_items = np.random.choice(place_items, size=np.random.randint(1, 6 - len(pick_items)), replace=False)
#   config = {'pick':  pick_items,
#             'place': place_items}
#   print(pick_items, place_items)
# obs = env.reset(config)

# img_top = env.get_camera_image_top()
# img_top_rgb = cv2.cvtColor(img_top, cv2.COLOR_BGR2RGB)
# plt.imshow(img_top_rgb)

# imageio.imsave(image_path, img_top)

# found_objects = vild(image_path, category_name_string, vild_params, plot_on=False)
# scene_description = build_scene_description(found_objects)
# print(scene_description)

# env_description = scene_description

# gpt3_prompt = gpt3_context
# gpt3_prompt += "\n" + env_description + "\n"
# gpt3_prompt += "# " + raw_input
# response = gpt3_call(engine=ENGINE, prompt=gpt3_prompt, max_tokens=128, temperature=0)
# steps_text = [text.strip().strip() for text in response["choices"][0]["text"].strip().split("#")[0].split("\n")][:-1]
# print('**** Solution ****')
# print(env_description)
# print('# ' + raw_input)
# for i, step in enumerate(steps_text):
#   if step == '' or step == termination_string:
#     break
#   print('Step ' + str(i) + ': ' + step)
#   nlp_step = step_to_nlp(step)

# if not only_plan:
#   print('Initial state:')
#   plt.imshow(env.get_camera_image())

#   # Initialize executor
#   executor = DirectExecutor(env)

#   for i, step in enumerate(steps_text):
#     if step == '' or step == termination_string:
#       break
#     nlp_step = step_to_nlp(step)
#     print('GPT-3 says next step:', nlp_step)
#     obs = executor.run(obs, nlp_step)

#   # Show camera image after task.
#   print('Final state:')
#   plt.imshow(env.get_camera_image())

## Josh's SayCan Demo Cell

### Overview
This demonstration shows the complete SayCan pipeline converting natural language instructions into robot actions through LLM planning and affordance scoring.

### Pipeline Steps
1. **Vision Analysis**
   * Scene detection using ViLD
   * Object identification and location mapping

2. **Language Planning**
   * GPT-3 converts high-level instruction to steps
   * Each possible action gets scored for relevance

3. **Action Selection**
   * Combines language scores with physical affordances 
   * Selects optimal action at each step

4. **Execution**
   * Runs selected action through direct control 
   * Shows visual confirmation of results

### Example Usage

```python
# Simple Instructions
instruction = "stack the red block on the blue block"
run_saycan_demo_v2(instruction)

# Complex Instructions
instruction = "put all blocks in their matching colored bowls"
run_saycan_demo_v2(instruction)
```

### Visualization Features
* Initial scene inspection
* Action scoring visualization
* Before/after images per step
* Performance metrics display

### Try These Instructions
* `"pick up the red block"`
* `"put the blue block in the green bowl"`
* `"sort blocks into matching colored bowls"`
* `"put blocks in different corners"`

### Notes
* Each step shows both scoring and execution
* Visual confirmation helps track progress
* Performance metrics show timing and success rates

---

In [33]:
# #@title Josh's SayCan Demo - With Scoring
# def run_saycan_with_scoring(instruction):
#     """SayCan with explicit scoring of each possible action"""
#     print(f"🤖 Running SayCan for: {instruction}")
    
#     # Reset environment
#     config = {
#         'pick': ['red block', 'blue block', 'green block'],
#         'place': ['red bowl', 'blue bowl', 'green bowl']
#     }
#     obs = env.reset(config)

#     executor = DirectExecutor(env)
    
#     # Show initial scene
#     plt.figure(figsize=(10, 6))
#     plt.imshow(env.get_camera_image())
#     plt.title("Initial Scene")
#     plt.show()

#     # Get scene description
#     image_path = 'tmp.jpg'
#     img_top = env.get_camera_image_top()
#     imageio.imwrite(image_path, img_top)
#     found_objects = vild(image_path, category_name_string, vild_params, plot_on=False)
#     scene_description = build_scene_description(found_objects)
#     print("\n🔍 Scene Description:", scene_description)

#     # Generate and execute plan with scoring
#     options = make_options(PICK_TARGETS, PLACE_TARGETS, termination_string="done()")
#     gpt3_prompt = gpt3_context + "\n" + scene_description + "\n# " + instruction + "\n"
    
#     num_tasks = 0
#     selected_task = ""
#     while not selected_task == "done()":
#         num_tasks += 1
#         if num_tasks > 5:  # Limit number of steps
#             break
            
#         # Score all possible actions
#         print(f"\n🎯 Step {num_tasks}:")
#         llm_scores, _ = gpt3_scoring(gpt3_prompt, options, verbose=False, engine=ENGINE)
#         affordance_scores = affordance_scoring(options, found_objects)
        
#         # Combine scores and select best action
#         combined_scores = {option: np.exp(llm_scores[option]) * affordance_scores[option] 
#                          for option in options}
#         combined_scores = normalize_scores(combined_scores)
#         selected_task = max(combined_scores, key=combined_scores.get)

        
        
#         if selected_task and selected_task != "done()":
#             print(f"Selected action: {selected_task}")
            
#             # Plot scores for this step
#             plot_saycan(llm_scores, affordance_scores, combined_scores, 
#                        f"Step {num_tasks} Scores", show_top=5)
            
#             # Execute action
#             nlp_step = step_to_nlp(selected_task)
#             print(f"Executing: {nlp_step}")
#             obs = executor.run(obs, nlp_step)
#             gpt3_prompt += selected_task + "\n"
            
#     # Show final scene
#     plt.figure(figsize=(10, 6))
#     plt.imshow(env.get_camera_image())
#     plt.title("Final Scene")
#     plt.show()


## Experiment 1: SayCan vs Socratic Comparison

This experiment compares the performance of SayCan and Socratic approaches across several key tasks:
1. Spatial reasoning (corners placement)
2. Color matching
3. Stacking
4. Complex multi-step tasks

We measure:
- Success rates
- Number of steps taken
- Execution time
- LLM scoring patterns

In [34]:
import time
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
import numpy as np
from collections import defaultdict
import traceback 


def setup_gpt3_context(use_environment_description=False):
    """Setup and process GPT-3 context following SayCan demo approach"""
    
    experiment_gpt3_context = """
    # Rules for robot actions:
    - Blocks can only be placed on valid targets
    - Each corner can hold one block
    - Corners are: top left, top right, bottom left, bottom right
    
    objects = [red block, blue block, green block]
    # put blocks in different corners
    robot.pick_and_place(red block, top left corner)
    robot.pick_and_place(blue block, top right corner)
    robot.pick_and_place(green block, bottom left corner)
    done()
    
    objects = [red block, blue block, green block]
    # stack all blocks
    robot.pick_and_place(red block, middle)
    robot.pick_and_place(blue block, red block)
    robot.pick_and_place(green block, blue block)
    done()
    
    objects = [red block, blue block, green block, red bowl, blue bowl, green bowl]
    # sort blocks into matching bowls
    robot.pick_and_place(red block, red bowl)
    robot.pick_and_place(blue block, blue bowl)
    robot.pick_and_place(green block, green bowl)
    done()
    """
    
    # Process context the same way as demo
    context_lines = experiment_gpt3_context.split("\n")
    context_lines_keep = []
    for line in context_lines:
        if "objects =" in line and not use_environment_description:
            continue
        context_lines_keep.append(line)
    
    return "\n".join(context_lines_keep)


@dataclass
class TaskResult:
    """Stores results for a single task execution"""
    method: str  # 'saycan' or 'socratic'
    task: str 
    success: bool
    num_steps: int
    planned_steps: List[str]
    executed_steps: List[str]
    execution_times: List[float]
    llm_scores: List[Dict[str, float]]
    total_time: float
    error: Optional[str] = None

class ExperimentRunner:
    """Handles running comparative experiments between SayCan and Socratic approaches"""

    
    def __init__(self, env, model="gpt-3.5-turbo-instruct"):
        self.env = env
        self.model = model
        self.results: List[TaskResult] = []
        self.executor = DirectExecutor(env)
        self.current_task = None  # Store current task for affordance scoring context
        self.current_task_scores = {
            'llm': [],
            'affordance': [],
            'combined': [],
            'steps': []
        }
        self.gpt3_context = setup_gpt3_context(use_environment_description=False)




    
    def get_enhanced_affordance_score(self, option: str, found_objects: list) -> float:
        """Enhanced affordance scoring with spatial awareness"""
        if option == "done()":
            return 0.2
            
        try:
            pick, place = option.replace("robot.pick_and_place(", "").replace(")", "").split(", ")
        except:
            return 0.0
        
        # Base validity checks
        if pick == place or pick not in found_objects:
            return 0.0
        if place not in found_objects and place not in PLACE_TARGETS:
            return 0.0
            
        score = 1.0
        
        # Task-specific scoring
        if self.current_task and "corner" in self.current_task.lower():
            # Penalize non-corner targets
            if "corner" not in place:
                score *= 0.2
                
            # Check if corner is already occupied
            if place in PLACE_TARGETS:  # If it's a corner
                target_pos = PLACE_TARGETS[place]
                if target_pos is not None:  # Make sure we have valid coordinates
                    target_pos = np.array(target_pos)
                    for obj_name, obj_id in self.env.obj_name_to_id.items():
                        if obj_id is not None:
                            obj_pos = np.array(pybullet.getBasePositionAndOrientation(obj_id)[0])
                            # If any object is close to this corner
                            if np.linalg.norm(obj_pos[:2] - target_pos[:2]) < 0.1:
                                score *= 0.1  # Heavily penalize already occupied corners
        
        # Accessibility check
        if pick in self.env.obj_name_to_id:
            pick_id = self.env.obj_name_to_id[pick]
            if pick_id is not None:
                pick_pos = np.array(pybullet.getBasePositionAndOrientation(pick_id)[0])
                for obj_name, obj_id in self.env.obj_name_to_id.items():
                    if obj_id is not None and obj_name != pick:
                        obj_pos = np.array(pybullet.getBasePositionAndOrientation(obj_id)[0])
                        if (obj_pos[2] > pick_pos[2] + 0.02 and 
                            np.linalg.norm(obj_pos[:2] - pick_pos[:2]) < 0.05):
                            score *= 0.1  # Severe penalty for blocked objects
        
        return np.clip(score, 0, 1)

    def correct_scene_description(self, found_objects):
        """Correct color detection issues in scene description"""
        corrections = {
            'yellow bowl': 'red bowl',
            'yellow block': 'red block'
        }
        corrected_objects = []
        for obj in found_objects:
            obj = obj.strip()
            if obj in corrections:
                corrected_objects.append(corrections[obj])
            else:
                corrected_objects.append(obj)
        return corrected_objects

    def clean_step(self, step: str) -> str:
        """Clean and validate a step string"""
        step = step.strip()
        if step.startswith('.'):
            step = step.lstrip('.')
        step = step.strip()
        if ',' not in step or 'robot.pick_and_place' not in step:
            return ""
        return step
        
    def print_task_summary(self, task: str, method: str, planned_steps: List[str], executed_steps: List[str]):
        """Print detailed summary of task execution with clear method identification"""
        print("\n" + "="*50)
        print(f"TASK EXECUTION SUMMARY - {method.upper()}")
        print("="*50)
        print(f"Task: {task}")
        print("\nPlanned Steps:")
        for i, step in enumerate(planned_steps, 1):
            print(f"{i}. {step}")
        print("\nExecuted Steps:")
        for i, step in enumerate(executed_steps, 1):
            print(f"{i}. {step}")
            
    def visualize_step_scores(self, step_num: int):
        """Visualize scores for a single step"""
        if not self.current_task_scores['steps']:
            return
            
        idx = step_num - 1
        if idx >= len(self.current_task_scores['steps']):
            return
            
        plot_saycan(
            self.current_task_scores['llm'][idx],
            self.current_task_scores['affordance'][idx],
            self.current_task_scores['combined'][idx],
            f"Step {step_num}: {self.current_task_scores['steps'][idx]}",
            show_top=10
        )
        
    def run_single_task(self, task: str, method: str, config: Dict) -> TaskResult:
        """Run a single task with specified method and collect metrics"""
        self.current_task = task  # Store for affordance context
        self.current_task_scores = {
            'llm': [],
            'affordance': [],
            'combined': [],
            'steps': []
        }
        
        start_time = time.time()
        
        try:
            # Reset environment without showing
            obs = self.env.reset(config)
            
            # Initialize tracking
            planned_steps = []
            executed_steps = []
            execution_times = []
            llm_scores = []
            
            # Print task header
            print(f"\n{'='*50}")
            print(f"Task: {task}")
            print(f"Method: {method.upper()}")
            print(f"{'='*50}\n")
            
            if method == "saycan":
                # Run SayCan implementation
                options = make_options(PICK_TARGETS, PLACE_TARGETS)
                found_objects = vild("tmp.jpg", category_name_string, vild_params, plot_on=False)
                found_objects = self.correct_scene_description(found_objects)
                scene_description = build_scene_description(found_objects)
                print("Scene:", scene_description)
                print("\nStarting execution:")
                self.show_execution_state()

                gpt3_prompt = self.gpt3_context
                if use_environment_description:
                    gpt3_prompt += "\n" + scene_description
                gpt3_prompt += "\n# " + task + "\n"

                # Track steps
                num_steps = 0
                selected_task = ""
                prev_selected_tasks = set()

                while selected_task != "done()" and num_steps < 5:
                    step_start = time.time()
                    num_steps += 1

                    # Get scores with enhanced affordance
                    llm_scores_step, _ = gpt3_scoring(gpt3_prompt, options, engine=self.model, verbose=False)
                    affordance_scores = {opt: self.get_enhanced_affordance_score(opt, found_objects) 
                                    for opt in options}

                    # Print detailed scoring breakdown
                    print(f"\nScoring Breakdown for Step {num_steps}:")
                    for option in options:
                        if option == "done()":
                            continue
                        pick, place = option.replace("robot.pick_and_place(", "").replace(")", "").split(", ")
                        llm_score = llm_scores_step[option]
                        affordance_score = affordance_scores[option]
                        combined = np.exp(llm_score) * affordance_score
                        
                        print(f"\nOption: {option}")
                        print(f"  LLM Score: {llm_score:.3f} - {'Valid for task' if llm_score > 5 else 'Less relevant for task'}")
                        print(f"  Affordance Score: {affordance_score:.3f} - Factors:")
                        print(f"    - Object exists: {1 if pick in found_objects else 0}")
                        print(f"    - Target valid: {1 if place in PLACE_TARGETS or place in found_objects else 0}")
                        if "corner" in task.lower():
                            print(f"    - Corner target: {'Yes' if 'corner' in place else 'No'}")
                        print(f"  Combined Score: {combined:.3f}")

                    # Store scores for visualization
                    combined_scores = {opt: np.exp(llm_scores_step[opt]) * affordance_scores[opt] 
                                    for opt in options}
                    combined_scores = normalize_scores(combined_scores)
                    
                    self.current_task_scores['llm'].append(llm_scores_step)
                    self.current_task_scores['affordance'].append(affordance_scores)
                    self.current_task_scores['combined'].append(combined_scores)

                    # Select best non-repeated action
                    valid_options = [opt for opt in combined_scores.keys() 
                                    if opt not in prev_selected_tasks and opt != "done()"
                                    and affordance_scores[opt] > 0]  # Only consider physically possible actions
                    if not valid_options:
                        selected_task = "done()"
                        continue
                        
                    selected_task = max(valid_options, key=lambda x: combined_scores[x])
                    
                    if selected_task and selected_task != "done()":
                        print(f"\nStep {num_steps}: {selected_task}")
                        planned_steps.append(selected_task)
                        self.current_task_scores['steps'].append(selected_task)
                        prev_selected_tasks.add(selected_task)

                        # Visualize scores for this step
                        self.visualize_step_scores(num_steps)

                        # Execute
                        nlp_step = step_to_nlp(selected_task)
                        print(f"Executing: {nlp_step}")
                        obs = self.executor.execute_action(obs, nlp_step)
                        executed_steps.append(nlp_step)
                        self.show_execution_state()

                        step_time = time.time() - step_start
                        execution_times.append(step_time)
                        gpt3_prompt += selected_task + "\n"

                # Print final summary
                self.print_task_summary(task, method, planned_steps, executed_steps)
                
            else:  # Socratic method
                found_objects = vild("tmp.jpg", category_name_string, vild_params, plot_on=False)
                found_objects = self.correct_scene_description(found_objects)
                scene_description = build_scene_description(found_objects)
                print("Scene:", scene_description)
                
                # Generate full plan
                gpt3_prompt = gpt3_context + f"\n{scene_description}\n# {task}"
                response = gpt3_call(engine=self.model, prompt=gpt3_prompt, max_tokens=128)
                steps = [s.strip() for s in response["choices"][0]["text"].strip().split("\n")]
                
                # Clean and validate steps
                planned_steps = []
                for step in steps:
                    cleaned_step = self.clean_step(step)
                    if cleaned_step:
                        planned_steps.append(cleaned_step)
                
                print("\nGenerated plan:")
                for i, step in enumerate(planned_steps, 1):
                    print(f"{i}. {step}")
                
                print("\nStarting execution:")
                self.show_execution_state()
                
                # Execute plan
                for i, step in enumerate(planned_steps, 1):
                    step_start = time.time()
                    nlp_step = step_to_nlp(step)
                    print(f"\nExecuting step {i}: {nlp_step}")
                    obs = self.executor.execute_action(obs, nlp_step)
                    if obs is None:
                        print(f"Failed to execute step {i}")
                        break
                    executed_steps.append(nlp_step)
                    self.show_execution_state()
                    execution_times.append(time.time() - step_start)
                
                # Print final summary
                self.print_task_summary(task, method, planned_steps, executed_steps)
            
            success = (len(executed_steps) == len(planned_steps) and 
                      obs is not None and 
                      len(executed_steps) > 0)
            print(f"\nTask completed. Success: {success}")
            
            return TaskResult(
                method=method,
                task=task,
                success=success,
                num_steps=len(executed_steps),
                planned_steps=planned_steps,
                executed_steps=executed_steps,
                execution_times=execution_times,
                llm_scores=llm_scores,
                total_time=time.time() - start_time
            )
            
        except Exception as e:
            print(f"\nError during execution: {str(e)}")
            traceback.print_exc()
            return TaskResult(
                method=method,
                task=task,
                success=False,
                num_steps=0,
                planned_steps=[],
                executed_steps=[],
                execution_times=[],
                llm_scores=[],
                total_time=time.time() - start_time,
                error=str(e)
            )

    def show_execution_state(self):
        """Show current environment state"""
        plt.figure(figsize=(8, 8))
        plt.imshow(self.env.get_camera_image())
        plt.axis('off')
        plt.show()
        plt.close()
    
    def run_experiment(self, tasks: List[Dict], random_seed: int = 42) -> Dict[str, Any]:
        """Run full experiment comparing both methods across multiple tasks"""
        np.random.seed(random_seed)
        
        results = defaultdict(list)
        for task_config in tasks:
            for method in ['saycan', 'socratic']:
                result = self.run_single_task(
                    task=task_config['instruction'],
                    method=method,
                    config=task_config['config']
                )
                self.results.append(result)
                results[method].append(result)
                
        # Calculate summary metrics
        summary = {
            'saycan': {
                'success_rate': np.mean([r.success for r in results['saycan']]),
                'avg_steps': np.mean([r.num_steps for r in results['saycan']]),
                'avg_time': np.mean([r.total_time for r in results['saycan']])
            },
            'socratic': {
                'success_rate': np.mean([r.success for r in results['socratic']]),
                'avg_steps': np.mean([r.num_steps for r in results['socratic']]),
                'avg_time': np.mean([r.total_time for r in results['socratic']])
            }
        }
        
        return summary

    def visualize_results(self):
        """Create visualization of experiment results"""
        if not self.results:
            print("No results to visualize")
            return
            
        plt.figure(figsize=(15, 5))
        
        # Success rates
        plt.subplot(131)
        success_rates = {
            'SayCan': np.mean([r.success for r in self.results if r.method == 'saycan']),
            'Socratic': np.mean([r.success for r in self.results if r.method == 'socratic'])
        }

In [None]:
# Define test tasks
test_tasks = [
    {
        'instruction': 'put all the blocks in different corners',
        'config': {
            'pick': ['red block', 'blue block', 'green block'],
            'place': ['red bowl', 'blue bowl', 'green bowl']
        }
    },
    {
        'instruction': 'put blocks in their matching colored bowls',
        'config': {
            'pick': ['red block', 'blue block', 'green block'],
            'place': ['red bowl', 'blue bowl', 'green bowl']
        }
    },
    {
        'instruction': 'stack all the blocks',
        'config': {
            'pick': ['red block', 'blue block', 'green block'],
            'place': ['red bowl', 'blue bowl', 'green bowl']
        }
    },
    {
        'instruction': 'put the red block between the blue and green blocks',
        'config': {
            'pick': ['red block', 'blue block', 'green block'],
            'place': ['red bowl', 'blue bowl', 'green bowl']
        }
    }
]

# Run experiment
# experiment = ExperimentRunner(env, model="gpt-3.5-turbo-instruct")
# summary = experiment.run_experiment(test_tasks)

# # Print results
# print("\nExperiment Summary:")
# print("\nSayCan Results:")
# for metric, value in summary['saycan'].items():
#     print(f"{metric}: {value:.3f}")

# print("\nSocratic Results:")
# for metric, value in summary['socratic'].items():
#     print(f"{metric}: {value:.3f}")

# # Visualize results
# experiment.visualize_results()

# # Print detailed task breakdown
# print("\nDetailed Task Results:")
# for result in experiment.results:
#     print(f"\nMethod: {result.method}")
#     print(f"Task: {result.task}")
#     print(f"Success: {result.success}")
#     print(f"Steps Planned: {len(result.planned_steps)}")
#     print(f"Steps Executed: {len(result.executed_steps)}")
#     print(f"Total Time: {result.total_time:.2f}s")
#     if result.error:
#         print(f"Error: {result.error}")

## Experiment 2: Pipeline Performance & Optimization 

This experiment analyzes how different configurations affect SayCan's performance and efficiency:

### 1. Processing & Memory
   - Processing in batches vs one at a time
   - Checking memory usage 
   - Looking at API costs

### 2. Model Behavior
   - How fast different parts work
   - Response times from GPT model
   - If caching helps speed things up

### 3. Computer Resources
   - Running multiple things at once
   - Where things slow down
   - How GPU and CPU work together

### We measure:
- How long actions take
- How many actions we can do per second 
- How much it costs to run
- How much memory it uses
- If saved results help speed things up

In [None]:
import time
from dataclasses import dataclass
from typing import Dict, List, Tuple
import numpy as np
import matplotlib.pyplot as plt

@dataclass
class PipelineConfig:
    """Configuration for a pipeline run"""
    batch_size: int
    model_name: str
    use_caching: bool
    max_concurrent: int

class PipelineOptimizer:
    def __init__(self, env, base_config: Dict = None):
        self.env = env
        self.base_config = base_config or {
            'pick': ['red block', 'blue block', 'green block'],
            'place': ['red bowl', 'blue bowl', 'green bowl']
        }
        
        # Initialize tracking
        self.latencies = []
        self.throughputs = []
        self.memory_usage = []
        self.api_costs = []
        
    def run_pipeline_benchmark(self, config: PipelineConfig, num_trials: int = 5):
        """Run benchmark with given configuration"""
        metrics = {
            'latency': [],
            'throughput': [],
            'memory': [],
            'cost': []
        }
        
        for _ in range(num_trials):
            start_time = time.time()
            
            # Reset environment
            obs = self.env.reset(self.base_config)
            
            # Run standard test task
            instruction = "put all blocks in their matching colored bowls"
            
            # Configure batch processing
            options = make_options(PICK_TARGETS, PLACE_TARGETS)
            options = [options[i:i + config.batch_size] for i in range(0, len(options), config.batch_size)]
            
            # Track API usage
            initial_cost = performance_monitor.total_cost
            
            try:
                # Run pipeline stages
                for batch in options:
                    # Vision processing
                    found_objects = vild("tmp.jpg", category_name_string, vild_params, plot_on=False)
                    
                    # Language model scoring
                    llm_scores, _ = gpt3_scoring(instruction, batch, engine=config.model_name)
                    
                    # Affordance scoring
                    affordance_scores = affordance_scoring(batch, found_objects)
                    
                    # Action selection
                    combined_scores = {opt: np.exp(llm_scores[opt]) * affordance_scores[opt] 
                                    for opt in batch}
                
                total_time = time.time() - start_time
                metrics['latency'].append(total_time)
                metrics['throughput'].append(len(options) / total_time)
                metrics['cost'].append(performance_monitor.total_cost - initial_cost)
                
            except Exception as e:
                print(f"Error in pipeline run: {e}")
                continue
                
        return {k: np.mean(v) for k, v in metrics.items()}
    
    def optimize_pipeline(self, param_grid: Dict[str, List]):
        """Run grid search over pipeline parameters"""
        results = []
        
        # Generate configurations
        configs = []
        for batch_size in param_grid['batch_sizes']:
            for model in param_grid['models']:
                for cache in param_grid['use_cache']:
                    for concurrent in param_grid['max_concurrent']:
                        configs.append(PipelineConfig(
                            batch_size=batch_size,
                            model_name=model,
                            use_caching=cache,
                            max_concurrent=concurrent
                        ))
        
        # Run benchmarks
        for config in configs:
            metrics = self.run_pipeline_benchmark(config)
            results.append((config, metrics))
            
        return results
    
    def plot_results(self, results: List[Tuple[PipelineConfig, Dict]]):
        """Visualize optimization results"""
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
        
        # Extract data
        batch_sizes = [r[0].batch_size for r in results]
        latencies = [r[1]['latency'] for r in results]
        throughputs = [r[1]['throughput'] for r in results]
        costs = [r[1]['cost'] for r in results]
        
        # Latency vs Batch Size
        ax1.plot(batch_sizes, latencies, 'o-')
        ax1.set_title('Latency vs Batch Size')
        ax1.set_xlabel('Batch Size')
        ax1.set_ylabel('Latency (s)')
        
        # Throughput vs Batch Size
        ax2.plot(batch_sizes, throughputs, 'o-')
        ax2.set_title('Throughput vs Batch Size')
        ax2.set_xlabel('Batch Size')
        ax2.set_ylabel('Throughput (actions/s)')
        
        # Cost Analysis
        ax3.bar(range(len(costs)), costs)
        ax3.set_title('Cost per Configuration')
        ax3.set_xlabel('Configuration Index')
        ax3.set_ylabel('Cost ($)')
        
        # Memory Usage (if available)
        if self.memory_usage:
            ax4.plot(batch_sizes, self.memory_usage, 'o-')
            ax4.set_title('Memory Usage vs Batch Size')
            ax4.set_xlabel('Batch Size')
            ax4.set_ylabel('Memory (MB)')
        
        plt.tight_layout()
        plt.show()

# Example usage:
optimizer = PipelineOptimizer(env)

param_grid = {
    'batch_sizes': [1, 2, 4, 8, 16],
    'models': ['gpt-3.5-turbo-instruct'],
    'use_cache': [True, False],
    'max_concurrent': [1, 2, 4]
}

results = optimizer.optimize_pipeline(param_grid)
optimizer.plot_results(results)