In [3]:
!pip install dynamixel-sdk

Collecting dynamixel-sdk
  Downloading dynamixel_sdk-3.7.31-py3-none-any.whl (23 kB)
Collecting pyserial (from dynamixel-sdk)
  Downloading pyserial-3.5-py2.py3-none-any.whl (90 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.6/90.6 kB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyserial, dynamixel-sdk
Successfully installed dynamixel-sdk-3.7.31 pyserial-3.5


In [12]:
import numpy as np

from leap_hand_utils.dynamixel_client import *
import leap_hand_utils.leap_hand_utils as lhu
import time

In [9]:
class LeapNode:
    def __init__(self):
        ####Some parameters
        # self.ema_amount = float(rospy.get_param('/leaphand_node/ema', '1.0')) #take only current
        self.kP = 600
        self.kI = 0
        self.kD = 200
        self.curr_lim = 350
        self.prev_pos = self.pos = self.curr_pos = lhu.allegro_to_LEAPhand(np.zeros(16))
           
        #You can put the correct port here or have the node auto-search for a hand at the first 3 ports.
        self.motors = motors = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
        try:
            self.dxl_client = DynamixelClient(motors, '/dev/ttyUSB0', 4000000)
            self.dxl_client.connect()
        except Exception:
            try:
                self.dxl_client = DynamixelClient(motors, '/dev/ttyUSB1', 4000000)
                self.dxl_client.connect()
            except Exception:
                self.dxl_client = DynamixelClient(motors, 'COM13', 4000000)
                self.dxl_client.connect()
        #Enables position-current control mode and the default parameters, it commands a position and then caps the current so the motors don't overload
        self.dxl_client.sync_write(motors, np.ones(len(motors))*5, 11, 1)
        self.dxl_client.set_torque_enabled(motors, True)
        self.dxl_client.sync_write(motors, np.ones(len(motors)) * self.kP, 84, 2) # Pgain stiffness     
        self.dxl_client.sync_write([0,4,8], np.ones(3) * (self.kP * 0.75), 84, 2) # Pgain stiffness for side to side should be a bit less
        self.dxl_client.sync_write(motors, np.ones(len(motors)) * self.kI, 82, 2) # Igain
        self.dxl_client.sync_write(motors, np.ones(len(motors)) * self.kD, 80, 2) # Dgain damping
        self.dxl_client.sync_write([0,4,8], np.ones(3) * (self.kD * 0.75), 80, 2) # Dgain damping for side to side should be a bit less
        #Max at current (in unit 1ma) so don't overheat and grip too hard #500 normal or #350 for lite
        self.dxl_client.sync_write(motors, np.ones(len(motors)) * self.curr_lim, 102, 2)
        self.dxl_client.write_desired_pos(self.motors, self.curr_pos)

    #Receive LEAP pose and directly control the robot
    def set_leap(self, pose):
        self.prev_pos = self.curr_pos
        self.curr_pos = np.array(pose)
        self.dxl_client.write_desired_pos(self.motors, self.curr_pos)
    #allegro compatibility
    def set_allegro(self, pose):
        pose = lhu.allegro_to_LEAPhand(pose, zeros=False)
        self.prev_pos = self.curr_pos
        self.curr_pos = np.array(pose)
        self.dxl_client.write_desired_pos(self.motors, self.curr_pos)
    #Sim compatibility, first read the sim value in range [-1,1] and then convert to leap
    def set_ones(self, pose):
        pose = lhu.sim_ones_to_LEAPhand(np.array(pose))
        self.prev_pos = self.curr_pos
        self.curr_pos = np.array(pose)
        self.dxl_client.write_desired_pos(self.motors, self.curr_pos)
    #read position
    def read_pos(self):
        return self.dxl_client.read_pos()
    #read velocity
    def read_vel(self):
        return self.dxl_client.read_vel()
    #read current
    def read_cur(self):
        return self.dxl_client.read_cur()


In [11]:
leap_hand = LeapNode()

In [15]:
from dynamixel_sdk import PortHandler, PacketHandler, GroupSyncRead

In [18]:
PROTOCOL_VERSION = 2.0
BAUDRATE = 4000000  # or any other baud rate supported by your motors
DEVICENAME = '/dev/ttyUSB1'  # Check the correct port name for your system
MOTOR_IDS = [1, 2, 3, 4] 

In [26]:
# Control table addresses
ADDR_PRESENT_POSITION = 132
LEN_PRESENT_POSITION = 4

# Initialize PortHandler and PacketHandler instances
port_handler = PortHandler(DEVICENAME)
packet_handler = PacketHandler(PROTOCOL_VERSION)

# Open port
if not port_handler.openPort():
    print("Failed to open the port.")
    raise SystemExit

# Set port baudrate
if not port_handler.setBaudRate(BAUDRATE):
    print("Failed to set the baudrate.")
    raise SystemExit

# Initialize GroupSyncRead for reading present positions
group_sync_read = GroupSyncRead(port_handler, packet_handler, ADDR_PRESENT_POSITION, LEN_PRESENT_POSITION)

# Add parameter storage for the present position value of each motor
for motor_id in MOTOR_IDS:
    group_sync_read.addParam(motor_id)



In [27]:
try:
    while True:
        # Perform synchronized read of the current positions
        dxl_comm_result = group_sync_read.txRxPacket()
        print(dxl_comm_result)
        if dxl_comm_result != COMM_SUCCESS:
            print(packet_handler.getTxRxResult(dxl_comm_result))

        positions = []
        for motor_id in MOTOR_IDS:
            # Check if data is available for each motor
            dxl_getdata_result = group_sync_read.isAvailable(motor_id, ADDR_PRESENT_POSITION, LEN_PRESENT_POSITION)
            if dxl_getdata_result != True:
                print(f"Failed to get position data for motor {motor_id}")
                continue
            
            # Get present position data
            dxl_present_position = group_sync_read.getData(motor_id, ADDR_PRESENT_POSITION, LEN_PRESENT_POSITION)
            positions.append(dxl_present_position)
        
        print(f"Current Positions: {positions}")
        time.sleep(1)  # Adjust the sleep time as needed

except KeyboardInterrupt:
    print("Process interrupted by user.")
finally:
    # Close port
    port_handler.closePort()

0


NameError: name 'COMM_SUCCESS' is not defined