In [None]:
import sys
import importlib
def reload_modules(*modules):
    for module in modules:
        if module.__name__ in sys.modules:
            importlib.reload(module)
        else:
            print(f"Module {module.__name__} is not currently imported.")

import numpy as np
import time
import copy
from PIL import Image
import cv2
import open3d as o3d
import ast
import importlib
import imutils
import matplotlib.pyplot as plt
# reload_modules(pcd, utils, real, ur5, gripper, control_utils)
from magpie_perception import pcd
from magpie_perception import utils
from magpie_perception.utils import label_wrist_image, find_object, label_wrist_image_rotation_axes, optimize_and_correct_frame, plot_frames_3d
from magpie_control import realsense_wrapper as real
from magpie_control import ur5
from magpie_control import gripper
from magpie_control import poses
from magpie_control import utils as control_utils
import spatialmath as sm
reload_modules(pcd, utils, real, ur5, gripper, control_utils)
from magpie_control.utils import transform_6d

# rerun viz
# import rerun as rr
# from rerun_rlds_ur5.rlds import RLDSDataset, DeliGraspTrajectory, LiveRobotData
# from rerun_rlds_ur5.rerun_loader_urdf import URDFLogger, get_urdf_paths, update_urdf
# reload_modules(LiveRobotData, URDFLogger)
# ur5_urdf, _, _= get_urdf_paths("ur5")
# rr.init("ScalingForce-visualized", spawn=True)
# urdf_logger = URDFLogger(ur5_urdf)
# urdf_logger.log()
# rr.log("annotation", rr.TextDocument("annotation_1",media_type="text/markdown"))
# viz_joints = False
# rerun_viz = LiveRobotData(entity_to_transform=urdf_logger.entity_to_transform, visualize_joints=viz_joints)
# rr.send_blueprint(rerun_viz.blueprint())
rerun_viz = None
# with help from: https://github.com/google-gemini/cookbook/blob/main/examples/Spatial_understanding_3d.ipynb
import os
from google import genai
import openai
from dotenv import load_dotenv
import instructor
from magpie_prompts.prompts import sf_force_thinker, sf_grasp_selection, sf_force_reflection, sf_position_thinker
from magpie_prompts import conversation
from magpie_prompts.conversation import openai_encode_image, build_messages, send_message
reload_modules(sf_force_thinker, sf_grasp_selection, sf_position_thinker, conversation, sf_force_reflection)

load_dotenv()
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')
OPENAI_API_KEY = os.getenv('CORRELL_API_KEY')
GEMINI_MODEL_ID = "gemini-2.0-flash" # @param ["gemini-1.5-flash-latest","gemini-2.0-flash-lite","gemini-2.0-flash","gemini-2.5-pro-exp-03-25"] {"allow-input":true}
# OPENAI_MODEL_ID = "gpt-4o-mini"
OPENAI_MODEL_ID = "gpt-4.1-mini"

gemini_client = genai.Client(api_key=GEMINI_API_KEY)
openai_client = openai.OpenAI(api_key=OPENAI_API_KEY)
force_thinker = sf_force_thinker.PromptForceThinker()

In [None]:
devices = real.poll_devices()
print(devices)
wrist = real.RealSense(fps=15, w=640, h=480, device_name="D405")
wrist.initConnection(device_serial=devices['D405'])
wkspc = real.RealSense(zMax=5, fps=6, w=640, h=480, device_name="D435")
wkspc.initConnection(device_serial=devices['D435'])

In [None]:
user_query = "pick up the drill"
# user_query = "push the bottle to the green duck"
# user_query = "tip the bottle over onto its front side"
grasp_phrase = "the drill handle"

pcd_wrist, rgbd_wrist = wrist.getPCD()
pcd_wkspc, rgbd_wkspc = wkspc.getPCD()

img_wrist = np.array(rgbd_wrist.color).copy()
img_wkspc = np.array(rgbd_wkspc.color)
img = img_wrist.copy()
H0, W0 = img.shape[:2]
img = Image.fromarray(img).copy()
W1 = 800
H1 = int(800 * H0 / W0)
# img = Image.fromarray(img_wrist)

img = img.resize((W1, H1), Image.Resampling.LANCZOS)
# img_wkspc_resized = imutils.resize(img, width=1000)

grasp_prompt = sf_grasp_selection.grasp_axis
# grasp_prompt = sf_grasp_selection.antipodal_grasp
image_response = gemini_client.models.generate_content(
    model=GEMINI_MODEL_ID,
    contents=[grasp_prompt, f"task: {user_query} grasp phrase: {grasp_phrase}", img],
)
grasp_selection = ast.literal_eval(image_response.text.split("\n")[1])
points = grasp_selection['points']
pt1, pt2 = points
image_response.text

def plot_norm_points_on_image(img, points, label="grasp", color='red'):
    plt.imshow(img)
    for pt in points:
        y_norm, x_norm = pt
        y_px = (y_norm / 1000) * H1
        x_px = (x_norm / 1000) * W1
        scale_x = W0 / W1
        scale_y = H0 / H1
        x_orig = x_px * scale_x
        y_orig = y_px * scale_y
        plt.scatter(x_orig, y_orig, c=color, s=100, marker='o', edgecolors='black', linewidths=1.5)
        # plt.text(x + 5, y - 5, label, color=color, fontsize=12, weight='bold')
    plt.axis('off')
    plt.show()

plot_norm_points_on_image(img_wrist, points, label="grasp")

In [None]:
pcd_wrist, rgbd_wrist = wrist.getPCD()
pcd_wkspc, rgbd_wkspc = wkspc.getPCD()

img_wrist = np.array(rgbd_wrist.color).copy()
img_wkspc = np.array(rgbd_wkspc.color)
H0, W0 = img_wkspc.shape[:2]
img = Image.fromarray(img_wkspc).copy()
W1 = 800
H1 = int(800 * H0 / W0)
# img = Image.fromarray(img_wrist)

img = img.resize((W1, H1), Image.Resampling.LANCZOS)
img_wkspc_resized = imutils.resize(img_wkspc, width=1000)


grasp_prompt = sf_grasp_selection.grasp_prompt
image_response = gemini_client.models.generate_content(
    model=GEMINI_MODEL_ID,
    contents=[grasp_prompt, f"task: {user_query} grasp phrase: {grasp_phrase}", img],
)
grasp_selection = ast.literal_eval(image_response.text.split("\n")[1])
point = grasp_selection['point']
y_norm, x_norm = point

# Step 1: [0, 1000] → resized pixel coordinates
y_px = (y_norm / 1000) * H1
x_px = (x_norm / 1000) * W1
# Step 2: resized pixels → original pixels
scale_x = W0 / W1
scale_y = H0 / H1
x_orig = x_px * scale_x
y_orig = y_px * scale_y
def plot_point_on_image(img, x, y, label="grasp", color='red'):
    plt.imshow(img)
    plt.scatter([x], [y], c=color, s=100, marker='o', edgecolors='black', linewidths=1.5)
    plt.text(x + 5, y - 5, label, color=color, fontsize=12, weight='bold')
    plt.axis('off')
    plt.show()

# origin = (x_orig, y_orig)
plot_point_on_image(img_wkspc, x_orig, y_orig, label="grasp")