In [None]:
import Arm_Lib
import cv2 as cv
import threading
import time
from time import sleep
from dofbot_config import Arm_Calibration, read_XYT, write_XYT
import ipywidgets as widgets
from IPython.display import display
from gem_identify import gem_identify
import rclpy
from rclpy.executors import MultiThreadedExecutor
import sys
import os
from logger import log_manager

### 初始化参数和机械臂位置

In [None]:
# 全局变量
num = 0
dp = []
msg = {}
xy = [90, 135]
threshold = 140
model = "General"
XYT_path = "/home/sunrise/dofbot_ws/dofbot_gem_yolov5/XYT_config.txt"

# 创建目标检测节点
target = gem_identify()

# 尝试读取XYT配置
try:
    xy, threshold = read_XYT(XYT_path)
    log_manager.add_log(f"读取XYT配置: X={xy[0]}, Y={xy[1]}, Threshold={threshold}", "SUCCESS")
except Exception as e:
    log_manager.add_log(f"读取XYT配置错误: {e}", "ERROR")

# 初始化机械臂
arm = Arm_Lib.Arm_Device()
joints_0 = [xy[0], xy[1], 0, 0, 90, 30]
arm.Arm_serial_servo_write6_array(joints_0, 1000)
log_manager.add_log("机械臂初始化完成", "SUCCESS")

# 创建标定对象
calibration = Arm_Calibration()
log_manager.add_log("标定对象创建完成", "SUCCESS")

# 摄像头状态
camera_retry_count = 0
camera_max_retries = 5
capture = None

### 创建控件

In [3]:
# 全屏布局设置
full_layout = widgets.Layout(
    width='100%',
    height='100vh',
    border='1px solid #ccc'
)

# 增加按钮宽度并添加图标
button_layout = widgets.Layout(width='110px', height='32px', margin='1px')

target_detection = widgets.Button(
    description='目标检测', 
    button_style='info', 
    layout=button_layout,
    icon='search')
grap = widgets.Button(
    description='抓取', 
    button_style='success', 
    layout=button_layout,
    icon='hand-o-up')
exit_button = widgets.Button(
    description='退出', 
    button_style='danger', 
    layout=button_layout,
    icon='sign-out')
auto_mode_checkbox = widgets.Checkbox(
    description='自动抓取',
    value=False,
    style={'description_width': 'auto'},
    layout=widgets.Layout(
        width='110px', 
        height='30px',
        padding='0px',
        margin='0px',
        border='none'
    ))
# 图像显示区域
imgbox = widgets.Image(
    format='jpg', 
    layout=widgets.Layout(
        width='450px', 
        max_width='450px',
        height='370px',
        max_height='370px',
        border='1px solid #333'
    )
)

# 状态显示
status_label = widgets.Label(
    value="状态: 准备就绪",
    layout=widgets.Layout(width='450px', height='25px'),
    style={'font_weight': 'bold'}
)
action_buttons = widgets.VBox([target_detection, grap, exit_button], 
                             layout=widgets.Layout(justify_content='flex-start', padding='0px', margin='0px'))
# 控制面板
control_panel = widgets.VBox([
    widgets.HTML("<h4>系统控制</h4>"),
    action_buttons,
    auto_mode_checkbox
], layout=widgets.Layout(width='140px', padding='0px', margin='0px', overflow='hidden'))

# 主显示区域
main_display = widgets.VBox([
    status_label,
    imgbox
], layout=widgets.Layout(width='100%', align_items='center'))

# 日志区域
log_panel = widgets.VBox([
    widgets.HTML("<h5 style='margin:2px 0'>系统日志</h5>"),
    log_manager.log_widget
], layout=widgets.Layout(width='290px', height='100%', padding='0px', margin='0px', overflow='hidden'))

main_layout = widgets.HBox([
    control_panel,
    main_display,
    log_panel
], layout=widgets.Layout(
    width='900px',  # 120px + 510px + 280px + 10px边距
    max_width='900px',
    height='600px',
    max_height='600px',
    justify_content='flex-start',
    align_items='flex-start',
    padding='0px',
    margin='0 auto',
    overflow='hidden'
))

settings_tab = widgets.VBox([
    widgets.HTML("<h4>系统设置</h4>"),
    widgets.HTML("<p>TODO:这里可以添加更多设置选项</p>")
], layout=widgets.Layout(padding='5px'))

# 使用 Tab 布局
tab = widgets.Tab(children=[main_layout, settings_tab])
tab.set_title(0, '主界面')
tab.set_title(1, '设置')

# 最终的容器 - 限制最大宽度为920px，确保不会溢出
full_screen_app = widgets.VBox([
    widgets.HTML("<h3 style='text-align:center; margin:2px 0'>DOFBot 矿石自动分类系统</h3>"),
    tab
], layout=widgets.Layout(
    width='1000px',
    max_width='1000px',
    margin='0 auto',
    padding='0px',
    overflow='hidden'
))

### 模式切换回调函数

In [4]:
def target_detection_callback(btn):
    global model
    model = 'Detection'
    status_label.value = "状态: 目标检测中"
    log_manager.add_log(f"模式切换: {model}")

def grap_callback(btn):
    global model
    model = 'Grap'
    status_label.value = "状态: 抓取中"
    log_manager.add_log(f"模式切换: {model}")

def exit_callback(btn):
    global model, ros_running
    model = 'Exit'
    status_label.value = "状态: 正在退出..."
    log_manager.add_log("正在退出...")
    ros_running = False
    
def auto_mode_callback(change):
    """自动模式切换回调函数"""
    global model
    if change['new']:  # 如果复选框被选中
        model = 'auto'  # 切换到自动模式
        log_manager.add_log("自动模式已启用", "INFO")
        status_label.value = "状态: 自动模式已启用"
    else:  # 如果复选框被取消选中
        model = 'Detection'  # 切换回目标检测模式
        log_manager.add_log("自动模式已禁用", "INFO")
        status_label.value = "状态: 自动模式已禁用"

# 设置按钮回调
target_detection.on_click(target_detection_callback)
grap.on_click(grap_callback)
exit_button.on_click(exit_callback)
auto_mode_checkbox.observe(auto_mode_callback,names='value')

### 摄像头处理函数

In [5]:
def open_camera():
    global capture, camera_retry_count
    if camera_retry_count >= camera_max_retries:
        status_label.value = "摄像头错误: 超过最大重试次数"
        return False
    
    try:
        capture = cv.VideoCapture(0)
        if not capture.isOpened():
            # 尝试其他可能的设备索引
            for i in range(1, 4):
                capture = cv.VideoCapture(i)
                if capture.isOpened():
                    log_manager.add_log(f"使用备用摄像头设备 /dev/video{i}")
                    break
        
        if capture.isOpened():
            # 设置摄像头参数
            capture.set(cv.CAP_PROP_FOURCC, cv.VideoWriter_fourcc('M', 'J', 'P', 'G'))
            capture.set(cv.CAP_PROP_FPS, 30)
            capture.set(cv.CAP_PROP_FRAME_WIDTH, 640)
            capture.set(cv.CAP_PROP_FRAME_HEIGHT, 480)
            camera_retry_count = 0
            status_label.value = "状态: 摄像头已连接"
            log_manager.add_log("摄像头已打开")
            return True
        else:
            camera_retry_count += 1
            log_manager.add_log(f"摄像头打开失败 ({camera_retry_count}/{camera_max_retries})", "ERROR")
            capture = None
            return False
    except Exception as e:
        log_manager.add_log(f"打开摄像头异常: {e}")
        capture = None
        camera_retry_count += 1
        return False

def cleanup():
    """释放资源"""
    global capture
    # 关闭摄像头
    if capture is not None and capture.isOpened():
        capture.release()
        log_manager.add_log("摄像头已释放")
    
    # 关闭ROS节点
    if target in executor.get_nodes():
        executor.remove_node(target)
        target.destroy_node()
        log_manager.add_log("ROS节点已销毁")
    
    # 注意: 不关闭全局ROS上下文
    log_manager.add_log("资源已释放，程序退出")
    status_label.value = "状态: 程序已退出"

def camera_loop():
    """摄像头处理主循环"""
    global capture, dp, msg, model, xy
    
    while model != 'Exit':
        try:
            # 尝试打开摄像头
            if capture is None or not capture.isOpened():
                if not open_camera():
                    status_label.value = f"状态: 摄像头错误 ({camera_retry_count}/{camera_max_retries})"
                    time.sleep(2)
                    continue
            
            # 读取帧
            ret, img = capture.read()
            if not ret:
                log_manager.add_log("摄像头读取失败")
                status_label.value = "状态: 摄像头读取失败"
                capture.release()
                capture = None
                time.sleep(0.5)
                continue
            
            img = cv.resize(img, (640, 480))
            
            if model == 'Detection':  # 目标检测模式
                img, msg = target.gem_run(img)
                # msg:字典 {宝石名：坐标值}
                if msg:
                    # 获取第一个宝石类别的名称
                    first_gem_name = list(msg.keys())[0]
                    status_label.value = f"检测到: {first_gem_name}"
                else:
                    status_label.value = "未检测到目标"
                
            
            if len(msg)!=0 and model == 'Grap':  # 抓取模式
                log_manager.add_log(f"开始抓取: {msg}", "INFO")
                
                # 获取第一个宝石类别的名称
                if msg:
                    first_gem_name = list(msg.keys())[0]
                    status_label.value = f"抓取中: {first_gem_name}"
                else:
                    status_label.value = "抓取中: 未知目标"
                
                try:
                    print("创建抓取线程")
                    grab_thread = threading.Thread(
                        target=target.gem_grap, 
                        args=(msg.copy(), xy)  # 使用副本避免数据竞争
                    )
                    grab_thread.daemon = True
                    grab_thread.start()
                    print("抓取线程已启动")
                    
                    # 等待线程启动（非阻塞）
                    sleep(0.1)
                except Exception as e:
                    print(f"启动抓取线程失败: {e}")
                    status_label.value = f"抓取失败: {str(e)}"
                
                msg = {}
                status_label.value = f"状态: 抓取中 - {list(msg.keys())[0] if msg else '未知目标'}"
            
            

            if model=='auto':
                img,msg=target.gem_run(img)
                if msg:
                    first_gem_name=list(msg.keys())[0]
                    status_label.value=f"自动模式检测到:{first_gem_name}"
                    try:
                        grab_thread=threading.Thread(
                            target=target.gem_grap,
                            args=(msg.copy(),xy)
                        )
                        grab_thread.daemon=True
                        grab_thread.start()
                        sleep(0.1)
                        status_label.value=f"状态：自动抓取中-{first_gem_name}"
                        log_manager.add_log(f"已启动自动抓取线程:{first_gem_name}","SUCCESS")
                    except Exception as e:
                        log_manager.add_log(f"自动抓取启动失败：{e}","ERROR")
                        status_label.value=f"grap error:{str(e)}"
                    time.sleep(20)
                else:
                    status_label.value="状态：自动检测中（未发现目标）"
                    
                msg={}
                status_label.value=f"graping-{list(msg.keys())[0] if msg else 'unknown target'}"
                    
            # 更新图像显示
            _, jpeg = cv.imencode('.jpg', img)
            imgbox.value = jpeg.tobytes() 
            
        except Exception as e:
            print(f"处理错误: {e}")
            status_label.value = f"错误: {str(e)}"
            if capture is not None:
                capture.release()
                capture = None
            time.sleep(1)
    
    # 清理资源
    cleanup()

### 启动应用程序

In [None]:
# 显示控件
display(full_screen_app)

# 启动摄像头线程
camera_thread = threading.Thread(target=camera_loop, daemon=True)
camera_thread.start()
print("摄像头线程已启动")