## [安装python-can](https://python-can.readthedocs.io/en/master/installation.html)

In [54]:
import struct
import can
import time

In [51]:
P_MIN = -12.5
P_MAX = 12.5
V_MIN = -30.0
V_MAX = 30.0

# 通信类型
class CmdModes:
    GET_DEVICE_ID = 0
    MOTOR_CONTROL = 1
    MOTOR_FEEDBACK = 2
    MOTOR_ENABLE = 3
    MOTOR_STOP = 4
    SET_MECHANICAL_ZERO = 6
    SET_MOTOR_CAN_ID = 7
    SINGLE_PARAM_READ = 17
    SINGLE_PARAM_WRITE = 18
    FAULT_FEEDBACK = 21


def float_to_uint(x, x_min, x_max, bits):
    span = x_max - x_min
    offset = x_min
    if x > x_max:
        x = x_max
    elif x < x_min:
        x = x_min
    return int(((x - offset) * ((1 << bits) - 1) / span))


def uint_to_float(x, x_min, x_max, bits):
    span = (1 << bits) - 1
    offset = x_max - x_min
    if x > span:
        x = span
    elif x < 0:
        x = 0
    return offset * x / span + x_min


def format_data(data=[], format="f f", type="decode"):
    # print(data)
    format_list = format.split()
    rdata = []
    if type == "decode":
        p = 0
        for f in format_list:
            s_f = []
            if f == "f":
                s_f = [4, "f"]
            elif f == "u16":
                s_f = [2, "H"]
            elif f == "s16":
                s_f = [2, "h"]
            elif f == "u32":
                s_f = [4, "I"]
            elif f == "s32":
                s_f = [4, "i"]
            elif f == "u8":
                s_f = [1, "B"]
            elif f == "s8":
                s_f = [1, "b"]
            ba = bytearray()
            if len(s_f) == 2:
                for i in range(s_f[0]):
                    ba.append(data[p])
                    p = p + 1
                rdata.append(struct.unpack(s_f[1], ba)[0])
            else:
                print("unkown format in format_data(): " + f)
                return []
        return rdata
    elif type == "encode" and len(format_list) == len(data):
        for i in range(len(format_list)):
            f = format_list[i]
            s_f = []
            if f == "f":
                s_f = [4, "f"]
            elif f == "u16":
                s_f = [2, "H"]
            elif f == "s16":
                s_f = [2, "h"]
            elif f == "u32":
                s_f = [4, "I"]
            elif f == "s32":
                s_f = [4, "i"]
            elif f == "u8":
                s_f = [1, "B"]
            elif f == "s8":
                s_f = [1, "b"]
            if f != "f":
                data[i] = int(data[i])
            if len(s_f) == 2:
                bs = struct.pack(s_f[1], data[i])
                for j in range(s_f[0]):
                    rdata.append(bs[j])
            else:
                print("unkown format in format_data(): " + f)
                return []
        if len(rdata) < 4:
            for i in range(4 - len(rdata)):
                rdata.append(0x00)
        return rdata

In [84]:
# 发生并接收一帧 CAN 信息
def send_receive_can_message(bus, cmd_mode, data2, motor_can_id, data1, timeout=0.2):
    """
    Send a CAN message and receive a response.

    Parameters:
    - bus: Initialized CAN bus object
    - cmd_mode: Command mode (bit28~bit24)
    - data2: Data area 2 (bit23~bit8)
    - motor_can_id: Motor target address (bit7~bit0)
    - data1: Data bytes to send
    - timeout: Timeout for sending the message (default is 0.2 seconds)

    Returns:
    A tuple containing the sent message and the received message, if any.
    """
    # Calculate the arbitration ID
    arbitration_id = (cmd_mode << 24) | (data2 << 8) | motor_can_id

    # Create CAN message
    message = can.Message(
        arbitration_id=arbitration_id, data=data1, is_extended_id=True,
    )

    # Send the CAN message
    try:
        bus.send(message, timeout=timeout)
    except can.CanError:
        print("Failed to send the message.")
        return None, None

    # Output details of the sent message
    print(f"Sent message with ID {hex(message.arbitration_id)}, data: 0x{(message.data.hex())}")

    # Receive a CAN message with a 1-second timeout
    received_msg = bus.recv(1.0)
    return message, received_msg


# 清空CAN接收缓存，避免读到老数据
def clear_can_rx(timeout=1):
    # Clear the receiver buffer by reading all existing messages
    start_time = time.time()
    timeout = timeout  # seconds
    while time.time() - start_time < timeout:
        recv_msg = bus.recv(0)
        if recv_msg is not None:
            print(
                f"Cleared message with ID {hex(recv_msg.arbitration_id)}"
            )

In [3]:
# Connect to the CAN bus with 1 Mbit/s bitrate
bus = can.interface.Bus(bustype="pcan", channel="PCAN_USBBUS1", bitrate=1000000)

uptime library not available, timestamps are relative to boot time and not to Epoch UTC


In [106]:
clear_can_rx() # 空CAN接收缓存，避免读到老数据

Cleared message with ID 0x7ffe
Cleared message with ID 0x2007ffe
Cleared message with ID 0x2007ffe
Cleared message with ID 0x2007ffe


In [41]:
MAIN_CAN_ID = 254 # 电脑上位机（主机） CAN ID
MOTOR_ID = 127 # 电机（从机） CAN ID

## Jog

In [102]:
jog_vel = -4  # rad/s
uint_value = float_to_uint(jog_vel, V_MIN, V_MAX, 16)
jog_vel_bytes = format_data(data=[uint_value], format="u16", type="encode")[:2][::-1]

data1 = [0x05, 0x70, 0x00, 0x00, 0x07, 0x01] + jog_vel_bytes
sent_msg, received_msg = send_receive_can_message(
    bus=bus, cmd_mode=CmdModes.SINGLE_PARAM_WRITE, data2=MAIN_CAN_ID, motor_can_id=MOTOR_ID, data1=data1
)

Sent message with ID 0x1200fe7f, data: 0x0570000007016eee


## 停止

In [105]:
data1 = [0x05, 0x70, 0x00, 0x00, 0x07, 0x00, 0x7F, 0xFF]
sent_msg, received_msg = send_receive_can_message(
    bus=bus, cmd_mode=CmdModes.SINGLE_PARAM_WRITE, data2=MAIN_CAN_ID, motor_can_id=MOTOR_ID, data1=data1
)

Sent message with ID 0x1200fe7f, data: 0x0570000007007fff


## 读取速度和位置
- 发送参数：单个参数写⼊（通信类型18），写入地址0x7018（电流限制）值为 27A
- 读取返回值：电机反馈数据（通信类型2），Byte0,1: 当前⻆度，Byte2,3:当前⻆速度

In [111]:
address = 0x7018
current_limit = 27

encoded_data = format_data(data=[current_limit], format="f", type="encode")

data_bytes = struct.pack("<I", address) + bytes(encoded_data)

sent_msg, received_msg = send_receive_can_message(
    bus=bus,
    cmd_mode=CmdModes.SINGLE_PARAM_WRITE,
    data2=MAIN_CAN_ID,
    motor_can_id=MOTOR_ID,
    data1=data_bytes,
)
if received_msg is not None:
    print(
        f"Received message with ID {hex(received_msg.arbitration_id)}: {received_msg.data.hex()}"
    )
    pos = uint_to_float(
        (received_msg.data[0] << 8) + received_msg.data[1], P_MIN, P_MAX, 16
    )  # * RAD_DEG
    vel = uint_to_float(
        (received_msg.data[2] << 8) + received_msg.data[3], V_MIN, V_MAX, 16
    )  # * RAD_S_R_MIN
    print(f"pos:{pos:.2f} rad, vel:{vel:.2f} rad/s")
else:
    print("No message received within the timeout period.")

Sent message with ID 0x1200fe7f, data: 0x187000000000d841
Received message with ID 0x2007ffe: 72c27fec7fff015a
pos:-1.29 rad, vel:-0.02 rad/s
