# ycb_objects

In [17]:
!pip install pybullet imageio-ffmpeg
!pip install -q transformers torch pillow gradio
!apt-get -y install xvfb ffmpeg > /dev/null
!pip install -q pybullet imageio-ffmpeg transformers torch pillow
!git clone https://github.com/eleramp/pybullet-object-models.git
!pip install -q -e pybullet-object-models
import os
import time
import math
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import pybullet as p
import pybullet_data
import cv2
import imageio_ffmpeg
from base64 import b64encode
from IPython.display import HTML
import os, glob, random, pybullet_data
import gradio as gr
from transformers import AutoImageProcessor
from transformers import SiglipForImageClassification
from transformers.image_utils import load_image
from PIL import Image
import torch
import os, sys, importlib
REPO = "/content/pybullet-object-models"
if not os.path.isdir(REPO):
    !git clone -q https://github.com/aleramp/pybullet-object-models.git $REPO
!pip install -q -e $REPO
importlib.invalidate_caches()
sys.path.append(REPO)
import pybullet as p, pybullet_data
import numpy as np, math, random, glob
from PIL import Image
import imageio_ffmpeg
import torch
from transformers import AutoImageProcessor, SiglipForImageClassification
from pybullet_object_models import ycb_objects

fatal: destination path 'pybullet-object-models' already exists and is not an empty directory.
  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone


In [121]:
MODEL_ID   = "prithivMLmods/Augmented-Waste-Classifier-SigLIP2"
processor  = AutoImageProcessor.from_pretrained(MODEL_ID)
clf_model  = SiglipForImageClassification.from_pretrained(MODEL_ID).to(
    "cuda" if torch.cuda.is_available() else "cpu"
)
id2label   = clf_model.config.id2label

BURNABLE    = {"Biological", "Cardboard", "Paper", "Clothes", "Trash"}
NONBURNABLE = {"Glass", "Metal", "Plastic", "Battery", "Shoes"}

def classify_trash(rgb, seg, uid):
    """
    rgb: H×W×3 np.uint8
    seg: H×W segmentation mask (object UIDs)
    uid: 対象オブジェクトの UID
    -> (is_burnable? True/False, label:str, confidence:float or None)
    """
    mask = (seg == uid)
    if not mask.any():
        return None, None, None
    ys, xs = np.where(mask)
    crop = rgb[ys.min():ys.max()+1, xs.min():xs.max()+1]
    img = Image.fromarray(crop).convert("RGB")
    inputs = processor(images=img, return_tensors="pt").to(clf_model.device)
    with torch.no_grad():
        logits = clf_model(**inputs).logits
    probs = torch.softmax(logits, dim=-1).squeeze()
    idx   = int(probs.argmax())
    label = id2label[idx]
    conf  = float(probs[idx])
    return (label in BURNABLE), label, conf

# YCB mapping
OBJ = {
    # 可燃
    "paper_box"   : ("ycb", "YcbCrackerBox"),
    "thin_box"   : ("ycb", "YcbGelatinBox"),
    "banana" : ("ycb", "YcbBanana"),
    "pear"          : ("ycb", "YcbPear"),
    # 不燃
    "can"      : ("ycb", "YcbMasterChefCan"),
    "mustard_bottle"  : ("ycb", "YcbMustardBottle"),
    "hammer"        : ("ycb", "YcbHammer"),
    "tomatosoup_can"      : ("ycb", "YcbTomatoSoupCan"),
}

def get_urdf(key):
    src, rel = OBJ[key]
    if src == "ycb":
        base = ycb_objects.getDataPath()
        folder = os.path.join(base, rel)
        if not os.path.isdir(folder):
            raise FileNotFoundError(f"[YCB] Not a dir: {folder}")
        urdfs = glob.glob(os.path.join(folder, "**/*.urdf"), recursive=True)
        if not urdfs:
            raise FileNotFoundError(f"[YCB] No URDF in {folder}")
        return urdfs[0]
    elif src == "rnd":
        folder = os.path.join(pybullet_data.getDataPath(), "random_urdfs", rel)
        urdfs = glob.glob(os.path.join(folder, "*.urdf"))
        if not urdfs:
            raise FileNotFoundError(f"[rnd] No URDF in {folder}")
        return urdfs[0]
    else:
        raise ValueError(f"unsupported src={src}")

# PyBullet 環境セットアップ
p.connect(p.DIRECT)  # GUI: p.GUI
p.setAdditionalSearchPath(pybullet_data.getDataPath())
p.setGravity(0,0,-10)
p.loadURDF("plane.urdf")
p.loadURDF("table/table.urdf", [1.0,-0.2,0.0], [0,0,0.7071,0.7071])

# KUKA + WSG-50
kuka = p.loadURDF("kuka_iiwa/model_vr_limits.urdf", 1.4,-0.2,0.6,0,0,0,1)
gripper = p.loadSDF("gripper/wsg50_one_motor_gripper_new_free_base.sdf")[0]
p.createConstraint(kuka,6,gripper,0,p.JOINT_FIXED,[0,0,0],[0,0,0.05],[0,0,0])
cid = p.createConstraint(gripper,4,gripper,6,p.JOINT_GEAR,[1,1,1],[0,0,0],[0,0,0])
p.changeConstraint(cid, gearRatio=-1, erp=0.5, maxForce=100)
# 初期ジョイント姿勢リセット
init = [-0,0,0,1.5708,0,-1.0367,0]
for j,val in enumerate(init):
    p.resetJointState(kuka, j, val)
for j in (4,6):
    p.resetJointState(gripper, j, 0); p.setJointMotorControl2(gripper,j,p.POSITION_CONTROL,targetPosition=0,force=200)

# ゴミ箱（可燃：赤、不燃：青）
burn_bin = p.loadURDF("tray/traybox.urdf", [0.85, 0.25, 0.65])
nonb_bin = p.loadURDF("tray/traybox.urdf", [0.85,-0.65, 0.65])
p.changeVisualShape(burn_bin,  -1, rgbaColor=[1,0,0,1])
p.changeVisualShape(nonb_bin, -1, rgbaColor=[0,0,1,1])

# ゴミを配置
key = random.choice(list(OBJ.keys()))
path= get_urdf(key)
uid = p.loadURDF(path, [0.85,-0.2,0.65])
print(f"[Spawn] {key} -> {os.path.basename(path)}")

# カメラ & 動画設定
W,H = 480,360
cam_pos  = [.95,-0.2,0.2]; cam_dist=2.05; cam_yaw,cam_pitch=-50,-40
vid = imageio_ffmpeg.write_frames("vid.mp4",(W,H),fps=30); vid.send(None)

# ステートマシンで pick & place
PICK_Z, CARRY_Z, LIFT = 0.97, 1.25, 150
dest = None
for t in range(1000):
    # 30fps 画像取得
    if t%8==0:
        vm = p.computeViewMatrixFromYawPitchRoll(cam_pos,cam_dist,cam_yaw,cam_pitch,0,2)
        pm = p.computeProjectionMatrixFOV(60,W/H,0.01,100)
        w,h,rgb,_,seg = p.getCameraImage(W,H,vm,pm,renderer=p.ER_BULLET_HARDWARE_OPENGL)
        img = np.reshape(rgb,(h,w,4))[:,:,:3]; mask=np.reshape(seg,(h,w))
        vid.send(np.ascontiguousarray(img))
        if dest is None:
            burn,label,conf = classify_trash(img,mask,uid)
            if burn is not None:
                dest = burn_bin if burn else nonb_bin
                print(f"[Detect] {label}({conf:.2f}) → {'赤' if burn else '青'}箱")

    # 動作フェーズ
    if t<150:
        target,gr = [0.85,-0.2,PICK_Z], 0
    elif t<250:
        target,gr = [0.85,-0.2,PICK_Z], 1
    elif t<250+LIFT:
        z = PICK_Z + (CARRY_Z-PICK_Z)*(t-250)/LIFT
        target,gr = [0.85,-0.2,z], 1
    elif t<600:
        y = -0.2 + ((0.25 if dest==burn_bin else -0.65)+0.2)*(t-400)/200
        target,gr = [0.85,y,CARRY_Z], 1
    elif t<700:
        target,gr = [0.85,(0.25 if dest==burn_bin else -0.65),CARRY_Z],1
    else:
        target,gr = [0.85,(0.25 if dest==burn_bin else -0.65),CARRY_Z],0

    # IK & モータ
    orn = p.getQuaternionFromEuler([0,math.pi,0])
    joints = p.calculateInverseKinematics(kuka,6,target,orn)
    for j,v in enumerate(joints): p.setJointMotorControl2(kuka,j,p.POSITION_CONTROL,targetPosition=v)
    for j in (4,6): p.setJointMotorControl2(gripper,j,p.POSITION_CONTROL,targetPosition=gr*0.05,force=100)
    p.stepSimulation()

vid.close()
print("動画保存: vid.mp4")
p.disconnect()



[Spawn] soup_can -> model.urdf
[Detect] Biological(1.00) → 赤箱
動画保存: vid.mp4


In [122]:
# Play recorded video

mp4 = open('vid.mp4', 'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML('<video width=480 controls><source src="%s" type="video/mp4"></video>' % data_url)

In [123]:
import shutil

# vid.mp4 を好きな名前（例: 'robot_sorting_demo.mp4'）で保存
shutil.copyfile('vid.mp4', 'tomatosoupcan_demo.mp4')
print("保存完了")

保存完了
