###  阶段一：确认机械臂和摄像头是否正确连接

In [None]:
# ✅ 初始化路径与配置导入
import sys
import os

project_root = "/home/wjw/lerobot"
sys.path.insert(0, project_root)
os.chdir(project_root)

print("✅ 已切换至项目路径：", os.getcwd())



In [None]:
# Cell 2: 检查 /dev/ttyACM* 端口
print("当前可用的 /dev/ttyACM 端口:")
!ls /dev/ttyACM*

print("\n查看 /dev/serial/by-id/ 绑定:")
!ls -l /dev/serial/by-id/

input("请确认上面输出是否与 configs.py 中配置的端口匹配；若不匹配，请手动更新后再按 Enter 继续...")

In [None]:
print("✅ 当前串口设备：")
!ls /dev/ttyACM*

print("\n✅ 当前摄像头设备：")
!ls /dev/video*

print("\n📍 查看摄像头绑定设备名称（识别 E22S / E12）：")
!ls -l /dev/serial/by-id/



In [None]:
!v4l2-ctl --list-devices

In [None]:
#测试使用机械臂的Type-C接口进行连接）

import os
import time

def list_serial_ports():
    return set(p for p in os.listdir('/dev') if p.startswith('ttyACM') or p.startswith('ttyUSB'))

print("🔌 请 **先拔掉你要测试的 Type-C 机械臂**，然后按回车键继续")
input()
before = list_serial_ports()

print("\n✅ 现在 **插上 Type-C 机械臂**，等待 3 秒后继续检测...")
time.sleep(3)

after = list_serial_ports()
new_ports = after - before

if new_ports:
    print(f"\n🎯 检测到新串口设备: {', '.join('/dev/' + p for p in new_ports)}")
    print("✅ Type-C 接口识别为串口设备，说明 **可以直接用于机器人通信**！")
else:
    print("\n⚠️ 未检测到新串口设备！")
    print("❌ 可能原因：Type-C 接口不支持数据通信，或系统未正确识别")
    print("👉 建议尝试更换 Type-C 口或回退至 USB 转接方案")


In [None]:
# # ✅ （可选）删除旧标定缓存文件，慎用
# import shutil

# calib_path = "/home/wjw/lerobot/.cache/calibration/koch/"
# if os.path.exists(calib_path):
#     shutil.rmtree(calib_path)
#     print("⚠️ 旧标定缓存已删除")
# else:
#     print("✅ 无旧缓存，无需清除")

# print("✅ Step 3：已执行缓存检查与清除")   


In [None]:
# Cell 4: 实例化 KochRobotConfig 并触发标定 -3月24日修改：添加了对摄像头的初始化（2个）
from lerobot.common.robot_devices.robots.configs import KochRobotConfig
from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot
from lerobot.common.robot_devices.cameras.configs import OpenCVCameraConfig

# 不考虑摄像头，清空 cameras
robot_cfg = KochRobotConfig()
#robot_cfg.cameras = {}

# ✅ 加入摄像头配置
robot_cfg.cameras = {
    "e22s_side": OpenCVCameraConfig(camera_index=2, fps=30, width=640, height=480),
    "e12_top": OpenCVCameraConfig(camera_index=4, fps=30, width=640, height=480),
}

robot = ManipulatorRobot(robot_cfg)

print("开始连接机器人（若无标定文件，将自动进入标定流程）...")
robot.connect()
print("✅ 标定完成，标定文件保存至:", robot_cfg.calibration_dir)

In [None]:
# Cell 5: 查看标定结果
print("当前标定文件夹内容:")
!ls -l /home/wjw/lerobot/.cache/calibration/koch/

In [None]:
# 读取主臂的状态
leader_status = robot.leader_arms["main"].read("Hardware_Error_Status")
leader_torque = robot.leader_arms["main"].read("Torque_Enable")
leader_position = robot.leader_arms["main"].read("Present_Position")

# 读取从臂的状态
follower_status = robot.follower_arms["main"].read("Hardware_Error_Status")
follower_torque = robot.follower_arms["main"].read("Torque_Enable")
follower_position = robot.follower_arms["main"].read("Present_Position")

# 打印结果
print("✅ 主臂硬件错误状态:", leader_status)
print("✅ 主臂扭矩状态:", leader_torque)
print("✅ 主臂当前关节位置:", leader_position)
print("✅ 从臂硬件错误状态:", follower_status)
print("✅ 从臂扭矩状态:", follower_torque)
print("✅ 从臂当前关节位置:", follower_position)

from lerobot.common.robot_devices.motors.dynamixel import TorqueMode

### 阶段二：进行同步控制和数据采集


In [None]:

# 🟢 Cell 1 - 创建自动编号的 clip 目录
import os
from datetime import datetime

base_dir = "results/demos/task_01_redblock"
os.makedirs(base_dir, exist_ok=True)

# 自动编号 clip 目录
existing = [d for d in os.listdir(base_dir) if d.startswith("clip_")]
clip_id = len(existing) + 1
clip_name = f"clip_{clip_id:05d}"
clip_path = os.path.join(base_dir, clip_name)
os.makedirs(os.path.join(clip_path, "images_cam1"), exist_ok=True)
os.makedirs(os.path.join(clip_path, "images_cam2"), exist_ok=True)
os.makedirs(os.path.join(clip_path, "joint_states"), exist_ok=True)
print(f"📁 本轮采集目录已创建: {clip_path}")


In [None]:
# 🟢 Cell 2 - 初始化采集变量
import time

cap_cam1 = cv2.VideoCapture("/dev/video2")
cap_cam2 = cv2.VideoCapture("/dev/video4")

fps = 30
frame_id = 0
timestamp_log = []
print("🎥 准备采集图像与关节角度，按 ESC 可随时中止")

In [None]:
# ✅ 采集主循环：采集图像 + 关节状态 + 保存同步帧
import cv2
import time
import numpy as np
from datetime import datetime

frame_id = 0
joint_data_list = []

print("🟢 正在采集图像与关节角度数据，按 ESC 键中止")

while True:
    start_time = time.time()
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
    
    # 读取图像帧
    ret1, frame1 = cap_cam1.read()
    ret2, frame2 = cap_cam2.read()
    
    if not (ret1 and ret2):
        print("⚠️ 图像读取失败，跳过当前帧")
        continue

    # 获取主臂和从臂的当前关节位置（长度 6）
    leader_joint_pos = robot.leader_arms["main"].read("Present_Position")
    follower_joint_pos = robot.follower_arms["main"].read("Present_Position")

    # 保存图像
    img_name1 = os.path.join(clip_path, "images_cam1", f"frame_{frame_id:05d}_e22s.jpg")
    img_name2 = os.path.join(clip_path, "images_cam2", f"frame_{frame_id:05d}_e12.jpg")
    cv2.imwrite(img_name1, frame1)
    cv2.imwrite(img_name2, frame2)

    # 保存 joint 数据（时间戳，主臂关节列表，从臂关节列表）
    joint_data_list.append((timestamp, leader_joint_pos, follower_joint_pos))

    # 可选：在窗口预览（不需要可注释）
    cv2.imshow("E22S Side", frame1)
    cv2.imshow("E12 Top", frame2)

    # 退出检测：按下 ESC 键退出（ASCII 27）
    if cv2.waitKey(1) & 0xFF == 27:
        print("🛑 采集中止，正在保存关节数据...")
        break

    frame_id += 1
    # 控制采样频率约为 30Hz
    elapsed = time.time() - start_time
    time.sleep(max(1.0 / fps - elapsed, 0))

# 保存关节数据
joint_data_array = np.array(joint_data_list, dtype=object)
np.save(os.path.join(clip_path, "joint_states", "joint_data.npy"), joint_data_array)

# 释放资源
cap_cam1.release()
cap_cam2.release()
cv2.destroyAllWindows()

print(f"✅ 采集完成，共保存 {frame_id} 帧图像与关节数据")


In [None]:
# 🟢 Cell 3 - 开始主循环采集数据
while True:
    ret1, frame1 = cap_cam1.read()
    ret2, frame2 = cap_cam2.read()
    timestamp = time.time()

    leader_angles = robot.leader_arm.get_joint_angles()
    follower_angles = robot.follower_arm.get_joint_angles()
    timestamp_log.append((timestamp, leader_angles, follower_angles))

    if ret1:
        fname1 = f"frame_{frame_id:05d}_cam1.jpg"
        path1 = os.path.join(clip_path, "images_cam1", fname1)
        cv2.imwrite(path1, frame1)
        cv2.imshow("Camera 1 (E22S)", frame1)

    if ret2:
        fname2 = f"frame_{frame_id:05d}_cam2.jpg"
        path2 = os.path.join(clip_path, "images_cam2", fname2)
        cv2.imwrite(path2, frame2)
        cv2.imshow("Camera 2 (E12)", frame2)

    frame_id += 1
    key = cv2.waitKey(1) & 0xFF
    if key == 27:  # ESC 键退出
        print("⛔ 手动停止采集...")
        break

    time.sleep(1.0 / fps)

cap_cam1.release()
cap_cam2.release()
cv2.destroyAllWindows()



In [None]:

# 🟢 Cell 4 - 保存 timestamp + joint 数据
import numpy as np
import yaml

# 保存时间戳日志
ts_file = os.path.join(clip_path, "timestamp_log.txt")
with open(ts_file, 'w') as f:
    for ts, la, fa in timestamp_log:
        line = f"{ts},{','.join([str(x) for x in la])},{','.join([str(x) for x in fa])}\n"
        f.write(line)

# 保存为 .npy 格式
np.save(os.path.join(clip_path, "joint_states", "joint_data.npy"), timestamp_log)

# 保存参数设置
settings = {"fps": fps, "cam1": "/dev/video2", "cam2": "/dev/video4"}
with open(os.path.join(clip_path, "settings.yaml"), 'w') as f:
    yaml.dump(settings, f)

print(f"✅ 本轮 clip 采集完成，共 {frame_id} 帧，数据已保存。")


In [None]:
# ✅ 第三阶段：读取采集内容（不变，语言优化）
import matplotlib.pyplot as plt

data = np.load(os.path.join(clip_path, "joint_states", "joint_data.npy"), allow_pickle=True)
print(f"读取成功，共采集 {len(data)} 帧")

# 可视化主臂第一个关节的轨迹
times = [d[0] for d in data]
leader_joint_0 = [d[1][0] for d in data]
plt.plot(times, leader_joint_0)
plt.xlabel("时间戳")
plt.ylabel("主臂关节 1 角度")
plt.title("主臂第1关节轨迹")
plt.show()



### 阶段三：对于采集到的数据进行评估

In [None]:
#3月24日，以下是完整的标定，初始化，同步控制和数据采集的代码+后续的评估内容基本都是对机械臂的同步控制的评估

### 评估采集数据的过程，内容很多，可以水很多报告

In [None]:
#评估采集数据的过程，内容很多，可以水很多报告

In [None]:
#数据浏览器

from pathlib import Path
from IPython.display import display
from PIL import Image
import numpy as np

# ✅ 设置数据路径
data_root = Path("results/results/multimodal_data")

# ✅ 获取文件列表（使用 exists 判断路径是否存在）
cam0_files = sorted((data_root / "cam0").glob("*.jpg"))
cam1_files = sorted((data_root / "cam1").glob("*.jpg"))
joint_files = sorted((data_root / "joint").glob("*_leader.txt"))

# ✅ 获取可用帧数（以最短的为准）
num_frames = min(len(cam0_files), len(cam1_files), len(joint_files))
print(f"📂 Total frames collected: {num_frames}")

# ✅ 防止列表越界（只展示最多 3 帧）
for i in range(min(3, num_frames)):
    print(f"\n🖼️ Frame {i:05d}:")
    print(f"    cam0: {cam0_files[i].name}")
    print(f"    cam1: {cam1_files[i].name}")
    print(f"    joint: {joint_files[i].name}")



In [None]:
# ✅ 加载图像和关节数据（预览第 0 帧）
idx = 0

if num_frames > idx:
    img0 = Image.open(cam0_files[idx])
    img1 = Image.open(cam1_files[idx])
    q_leader = np.loadtxt(data_root / "joint" / f"frame_{idx:05d}_leader.txt")
    q_follower = np.loadtxt(data_root / "joint" / f"frame_{idx:05d}_follower.txt")

    # ✅ 显示图像
    print("📸 cam0 view (E22S - side):")
    display(img0)
    print("📸 cam1 view (E12 - top):")
    display(img1)

    # ✅ 打印关节数据
    print("🤖 Leader joint angles:", q_leader)
    print("🤖 Follower joint angles:", q_follower)
else:
    print(f"⚠️ Not enough frames to preview idx = {idx}")


In [None]:
#帧同步检测

from pathlib import Path

# 设置数据路径 可能后续我会改一些路径
data_root = Path("results/results/multimodal_data")

# 获取文件列表（按帧编号排序）
cam0_files = sorted((data_root / "cam0").glob("*.jpg"))
cam1_files = sorted((data_root / "cam1").glob("*.jpg"))
leader_files = sorted((data_root / "joint").glob("*_leader.txt"))
follower_files = sorted((data_root / "joint").glob("*_follower.txt"))

# 提取帧编号的辅助函数
def extract_index(file_path, part):
    stem = file_path.stem
    if part in stem:
        return stem.replace(f"frame_", "").replace(f"_{part}", "")
    return None

# 构建帧编号集合
cam0_indices = set(extract_index(p, "e22s") for p in cam0_files)
cam1_indices = set(extract_index(p, "e12") for p in cam1_files)
leader_indices = set(p.stem.replace("frame_", "").replace("_leader", "") for p in leader_files)
follower_indices = set(p.stem.replace("frame_", "").replace("_follower", "") for p in follower_files)

# 统计交集与差集
all_indices = cam0_indices & cam1_indices & leader_indices & follower_indices
missing_cam0 = sorted(cam1_indices - cam0_indices)
missing_cam1 = sorted(cam0_indices - cam1_indices)
missing_leader = sorted(cam0_indices - leader_indices)
missing_follower = sorted(cam0_indices - follower_indices)

# 显示结果
import pandas as pd

summary = {
    "Total Frames (Aligned)": [len(all_indices)],
    "Missing cam0": [len(missing_cam0)],
    "Missing cam1": [len(missing_cam1)],
    "Missing leader.txt": [len(missing_leader)],
    "Missing follower.txt": [len(missing_follower)],
}

df = pd.DataFrame(summary)
# ✅ 本地显示同步检测结果表格
print("✅ 帧同步检测结果如下：")
display(df)  # 或 df.head()，根据你 Notebook 支持情况

#import ace_tools as tools; tools.display_dataframe_to_user(name="Frame Synchronization Summary", dataframe=df)


In [None]:
#轨迹可视化

import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

# 设置数据路径
data_root = Path("results/results/multimodal_data/joint")

# 获取所有 leader 和 follower 的关节数据文件
leader_files = sorted(data_root.glob("frame_*_leader.txt"))
follower_files = sorted(data_root.glob("frame_*_follower.txt"))

# 加载所有关节数据
leader_trajectories = [np.loadtxt(f) for f in leader_files]
follower_trajectories = [np.loadtxt(f) for f in follower_files]

# 转换为 numpy 数组（帧数 × 关节数）
leader_trajectories = np.array(leader_trajectories)
follower_trajectories = np.array(follower_trajectories)

# 获取关节数量
num_joints = leader_trajectories.shape[1]
time = np.arange(len(leader_trajectories))

# 绘图：每个子图表示一个关节的主从臂角度变化
fig, axes = plt.subplots(num_joints, 1, figsize=(12, 2.5 * num_joints), sharex=True)
for j in range(num_joints):
    axes[j].plot(time, leader_trajectories[:, j], label="Leader Arm", linestyle='--')
    axes[j].plot(time, follower_trajectories[:, j], label="Follower Arm", alpha=0.8)
    axes[j].set_ylabel(f"Joint {j+1} (rad)")
    axes[j].legend(loc="upper right")
    axes[j].grid(True)

axes[-1].set_xlabel("Frame Index")
fig.suptitle("Trajectory Visualization: Leader vs Follower Arm Joint Angles", fontsize=16)
fig.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()


In [None]:
#✅ 1. 主从误差分析（Position Error Analysis）

import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
import pandas as pd

# 设置数据路径
data_root = Path("results/results/multimodal_data/joint")
leader_files = sorted(data_root.glob("frame_*_leader.txt"))
follower_files = sorted(data_root.glob("frame_*_follower.txt"))

# 加载轨迹数据
leader_trajectories = [np.loadtxt(f) for f in leader_files]
follower_trajectories = [np.loadtxt(f) for f in follower_files]

# 确保数据有效且维度一致
if len(leader_trajectories) == 0 or len(leader_trajectories) != len(follower_trajectories):
    print("❌ 数据加载失败，数量不一致或为空！")
else:
    leader_trajectories = np.array(leader_trajectories)
    follower_trajectories = np.array(follower_trajectories)

    if leader_trajectories.ndim != 2:
        print("❌ 加载数据维度异常！")
    else:
        # 误差计算
        errors = leader_trajectories - follower_trajectories

        # 计算统计量
        error_summary = {
            "Joint": [f"Joint {i+1}" for i in range(errors.shape[1])],
            "Mean Error (rad)": np.mean(errors, axis=0),
            "Max Error (rad)": np.max(np.abs(errors), axis=0),
            "Std Error (rad)": np.std(errors, axis=0)
        }
        error_df = pd.DataFrame(error_summary)

        # 可视化误差曲线
        time = np.arange(errors.shape[0])
        fig, axes = plt.subplots(errors.shape[1], 1, figsize=(12, 2.5 * errors.shape[1]), sharex=True)
        for j in range(errors.shape[1]):
            axes[j].plot(time, errors[:, j], label="Error", color="red")
            axes[j].axhline(0, color='black', linestyle='--', linewidth=0.8)
            axes[j].set_ylabel(f"Joint {j+1} Error (rad)")
            axes[j].grid(True)
        axes[-1].set_xlabel("Frame Index")
        fig.suptitle("Leader-Follower Joint Angle Error (per joint)", fontsize=16)
        fig.tight_layout(rect=[0, 0.03, 1, 0.95])
        plt.show()

        # 展示误差统计表格
        #import ace_tools as tools; tools.display_dataframe_to_user(name="误差统计表", dataframe=error_df)
from IPython.display import display
display(error_df)


In [None]:
#✅ 【角速度分析脚本】Velocity Analysis（单位：rad/frame）

import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

# ✅ 设置数据路径
data_root = Path("results/results/multimodal_data/joint")

# ✅ 读取主从臂数据
leader_files = sorted(data_root.glob("frame_*_leader.txt"))
follower_files = sorted(data_root.glob("frame_*_follower.txt"))
leader_trajectories = np.array([np.loadtxt(f) for f in leader_files])
follower_trajectories = np.array([np.loadtxt(f) for f in follower_files])

# ✅ 计算角速度（速度 = 相邻帧之差）
leader_velocity = np.diff(leader_trajectories, axis=0)
follower_velocity = np.diff(follower_trajectories, axis=0)
num_joints = leader_velocity.shape[1]
time = np.arange(len(leader_velocity))  # 因为 diff 后帧数减少了 1

# ✅ 绘图：每个关节的主从角速度比较
fig, axes = plt.subplots(num_joints, 1, figsize=(12, 2.5 * num_joints), sharex=True)
for j in range(num_joints):
    axes[j].plot(time, leader_velocity[:, j], label="Leader Velocity", linestyle="--", color="blue")
    axes[j].plot(time, follower_velocity[:, j], label="Follower Velocity", linestyle="-", color="orange")
    axes[j].set_ylabel(f"Joint {j+1} vel (rad/frame)")
    axes[j].legend(loc="upper right")
    axes[j].grid(True)

axes[-1].set_xlabel("Frame Index")
fig.suptitle("Joint Velocity Analysis: Leader vs Follower", fontsize=16)
fig.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()


In [None]:
#✅ 【误差均方统计模块】MSE over Time

import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

# ✅ 设置数据路径
data_root = Path("results/results/multimodal_data/joint")

# ✅ 读取主从臂关节角数据
leader_files = sorted(data_root.glob("frame_*_leader.txt"))
follower_files = sorted(data_root.glob("frame_*_follower.txt"))
leader_trajectories = np.array([np.loadtxt(f) for f in leader_files])
follower_trajectories = np.array([np.loadtxt(f) for f in follower_files])

# ✅ 计算误差：每一帧的所有关节误差平方的平均值
mse_per_frame = np.mean((leader_trajectories - follower_trajectories)**2, axis=1)

# ✅ 绘图：误差随时间的变化
plt.figure(figsize=(12, 4))
plt.plot(mse_per_frame, label="Mean Squared Error (MSE)", color='red')
plt.xlabel("Frame Index")
plt.ylabel("MSE (rad²)")
plt.title("Mean Squared Error over Time")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

# ✅ 统计整体误差均值
overall_mse = np.mean(mse_per_frame)
print(f"📊 全局平均 MSE（主从臂角度误差）: {overall_mse:.6f} rad²")


In [None]:
import numpy as np
import pandas as pd
from pathlib import Path

# 设置数据路径
data_root = Path("results/results/multimodal_data/joint")

# 读取主从臂数据
leader_files = sorted(data_root.glob("frame_*_leader.txt"))
follower_files = sorted(data_root.glob("frame_*_follower.txt"))

# 加载数据
leader_data = np.array([np.loadtxt(f) for f in leader_files])
follower_data = np.array([np.loadtxt(f) for f in follower_files])
num_frames = len(leader_data)
num_joints = leader_data.shape[1]

# 构建列名
columns = ["Frame"]
columns += [f"Leader_J{j+1}" for j in range(num_joints)]
columns += [f"Follower_J{j+1}" for j in range(num_joints)]

# 构建 DataFrame
frame_indices = np.arange(num_frames).reshape(-1, 1)
joint_data = np.hstack([frame_indices, leader_data, follower_data])
df = pd.DataFrame(joint_data, columns=columns)

# 保存为 CSV 文件
csv_path = data_root.parent / "joint_data.csv"
df.to_csv(csv_path, index=False)
print(f"✅ 关节数据成功导出为 CSV: {csv_path.resolve()}")


In [None]:
#✅ 扩展模块：「导出完整同步数据 CSV（包含时间戳 + 图像路径）」

import numpy as np
import pandas as pd
from pathlib import Path

# 设置数据路径
root = Path("results/results/multimodal_data")
joint_path = root / "joint"
cam0_path = root / "cam0"
cam1_path = root / "cam1"
timestamp_file = root / "timestamp_log.txt"

# 读取关节数据
leader_files = sorted(joint_path.glob("frame_*_leader.txt"))
follower_files = sorted(joint_path.glob("frame_*_follower.txt"))
leader_data = np.array([np.loadtxt(f) for f in leader_files])
follower_data = np.array([np.loadtxt(f) for f in follower_files])
num_frames = len(leader_data)

# 读取时间戳
timestamps = []
with open(timestamp_file, "r") as f:
    for line in f:
        _, ts = line.strip().split()
        timestamps.append(float(ts))

# 构建图像路径
cam0_images = [str((cam0_path / f"frame_{i:05d}_e22s.jpg").resolve()) for i in range(num_frames)]
cam1_images = [str((cam1_path / f"frame_{i:05d}_e12.jpg").resolve()) for i in range(num_frames)]

# 拼接成 DataFrame
columns = ["Frame", "Timestamp", "cam0_path", "cam1_path"]
columns += [f"Leader_J{j+1}" for j in range(6)]
columns += [f"Follower_J{j+1}" for j in range(6)]

all_data = []
for i in range(num_frames):
    row = [i, timestamps[i], cam0_images[i], cam1_images[i]]
    row += leader_data[i].tolist()
    row += follower_data[i].tolist()
    all_data.append(row)

df = pd.DataFrame(all_data, columns=columns)

# 导出为 CSV
csv_path = root / "synchronized_multimodal_data.csv"
df.to_csv(csv_path, index=False)
print(f"✅ 扩展数据成功导出为 CSV: {csv_path.resolve()}")


In [None]:
#清洗 + 插值修复 + 可视化」模块

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# ✅ 读取导出的同步数据（包含主从关节角度）
df = pd.read_csv("results/results/multimodal_data/synchronized_multimodal_data.csv")


# ✅ 打印基本信息
print("📊 数据维度:", df.shape)
print("🔍 是否存在缺失值？\n", df.isnull().any())

# ✅ 找出异常值（例如全为 0，或者过大过小的角度）
angle_cols = [col for col in df.columns if "Leader_J" in col or "Follower_J" in col]
abnormal = df[angle_cols].applymap(lambda x: abs(x) > 1e3 or x == 0)
print("⚠️ 每列异常值数量：\n", abnormal.sum())


In [None]:
# ✅ 使用线性插值法修复缺失值
df_interp = df.copy()
df_interp[angle_cols] = df[angle_cols].interpolate(method="linear", limit_direction="both")

# ✅ 滑动平均滤波（可选）
df_smooth = df_interp.copy()
df_smooth[angle_cols] = df_interp[angle_cols].rolling(window=3, center=True).mean()

# ✅ 绘制修复前后对比（以 Joint1 为例）
plt.figure(figsize=(12, 4))
plt.plot(df["Leader_J1"], label="Raw Leader_J1", linestyle='--', alpha=0.6)
plt.plot(df_smooth["Leader_J1"], label="Smoothed Leader_J1", linewidth=2)
plt.title(" Leader_J1 before and after")#清洗前后对比
plt.legend()
plt.grid(True)
plt.xlabel("Frame Index")
plt.ylabel("Angle (rad)")
plt.tight_layout()
plt.show()
#2条线是重合的，说明几乎没有误差，不需要插值修复

In [None]:
# ✅ 保存修复和平滑后的数据
df_smooth.to_csv("results/results/multimodal_data/synchronized_cleaned_data.csv", index=False)
print("✅ 已保存清洗后的数据为: synchronized_cleaned_data.csv")


In [None]:
#模仿学习数据格式转换（State-Action），这个保存的CGV文件的列名有问题，还是参考synchronized_cleaned_data.csv吧
import pandas as pd
import numpy as np
from pathlib import Path

# ✅ 设置数据路径
base_dir = Path("results/results/multimodal_data")
csv_path = base_dir / "synchronized_cleaned_data.csv"

# ✅ 设置保存路径
save_dir = base_dir / "processed"
save_dir.mkdir(parents=True, exist_ok=True)

# ✅ 读取清洗后的数据
df = pd.read_csv(csv_path)

# ✅ 提取 State（Leader 关节）和 Action（Follower 关节）
state_cols = [f"Leader_J{i}" for i in range(1, 7)]
action_cols = [f"Follower_J{i}" for i in range(1, 7)]

states = df[state_cols].values
actions = df[action_cols].values

# ✅ 构建 state-action 对
state_action_pairs = np.hstack([states, actions])  # 每行 [state_1..6, action_1..6]

# ✅ 构建列名
columns = [f"state_J{i}" for i in range(1, 7)] + [f"action_J{i}" for i in range(1, 7)]

# ✅ 保存为 CSV 文件（带列名）
df_out = pd.DataFrame(state_action_pairs, columns=columns)
csv_save_path = save_dir / "imitation_dataset_state_action.csv"
df_out.to_csv(csv_save_path, index=False)
print(f"✅ 已保存 CSV 格式模仿学习数据集：{csv_save_path.resolve()}")

# ✅ 保存为 NumPy 格式
npz_save_path = save_dir / "imitation_dataset_state_action.npz"
np.savez(npz_save_path, states=states, actions=actions)
print(f"✅ 已保存 NumPy 格式数据集：{npz_save_path.resolve()}")





In [None]:
# #🎞️ 图像帧组合为 MP4，并叠加主从臂状态，
# #仅仅生成了cam0-e12的俯视
# import cv2
# import pandas as pd
# from pathlib import Path

# # ✅ 设置路径
# data_root = Path("results/results/multimodal_data")
# cam0_dir = data_root / "cam0"
# joint_csv = data_root / "synchronized_cleaned_data.csv"
# save_path = data_root / "processed" / "demo_video_with_joint_overlay.mp4"
# save_path.parent.mkdir(parents=True, exist_ok=True)

# # ✅ 读取关节数据
# df = pd.read_csv(joint_csv)

# # ✅ 视频参数
# fps = 30
# frame_size = (640, 480)
# fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # 如播放失败可改为 'XVID' 或 'avc1'
# video_writer = cv2.VideoWriter(str(save_path), fourcc, fps, frame_size)

# font = cv2.FONT_HERSHEY_SIMPLEX
# font_scale = 0.5
# font_color = (0, 255, 0)
# thickness = 1

# written = 0  # 写入帧计数器

# # ✅ 遍历数据逐帧写入
# for i, row in df.iterrows():
#     frame_name = f"frame_{i:05d}_e22s.jpg"
#     img_path = cam0_dir / frame_name

#     if not img_path.exists():
#         print(f"❌ 图像缺失 @ frame {i}: {img_path.name}")
#         continue

#     img = cv2.imread(str(img_path))
#     if img is None:
#         print(f"❌ 图像读取失败 @ frame {i}: {img_path.name}")
#         continue

#     # ✅ 自动调整图像大小
#     if img.shape[:2] != (480, 640):  # 高宽判断
#         img = cv2.resize(img, frame_size)

#     # ✅ 叠加主从臂关节数据
#     lines = [
#         f"Frame {i:03d}",
#         f"L-J1: {row['Leader_J1']:.1f}, F-J1: {row['Follower_J1']:.1f}",
#         f"L-J2: {row['Leader_J2']:.1f}, F-J2: {row['Follower_J2']:.1f}",
#         f"L-J3: {row['Leader_J3']:.1f}, F-J3: {row['Follower_J3']:.1f}",
#     ]

#     for j, text in enumerate(lines):
#         cv2.putText(img, text, (10, 25 + j * 20), font, font_scale, font_color, thickness, cv2.LINE_AA)

#     video_writer.write(img)
#     written += 1

# video_writer.release()

# if written == 0:
#     print("⚠️ 没有成功写入任何帧，请检查图像路径或格式")
# else:
#     print(f"✅ 成功写入视频帧数: {written}")
#     print(f"📦 视频保存至: {save_path.resolve()}")

import cv2
import pandas as pd
from pathlib import Path

# ✅ 设置路径
data_root = Path("results/results/multimodal_data")
cam0_dir = data_root / "cam0"  # 側視視角 (E22S)
cam1_dir = data_root / "cam1"  # 俯視視角 (E12)
joint_csv = data_root / "synchronized_cleaned_data.csv"
save_path = data_root / "processed" / "dual_view_joint_overlay.mp4"
save_path.parent.mkdir(parents=True, exist_ok=True)

# ✅ 读取关节数据
df = pd.read_csv(joint_csv)

# ✅ 视频参数
fps = 30  # 修改为你需要的帧速率，例如 10 / 15 / 30 / 60
frame_size = (640, 480)
combined_size = (640 * 2, 480)  # 拼接后为 1280 x 480
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(str(save_path), fourcc, fps, combined_size)

# ✅ 字体参数
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.5
font_color = (0, 255, 0)
thickness = 1

written = 0

# ✅ 遍历每帧数据
for i, row in df.iterrows():
    frame_name_cam0 = f"frame_{i:05d}_e22s.jpg"
    frame_name_cam1 = f"frame_{i:05d}_e12.jpg"

    img_path_cam0 = cam0_dir / frame_name_cam0
    img_path_cam1 = cam1_dir / frame_name_cam1

    if not img_path_cam0.exists() or not img_path_cam1.exists():
        print(f"❌ 缺失图像 @ frame {i}")
        continue

    img0 = cv2.imread(str(img_path_cam0))
    img1 = cv2.imread(str(img_path_cam1))

    if img0 is None or img1 is None:
        print(f"❌ 图像读取失败 @ frame {i}")
        continue

    # ✅ 自动缩放到统一大小
    img0 = cv2.resize(img0, frame_size)
    img1 = cv2.resize(img1, frame_size)

    # ✅ 拼接图像（左右）
    combined_img = cv2.hconcat([img0, img1])

    # ✅ 构建叠加文字内容
    # lines = [
    #     f"Frame {i:03d}",
    #     f"L-J1: {row['Leader_J1']:.1f}, F-J1: {row['Follower_J1']:.1f}",
    #     f"L-J2: {row['Leader_J2']:.1f}, F-J2: {row['Follower_J2']:.1f}",
    #     f"L-J3: {row['Leader_J3']:.1f}, F-J3: {row['Follower_J3']:.1f}",
    # ]
# ✅ 构建叠加文字：全6个关节
    lines = [f"Frame {i:03d}"]
    for j in range(6):
        lj = row[f'Leader_J{j+1}']
        fj = row[f'Follower_J{j+1}']
        lines.append(f"L-J{j+1}: {lj:.1f}, F-J{j+1}: {fj:.1f}")

    for j, text in enumerate(lines):
        cv2.putText(combined_img, text, (10, 25 + j * 20), font, font_scale, font_color, thickness, cv2.LINE_AA)

    video_writer.write(combined_img)
    written += 1

video_writer.release()

if written == 0:
    print("⚠️ 未写入任何帧，请检查图像路径")
else:
    print(f"✅ 成功写入双视角视频帧数: {written}")
    print(f"📦 视频保存路径: {save_path.resolve()}")




In [None]:
# from IPython.display import Video
#Jupyter Notebook 的视频插件兼容性问题，请直接在视频文件地址打开吧
# Video("results/results/multimodal_data/processed/demo_video_with_joint_overlay.mp4", embed=True, width=640)
# from IPython.display import HTML

# HTML(f"""<video width="720" controls>
#   <source src="file:///home/wjw/lerobot/results/results/multimodal_data/processed/demo_video_with_joint_overlay.mp4" type="video/mp4">
# </video>""")
# import webbrowser
# webbrowser.open("/home/wjw/lerobot/results/results/multimodal_data/processed/demo_video_with_joint_overlay.mp4")
#🔧 方法 1：保存为 GIF（帧数适中推荐）
import imageio
from pathlib import Path
import cv2
from IPython.display import Image, display

# ✅ 读取视频并保存为 GIF
video_path = Path("results/results/multimodal_data/processed/dual_view_joint_overlay.mp4")
gif_path = video_path.with_suffix(".gif")

reader = cv2.VideoCapture(str(video_path))
frames = []
success, frame = reader.read()
while success:
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resized = cv2.resize(rgb, (640, 240))  # 缩小尺寸
    frames.append(resized)
    success, frame = reader.read()
reader.release()

imageio.mimsave(gif_path, frames, fps=10)
print(f"✅ GIF 已保存为: {gif_path.resolve()}")

# ✅ 在 Notebook 中显示 GIF
display(Image(filename=str(gif_path)))


#还想实现 自动删除临时帧 或导出压缩版 GIF，我也可以继续帮你拓展～（仍未实现）

In [None]:
#误差动态热图或动画回放

# import pandas as pd
# import numpy as np
# import matplotlib.pyplot as plt
# import matplotlib.animation as animation
# from pathlib import Path

# # ✅ 读取关节数据
# csv_path = Path("results/results/multimodal_data/synchronized_cleaned_data.csv")
# df = pd.read_csv(csv_path)

# # ✅ 计算每帧误差（Leader vs Follower）
# joint_cols = [f"J{i}" for i in range(1, 7)]
# errors = np.abs(df[[f"Leader_{j}" for j in joint_cols]].values -
#                 df[[f"Follower_{j}" for j in joint_cols]].values)

# # ✅ 设置热图动画保存路径
# save_path = Path("results/results/multimodal_data/processed/error_heatmap_video.mp4")
# save_path.parent.mkdir(parents=True, exist_ok=True)

# # ✅ 初始化动画绘图
# fig, ax = plt.subplots(figsize=(8, 4))
# cax = ax.imshow(errors[0:1], aspect='auto', cmap='hot', vmin=0, vmax=np.max(errors))
# cb = fig.colorbar(cax)
# cb.set_label("Joint Error (rad)")
# ax.set_yticks(range(6))
# ax.set_yticklabels([f"J{i}" for i in range(1, 7)])
# ax.set_xlabel("Frame Index")

# # ✅ 更新函数：动态刷新热图
# def update(frame):
#     ax.clear()
#     ax.imshow(errors[:frame + 1].T, aspect='auto', cmap='hot', vmin=0, vmax=np.max(errors))
#     ax.set_title(f"Frame {frame:03d} - Dynamic Joint Error Heatmap")
#     ax.set_yticks(range(6))
#     ax.set_yticklabels([f"J{i}" for i in range(1, 7)])
#     ax.set_xlabel("Frame Index")

# # ✅ 创建动画
# ani = animation.FuncAnimation(fig, update, frames=len(errors), interval=50)

# # ✅ 保存为 MP4 视频
# ani.save(save_path, fps=20)
# print(f"✅ 误差动态热图视频已保存：{save_path.resolve()}")

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML
from pathlib import Path

# ✅ 加载数据
df = pd.read_csv("results/results/multimodal_data/synchronized_cleaned_data.csv")
joint_cols = [f"J{i}" for i in range(1, 7)]
errors = np.abs(df[[f"Leader_{j}" for j in joint_cols]].values -
                df[[f"Follower_{j}" for j in joint_cols]].values)

# ✅ 初始化图形
fig, ax = plt.subplots(figsize=(8, 4))

# 初始化热图
cax = ax.imshow(errors[0:1].T, aspect='auto', cmap='hot', vmin=0, vmax=np.max(errors))
cb = fig.colorbar(cax)
cb.set_label("Joint Error (rad)")
ax.set_yticks(range(6))
ax.set_yticklabels([f"J{i}" for i in range(1, 7)])
ax.set_xlabel("Frame Index")

# ✅ 更新帧函数
def update(frame):
    ax.clear()
    ax.imshow(errors[:frame + 1].T, aspect='auto', cmap='hot', vmin=0, vmax=np.max(errors))
    ax.set_title(f"Frame {frame:03d} - Dynamic Joint Error Heatmap")
    ax.set_yticks(range(6))
    ax.set_yticklabels([f"J{i}" for i in range(1, 7)])
    ax.set_xlabel("Frame Index")

# ✅ 创建动画
ani = animation.FuncAnimation(fig, update, frames=len(errors), interval=50)

# ✅ Notebook 内嵌显示动画
HTML(ani.to_jshtml())

# #❶ 所有误差接近于 0
# 说明主臂和从臂的轨迹几乎完全一致，误差非常小（例如：都小于 0.1 rad），因此热图颜色对比度极低，看起来就是一块“黑色板”。

# ❷ 颜色条（colorbar）自动范围被限制住了
# 你设置了 vmin=0, vmax=np.max(errors)，如果误差整体很小，比如最大值 < 0.01，那么热图就全黑了。
#帮你补一版“动态热图增强对比 + 彩色可视化”的优化代码？


In [None]:
# 重新执行图表导出流程，因为执行环境已重置
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from pathlib import Path

# ✅ 设置路径
base_dir = Path("/home/wjw/lerobot/results/results/multimodal_data")
data_path = base_dir / "synchronized_cleaned_data.csv"
save_dir = base_dir / "figures"
save_dir.mkdir(parents=True, exist_ok=True)

# ✅ 读取数据
df = pd.read_csv(data_path)
time = np.arange(len(df))

# ✅ 提取关节数据
leader_cols = [f"Leader_J{i}" for i in range(1, 7)]
follower_cols = [f"Follower_J{i}" for i in range(1, 7)]

leader_data = df[leader_cols].values
follower_data = df[follower_cols].values

# ✅ 保存轨迹对比图
fig, axes = plt.subplots(6, 1, figsize=(8.27, 11.69), sharex=True)  # A4 纵向尺寸
for i in range(6):
    axes[i].plot(time, leader_data[:, i], label="Leader", linestyle='--')
    axes[i].plot(time, follower_data[:, i], label="Follower", alpha=0.8)
    axes[i].set_ylabel(f"J{i+1} (rad)")
    axes[i].legend(loc='upper right')
    axes[i].grid(True)
axes[-1].set_xlabel("Frame Index")
fig.suptitle("Joint Trajectory Comparison", fontsize=14)
fig.tight_layout(rect=[0, 0.03, 1, 0.97])
traj_pdf = save_dir / "trajectory_comparison.pdf"
fig.savefig(traj_pdf)
plt.close(fig)

# ✅ 保存误差曲线图
error = leader_data - follower_data
fig, axes = plt.subplots(6, 1, figsize=(8.27, 11.69), sharex=True)
for i in range(6):
    axes[i].plot(time, error[:, i], color='red')
    axes[i].axhline(0, linestyle='--', color='gray')
    axes[i].set_ylabel(f"J{i+1} Error (rad)")
    axes[i].grid(True)
axes[-1].set_xlabel("Frame Index")
fig.suptitle("Joint Angle Error", fontsize=14)
fig.tight_layout(rect=[0, 0.03, 1, 0.97])
err_pdf = save_dir / "joint_angle_error.pdf"
fig.savefig(err_pdf)
plt.close(fig)

# ✅ 保存角速度曲线图
leader_vel = np.gradient(leader_data, axis=0)
follower_vel = np.gradient(follower_data, axis=0)
fig, axes = plt.subplots(6, 1, figsize=(8.27, 11.69), sharex=True)
for i in range(6):
    axes[i].plot(time, leader_vel[:, i], label="Leader Velocity", linestyle='--')
    axes[i].plot(time, follower_vel[:, i], label="Follower Velocity", alpha=0.8)
    axes[i].set_ylabel(f"J{i+1} Vel")
    axes[i].legend()
    axes[i].grid(True)
axes[-1].set_xlabel("Frame Index")
fig.suptitle("Joint Velocity Comparison", fontsize=14)
fig.tight_layout(rect=[0, 0.03, 1, 0.97])
vel_pdf = save_dir / "joint_velocity_comparison.pdf"
fig.savefig(vel_pdf)
plt.close(fig)

# ✅ 保存 MSE 曲线图
mse = np.mean(error**2, axis=1)
fig = plt.figure(figsize=(8.27, 5))
plt.plot(time, mse, color='red')
plt.xlabel("Frame Index")
plt.ylabel("MSE (rad²)")
plt.title("Mean Squared Error over Time")
plt.grid(True)
mse_pdf = save_dir / "mse_curve.pdf"
fig.savefig(mse_pdf)
plt.close(fig)

import ace_tools as tools; tools.display_dataframe_to_user(name="保存成功的图表", dataframe=pd.DataFrame({
    "PDF 图名": ["trajectory_comparison.pdf", "joint_angle_error.pdf", "joint_velocity_comparison.pdf", "mse_curve.pdf"],
    "保存路径": [str(traj_pdf), str(err_pdf), str(vel_pdf), str(mse_pdf)]
}))


In [None]:
#结束实验

In [None]:
from lerobot.common.robot_devices.motors.dynamixel import TorqueMode

# 关闭主臂的所有关节扭矩
robot.leader_arms["main"].write("Torque_Enable", TorqueMode.DISABLED.value)

# 关闭从臂的所有关节扭矩
robot.follower_arms["main"].write("Torque_Enable", TorqueMode.DISABLED.value)

print("✅  主臂和从臂扭矩已关闭！")

leader_torque = robot.leader_arms["main"].read("Torque_Enable")
follower_torque = robot.follower_arms["main"].read("Torque_Enable")

print("🔍  主臂扭矩状态:", leader_torque)
print("🔍  从臂扭矩状态:", follower_torque)

assert all(t == 0 for t in leader_torque), "❌  主臂扭矩未正确关闭！"
assert all(t == 0 for t in follower_torque), "❌  从臂扭矩未正确关闭！"

print("✅  主从臂扭矩已确认关闭！")

In [None]:
robot.disconnect()
print("⚠️ 断开连接")