In [1]:
# !pip install qwen-vl-utils==0.0.10
# !pip install httpx==0.27.2
# !pip install openai==1.40.0

In [1]:
# Your Command
command = input("Please input your instruction: ")
# pick up the Paper tube on top of the cell phone

Please input your instruction:  pick up the Paper tube on top of the cell phone


In [3]:
# Your Qwen API_KEY
import yaml
apikey = yaml.safe_load(open('env/configs.yaml', 'r'))["qwen_apikey"]


# the whole prompt in english
prompt = """I am about to give a command to the robot arm. Please help me extract the starting object and the ending object from this sentence, find the pixel coordinates of the upper left corner and the lower right corner of the two objects from this picture, and output the json data structure.

For example, if my command is: Please help me put the red square on the house sketch.
You output the following format:
{
"start":"red square",
"start_xyxy":[[102,505],[324,860]],
"end":"house sketch",
"end_xyxy":[[300,150],[476,310]]
}

Just reply to the json itself, don't reply to other content

My current command is: """ + command

In [4]:
import os
# API_KEY for Qwen model
os.environ['DASHSCOPE_API_KEY'] = apikey

# Get Noto JP font to display janapese characters
# !apt-get install fonts-noto-cjk  # For Noto Sans CJK JP

#!apt-get install fonts-source-han-sans-jp # For Source Han Sans (Japanese)

import json
import random
import io
import ast
from PIL import Image, ImageDraw, ImageFont
from PIL import ImageColor

additional_colors = [colorname for (colorname, colorcode) in ImageColor.colormap.items()]

def parse_json(json_output):
    # Parsing out the markdown fencing
    lines = json_output.splitlines()
    for i, line in enumerate(lines):
        if line == "```json":
            json_output = "\n".join(lines[i+1:])  # Remove everything before "```json"
            json_output = json_output.split("```")[0]  # Remove everything after the closing "```"
            break  # Exit the loop once "```json" is found
    return json_output

from openai import OpenAI
import base64
#  base 64 encode format image
def encode_image(fra):
    _, buffer = cv2.imencode(".jpg", fra)
    return base64.b64encode(buffer).decode("utf-8")


def inference_with_api(frame, prompt, sys_prompt="You are a helpful assistant.", model_id="qwen2.5-vl-72b-instruct", min_pixels=512*28*28, max_pixels=2048*28*28):
    base64_image = encode_image(frame)
    client = OpenAI(
        #If the environment variable is not configured, please replace the following line with the Dashscope API Key: api_key="sk-xxx".
        api_key=os.getenv('DASHSCOPE_API_KEY'),
        base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
    )


    messages=[
        {
            "role": "system",
            "content": [{"type":"text","text": sys_prompt}]},
        {
            "role": "user",
            "content": [
                {
                    "type": "image_url",
                    "min_pixels": min_pixels,
                    "max_pixels": max_pixels,
                    # Pass in BASE64 image data. Note that the image format (i.e., image/{format}) must match the Content Type in the list of supported images. "f" is the method for string formatting.
                    # PNG image:  f"data:image/png;base64,{base64_image}"
                    # JPEG image: f"data:image/jpeg;base64,{base64_image}"
                    # WEBP image: f"data:image/webp;base64,{base64_image}"
                    "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"},
                },
                {"type": "text", "text": prompt},
            ],
        }
    ]
    completion = client.chat.completions.create(
        model = model_id,
        messages = messages,

    )
    return completion.choices[0].message.content


# Use an API-based approach to inference. Apply API key here: https://bailian.console.alibabacloud.com/?apiKey=1
from qwen_vl_utils import smart_resize


min_pixels = 512*28*28
max_pixels = 2048*28*28

# Define a list of colors
colors = [
'red',
'green',
'blue',
'yellow',
'orange',
'pink',
'purple',
'brown',
'gray',
'beige',
'turquoise',
'cyan',
'magenta',
'lime',
'navy',
'maroon',
'teal',
'olive',
'coral',
'lavender',
'violet',
'gold',
'silver',
] + additional_colors



In [5]:
# import cv2
# _, i = cv2.VideoCapture(1).read()
# cv2.imwrite("captured_image.jpg", i)

In [None]:
# take picture and send to Qwen model
import cv2
from PIL import Image
# turn on camera
cap = cv2.VideoCapture(1)
if not cap.isOpened():
    print("Unable to open camera")
    exit()

# while True:
ret, frame = cap.read()
if not ret:
    print("Unable to read frame")
    # break
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
width, height = image.size
print(width, height)
input_height,input_width = smart_resize(height,width,min_pixels=min_pixels, max_pixels=max_pixels)
response = inference_with_api(frame, prompt, min_pixels=min_pixels, max_pixels=max_pixels)
print(response)


im = image
bounding_boxes = response
# Load the image
# img = im


# Parsing out the markdown fencing
bounding_boxes = parse_json(bounding_boxes)

# font = ImageFont.truetype("NotoSansCJK-Regular.ttc", size=14)

try:
  json_output = ast.literal_eval(bounding_boxes)
except Exception as e:
  end_idx = bounding_boxes.rfind('"}') + len('"}')
  truncated_text = bounding_boxes[:end_idx] + "]"
  json_output = ast.literal_eval(truncated_text)

start, start_box = json_output["start"], json_output["start_xyxy"]
end, end_box = json_output["end"], json_output["end_xyxy"]

# Convert normalized coordinates to absolute coordinates
# start object
start_abs_y1 = int(start_box[0][1]/input_height * height)
start_abs_x1 = int(start_box[0][0]/input_width * width)
start_abs_y2 = int(start_box[1][1]/input_height * height)
start_abs_x2 = int(start_box[1][0]/input_width * width)
start_x_center = int((start_abs_x1 + start_abs_x2) / 2)
start_y_center = int((start_abs_y1 + start_abs_y2) / 2)
# start_y_center = max(start_abs_y1, start_abs_y2)

if start_abs_x1 > start_abs_x2:
    start_abs_x1, start_abs_x2 = start_abs_x2, start_abs_x1

if start_abs_y1 > start_abs_y2:
    start_abs_y1, start_abs_y2 = start_abs_y2, start_abs_y1


# end object
end_abs_y1 = int(end_box[0][1]/input_height * height)
end_abs_x1 = int(end_box[0][0]/input_width * width)
end_abs_y2 = int(end_box[1][1]/input_height * height)
end_abs_x2 = int(end_box[1][0]/input_width * width)
end_x_center = int((end_abs_x1 + end_abs_x2) / 2)
end_y_center = int((end_abs_y1 + end_abs_y2) / 2)
# end_y_center = max(end_abs_y1, end_abs_y2)

if end_abs_x1 > end_abs_x2:
    end_abs_x1, end_abs_x2 = end_abs_x2, end_abs_x1

if end_abs_y1 > end_abs_y2:
    end_abs_y1, end_abs_y2 = end_abs_y2, end_abs_y1

print(f"start position: ({start_abs_x1}, {start_abs_y1}) ({start_abs_x2}, {start_abs_y2}), center coordinate: ({start_x_center}, {start_y_center})")
print(f"end position: ({end_abs_x1}, {end_abs_y1}) ({end_abs_x2}, {end_abs_y2}), center coordinate: ({end_x_center}, {end_y_center})")



In [None]:
draw_flag = True
if draw_flag:
    # Create a drawing object
    draw = ImageDraw.Draw(image)
    # Draw the bounding box
    draw.rectangle(
      ((start_abs_x1, start_abs_y1), (start_abs_x2, start_abs_y2)), outline=colors[0], width=4
    )
    # draw center of start object
    draw.ellipse((start_x_center - 10, start_y_center - 10, start_x_center + 10, start_y_center + 10), fill=colors[0])

    # draw.text((start_abs_x1 + 8, start_abs_y1 + 6), start, fill=colors[0], font=font)
    # Draw the bounding box
    draw.rectangle(
      ((end_abs_x1, end_abs_y1), (end_abs_x2, end_abs_y2)), outline=colors[1], width=4
    )
    # draw center of end object
    draw.ellipse((end_x_center - 10, end_y_center - 10, end_x_center + 10, end_y_center + 10), fill=colors[1])
    # draw.text((end_abs_x1 + 8, end_abs_y1 + 6), end, fill=colors[1], font=font)

    image

In [None]:
# connect mycobot 320
from pymycobot import MyCobot320Socket
import time
def get_ip_config():
    # 读取 YAML 文件
    with open('env/configs.yaml', 'r') as file:
        data = yaml.safe_load(file)

    # 读取 IP 和端口信息
    ip_address = data['ip']
    netport = data['port']

    return ip_address, netport


ip_address, netport = get_ip_config()
mc = MyCobot320Socket(ip_address, netport)
time.sleep(1)

mc.focus_all_servos()
time.sleep(1)

print("\n---> set_gripper_mode(0) => pass-through")
ret_mode = mc.set_gripper_mode(0)
print("     Return code:", ret_mode)
time.sleep(1)

home_angles = [0, 0, 0, 0, 0, 0]
print("\n---> Move to home position:", home_angles)
mc.send_angles(home_angles, 30)
time.sleep(3)

speed = 30

print("\n---> Open gripper")
mc.set_gripper_state(0, 100)
time.sleep(2)

In [7]:
print(f"Captured camera start coordinate from vision: ({start_x_center},{start_y_center})" )
print(f"Captured camera end coordinate from vision: ({end_x_center},{end_y_center})" )

import numpy as np

H = np.array([
    [6.60782927e-04,  2.48469514e+00, -5.96091742e+02],
    [3.82506417e-01,  4.06164160e-01, -2.18163280e+02],
    [9.21284300e-05, -5.55189057e-03,  1.00000000e+00]
])

def convert_camera_to_robot(camera_coord, H):
    u, v = camera_coord
    point_h = np.array([u, v, 1.0])
    robot_h = H.dot(point_h)
    robot_h /= robot_h[2]
    return (robot_h[0], robot_h[1])

start_robot_xy = convert_camera_to_robot((start_x_center, start_y_center), H)
print("Converted robot start (x, y):", start_robot_xy)
end_robot_xy = convert_camera_to_robot((end_x_center, end_y_center), H)
print("Converted robot end (x, y):", end_robot_xy)

Captured camera start coordinate from vision: (474,305)
Captured camera end coordinate from vision: (112,328)
Converted robot start (x, y): (-249.444399453102, -133.9548873570601)
Converted robot end (x, y): (-270.08980670667944, 51.93120432826099)


In [8]:
pick_z = 165
pick_orientation = [-179.46, -6.69, 95.57]
pick_coords = [start_robot_xy[0], start_robot_xy[1], pick_z] + pick_orientation
print("Pick coordinates:", pick_coords)

place_coords = [end_robot_xy[0], end_robot_xy[1], pick_z+50] + pick_orientation


print("\n---> Move to pick coordinates")
print(pick_coords)
mc.send_coords(pick_coords, speed, 1)
time.sleep(3)


print("\n---> Close gripper to grasp block")
mc.set_gripper_state(1, 100)
time.sleep(2)

pick_coords_ascend = [pick_coords[0], pick_coords[1], pick_coords[2] + 50] + pick_orientation
print("\n---> Ascend after grasping (z + 150)")
print(pick_coords_ascend)
mc.send_coords(pick_coords_ascend, speed, 1)
time.sleep(3)

print("\n---> Move to place coordinates")
print(place_coords)
mc.send_coords(place_coords, speed, 1)
time.sleep(3)

print("\n---> Open gripper to release block")
mc.set_gripper_state(0, 100)
time.sleep(2)

print("\n---> Return to home position")
mc.send_angles(home_angles, 30)
time.sleep(3)

print("\n---> Close gripper (final state)")
mc.set_gripper_state(1, 100)
time.sleep(2)

print("\nPick & Place sequence completed.\n")

Pick coordinates: [-249.444399453102, -133.9548873570601, 165, -179.46, -6.69, 95.57]

---> set_gripper_mode(0) => pass-through
     Return code: -1

---> Move to home position: [0, 0, 0, 0, 0, 0]

---> Open gripper

---> Move to pick coordinates
[-249.444399453102, -133.9548873570601, 165, -179.46, -6.69, 95.57]

---> Close gripper to grasp block

---> Ascend after grasping (z + 150)
[-249.444399453102, -133.9548873570601, 215, -179.46, -6.69, 95.57]

---> Move to place coordinates
[-270.08980670667944, 51.93120432826099, 215, -179.46, -6.69, 95.57]

---> Open gripper to release block

---> Return to home position

---> Close gripper (final state)

Pick & Place sequence completed.



In [9]:
del mc
import gc
gc.collect()

0