# CTE (Cross-Track Error) 估算调试工具

这个 Notebook 帮助你调试实车上的中心线偏差 (CTE) 估算。

## 功能
1. 实时查看摄像头画面
2. 调整 HSV 颜色阈值
3. 可视化边缘检测 / 中心线追踪结果
4. 测试不同的 CTE 估算方法

## 使用步骤
1. 运行"初始化"部分
2. 用 HSV 阈值调节器找到合适的颜色参数
3. 测试 CTE 估算效果
4. 导出配置用于实际训练


## 1. 导入依赖


In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import ipywidgets as widgets
from ipywidgets import interact, interactive
import time

# 设置 matplotlib
%matplotlib inline
plt.rcParams['figure.figsize'] = [12, 6]

print("✅ 依赖导入完成！")


## 2. 初始化摄像头

修改下面的配置来匹配你的摄像头类型


In [None]:
# ============ 配置 ============
CAM_TYPE = "csi"  # "csi" 或 "usb"
CAM_WIDTH = 320
CAM_HEIGHT = 240
# ==============================

camera = None

def init_camera():
    """初始化摄像头"""
    global camera
    
    if camera is not None:
        print("摄像头已初始化")
        return True
    
    try:
        if CAM_TYPE == "csi":
            from jetcam.csi_camera import CSICamera
            camera = CSICamera(
                width=CAM_WIDTH, height=CAM_HEIGHT,
                capture_width=1280, capture_height=720, capture_fps=30,
            )
        else:
            from jetcam.usb_camera import USBCamera
            camera = USBCamera(width=CAM_WIDTH, height=CAM_HEIGHT)
        
        camera.read()  # 预热
        print(f"✅ 摄像头初始化成功: {CAM_TYPE}, {CAM_WIDTH}x{CAM_HEIGHT}")
        return True
    except Exception as e:
        print(f"⚠️ 摄像头初始化失败: {e}")
        print("将使用测试图像代替")
        camera = None
        return False

def get_frame():
    """获取一帧图像"""
    if camera is not None:
        return camera.read()
    else:
        # 返回模拟测试图像
        img = np.zeros((CAM_HEIGHT, CAM_WIDTH, 3), dtype=np.uint8)
        img[100:, :] = (50, 50, 50)  # 灰色路面
        cv2.line(img, (50, 100), (50, 240), (255, 255, 255), 3)   # 左白线
        cv2.line(img, (270, 100), (270, 240), (255, 255, 255), 3) # 右白线
        cv2.line(img, (160, 100), (160, 240), (0, 255, 255), 2)   # 黄色中心线
        return img

# 初始化
init_camera()


## 3. 查看原始图像


In [None]:
# 获取并显示一帧图像
frame = get_frame()

fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# 原图
axes[0].imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
axes[0].set_title(f'原始图像 ({frame.shape[1]}x{frame.shape[0]})')
axes[0].axis('off')

# HSV 图像
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
axes[1].imshow(hsv)
axes[1].set_title('HSV 图像')
axes[1].axis('off')

plt.tight_layout()
plt.show()

print(f"图像尺寸: {frame.shape}, 数据类型: {frame.dtype}")


## 4. 交互式 HSV 阈值调节器

拖动滑块调整 HSV 阈值，实时查看检测效果。

**常用颜色参考:**
- 白色: H=[0,180], S=[0,30], V=[200,255]
- 黄色: H=[20,35], S=[100,255], V=[100,255]
- 黑色: H=[0,180], S=[0,255], V=[0,50]


In [None]:
# 存储当前阈值
current_hsv = {'h_low': 0, 'h_high': 180, 's_low': 0, 's_high': 30, 'v_low': 200, 'v_high': 255}

def hsv_threshold_test(h_low, h_high, s_low, s_high, v_low, v_high, refresh=False):
    """测试 HSV 阈值"""
    global current_hsv
    current_hsv = {'h_low': h_low, 'h_high': h_high, 's_low': s_low, 's_high': s_high, 'v_low': v_low, 'v_high': v_high}
    
    frame = get_frame()
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    
    lower = np.array([h_low, s_low, v_low])
    upper = np.array([h_high, s_high, v_high])
    mask = cv2.inRange(hsv, lower, upper)
    result = cv2.bitwise_and(frame, frame, mask=mask)
    
    fig, axes = plt.subplots(1, 3, figsize=(15, 4))
    
    axes[0].imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    axes[0].set_title('原始图像')
    axes[0].axis('off')
    
    axes[1].imshow(mask, cmap='gray')
    axes[1].set_title(f'Mask ({np.sum(mask > 0)} 像素)')
    axes[1].axis('off')
    
    axes[2].imshow(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
    axes[2].set_title('检测结果')
    axes[2].axis('off')
    
    plt.tight_layout()
    plt.show()
    
    print(f"当前阈值: H=[{h_low}, {h_high}], S=[{s_low}, {s_high}], V=[{v_low}, {v_high}]")

# 创建交互式控件
interact(
    hsv_threshold_test,
    h_low=widgets.IntSlider(min=0, max=180, value=0, description='H 下限'),
    h_high=widgets.IntSlider(min=0, max=180, value=180, description='H 上限'),
    s_low=widgets.IntSlider(min=0, max=255, value=0, description='S 下限'),
    s_high=widgets.IntSlider(min=0, max=255, value=30, description='S 上限'),
    v_low=widgets.IntSlider(min=0, max=255, value=200, description='V 下限'),
    v_high=widgets.IntSlider(min=0, max=255, value=255, description='V 上限'),
    refresh=widgets.Checkbox(value=False, description='刷新图像'),
);


## 5. CTE 估算器

三种估算方法:
1. **edge_detection**: 通过颜色检测左右边缘，计算中心
2. **centerline_tracking**: 追踪彩色中心线（如黄线）
3. **canny_edges**: 使用 Canny 边缘检测


In [None]:
class CTEEstimator:
    """CTE 估算器"""
    
    def __init__(self, method='edge_detection', max_cte=3.0):
        self.method = method
        self.max_cte = max_cte
        self.last_debug_image = None
        
        # 默认阈值（可通过 set_*_thresholds 方法修改）
        self.edge_lower = np.array([0, 0, 200])       # 白色边线
        self.edge_upper = np.array([180, 30, 255])
        self.centerline_lower = np.array([20, 100, 100])  # 黄色中心线
        self.centerline_upper = np.array([35, 255, 255])
    
    def set_edge_thresholds(self, h_low, h_high, s_low, s_high, v_low, v_high):
        self.edge_lower = np.array([h_low, s_low, v_low])
        self.edge_upper = np.array([h_high, s_high, v_high])
    
    def set_centerline_thresholds(self, h_low, h_high, s_low, s_high, v_low, v_high):
        self.centerline_lower = np.array([h_low, s_low, v_low])
        self.centerline_upper = np.array([h_high, s_high, v_high])
    
    def estimate(self, frame_bgr):
        """估算 CTE，返回 (cte, confidence, mask)"""
        if self.method == 'edge_detection':
            return self._by_edges(frame_bgr)
        elif self.method == 'centerline_tracking':
            return self._by_centerline(frame_bgr)
        elif self.method == 'canny_edges':
            return self._by_canny(frame_bgr)
        return 0.0, 0.0, None
    
    def _by_edges(self, frame_bgr):
        """通过颜色检测边缘"""
        h, w = frame_bgr.shape[:2]
        roi = frame_bgr[int(h * 0.6):, :]  # 下 40%
        
        hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
        mask = cv2.inRange(hsv, self.edge_lower, self.edge_upper)
        
        scan_row = max(0, mask.shape[0] - 10)
        edge_pixels = np.where(mask[scan_row, :] > 0)[0]
        
        debug_img = roi.copy()
        cv2.line(debug_img, (0, scan_row), (w, scan_row), (255, 0, 0), 1)
        
        if len(edge_pixels) < 2:
            cv2.putText(debug_img, "No edges", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
            self.last_debug_image = debug_img
            return 0.0, 0.0, mask
        
        left, right = edge_pixels[0], edge_pixels[-1]
        lane_center = (left + right) // 2
        img_center = w // 2
        
        cte = ((lane_center - img_center) / (w / 2)) * self.max_cte
        confidence = min(1.0, (right - left) / (w * 0.4))
        
        cv2.circle(debug_img, (left, scan_row), 5, (0, 255, 0), -1)
        cv2.circle(debug_img, (right, scan_row), 5, (0, 255, 0), -1)
        cv2.circle(debug_img, (lane_center, scan_row), 8, (0, 0, 255), -1)
        cv2.line(debug_img, (img_center, 0), (img_center, debug_img.shape[0]), (255, 255, 0), 2)
        cv2.putText(debug_img, f"CTE: {cte:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        
        self.last_debug_image = debug_img
        return float(cte), float(confidence), mask
    
    def _by_centerline(self, frame_bgr):
        """追踪彩色中心线"""
        h, w = frame_bgr.shape[:2]
        roi = frame_bgr[int(h * 0.5):, :]
        
        hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
        mask = cv2.inRange(hsv, self.centerline_lower, self.centerline_upper)
        
        moments = cv2.moments(mask)
        debug_img = roi.copy()
        img_center = w // 2
        cv2.line(debug_img, (img_center, 0), (img_center, debug_img.shape[0]), (255, 255, 0), 2)
        
        if moments["m00"] < 100:
            cv2.putText(debug_img, "No centerline", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
            self.last_debug_image = debug_img
            return 0.0, 0.0, mask
        
        cx = int(moments["m10"] / moments["m00"])
        cy = int(moments["m01"] / moments["m00"])
        
        cte = ((cx - img_center) / (w / 2)) * self.max_cte
        confidence = min(1.0, moments["m00"] / (w * roi.shape[0] * 0.03))
        
        cv2.circle(debug_img, (cx, cy), 10, (0, 0, 255), -1)
        cv2.putText(debug_img, f"CTE: {cte:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        
        self.last_debug_image = debug_img
        return float(cte), float(confidence), mask
    
    def _by_canny(self, frame_bgr):
        """使用 Canny 边缘检测"""
        h, w = frame_bgr.shape[:2]
        roi = frame_bgr[int(h * 0.6):, :]
        
        gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
        blurred = cv2.GaussianBlur(gray, (5, 5), 0)
        edges = cv2.Canny(blurred, 50, 150)
        
        scan_row = max(0, edges.shape[0] - 10)
        edge_pixels = np.where(edges[scan_row, :] > 0)[0]
        
        debug_img = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
        img_center = w // 2
        cv2.line(debug_img, (img_center, 0), (img_center, debug_img.shape[0]), (255, 255, 0), 2)
        
        if len(edge_pixels) < 2:
            cv2.putText(debug_img, "No edges", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
            self.last_debug_image = debug_img
            return 0.0, 0.0, edges
        
        left, right = edge_pixels[0], edge_pixels[-1]
        lane_center = (left + right) // 2
        
        cte = ((lane_center - img_center) / (w / 2)) * self.max_cte
        confidence = min(1.0, (right - left) / (w * 0.4))
        
        cv2.circle(debug_img, (left, scan_row), 5, (0, 255, 0), -1)
        cv2.circle(debug_img, (right, scan_row), 5, (0, 255, 0), -1)
        cv2.circle(debug_img, (lane_center, scan_row), 8, (0, 0, 255), -1)
        cv2.putText(debug_img, f"CTE: {cte:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        
        self.last_debug_image = debug_img
        return float(cte), float(confidence), edges

# 创建估算器
estimator = CTEEstimator(method='edge_detection', max_cte=3.0)
print("✅ CTE 估算器创建完成")


## 6. 交互式 CTE 测试

选择估算方法，调整阈值，实时查看 CTE 估算效果


In [None]:
def test_cte(method, h_low, h_high, s_low, s_high, v_low, v_high, refresh=False):
    """交互式 CTE 测试"""
    estimator.method = method
    
    if method == 'edge_detection':
        estimator.set_edge_thresholds(h_low, h_high, s_low, s_high, v_low, v_high)
    elif method == 'centerline_tracking':
        estimator.set_centerline_thresholds(h_low, h_high, s_low, s_high, v_low, v_high)
    
    frame = get_frame()
    cte, confidence, mask = estimator.estimate(frame)
    
    fig, axes = plt.subplots(1, 3, figsize=(15, 4))
    
    axes[0].imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    axes[0].set_title('原始图像')
    axes[0].axis('off')
    
    if mask is not None:
        axes[1].imshow(mask, cmap='gray')
        axes[1].set_title('检测 Mask')
        axes[1].axis('off')
    
    if estimator.last_debug_image is not None:
        axes[2].imshow(cv2.cvtColor(estimator.last_debug_image, cv2.COLOR_BGR2RGB))
        axes[2].set_title(f'CTE: {cte:.2f}, 置信度: {confidence:.2f}')
        axes[2].axis('off')
    
    plt.tight_layout()
    plt.show()
    
    print(f"方法: {method} | CTE: {cte:+.3f} (正=偏右, 负=偏左) | 置信度: {confidence:.2f}")

interact(
    test_cte,
    method=widgets.Dropdown(options=['edge_detection', 'centerline_tracking', 'canny_edges'], value='edge_detection', description='方法'),
    h_low=widgets.IntSlider(min=0, max=180, value=0, description='H 下限'),
    h_high=widgets.IntSlider(min=0, max=180, value=180, description='H 上限'),
    s_low=widgets.IntSlider(min=0, max=255, value=0, description='S 下限'),
    s_high=widgets.IntSlider(min=0, max=255, value=30, description='S 上限'),
    v_low=widgets.IntSlider(min=0, max=255, value=200, description='V 下限'),
    v_high=widgets.IntSlider(min=0, max=255, value=255, description='V 上限'),
    refresh=widgets.Checkbox(value=False, description='刷新'),
);


## 7. 实时监控与导出配置


In [None]:
def realtime_monitor(duration=10, fps=5):
    """实时监控 CTE (按 Kernel->Interrupt 停止)"""
    print(f"开始监控 {duration} 秒... (Kernel->Interrupt 停止)")
    
    cte_history, conf_history = [], []
    start = time.time()
    
    try:
        while time.time() - start < duration:
            frame = get_frame()
            cte, conf, _ = estimator.estimate(frame)
            cte_history.append(cte)
            conf_history.append(conf)
            
            clear_output(wait=True)
            
            fig, axes = plt.subplots(1, 2, figsize=(14, 4))
            
            if estimator.last_debug_image is not None:
                axes[0].imshow(cv2.cvtColor(estimator.last_debug_image, cv2.COLOR_BGR2RGB))
            axes[0].set_title(f'CTE: {cte:.2f}')
            axes[0].axis('off')
            
            axes[1].plot(cte_history, 'b-', linewidth=2)
            axes[1].axhline(y=0, color='g', linestyle='--', alpha=0.5)
            axes[1].axhline(y=3, color='r', linestyle='--', alpha=0.5)
            axes[1].axhline(y=-3, color='r', linestyle='--', alpha=0.5)
            axes[1].set_ylim(-4, 4)
            axes[1].set_title('CTE 历史')
            axes[1].set_xlabel('帧')
            axes[1].grid(True, alpha=0.3)
            
            plt.tight_layout()
            plt.show()
            
            print(f"CTE: {cte:+.2f} | 置信度: {conf:.2f} | 帧: {len(cte_history)}")
            time.sleep(1.0 / fps)
    except KeyboardInterrupt:
        print("已停止")
    
    if cte_history:
        print(f"\n统计: 平均CTE={np.mean(cte_history):.3f}, 标准差={np.std(cte_history):.3f}")

# 取消注释运行实时监控 (默认10秒)
# realtime_monitor(duration=10, fps=5)


## 8. 导出配置

调试完成后，运行下面的单元格导出配置，用于 `real_car_env.py`


In [None]:
def export_config():
    """导出当前配置"""
    print("=" * 60)
    print("当前配置 (复制到 real_car_env.py 中使用):")
    print("=" * 60)
    print(f"""
# CTE 估算器配置
from real_car_env import VisualCTEEstimator

cte_estimator = VisualCTEEstimator(
    method='{estimator.method}',
    image_width={CAM_WIDTH},
    image_height={CAM_HEIGHT},
    max_cte={estimator.max_cte},
    # 边缘检测阈值 (HSV)
    track_lower=tuple({estimator.edge_lower.tolist()}),
    track_upper=tuple({estimator.edge_upper.tolist()}),
    # 中心线检测阈值 (HSV)
    centerline_lower=tuple({estimator.centerline_lower.tolist()}),
    centerline_upper=tuple({estimator.centerline_upper.tolist()}),
)
""")
    print("=" * 60)

# 导出配置
export_config()


## 9. 清理资源

完成后运行此单元格释放摄像头


In [None]:
def cleanup():
    """清理摄像头资源"""
    global camera
    if camera is not None:
        if hasattr(camera, 'running'):
            camera.running = False
        time.sleep(0.2)
        camera = None
        print("✅ 摄像头已释放")
    else:
        print("摄像头未初始化")

# 取消注释释放摄像头
# cleanup()
