In [1]:
import sys
import time
from typing import Optional, Tuple

import cv2
from ultralytics import YOLO

from pydobot.dobot import MODE_PTP
import pydobot
from pydantic import BaseModel

In [2]:

# Robot poses - UPDATE THESE COORDINATES FOR YOUR SETUP
HOME: Tuple[float, float, float, float] = (233.1162, -1.3118, 150.7647, -0.3224)

# Fixed position where cardboard pieces with object pictures are stacked
CARDBOARD_STACK: Tuple[float, float, float, float] = (297.28350830078125, 51.12328338623047, -45.1676025390625, 9.757606506347656)

# Two pallets for different object categories
PALLET_A: Tuple[float, float, float, float] = (254.5684814453125, -46.615028381347656, -21.938823699951172, -10.376693725585938)  # Food items
PALLET_B: Tuple[float, float, float, float] = (352, -46.615028381347656, -21.938823699951172, -10.376693725585938)  # Vehicle items

# Z heights for suction
SAFE_Z: float = -20.0  # Safe height above objects
Z_SUCK_START: float = -46  # Starting suction height
Z_SUCK_INCREMENT: float = -2.0  # How much to lower each attempt
MAX_Z_SUCK: float = -55.0  # Maximum depth to try


FOOD_LABELS = {
    "banana",
    "apple",
    "pizza",
}

VEHICLE_LABELS = {
    "bicycle",
    "car",
    "airplane",
}


In [2]:

def connect_robot(serial_port: str) -> pydobot.Dobot:
    device = pydobot.Dobot(port=serial_port)
    device.speed(1000, 1000)
    return device


def move_linear(device: pydobot.Dobot, x: float, y: float, z: float, r: float, wait: bool = True) -> None:
    device.move_to(mode=int(MODE_PTP.MOVJ_XYZ), x=x, y=y, z=z, r=r)


def home(device: pydobot.Dobot) -> None:
    device.home()

In [5]:
device = connect_robot("/dev/ttyACM0")

Clearing alarms: 0.


In [10]:
device.get_pose()

Pose(position=Position(x=178.2554931640625, y=45.18132019042969, z=60.5149040222168, r=14.222879409790039), joints=Joints(j1=14.222879409790039, j2=-10.74116325378418, j3=36.01646423339844, j4=0.0))

# Util Functions

In [6]:
def detect_object_category(model: YOLO, frame) -> Optional[str]:
    """
    Run YOLO and return the category (A or B) of the detection closest to image center.
    Returns None if no valid detections.
    """
    results = model(frame, verbose=False)
    r = results[0]
    if r.boxes is None or len(r.boxes) == 0:
        return None

    xyxy = r.boxes.xyxy.cpu().numpy()
    conf = r.boxes.conf.cpu().numpy()
    cls = r.boxes.cls.cpu().numpy().astype(int)
    names = r.names

    h, w = frame.shape[:2]
    cx_img, cy_img = w / 2.0, h / 2.0

    best_idx = -1
    best_dist = 1e9
    for i, ((x1, y1, x2, y2), c, k) in enumerate(zip(xyxy, conf, cls)):
        mx = (x1 + x2) / 2.0
        my = (y1 + y2) / 2.0
        dist = (mx - cx_img) ** 2 + (my - cy_img) ** 2
        if dist < best_dist or (dist == best_dist and c > conf[best_idx]):
            best_dist = dist
            best_idx = i

    if best_idx == -1:
        return None

    label_idx = cls[best_idx]
    label_name = names[label_idx].lower()
    
    if label_name in FOOD_LABELS:
        return "A"
    elif label_name in VEHICLE_LABELS:
        return "B"
    else:
        return None


def pick_from_stack(device: pydobot.Dobot) -> bool:
    """
    Pick up an object from the cardboard stack with incremental Z lowering.
    Returns True if successful, False if failed after all attempts.
    """
    x, y, _, r = CARDBOARD_STACK
    
    # Move to safe height above stack
    move_linear(device, x, y, SAFE_Z, r, wait=True)
    
    # Try suction at different Z heights
    current_z = Z_SUCK_START
    while current_z >= MAX_Z_SUCK:
        print(f"Trying suction at Z = {current_z}")
        
        # Move to current Z height
        move_linear(device, x, y, current_z, r, wait=True)
        
        # Activate suction
        device.suck(True)
        time.sleep(0.5)  # Give suction time to engage
        
        # Try to lift - if successful, we got something
        move_linear(device, x, y, SAFE_Z, r, wait=True)
        
        # Check if we actually picked something up by trying to move slightly
        # If the object is heavy enough, it will resist movement
        original_pose, _ = device.get_pose()
        move_linear(device, x + 5, y, SAFE_Z, r, wait=True)
        new_pose, _ = device.get_pose()
        
        # If we moved successfully, we probably picked something up
        if abs(new_pose[0] - (x + 5)) < 2.0:  # We moved as expected
            print(f"Successfully picked up object at Z = {current_z}")
            return True
        else:
            # Release suction and try lower
            device.suck(False)
            current_z += Z_SUCK_INCREMENT
            time.sleep(0.2)
    
    print("Failed to pick up object after all attempts")
    device.suck(False)
    return False


def place_in_pallet(device: pydobot.Dobot, category: str) -> None:
    """Place the picked object in the appropriate pallet"""
    if category == "A":
        target = PALLET_A
        pallet_name = "A (Food)"
    else:
        target = PALLET_B
        pallet_name = "B (Vehicle)"
    
    tx, ty, tz, tr = target
    print(f"Placing object in pallet {pallet_name}")
    
    # Move to pallet location
    move_linear(device, tx, ty, SAFE_Z, tr, wait=True)
    move_linear(device, tx, ty, tz, tr, wait=True)
    
    # Release suction
    device.suck(False)
    time.sleep(0.5)
    
    # Lift up
    move_linear(device, tx, ty, SAFE_Z, tr, wait=True)

# Run Code

In [7]:
class Args(BaseModel):
    camera_index: int
    model: str
    port: str
    loop: int
    show: bool

def run(args: Args) -> None:
    # Initialize camera
    cap = cv2.VideoCapture(args.camera_index)
    if not cap.isOpened():
        raise RuntimeError("Cannot open camera. Check --camera-index and permissions.")

    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    # Load model
    model = YOLO(args.model)

    # Connect robot
    device = connect_robot(args.port)
    home(device)

    prev_t = time.time()
    cycles_run = 0
    win_name = "Palletizing View"

    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                print("Can't receive frame. Exiting...")
                break

            # Detect object category
            category = detect_object_category(model, frame)

            if args.show:
                # Show overlay information
                now = time.time()
                fps = 1.0 / max(1e-6, (now - prev_t))
                prev_t = now
                h, w = frame.shape[:2]
                cv2.circle(frame, (w // 2, h // 2), 8, (0, 255, 255), 2)
                
                category_text = f"Category: {category or 'None'}"
                cv2.putText(frame, category_text, (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv2.putText(frame, f"FPS: {fps:.1f}", (10, 55), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
                cv2.imshow(win_name, frame)

            # If a valid category is detected, perform pick-and-place
            if category is not None:
                print(f"Detected category {category} -> executing pick/place")
                
                # Pick from stack
                if pick_from_stack(device):
                    # Place in appropriate pallet
                    place_in_pallet(device, category)
                    home(device)
                    cycles_run += 1
                    print(f"Completed cycle {cycles_run}")
                else:
                    print("Failed to pick up object, skipping this cycle")
                    home(device)

                if args.loop > 0 and cycles_run >= args.loop:
                    print("Completed requested number of cycles. Exiting.")
                    break

            # UI + quit handling
            if args.show:
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
            else:
                # Without UI, modest sleep to avoid busy-looping the camera
                time.sleep(0.02)
    finally:
        # Cleanup
        try:
            cap.release()
        except Exception:
            pass
        try:
            cv2.destroyAllWindows()
        except Exception:
            pass
        try:
            # Release suction and go home for safety
            device.suck(False)
            home(device)
            device.close()
        except Exception:
            pass


# RUN

In [8]:
input_args = Args(camera_index=2, model="yolov8n.pt", port="/dev/ttyACM1", loop=0, show=True)

In [9]:
import traceback
try:
    run(input_args)
except KeyboardInterrupt:
    print("Interrupted by user.")
    sys.exit(0)
except Exception as exc:
    traceback.print_exc()
    sys.exit(1)

[ WARN:0@1.211] global cap_v4l.cpp:913 open VIDEOIO(V4L2:/dev/video2): can't open camera by index
Traceback (most recent call last):
[ERROR:0@1.434] global obsensor_uvc_stream_channel.cpp:158 getStreamChannelGroup Camera index out of range
  File "/tmp/ipykernel_43659/1949895717.py", line 3, in <module>
    run(input_args)
    ~~~^^^^^^^^^^^^
  File "/tmp/ipykernel_43659/1066124544.py", line 12, in run
    raise RuntimeError("Cannot open camera. Check --camera-index and permissions.")
RuntimeError: Cannot open camera. Check --camera-index and permissions.


SystemExit: 1

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [None]:
import numpy as np

np.matrix([[1, 1,1 ], [1, 2,3]])