## Pixel coordinates extraction

### Handle click event

In [9]:
pixel_coordinates = []

# Track mouse clicks
def click_event(event, x, y, flags, param):
    if event == cv2.EVENT_LBUTTONDOWN:
        pixel_coordinates.append((x, y))
        print(f"Clicked at: ({x}, {y})")

In [1]:
import cv2
import numpy as np
cap = cv2.VideoCapture(0)
pixel_coordinates = []

# Track mouse clicks
def click_event(event, x, y, flags, param):
    if event == cv2.EVENT_LBUTTONDOWN:
        pixel_coordinates.append((x, y))
        print(f"Clicked at: ({x}, {y})")

cv2.namedWindow("Pixel Coordinates")
cv2.setMouseCallback("Pixel Coordinates", click_event)

# Display video feed
while True:
    ret, frame = cap.read()
    if not ret:
        break

    cv2.imshow("Pixel Coordinates", frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

: 

## Calculate homgeneous transform matrix for transformation from camera's frame of reference to end effector frame of reference

In [1]:
import numpy as np
import cv2

pixel_coordinates = [[197,32], [328,43], [256,83], [402,77], [231,137], [331,153], [316,166], [260,224], [405,220], [245,265], [333,260]]
robot_coordinates = [[393.6,-47.6], [245.8,-29.9], [325.3,11.4], [164.9,7.9], [356.9,73.4], [244.5,92.9], [262.6,107.8], [323.9,170.4], [164.6,168.5], [343.1,215.1], [243.9,212.2]]
pixel_points = np.array(pixel_coordinates, dtype=np.float32)
robot_points = np.array(robot_coordinates, dtype=np.float32)

homography_matrix, _ = cv2.findHomography(pixel_points, robot_points)
print("Homography Matrix:", homography_matrix)

Homography Matrix: [[-1.12652760e+00  3.40585199e-02  6.19708826e+02]
 [ 2.72743826e-02  1.15644498e+00 -9.04650194e+01]
 [ 6.52525716e-05  5.57229232e-05  1.00000000e+00]]


In [2]:
def pixel_to_robot(x, y, matrix):
    pixel = np.array([x, y, 1]).reshape(3, 1)
    robot_coords = np.dot(matrix, pixel)
    robot_coords /= robot_coords[2]  
    return robot_coords[0][0], robot_coords[1][0]

## Box coordinates from camera reference to end effector reference frame

In [3]:
box = pixel_to_robot(306, 303, homography_matrix)
box_x, box_y = box

## Initialize arm

In [4]:
import cv2
import numpy as np
from xarm.wrapper import XArmAPI


arm = XArmAPI('192.168.1.155')
arm.motion_enable(enable=True)
arm.set_mode(0)
arm.set_state(0)
arm.connect()
arm.move_gohome()

SDK_VERSION: 1.14.8
ROBOT_IP: 192.168.1.155, VERSION: v2.2.0, PROTOCOL: V1, DETAIL: 6,9,LI1006,DL1000,v2.2.0, TYPE1300: [0, 0]
change protocol identifier to 3


0

## Start window listener stream
1. Click object to pick
2. Transform pixel to end effector coordinate frame (P)
3. Move arm to the point (P) with a fixed height
4. Pick the object
5. Go to dropping destination
6. Lower arm and drop the object
7. Go back to home

## Pick up and drop sub-routine

In [5]:
pixel_coordinates = [] # Reinitialize pixel coordinates

In [7]:
import time
def pick_up_and_drop(x_robot, y_robot):
    arm.set_position(x_robot, y_robot, 70)  
    arm.set_position(x_robot, y_robot, 17, wait=True)  
    arm.set_suction_cup(False)
    time.sleep(1)
    arm.set_position(x_robot, y_robot, 16.5, wait=True)  
    arm.set_position(x_robot, y_robot, 70, wait=True)  
    arm.set_position(box_x, box_y, 200)
    arm.set_position(box_x, box_y, 100)
    arm.set_suction_cup(True)
    arm.set_position(box_x, box_y, 200)

### Main pick up routine
1. Start stream
2. User selects object
3. Arm moves and places it's end effector over the center of the object
4. Arm lowers and picks up the object
5. Arm goes to box location
6. Arm drops the object

In [10]:
import time

cap = cv2.VideoCapture(1)
cv2.namedWindow("Click to Move")
cv2.setMouseCallback("Click to Move", click_event)

while True:
    ret, frame = cap.read()
    if not ret:
        break
    cv2.imshow("Click to Move", frame)
    
    if len(pixel_coordinates) > 0:
        x_pixel, y_pixel = pixel_coordinates.pop(0)
        x_robot, y_robot = pixel_to_robot(x_pixel, y_pixel, homography_matrix)
        print(f"Moving to: ({x_robot}, {y_robot})")
        pick_up_and_drop(x_robot, y_robot)
        # arm.set_position(x_robot, y_robot, 70, speed=100)  
        # arm.set_position(x_robot, y_robot, 27, speed=100)  
        # arm.set_suction_cup(False)
        # arm.set_position(box_x, box_y, 200, speed=100)
        # arm.set_position(box_x, box_y, 100, speed=100)
        # arm.set_suction_cup(True)
        # arm.set_position(box_x, box_y, 200, speed=100)
        # arm.move_gohome()
        
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

Clicked at: (257, 157)
Moving to: (327.1890762621443, 95.66513696902115)
Clicked at: (260, 165)
Moving to: (323.956617351429, 104.70077500758353)
Clicked at: (239, 201)
Moving to: (347.9898703731883, 144.62370994077784)
Clicked at: (232, 122)
Moving to: (354.7279688698958, 55.72646526077061)
Clicked at: (252, 151)
Moving to: (332.6965994095523, 88.82336310257618)
Clicked at: (248, 144)
Moving to: (337.07492422482045, 80.86951765030584)
Clicked at: (176, 96)
Moving to: (417.6784492829194, 24.934250731439587)
Clicked at: (276, 95)
Moving to: (304.9171657381117, 26.311828789104666)
Clicked at: (196, 185)
Moving to: (396.06190836446785, 125.91467334432974)
Clicked at: (348, 147)
Moving to: (225.7095868363908, 86.35556337292725)


: 

## Gemini API

In [2]:
import cv2

cap = cv2.VideoCapture(1)

while True:
    ret, frame = cap.read()

    if not ret:
        print("Can't receive frame. Exiting...")
        break

    cv2.imshow('Webcam Preview - Press Q to capture', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        cv2.imwrite('captured_image.jpg', frame)
        print("Image saved as captured_image.jpg")
        break

cap.release()
cv2.destroyAllWindows()

Image saved as captured_image.jpg


In [3]:
import dotenv
import os
from google import genai
from google.genai import types
from PIL import Image
import io
import os
import requests
from io import BytesIO

In [4]:
dotenv.load_dotenv()
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

model_name = "gemini-2.0-flash" 

bounding_box_system_instructions = """
    Return bounding boxes as a JSON array with labels. Never return masks or code fencing. Limit to 25 objects.
    If an object is present multiple times, name them according to their unique characteristic (colors, size, position, unique characteristics, etc..).
    """

client = genai.Client(api_key=GEMINI_API_KEY)

In [5]:
safety_settings = [
    types.SafetySetting(
        category="HARM_CATEGORY_DANGEROUS_CONTENT",
        threshold="BLOCK_ONLY_HIGH",
    ),
]

In [6]:
import json
import random
import io
from PIL import Image, ImageDraw, ImageFont
from PIL import ImageColor

additional_colors = [colorname for (colorname, colorcode) in ImageColor.colormap.items()]

def plot_bounding_boxes(im, bounding_boxes):
    """
    Plots bounding boxes on an image with markers for each a name, using PIL, normalized coordinates, and different colors.

    Args:
        img_path: The path to the image file.
        bounding_boxes: A list of bounding boxes containing the name of the object
         and their positions in normalized [y1 x1 y2 x2] format.
    """

    # Load the image
    img = im
    width, height = img.size
    print(img.size)
    # Create a drawing object
    draw = ImageDraw.Draw(img)

    # Define a list of colors
    colors = [
    'red',
    'green',
    'blue',
    'yellow',
    'orange',
    'pink',
    'purple',
    'brown',
    'gray',
    'beige',
    'turquoise',
    'cyan',
    'magenta',
    'lime',
    'navy',
    'maroon',
    'teal',
    'olive',
    'coral',
    'lavender',
    'violet',
    'gold',
    'silver',
    ] + additional_colors

    # Parsing out the markdown fencing
    bounding_boxes = parse_json(bounding_boxes)

    font = ImageFont.load_default()

    # Iterate over the bounding boxes
    for i, bounding_box in enumerate(json.loads(bounding_boxes)):
      # Select a color from the list
      color = colors[i % len(colors)]

      # Convert normalized coordinates to absolute coordinates
      abs_y1 = int(bounding_box["box_2d"][0]/1000 * height)
      abs_x1 = int(bounding_box["box_2d"][1]/1000 * width)
      abs_y2 = int(bounding_box["box_2d"][2]/1000 * height)
      abs_x2 = int(bounding_box["box_2d"][3]/1000 * width)

      if abs_x1 > abs_x2:
        abs_x1, abs_x2 = abs_x2, abs_x1

      if abs_y1 > abs_y2:
        abs_y1, abs_y2 = abs_y2, abs_y1

      # Draw the bounding box
      draw.rectangle(
          ((abs_x1, abs_y1), (abs_x2, abs_y2)), outline=color, width=4
      )

      # Draw the text
      if "label" in bounding_box:
        draw.text((abs_x1 + 8, abs_y1 + 6), bounding_box["label"], fill=color, font=font)

    # Display the image
    img.save("output_image.jpg")  # or "output_image.png"
    print("Image saved as output_image.jpg")

    img.show()

In [7]:
def parse_json(json_output):
    # Parsing out the markdown fencing
    lines = json_output.splitlines()
    for i, line in enumerate(lines):
        if line == "```json":
            json_output = "\n".join(lines[i+1:])  # Remove everything before "```json"
            json_output = json_output.split("```")[0]  # Remove everything after the closing "```"
            break  # Exit the loop once "```json" is found
    return json_output

In [1]:
prompt = "Detect the 2d bounding boxes of the small cubes (with “label” as topping description”)"  # @param {type:"string"}

image = "captured_image.jpg"
# Load and resize image
im = Image.open(io.BytesIO(open(image, "rb").read()))
im.thumbnail([1024,1024], Image.Resampling.LANCZOS)

# Run model to find bounding boxes
response = client.models.generate_content(
    model=model_name,
    contents=[prompt, im],
    config = types.GenerateContentConfig(
        system_instruction=bounding_box_system_instructions,
        temperature=0.5,
    )
)

# Check output
print(response.text)

NameError: name 'Image' is not defined

In [9]:
plot_bounding_boxes(im, response.text)

(640, 480)
Image saved as output_image.jpg


In [10]:
import cv2
import numpy as np
from xarm.wrapper import XArmAPI


arm = XArmAPI('192.168.1.155')
arm.motion_enable(enable=True)
arm.set_mode(0)
arm.set_state(0)
arm.connect()
arm.move_gohome()

ROBOT_IP: 192.168.1.155, VERSION: v2.2.0, PROTOCOL: V1, DETAIL: 6,9,LI1006,DL1000,v2.2.0, TYPE1300: [0, 0]
change protocol identifier to 3


0

In [11]:
bounding_boxes = parse_json(response.text)

In [18]:
for i, bounding_box in enumerate(json.loads(bounding_boxes)):

    width, height = im.size
    # Convert normalized coordinates to absolute coordinates
    abs_y1 = int(bounding_box["box_2d"][0]/1000 * height)
    abs_x1 = int(bounding_box["box_2d"][1]/1000 * width)
    abs_y2 = int(bounding_box["box_2d"][2]/1000 * height)
    abs_x2 = int(bounding_box["box_2d"][3]/1000 * width)

    if abs_x1 > abs_x2:
        abs_x1, abs_x2 = abs_x2, abs_x1

    if abs_y1 > abs_y2:
        abs_y1, abs_y2 = abs_y2, abs_y
    
    c1 = (abs_x1 + abs_x2) / 2
    c2 = (abs_y1 + abs_y2) / 2

    c1_robot, c2_robot = pixel_to_robot(c1, c2, homography_matrix)
    pick_up_and_drop(c1_robot, c2_robot)

NameError: name 'box_x' is not defined

## Get which bounding box to pick from llama

In [None]:
import dotenv
import os
import requests
import json
from groq import Groq

def pick_and_place_all_blocks(centroids, homography_matrix, drop_location=None):
    """
    Pick and place all blocks at the specified centroids
    
    Args:
        centroids: List of dictionaries, each with a 'centroid' key containing [x, y] coordinates
        homography_matrix: 3x3 homography matrix that maps pixel coordinates to robot coordinates
        drop_location: Optional tuple (x, y) for the drop location in robot coordinates
                      If None, defaults to (250, 0)
    """
    # Default drop location if not specified
    if drop_location is None:
        drop_location = (250, 0)
    
    box_x, box_y = drop_location
    
    # Initialize the XArm
    arm = XArmAPI('192.168.1.155')
    arm.motion_enable(enable=True)
    arm.set_mode(0)
    arm.set_state(0)
    arm.connect()
    
    # Move to home position to start
    print("Moving to home position...")
    arm.move_gohome(wait=True)
    
    # Process each centroid
    for i, centroid_obj in enumerate(centroids):
        print(f"Processing block {i+1}/{len(centroids)}")
        
        # Extract the centroid coordinates
        pixel_x, pixel_y = centroid_obj['centroid']
        
        # Convert pixel coordinates to robot coordinates
        x_robot, y_robot = pixel_to_robot(pixel_x, pixel_y, homography_matrix)
        print(f"Pixel coordinates: ({pixel_x}, {pixel_y}) -> Robot coordinates: ({x_robot}, {y_robot})")
        
        # Pick up the block
        print(f"Picking up block at ({x_robot}, {y_robot})...")
        
        # Move above the block
        arm.set_position(x=x_robot, y=y_robot, z=70, wait=True)
        
        # Lower to the block
        arm.set_position(z=17, wait=True)
        
        # Turn off suction (to create suction)
        arm.set_suction_cup(False)
        time.sleep(1)  # Wait for suction to establish
        
        # Fine adjustment for better grip
        arm.set_position(z=16.5, wait=True)
        
        # Lift the block
        arm.set_position(z=70, wait=True)
        
        # Move to the drop location
        print(f"Moving to drop location ({box_x}, {box_y})...")
        arm.set_position(x=box_x, y=box_y, z=200, wait=True)
        
        # Lower to drop height
        arm.set_position(z=100, wait=True)
        
        # Release the block
        arm.set_suction_cup(True)  # Turn on suction (to release)
        time.sleep(0.5)  # Wait for release
        
        # Move up
        arm.set_position(z=200, wait=True)
        
        print(f"Block {i+1} placed successfully")
    
    # Return to home position
    print("All blocks processed. Returning to home position...")
    arm.move_gohome(wait=True)
    
    print("Operation completed successfully")

# Load environment variables from .env file
dotenv.load_dotenv()

# Your Groq API client setup
client = Groq(
    api_key=os.environ.get("GROQ_API_KEY")  # Make sure this is set in your .env file
)

# Sample data with object bounding boxes and labels
data = [
    {"box_2d": [237, 390, 300, 436], "label": "green cube"},
    {"box_2d": [193, 488, 262, 535], "label": "green cube"},
    {"box_2d": [368, 415, 425, 458], "label": "red cube"},
    {"box_2d": [270, 548, 331, 593], "label": "green cube"},
    {"box_2d": [275, 301, 343, 355], "label": "blue cube"},
    {"box_2d": [268, 488, 331, 530], "label": "yellow cube"},
    {"box_2d": [335, 470, 402, 520], "label": "green cube"},
    {"box_2d": [231, 435, 306, 490], "label": "blue cube"},
    {"box_2d": [295, 403, 362, 455], "label": "red cube"},
    {"box_2d": [418, 498, 487, 548], "label": "green cube"}
]

# Calculate centroids from the bounding boxes
objects = []
for item in data:
    x1, y1, x2, y2 = item["box_2d"]
    centroid_x = (x1 + x2) / 2
    centroid_y = (y1 + y2) / 2
    objects.append({
        "label": item["label"],
        "centroid": [centroid_x, centroid_y]
    })

# User command
custom_prompt = "pick the yellow block"

# Send the objects and custom prompt to the Groq API
response = client.chat.completions.create(
    messages=[
        {
            "role": "system", 
            "content": "You are an assistant tasked with processing object data and returning only the centroids that match the user's request in JSON format."
        },
        {
            "role": "user", 
            "content": f"""
Here is a list of objects with their labels and centroids:
{json.dumps(objects, indent=2)}

The command is: "{custom_prompt}"

Please return only the centroids of objects that match this command in this exact JSON format:
{{
    "centroids": [
        {{
            "centroid": [x, y]
        }},
        ...
    ]
}}
Do not include any explanation, only the JSON.
"""
        }
    ],
    model="llama3-8b-8192"
)

# Debugging: Print the raw response
print("Raw response:", response)

# Handle the response from the API
try:
    # Check if the response contains content
    if response and hasattr(response, 'choices') and len(response.choices) > 0:
        # Access the message content
        response_content = response.choices[0].message.content
        print("Groq API response:", response_content)

        # Try parsing the response content as JSON
        try:
            result = json.loads(response_content)
            # Assuming the Groq API returns the centroid values
            centroids_from_response = result.get('centroids', [])
            
            if centroids_from_response:
                print("Returned centroids:", centroids_from_response)
                
                # Here you would typically use the centroids to control the robotic arm
                # For example:
                # for centroid_obj in centroids_from_response:
                #     centroid = centroid_obj['centroid']
                #     # Move arm to this centroid
                #     # arm.set_position(x=centroid[0], y=centroid[1], z=some_z_value)
            else:
                print("No matching centroids found.")
                
        except json.JSONDecodeError as e:
            print(f"Response is not valid JSON: {e}")
            print("Raw content:", response_content)
            
    else:
        print("No valid response or content in response.")

except Exception as e:
    print(f"Error processing response: {str(e)}")

Raw response: ChatCompletion(id='chatcmpl-89c7db2e-34b2-4900-b69e-342084abef90', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='{\n    "centroids": [\n        {\n            "centroid": [\n                299.5,\n                509.0\n            ]\n        }\n    ]\n}', role='assistant', function_call=None, reasoning=None, tool_calls=None))], created=1742563057, model='llama3-8b-8192', object='chat.completion', system_fingerprint='fp_dadc9d6142', usage=CompletionUsage(completion_tokens=34, prompt_tokens=425, total_tokens=459, completion_time=0.028333333, prompt_time=0.080756094, queue_time=0.23951559700000002, total_time=0.109089427), x_groq={'id': 'req_01jpwdm7bef5tbrkg4jzvw1gz4'})
Groq API response: {
    "centroids": [
        {
            "centroid": [
                299.5,
                509.0
            ]
        }
    ]
}
Returned centroids: [{'centroid': [299.5, 509.0]}]
