In [1]:
import logging
import can
from time import sleep

# Initialize logging
logging.basicConfig(
    level=logging.ERROR, format="%(asctime)s - %(levelname)s - %(message)s"
)

# 初始化CAN总线
bus = can.interface.Bus(interface="pcan", channel="PCAN_USBBUS1", bitrate=1000000)
########################################
# 导入机械爪和小米微电机CAN总线库
########################################
from gripper_control import GripperControl
from pcan_cybergear import CANMotorController

In [2]:
# 机械爪驱动器CAN ID为1
gripper = GripperControl(bus, addr=1)

# 修改闭环模式最大输出电压，从而调整最大输出加持力量，单位mV，最大可以是 5000mV
gripper.set_max_output_voltage(5000)

# 自动初始化爪开合位置
gripper.init_position()

# 合
# gripper.close_gripper()

# sleep(3)

# 自动调整位置以解除卡顿
# gripper.clear_clog()

# 开
# gripper.open_gripper()

In [3]:
gripper.set_max_output_voltage(3000)

In [4]:
gripper.close_gripper()

In [5]:
gripper.open_gripper()

In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
from cv2 import getTickCount, getTickFrequency
import ollama
import re
import time
import json  # 导入 json 模块
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
# 加载 YOLOv11 模型
model = YOLO("weights/yolo11s.pt")

# 获取摄像头内容
cap = cv2.VideoCapture(0)

# 全局变量
force_cache = {}  # 缓存力度值
last_query_time = {}  # 记录每个物体最后查询时间
QUERY_INTERVAL = 5  # 每个物体的查询间隔（秒）
current_force = None  # 当前使用的力度值
executor = ThreadPoolExecutor(max_workers=1)  # 用于异步执行力度查询

# 定义我们感兴趣的类别ID列表
INTERESTED_CLASSES = {
    39: 'bottle',     # LayerD
    41: 'cup',        # LayerD
    44: 'spoon',      # LayerD
    47: 'apple',      # LayerC
    49: 'orange',     # LayerB
    50: 'broccoli',   # LayerC
    51: 'carrot',     # LayerC
    65: 'remote',     # LayerD
    67: 'cell phone', # LayerD
    73: 'book',       # LayerD
    76: 'scissors',   # LayerD
    79: 'toothbrush', # LayerD
}

def get_force_value(object_name):
    message_content = f"""
    Based on the following dictionaries, determine which level the object '[object_name]' is most similar to, and output only the corresponding value of that level as a single number. If the object is not listed, use your best judgment to determine the closest match and output the value for that level. Do not include any code or text, only a number.

    Dictionary of hierarchical structures and corresponding object arrays:

    "LayerA": ["cup"],  # Soft Texture Items
    "LayerB": ["orange"],  # Moderately Soft Texture Items
    "LayerC": ["broccoli", "carrot", "apple"],  # Hard Texture Items
    "LayerD": ["bottle", "remote", "cell phone", "book"],  # Hard items
    "LayerE": ["spoon", "scissors", "toothbrush"]  # Very Hard Items

    Dictionary of hierarchical structures and corresponding values:

        'LayerA': 1000,
        'LayerB': 2000,
        'LayerC': 3000,
        'LayerD': 4000,
        'LayerE': 5000


    Based on the typical weight characteristics of '{object_name}', determine which level it belongs to and output only the corresponding value.
    """

    try:
        response = ollama.chat(model="llama3.2", stream=False, messages=[{
            "role": "assistant",
            "content": message_content
        }], options={"temperature": 0.5})
        content = response['message']['content']
        numbers = re.findall(r'\d+', content)
        return int(numbers[0]) if numbers else 5000
    except Exception as e:
        print(f"Force value query error: {e}")
        return 5000  # Return default value when error occurs

def update_force_value(object_name):
    """异步更新力度值的包装函数"""
    force_value = get_force_value(object_name)
    force_cache[object_name] = force_value
    last_query_time[object_name] = time.time()
    return force_value

def get_current_force(detected_objects):
    """获取当前应该使用的力度值"""
    global current_force
    
    if not detected_objects:
        current_force = None
        return None

    current_time = time.time()
    for obj_name in detected_objects:
        # 如果物体不在缓存中
        if obj_name not in force_cache:
            # 同步更新力度值（仅在第一次检测到物体时）
            force_value = get_force_value(obj_name)
            force_cache[obj_name] = force_value
            last_query_time[obj_name] = time.time()
        # 如果物体在缓存中但超过查询间隔
        elif current_time - last_query_time.get(obj_name, 0) > QUERY_INTERVAL:
            # 异步更新力度值
            executor.submit(update_force_value, obj_name)    
    # # # 遍历检测到的物体，更新力度值
    # for obj_name in detected_objects:
    #     # 如果物体不在缓存中，或者超过查询间隔
    #     if obj_name not in force_cache or current_time - last_query_time.get(obj_name, 0) > QUERY_INTERVAL:
    #         # 异步更新力度值
    #         executor.submit(update_force_value, obj_name)
    
    # 使用缓存中的最大力度值
    max_force = max([force_cache.get(obj, 5000) for obj in detected_objects])
    current_force = max_force
    return current_force

def write_to_file(detected_objects, force):
    """将检测到的物体名称和force以JSON数组格式追加写入文本文件"""
    # 读取文件中已有的数据（如果文件存在）
    try:
        with open("detected_objects.json", "r") as file:
            data = json.load(file)  # 读取已有的JSON数据
    except (FileNotFoundError, json.JSONDecodeError):
        # 如果文件不存在或文件内容不是有效的JSON，初始化一个空列表
        data = []
    
    # 将新的检测结果追加到数据中
    for obj in detected_objects:
        data.append({"object": obj, "force": force})  # 将每个物体和force存储为字典
    
    # 将更新后的数据以JSON格式写回文件
    with open("detected_objects.json", "w") as file:
        json.dump(data, file, indent=4)  # 使用indent=4使文件更易读

# 记录程序开始时间
start_time = time.time()

while cap.isOpened():
    loop_start = getTickCount()
    success, frame = cap.read()
    
    if success:
        # 添加类别过滤
        results = model.predict(source=frame, classes=list(INTERESTED_CLASSES.keys()))
        result = results[0]
        boxes = result.boxes
        
        # 获取当前帧检测到的所有物体（只包含感兴趣的类别）
        detected_objects = []
        for cls_id in boxes.cls:
            cls_id = int(cls_id)
            if cls_id in INTERESTED_CLASSES:
                detected_objects.append(INTERESTED_CLASSES[cls_id])
        
        # 更新力度值
        force = get_current_force(detected_objects)
        
        # 将检测到的物体名称和force以JSON格式写入文本文件
        if detected_objects and force:
            write_to_file(detected_objects, force)
        
        # 在画面上显示检测到的物体和力度值
        annotated_frame = results[0].plot()
        
        # 计算FPS
        loop_time = getTickCount() - loop_start
        fps = int(getTickFrequency() / loop_time)
        
        # 显示信息
        font = cv2.FONT_HERSHEY_SIMPLEX
        cv2.putText(annotated_frame, f"FPS: {fps}", (10, 30), font, 1, (0, 0, 255), 2)
        if force:
            cv2.putText(annotated_frame, f"Force: {force}", (10, 70), font, 1, (0, 255, 0), 2)
        
        # 显示检测到的物体名称
        y_offset = 110
        for obj in detected_objects:
            cv2.putText(annotated_frame, f"Detected: {obj}", (10, y_offset), font, 0.7, (255, 0, 0), 2)
            y_offset += 30
        
        cv2.imshow('Object Detection', annotated_frame)
    
    # 检查是否超过20秒
    # if time.time() - start_time > 20:
    #     break

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

executor.shutdown(wait=False)
cap.release()
cv2.destroyAllWindows()


0: 480x640 (no detections), 175.6ms
Speed: 1.8ms preprocess, 175.6ms inference, 54.5ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 13.4ms
Speed: 2.8ms preprocess, 13.4ms inference, 0.8ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 16.5ms
Speed: 0.0ms preprocess, 16.5ms inference, 2.6ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 14.8ms
Speed: 0.0ms preprocess, 14.8ms inference, 0.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 18.9ms
Speed: 0.0ms preprocess, 18.9ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 14.8ms
Speed: 1.4ms preprocess, 14.8ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 24.1ms
Speed: 2.3ms preprocess, 24.1ms inference, 2.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 20.5ms
Speed: 2.2ms preprocess, 20.5m

In [2]:
import json

def print_latest_force():
    """打印 detected_objects.json 文件中最新一条记录的 force 值"""
    try:
        # 读取 JSON 文件
        with open("detected_objects.json", "r") as file:
            data = json.load(file)  # 读取 JSON 数据
        
        # 检查数据是否为空
        if not data:
            print("No data found in detected_objects.json.")
            return
        
        # 获取最新一条记录
        latest_entry = data[-1]  # 列表的最后一个元素是最新记录
        
        # 提取 force 值
        force = latest_entry.get("force")
        
        # 打印 force 值
        if force is not None:
            print(f"Latest force value: {force}")
            
        else:
            print("No force value found in the latest entry.")
    
    except FileNotFoundError:
        print("File detected_objects.json not found.")
    except json.JSONDecodeError:
        print("Invalid JSON format in detected_objects.json.")
    except Exception as e:
        print(f"An error occurred: {e}")

# 调用函数打印最新 force 值
print_latest_force()

Latest force value: 4000


In [4]:
print(force)

None
