In [92]:
import socket
import struct
import time

# ==== Cấu hình IP Robot và cổng Modbus (cố định) ====
ROBOT_IP = "192.168.1.165"
ROBOT_PORT = 502
SOCK_TIMEOUT = 5.0


# ==== Hàm chuyển số nguyên 16-bit sang big-endian ====
def to_be_u16(value):
    return struct.pack(">H", value)


# ==== Hàm tạo khung truyền Lite6 ====
def create_lite6_packet(tid, register, params=b""):
    """
    Khung Lite6 gồm:
    - TID (2 byte BE)
    - Proto = 0x0002 (2 byte BE)
    - Length = 1 + len(params) (2 byte BE)
    - Register (1 byte)
    - Params (n byte)
    """
    proto = 0x0002
    length = 1 + len(params)
    packet = (
        to_be_u16(tid) +
        to_be_u16(proto) +
        to_be_u16(length) +
        bytes([register]) +
        params
    )
    return packet


# ==== Hàm gửi lệnh và nhận phản hồi ====
def send_command(packet):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.settimeout(SOCK_TIMEOUT)
        sock.connect((ROBOT_IP, ROBOT_PORT))
        sock.sendall(packet)

        # Nhận header 6 byte
        header = sock.recv(6)
        tid, proto, length = struct.unpack(">HHH", header)

        # Nhận phần body còn lại
        body = sock.recv(length)
        return header + body


# ==== Hàm giải mã phản hồi từ robot ====
def parse_response(data):
    if len(data) < 8:
        return "Phản hồi không hợp lệ"

    register = data[6]
    status = data[7]

    # Giải thích trạng thái
    status_text = []
    if (status >> 6) & 1:
        status_text.append("❌ Lỗi (Bit6)")
    if (status >> 5) & 1:
        status_text.append("⚠️ Cảnh báo (Bit5)")
    if (status >> 4) & 1:
        status_text.append("⛔ Không thể chuyển động (Bit4)")
    if not status_text:
        status_text = ["✅ OK"]

    return f"Reg: 0x{register:02X}, Status: 0x{status:02X} ({', '.join(status_text)})"


# ==== Lệnh chuẩn (Clear Error / Enable Robot) ====
def cmd_clear_error(tid=1):
    return create_lite6_packet(tid, register=0x10)

def cmd_clear_warning(tid=2):
    return create_lite6_packet(tid, register=0x11)

def cmd_enable_all_joints(tid=3):
    params = b"\x08\x01"  # 0x08 = all joints, 0x01 = enable
    return create_lite6_packet(tid, register=0x0B, params=params)


# ==== Lệnh điều khiển Gripper (CH0 / CH1, OPEN / CLOSE) ====
def cmd_gripper(channel=0, open_on=True, tid=10):
    """
    Gửi lệnh điều khiển Gripper theo kênh:
    - channel: 0 hoặc 1
    - open_on: True = Open, False = Close
    """
    HOST_ID = 0x09
    ADDR = 0x0A15

    # Tra giá trị float theo manual
    if channel == 0:
        value = 257.0 if open_on else 256.0
    elif channel == 1:
        value = 514.0 if open_on else 512.0
    else:
        raise ValueError("Chỉ chấp nhận channel 0 hoặc 1")

    # Gói params: [HostID][Addr u16 BE][Value f32 LE]
    params = bytes([HOST_ID]) + to_be_u16(ADDR) + struct.pack("<f", value)
    return create_lite6_packet(tid, register=0x7F, params=params)

def close_gripper(channel=1, tid=13):
    """
    Mở gripper trên channel chỉ định (0 hoặc 1)
    """
    print(f"--- Mở Gripper Channel {channel} ---")
    packet = cmd_gripper(channel=channel, open_on=True, tid=tid)
    response = send_command(packet)
    result = parse_response(response)
    return result

def stop_gripper(channel=0, tid=11):
    """
    Đóng gripper trên channel chỉ định (0 hoặc 1)
    """
    print(f"--- Đóng Gripper Channel {channel} ---")
    packet = cmd_gripper(channel=channel, open_on=False, tid=tid)
    response = send_command(packet)
    result = parse_response(response)
    return result
def open_gripper(channel=0, tid=12):
    """
    Mở gripper trên channel chỉ định (0 hoặc 1)
    """
    print(f"--- Mở Gripper Channel {channel} ---")
    packet = cmd_gripper(channel=channel, open_on=True, tid=tid)
    response = send_command(packet)
    result = parse_response(response)
    return result
# ==== Demo chạy lệnh ====
def demo():
    print("=== Bắt đầu điều khiển Lite6 ===")

    for name, cmd_func in [
        ("Xóa lỗi", cmd_clear_error),
        ("Xóa cảnh báo", cmd_clear_warning),
        ("Bật động cơ (enable all joints)", cmd_enable_all_joints),
    ]:
        print(f"\n--- {name} ---")
        packet = cmd_func()
        resp = send_command(packet)
        print(parse_response(resp))
    close_gripper()
    time.sleep(5)
    open_gripper()
    
    
# ==== Chạy demo nếu file chính ====
if __name__ == "__main__":
    demo()


=== Bắt đầu điều khiển Lite6 ===

--- Xóa lỗi ---
Reg: 0x10, Status: 0x00 (✅ OK)

--- Xóa cảnh báo ---
Reg: 0x11, Status: 0x00 (✅ OK)

--- Bật động cơ (enable all joints) ---
Reg: 0x0B, Status: 0x00 (✅ OK)
--- Mở Gripper Channel 1 ---
--- Mở Gripper Channel 0 ---


In [93]:
import anhooa

print(dir(anhooa))  # Kiểm tra xem có open_gripper không




In [None]:
import socket
import struct
import time
import math

# ==== Cấu hình IP Robot và cổng Modbus (cố định) ====
ROBOT_IP = "192.168.1.165"
ROBOT_PORT = 502
SOCK_TIMEOUT = 5.0


# ==== Tiện ích chung ====
def to_be_u16(value: int) -> bytes:
    """Đóng gói số nguyên 16-bit big-endian."""
    return struct.pack(">H", value)


def recv_all(sock: socket.socket, n: int) -> bytes:
    """Nhận chính xác n byte từ socket (block đến khi đủ hoặc timeout)."""
    data = bytearray()
    while len(data) < n:
        chunk = sock.recv(n - len(data))
        if not chunk:
            break
        data.extend(chunk)
    return bytes(data)


# ==== Hàm tạo khung truyền Lite6 ====
def create_lite6_packet(tid: int, register: int, params: bytes = b"") -> bytes:
    """
    Khung Lite6 gồm:
    - TID (2 byte BE)
    - Proto = 0x0002 (2 byte BE)
    - Length = 1 + len(params) (2 byte BE)
    - Register (1 byte)
    - Params (n byte, thường là fp32 LE)
    """
    proto = 0x0002
    length = 1 + len(params)
    packet = (
        to_be_u16(tid) +
        to_be_u16(proto) +
        to_be_u16(length) +
        bytes([register]) +
        params
    )
    return packet


# ==== Hàm gửi lệnh và nhận phản hồi ====
def send_command(packet: bytes) -> bytes:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.settimeout(SOCK_TIMEOUT)
        sock.connect((ROBOT_IP, ROBOT_PORT))
        sock.sendall(packet)

        # Nhận header 6 byte
        header = recv_all(sock, 6)
        if len(header) != 6:
            raise TimeoutError("Không nhận đủ 6 byte header từ robot.")

        tid, proto, length = struct.unpack(">HHH", header)

        # Nhận phần body còn lại (length byte)
        body = recv_all(sock, length)
        if len(body) != length:
            raise TimeoutError(f"Không nhận đủ body ({len(body)}/{length} byte).")

        return header + body


# ==== Hàm giải mã phản hồi từ robot ====
def parse_response(data: bytes) -> str:
    if len(data) < 8:
        return "Phản hồi không hợp lệ"

    # header: 6 byte, tiếp theo: register (1), status (1), ...
    register = data[6]
    status = data[7]

    # Giải thích trạng thái
    status_text = []
    if (status >> 6) & 1:
        status_text.append("❌ Lỗi (Bit6)")
    if (status >> 5) & 1:
        status_text.append("⚠️ Cảnh báo (Bit5)")
    if (status >> 4) & 1:
        status_text.append("⛔ Không thể chuyển động (Bit4)")
    if not status_text:
        status_text = ["✅ OK"]

    return f"Reg: 0x{register:02X}, Status: 0x{status:02X} ({', '.join(status_text)})"


# ==== Lệnh chuẩn (Clear Error / Warning / Enable Robot) ====
def cmd_clear_error(tid: int = 1) -> bytes:
    return create_lite6_packet(tid, register=0x10)

def cmd_clear_warning(tid: int = 2) -> bytes:
    return create_lite6_packet(tid, register=0x11)

def cmd_enable_all_joints(tid: int = 3) -> bytes:
    # 0x08 = tất cả khớp, 0x01 = enable
    params = b"\x08\x01"
    return create_lite6_packet(tid, register=0x0B, params=params)


# ==== Lệnh điều khiển Gripper (CH0 / CH1, OPEN / CLOSE) ====
def cmd_gripper(channel: int = 0, open_on: bool = True, tid: int = 10) -> bytes:
    """
    Gửi lệnh điều khiển Gripper theo kênh:
    - channel: 0 hoặc 1
    - open_on: True = Open, False = Close
    Gói params: [HostID:1][Addr u16 BE][Value f32 LE]
    """
    HOST_ID = 0x09
    ADDR = 0x0A15

    # Tra giá trị float theo manual
    if channel == 0:
        value = 257.0 if open_on else 256.0
    elif channel == 1:
        value = 514.0 if open_on else 512.0
    else:
        raise ValueError("Chỉ chấp nhận channel 0 hoặc 1")

    params = bytes([HOST_ID]) + to_be_u16(ADDR) + struct.pack("<f", value)
    return create_lite6_packet(tid, register=0x7F, params=params)


# ==== Lệnh P2P (Joint motion) - Register 0x17 ====
def cmd_p2p(
    joints_rad,              # iterable 6 phần tử [J1..J6] đơn vị rad
    speed_rad_s: float = 0.0,
    acc_rad_s2: float = 0.0,
    motion_time_s: float = 0.0,
    tid: int = 20
) -> bytes:
    """
    Điều khiển P2P theo tài liệu (Register 0x17):
    - joints_rad: [J1..J6] (rad)
    - speed_rad_s: tốc độ khớp (rad/s)
    - acc_rad_s2: gia tốc khớp (rad/s^2)
    - motion_time_s: thời gian chuyển động (s), 0 = để robot tự tính theo speed/acc
    Tham số đóng gói dạng fp32 little-endian.
    """
    if len(joints_rad) != 6:
        raise ValueError("joints_rad phải có đúng 6 phần tử (J1..J6, đơn vị rad).")

    params = b"".join(struct.pack("<f", float(v)) for v in joints_rad)
    params += struct.pack("<f", float(speed_rad_s))
    params += struct.pack("<f", float(acc_rad_s2))
    params += struct.pack("<f", float(motion_time_s))

    return create_lite6_packet(tid, register=0x17, params=params)


# ==== Tiện ích: nhập độ thay vì rad (tuỳ chọn) ====
def deg2rad(deg: float) -> float:
    return deg * math.pi / 180.0

def cmd_p2p_deg(
    joints_deg,              # iterable 6 phần tử [J1..J6] đơn vị độ
    speed_deg_s: float = 0.0,
    acc_deg_s2: float = 0.0,
    motion_time_s: float = 0.0,
    tid: int = 21
) -> bytes:
    """
    Tương tự cmd_p2p nhưng nhập theo độ.
    """
    joints_rad = [deg2rad(d) for d in joints_deg]
    speed_rad_s = deg2rad(speed_deg_s)
    acc_rad_s2 = deg2rad(acc_deg_s2)
    return cmd_p2p(joints_rad, speed_rad_s, acc_rad_s2, motion_time_s, tid)


# ==== Demo chạy lệnh ====
def demo():
    print("=== Bắt đầu điều khiển Lite6 ===")

    # 1) Clear lỗi / cảnh báo / enable
    for name, cmd_func in [
        ("Xóa lỗi", cmd_clear_error),
        ("Xóa cảnh báo", cmd_clear_warning),
        ("Bật động cơ (enable all joints)", cmd_enable_all_joints),
    ]:
        print(f"\n--- {name} ---")
        packet = cmd_func()
        resp = send_command(packet)
        print(parse_response(resp))
        time.sleep(0.2)

    # 2) Gripper demo
    print("\n--- Gripper Channel 0 Mở ---")
    packet = cmd_gripper(channel=0, open_on=True, tid=11)
    print(parse_response(send_command(packet)))
    time.sleep(0.8)

    print("--- Gripper Channel 0 Đóng ---")
    packet = cmd_gripper(channel=0, open_on=False, tid=12)
    print(parse_response(send_command(packet)))
    time.sleep(0.8)

    print("\n--- Gripper Channel 0 Mở ---")
    packet = cmd_gripper(channel==1, open_on=True, tid=13)
    print(parse_response(send_command(packet)))
    time.sleep(5)

    print("--- Gripper Channel 0 Đóng ---")
    packet = cmd_gripper(channel=1, open_on=False, tid=14)
    print(parse_response(send_command(packet)))
    time.sleep(0.8)

    # 3) P2P demo (theo độ)
    #   Ví dụ: tới [60°, 0, 0, 0, 0, 0] với speed=20°/s, acc=500°/s², motion_time=0 (robot tự tính)
    # print("\n--- P2P: Joint move tới [60°, 0, 0, 0, 0, 0] ---")
    # packet = cmd_p2p_deg([60, 0, 0, 0, 0, 0], speed_deg_s=20, acc_deg_s2=500, motion_time_s=0, tid=30)
    # print(parse_response(send_command(packet)))

    # Đợi robot hoàn tất tương đối trước khi chạy lệnh khác (tùy bài toán có thể cần đọc trạng thái)
    time.sleep(2.0)


# ==== Chạy demo nếu file chính ====
if __name__ == "__main__":
    demo()


=== Bắt đầu điều khiển Lite6 ===

--- Xóa lỗi ---
Reg: 0x10, Status: 0x00 (✅ OK)

--- Xóa cảnh báo ---
Reg: 0x11, Status: 0x00 (✅ OK)

--- Bật động cơ (enable all joints) ---
Reg: 0x0B, Status: 0x00 (✅ OK)

--- Gripper Channel 0 Mở ---
Reg: 0x7F, Status: 0x00 (✅ OK)
--- Gripper Channel 0 Đóng ---
Reg: 0x7F, Status: 0x00 (✅ OK)

--- Gripper Channel 0 Mở ---


NameError: name 'channel' is not defined