In [1]:
from dynamixel_sdk import *  # ROBOTIS Dynamixel SDK
import time
import math
from dataclasses import dataclass
from typing import Dict, List, Optional, Any

# ===== 你需要改的 =====
DEVICENAME = "COM3"
BAUDRATE = 57600
DXL_IDS = [8,9]
PROTOCOL_VERSION = 2.0

# ===== XL330 / X-series 常用控制表地址 =====
ADDR_OPERATING_MODE    = 11
ADDR_TORQUE_ENABLE     = 64
ADDR_GOAL_POSITION     = 116

ADDR_PRESENT_CURRENT   = 126
ADDR_PRESENT_VELOCITY  = 128
ADDR_PRESENT_POSITION  = 132

LEN_4 = 4
LEN_2 = 2

# ===== Operating Mode =====
MODE_EXTENDED_POSITION = 4   # 多圈位置模式

TORQUE_ENABLE  = 1
TORQUE_DISABLE = 0

# ===== 角度-刻度换算（XL330 通常是 4096 ticks / rev）=====
TICKS_PER_REV = 4096

# ===== controller_state：核心状态 =====
controller_state = {
    "ticks_per_rev": TICKS_PER_REV,
    "cmd_ticks": {i: 0 for i in DXL_IDS},   # 我们的“命令坐标系”（动作序列在此基础上规划）
    "connected": False,
}


In [2]:
class AxisServo:
    def __init__(self, state: dict):
        self.state = state
        self.port = PortHandler(DEVICENAME)
        self.packet = PacketHandler(PROTOCOL_VERSION)

    # ---------- 基础工具 ----------
    @staticmethod
    def _int32(x: int) -> int:
        x &= 0xFFFFFFFF
        return x - 0x100000000 if x & 0x80000000 else x

    @staticmethod
    def _u32(x: int) -> int:
        return x & 0xFFFFFFFF

    @staticmethod
    def _int16(x: int) -> int:
        x &= 0xFFFF
        return x - 0x10000 if x & 0x8000 else x

    def deg_to_ticks(self, deg: float) -> int:
        return int(round(deg * self.state["ticks_per_rev"] / 360.0))

    def ticks_to_deg(self, ticks: int) -> float:
        return ticks * 360.0 / self.state["ticks_per_rev"]

    # ---------- 连接 ----------
    def connect(self):
        if not self.port.openPort():
            raise RuntimeError("Failed to open port")
        if not self.port.setBaudRate(BAUDRATE):
            raise RuntimeError("Failed to set baudrate")
        self.state["connected"] = True

    def close(self):
        try:
            self.port.closePort()
        finally:
            self.state["connected"] = False

    # ---------- 扫描 ----------
    def scan_ids(self, ids=None, timeout_ms=10) -> List[int]:
        if ids is None:
            ids = DXL_IDS
        online = []
        for dxl_id in ids:
            model, comm, err = self.packet.ping(self.port, dxl_id)
            if comm == COMM_SUCCESS:
                online.append(dxl_id)
        return online

    # ---------- 写寄存器 ----------
    def write1(self, dxl_id: int, addr: int, val: int):
        comm, err = self.packet.write1ByteTxRx(self.port, dxl_id, addr, val)
        if comm != COMM_SUCCESS:
            raise RuntimeError(self.packet.getTxRxResult(comm))

    def write4(self, dxl_id: int, addr: int, val: int):
        comm, err = self.packet.write4ByteTxRx(self.port, dxl_id, addr, self._u32(val))
        if comm != COMM_SUCCESS:
            raise RuntimeError(self.packet.getTxRxResult(comm))

    # ---------- SyncWrite Goal Position ----------
    def sync_write_goal_position(self, ticks_by_id: Dict[int, int]):
        sw = GroupSyncWrite(self.port, self.packet, ADDR_GOAL_POSITION, LEN_4)
        for dxl_id, v in ticks_by_id.items():
            u = self._u32(v)
            param = [
                DXL_LOBYTE(DXL_LOWORD(u)),
                DXL_HIBYTE(DXL_LOWORD(u)),
                DXL_LOBYTE(DXL_HIWORD(u)),
                DXL_HIBYTE(DXL_HIWORD(u)),
            ]
            if not sw.addParam(dxl_id, bytearray(param)):
                raise RuntimeError(f"SyncWrite addParam failed: ID {dxl_id}")
        comm = sw.txPacket()
        sw.clearParam()
        if comm != COMM_SUCCESS:
            raise RuntimeError(self.packet.getTxRxResult(comm))

    # ---------- BulkRead（Position + Current，可选 Velocity） ----------
    def bulk_read(self, ids=None, want_current=True, want_velocity=False):
        if ids is None:
            ids = DXL_IDS

        br = GroupBulkRead(self.port, self.packet)

        for dxl_id in ids:
            if not br.addParam(dxl_id, ADDR_PRESENT_POSITION, LEN_4):
                raise RuntimeError(f"BulkRead addParam failed: pos ID {dxl_id}")
            if want_velocity:
                if not br.addParam(dxl_id, ADDR_PRESENT_VELOCITY, LEN_4):
                    raise RuntimeError(f"BulkRead addParam failed: vel ID {dxl_id}")
            if want_current:
                if not br.addParam(dxl_id, ADDR_PRESENT_CURRENT, LEN_2):
                    raise RuntimeError(f"BulkRead addParam failed: cur ID {dxl_id}")

        comm = br.txRxPacket()
        if comm != COMM_SUCCESS:
            raise RuntimeError(self.packet.getTxRxResult(comm))

        pos = {}
        vel = {}
        cur = {}

        for dxl_id in ids:
            raw_pos = br.getData(dxl_id, ADDR_PRESENT_POSITION, LEN_4)
            pos[dxl_id] = self._int32(raw_pos)

            if want_velocity:
                raw_vel = br.getData(dxl_id, ADDR_PRESENT_VELOCITY, LEN_4)
                vel[dxl_id] = self._int32(raw_vel)
            else:
                vel[dxl_id] = None

            if want_current:
                raw_cur = br.getData(dxl_id, ADDR_PRESENT_CURRENT, LEN_2)
                cur[dxl_id] = self._int16(raw_cur)
            else:
                cur[dxl_id] = None

        br.clearParam()
        return pos, vel, cur

    # ---------- 模式设置：Extended Position ----------
    def ensure_extended_mode(self, ids=None):
        if ids is None:
            ids = DXL_IDS
        for dxl_id in ids:
            # torque off -> set mode -> torque on
            self.write1(dxl_id, ADDR_TORQUE_ENABLE, TORQUE_DISABLE)
            self.write1(dxl_id, ADDR_OPERATING_MODE, MODE_EXTENDED_POSITION)
            self.write1(dxl_id, ADDR_TORQUE_ENABLE, TORQUE_ENABLE)

    # ---------- 软重置：把 cmd_ticks 贴到当前实测位置（避免一上来跳） ----------
    def soft_reset_cmd_to_present(self, ids=None):
        if ids is None:
            ids = DXL_IDS
        pos, _, _ = self.bulk_read(ids, want_current=False)
        for dxl_id in ids:
            self.state["cmd_ticks"][dxl_id] = int(pos[dxl_id])

    # ---------- 设置目标（绝对角度 / 相对角度） ----------
    def set_targets_abs_deg(self, abs_deg_by_id: Dict[int, float]):
        for dxl_id, deg in abs_deg_by_id.items():
            self.state["cmd_ticks"][dxl_id] = self.deg_to_ticks(deg)

    def nudge_rel_deg(self, rel_deg_by_id: Dict[int, float]):
        for dxl_id, deg in rel_deg_by_id.items():
            self.state["cmd_ticks"][dxl_id] += self.deg_to_ticks(deg)


In [3]:
@dataclass
class Segment:
    t0: float
    t1: float
    p0: int
    p1: int

def _lerp(a: float, b: float, u: float) -> float:
    return a + (b - a) * u

def _smoothstep(u: float) -> float:
    # 0..1 -> 0..1，减冲击
    return u*u*(3 - 2*u)

def build_segments_for_motor(
    motor_id: int,
    actions: List[Dict[str, Any]],
    base_tick: int,
    servo: AxisServo
) -> List[Segment]:
    """把某个 motor 的 actions 转成不重叠的 Segment 列表"""
    acts = [a for a in actions if int(a["id"]) == motor_id]
    acts.sort(key=lambda x: float(x["start_s"]))

    segs: List[Segment] = []
    current_base = base_tick

    for a in acts:
        t0 = float(a["start_s"])
        dur = float(a["duration_s"])
        t1 = t0 + max(1e-6, dur)

        if segs and t0 < segs[-1].t1 - 1e-6:
            raise ValueError(f"Motor {motor_id} actions overlap: start {t0} < prev_end {segs[-1].t1}")

        # 计算该动作的起点 tick：如果前面已有 segment，那么以前一段终点为起点；否则用 base
        p0 = segs[-1].p1 if segs else current_base

        typ = a["type"]
        val = float(a["value"])

        if typ == "abs_deg":
            p1 = servo.deg_to_ticks(val)
        elif typ == "rel_deg":
            p1 = p0 + servo.deg_to_ticks(val)
        else:
            raise ValueError(f"Unknown action type: {typ}")

        segs.append(Segment(t0=t0, t1=t1, p0=p0, p1=p1))

    return segs

def desired_tick_at_time(t: float, base_tick: int, segs: List[Segment]) -> int:
    """给定时间 t，返回该 motor 在该时刻应该写入的 goal tick"""
    if not segs:
        return base_tick

    if t <= segs[0].t0:
        return base_tick

    # 在段之间：保持上一段终点
    last_p = base_tick
    for s in segs:
        if t < s.t0:
            return last_p
        if s.t0 <= t <= s.t1:
            u = (t - s.t0) / (s.t1 - s.t0)
            u = max(0.0, min(1.0, u))
            u = _smoothstep(u)
            return int(round(_lerp(s.p0, s.p1, u)))
        last_p = s.p1

    # 超过最后一段：保持终点
    return segs[-1].p1

def run_action_sequence(
    servo: AxisServo,
    actions: List[Dict[str, Any]],
    poll_hz: int = 50,
    ids=None,
    reset_cmd_to_present: bool = True,
    write_all_ids_each_tick: bool = True,
):
    """
    - 支持不同电机不同 start_s / duration_s
    - 全程持续 bulk_read current，并记录日志
    - 默认：开跑前把 cmd_ticks 贴到 present（避免突然跳）
    """
    if ids is None:
        ids = DXL_IDS

    # 0) 可选：先把 cmd_ticks 贴到当前实测位置
    if reset_cmd_to_present:
        servo.soft_reset_cmd_to_present(ids)

    # 1) 以“跑开始时的 cmd_tick”为 base，构建每个电机的 segments
    base_ticks = {i: int(servo.state["cmd_ticks"][i]) for i in ids}
    segs_by_id = {i: build_segments_for_motor(i, actions, base_ticks[i], servo) for i in ids}

    # 2) 总时长 = max(t1)
    total_t = 0.0
    for i in ids:
        if segs_by_id[i]:
            total_t = max(total_t, segs_by_id[i][-1].t1)
    total_t = max(total_t, 0.0)

    dt = 1.0 / max(1, int(poll_hz))
    steps = max(1, int(math.ceil(total_t / dt)))

    # 3) 记录日志：每行包含 time + goal/pos/current（每个 id）
    log_rows: List[Dict[str, Any]] = []

    t_start = time.perf_counter()

    # 4) 主循环
    for k in range(steps + 1):
        now = time.perf_counter()
        t = now - t_start

        # 4.1 计算 goal ticks
        goal_ticks = {}
        for i in ids:
            goal_ticks[i] = desired_tick_at_time(t, base_ticks[i], segs_by_id[i])

        # 4.2 写入 goal（一般建议每 tick 都写全量，更稳）
        if write_all_ids_each_tick:
            servo.sync_write_goal_position(goal_ticks)
        else:
            # 只写有变化的（可选优化）
            servo.sync_write_goal_position(goal_ticks)

        # 4.3 读取 present（全程 current）
        pos, vel, cur = servo.bulk_read(ids, want_current=True, want_velocity=False)

        row = {"t_s": t}
        for i in ids:
            row[f"goal_tick_{i}"] = int(goal_ticks[i])
            row[f"pos_tick_{i}"]  = int(pos[i])
            row[f"cur_raw_{i}"]   = int(cur[i]) if cur[i] is not None else None
        log_rows.append(row)

        # 4.4 节奏控制
        target = k * dt
        spent = time.perf_counter() - t_start
        sleep_s = target - spent
        if sleep_s > 0:
            time.sleep(sleep_s)

    # 5) 跑完后，把 cmd_ticks 更新为最后的目标（方便你继续追加动作）
    for i in ids:
        servo.state["cmd_ticks"][i] = desired_tick_at_time(total_t, base_ticks[i], segs_by_id[i])

    result = {
        "total_s": total_t,
        "poll_hz": poll_hz,
        "rows": log_rows,
        "segments": segs_by_id,
        "base_ticks": base_ticks,
    }
    return result

def save_log_csv(result: dict, path: str):
    import csv
    rows = result["rows"]
    if not rows:
        raise ValueError("Empty log")
    keys = list(rows[0].keys())
    with open(path, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=keys)
        w.writeheader()
        w.writerows(rows)
    return path


In [4]:
servo = AxisServo(controller_state)
servo.connect()

online = servo.scan_ids()
print("Online IDs:", online)

# 强烈建议：统一设成 Extended Position（多圈）
servo.ensure_extended_mode(DXL_IDS)

# 建议：启动时先把 cmd_ticks 贴到当前实测位置，避免一上来跳到 0
servo.soft_reset_cmd_to_present(DXL_IDS)
print("soft_reset_cmd_to_present done.")

# ===== 定义动作序列 =====
# 说明：
# - 每个 action 都有 start_s（相对本序列开始）
# - duration_s 内用平滑插值走到目标（多圈完全OK）
actions = [
    {"id": 8, "type": "rel_deg", "value": 10, "start_s": 0.0, "duration_s": 1.5},  # 电机1：+2圈，从0秒开始
    {"id": 9, "type": "rel_deg", "value": 360, "start_s": 0.0, "duration_s": 1.5},  # 电机2：-1圈，从0.6秒开始
    # 你可以继续加：同一个电机多个动作（注意不要重叠 start/duration）
    #{"id": 1, "type": "rel_deg", "value": -180, "start_s": 1.5, "duration_s": 0.6},  # 电机1：再回一点，从1.5秒开始
]

result = run_action_sequence(
    servo,
    actions=actions,
    poll_hz=50,
    ids=[8, 9],                 # 只控制 1 和 2（你也可以换成 DXL_IDS）
    reset_cmd_to_present=True,  # 序列开始前贴到 present
)

print("Sequence done. total_s =", result["total_s"])
print("Log rows =", len(result["rows"]))

# 保存日志（可选）
csv_path = save_log_csv(result, "xl330_action_log——1.csv")
print("Saved:", csv_path)


Online IDs: [8, 9]
soft_reset_cmd_to_present done.


RuntimeError: BulkRead addParam failed: cur ID 8

In [None]:
servo.close()
print("closed")
