In [1]:
import pathlib
import numpy as np
from collections import OrderedDict
import time

# Common Classes and Functions

In [2]:
from scipy.spatial.transform import Rotation as R


def rot(axis, deg):
    """Compute 3D rotation matrix given euler rotation."""
    return R.from_euler(axis, np.deg2rad(deg)).as_matrix()

In [3]:
class DynamixelMotor(object):
    """DynamixelMotor abstraction class.
    Args:
        root_part (str): name of the part where the motor is attached to (eg 'right_arm.hand')
        name (str): name of the motor (eg. 'shoulder_pitch')
        luos_motor (:py:class:`pyluos.modules.DxlMotor`): pyluos motor
        config (dict): extra motor config (must include 'offset' and 'orientation' fields)
    Wrap the pyluos motor object to simplify and make the API homogeneous.
    """

    def __init__(self, root_part, name, luos_motor, config):
        """Create a DynamixelMotor given its pyluos equivalent."""
        self._root_part = root_part
        self._name = name

        self._motor = luos_motor

        self._offset = config['offset']
        self._direct = config['orientation'] == 'direct'

        self._timer = None
        self._use_static_fix = False

    def __repr__(self):
        """Motor representation."""
        mode = 'compliant' if self.compliant else 'stiff'
        return f'<DxlMotor "{self.name}" pos="{self.present_position}" mode="{mode}">'

    @property
    def name(self):
        """Fullname of the motor (eg. right_arm.hand.gripper)."""
        return f'{self._root_part.name}.{self._name}'

    # Position
    @property
    def present_position(self):
        """Present position (in degrees) of the motor."""
        return self._as_local_pos(self._motor.rot_position)

    @property
    def goal_position(self):
        """Get current goal position (in degrees) of the motor."""
        return self._as_local_pos(self._motor.target_rot_position)

    @goal_position.setter
    def goal_position(self, value):
        if not self.compliant:
            self._motor.target_rot_position = self._to_motor_pos(value)

            if self._use_static_fix:
                self._schedule_static_error_fix(delay=1)

    @property
    def offset(self):
        """Get motor real zero (in degrees)."""
        return self._offset

    def is_direct(self):
        """Check whether the motor is direct or not."""
        return self._direct

    def _as_local_pos(self, pos):
        return (pos if self.is_direct() else -pos) - self.offset

    def _to_motor_pos(self, pos):
        return (pos + self.offset) * (1 if self.is_direct() else -1)

    # Speed
    @property
    def moving_speed(self):
        """Get the maximum speed (in degree per second) of the motor."""
        return self._motor.target_rot_speed

    @moving_speed.setter
    def moving_speed(self, value):
        self._motor.target_rot_speed = value

    # Compliancy
    @property
    def compliant(self):
        """Check whether or not the motor is compliant."""
        return self._motor.compliant

    @compliant.setter
    def compliant(self, value):
        self._motor.compliant = value

    @property
    def torque_limit(self):
        """Check the maximum torque allowed (in %) of the motor."""
        return self._motor.power_ratio_limit

    @torque_limit.setter
    def torque_limit(self, value):
        self._motor.power_ratio_limit = value

    # Temperature
    @property
    def temperature(self):
        """Check the current motor temp. (in °C)."""
        return self._motor.temperature

    def goto(self,
             goal_position, duration,
             starting_point='present_position',
             wait=False, interpolation_mode='linear'):
        """Set trajectory goal for the motor.
        Args:
            goal_position (float): target position (in degrees)
            duration (float): duration of the movement (in seconds)
            starting_point (str): register used to determine the starting point (eg. 'goal_position' can also be used in some specific case)
            wait (bool): whether or not to wait for the end of the motion
            interpolation_mode (str): interpolation technique used for computing the trajectory ('linear', 'minjerk')
        Returns:
            reachy.trajectory.TrajectoryPlayer: trajectory player that can be used to monitor the trajectory, stop it, etc
        """
        if interpolation_mode not in interpolation_modes.keys():
            available = tuple(interpolation_modes.keys())
            raise ValueError(f'interpolation_mode should be one of {available}')

        traj_player = interpolation_modes[interpolation_mode](getattr(self, starting_point), goal_position, duration)
        traj_player.start(self)

        if wait:
            traj_player.wait()

        return traj_player

    def use_static_error_fix(self, activate):
        """Trigger the static error fix.
        Args:
            activate (bool): whether to activate/deactivate the static error issue fix
        If activated, the static error fix will check the reach position a fixed delay after the send of a new goal position.
        The static error may result in the motor's load increasing, and yet not managing to move.
        To prevent this behavior we automatically adjust the target goal position to reduce this error.
        """
        self._use_static_fix = activate

    # Patch dynamixel controller issue when the motor forces
    # while not managing to reach the goal position
    def _schedule_static_error_fix(self, delay):
        if self._timer is not None:
            self._timer.cancel()
        self._timer = Timer(delay, self._fix_static_error)
        self._timer.start()

    def _fix_static_error(self, threshold=2):
        error = (self.present_position - self.goal_position)

        if abs(error) > threshold:
            pos = self.goal_position + error / 2
            logger.info('Fix static error controller', extra={
                'goal_position': self.goal_position,
                'present_position': self.present_position,
                'fixed_goal_position': pos,
            })

            self._motor.target_rot_position = self._to_motor_pos(pos)
            self._timer = None

I needed to modify find_dxl() and attach_dxl_motors() get the correct Luos gate name.

In [4]:
def find_dxl(io, part_name, dxl_name, dxl_config):
    """Get a specific dynamixel motor from the IO.
    Only goal position is used atm.
    """
    pos = dxl_config['offset'] * (-1 if dxl_config['orientation'] == 'indirect' else 1)
    m = WsMotor(name=f'{part_name}.{dxl_name}', initial_position=pos)
    
    io.motors.append(m)
    io.ws.motors[m.name] = m
    return m

In [5]:
def attach_dxl_motors(io, part_name, dxl_motors):
    """Attach given dynamixel motors to a part.
    Args:
        dxl_motors (dict): motors config, the config must at least include an id for each motor (see attach_kinematic_chain for extra parameters)
    """
    motors_dict = {}
    
    
    
    class Root(object):
        def __init__(self, name):
            self.name = name
            
    root = Root(part_name)
    
    for motor_name, config in dxl_motors.items():
        dxl = find_dxl(io, part_name, motor_name, config)
        m = DynamixelMotor(root, motor_name, dxl, config)
        motors_dict[motor_name] = m

    return motors_dict

# Define WebSocket components

These are just supporting classes for WebSocket IO. Luos has similar IO classes which share the same interface. https://github.com/CircuitLaunch/reachy/blob/master/software/reachy/io/luos.py
You should be able to replace the WebSocket IO classes with the Luos ones with few modifications.

In [6]:
from threading import Thread, Event
import asyncio
import websockets
import json

In [7]:
class WsServer(object):
    """WebSocket server, sync value from the modules with their equivalent from the client."""

    def __init__(self, host='0.0.0.0', port=6171):
        """Prepare the ws server."""
        self.host, self.port = host, port
        self.running = Event()

        self.parts = []
        self.motors = {}

    async def sync(self, websocket, path):
        """Sync loop that exchange modules state with the client."""
        self.running.set()

        while self.running.is_set():
            if not websocket.open:
                break

            msg = json.dumps({
                'motors': [
                    {'name': m.name, 'goal_position': m.target_rot_position}
                    for m in sum([p.motors for p in self.parts], [])
                ],
                'disks': [
                    {'name': m.name, 'goal_position': m.target_rot_position}
                    for m in sum([p.disks for p in self.parts], [])
                ],
            })
            await websocket.send(msg.encode('UTF-8'))

            resp = await websocket.recv()
            state = json.loads(resp)

            if hasattr(self, 'cam'):
                eye = f'{self.cam.active_side}_eye'
                if eye in state:
                    jpeg_data = b64decode(state[eye])
                    self.cam.frame = np.array(Image.open(BytesIO(jpeg_data)))

            for m in state['motors']:
                if m['name'] in self.motors:
                    self.motors[m['name']].rot_position = m['present_position']

            if hasattr(self, 'left_force_sensor') and 'left_force_sensor' in state:
                self.left_force_sensor.load = state['left_force_sensor']
            if hasattr(self, 'right_force_sensor') and 'right_force_sensor' in state:
                self.right_force_sensor.load = state['right_force_sensor']

    def close(self):
        """Stop the sync loop."""
        self.running.clear()
        self.t.join()

    def register(self, io):
        """Register a new io (and its module) to be synced."""
        self.parts.append(io)

    def run_forever(self):
        """Run the sync loop forever."""
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        serve = websockets.serve(self.sync, self.host, self.port)

        loop.run_until_complete(serve)
        loop.run_forever()

    def run_in_background(self):
        """Run the sync loop forever in background."""
        self.t = Thread(target=self.run_forever)
        self.t.daemon = True
        self.t.start()

WsServer.sync() syncs the state back and forth between the internal state (in Python) and Luos. The "while self.running.is_set():" loop first converts the internal state to json then does:
```
await websocket.send(msg.encode('UTF-8'))
```
to send it to Luos. It waits for a response from Luos with:
```
resp = await websocket.recv()
```
The resp is the current state of Luos. Luos' state gets updated into the internal state.

In [8]:
class WsFakeOrbitaDisk(object):
    """Orbital disk placeholder."""

    def __init__(self, name, initial_position):
        """Create fake Orbita disk."""
        self.name = name
        self.compliant = False
        self._target_rot_position = initial_position

    def __repr__(self) -> str:
        """Public Orbita disk string representation."""
        return f'<Orbita "{self.name}" pos="{self.rot_position}>'

    def setup(self):
        """Initialize the disk."""
        pass

    @property
    def rot_position(self):
        """Get the current disk angle position (in deg.)."""
        return self.target_rot_position

    @property
    def temperature(self):
        """Get the current temperature in C."""
        return 37.2

    @property
    def target_rot_position(self):
        """Get the current target angle position (in deg.)."""
        return self._target_rot_position

    @target_rot_position.setter
    def target_rot_position(self, new_pos):
        """Set a new target angle position (in deg.)."""
        self._target_rot_position = new_pos

In [9]:
class WsMotor(object):
    """Motor Placeholder.
    Only the goal position (ie. target_rot_position) is currently used.
    """

    def __init__(self, name, initial_position):
        """Init the fake motor."""
        self.name = name

        self.compliant = False
        self.target_rot_position = initial_position
        self.rot_position = initial_position
        self.temperature = 20

In [10]:
class FakeFan(object):
    """Fake fan module for API consistensy."""

    def on(self):
        """Do nothing."""
        pass

    def off(self):
        """Do nothing."""
        pass

In [11]:
class WsIO(object):
    """WebSocket IO implementation."""

    ws = None

    def __init__(self, part_name):
        """Init an io attached to the given part."""
        self.part_name = part_name
        self.motors = []
        self.disks = []

    @classmethod
    def shared_server(cls, part_name):
        """Create a new io with its ws server."""
        io = cls(part_name)

        if cls.ws is None:
            cls.ws = WsServer()
            cls.ws.run_in_background()
        cls.ws.register(io)

        return io

    def find_module(self, module_name):
        """Get a specific module from the IO.
        For the moment no module are really implemented. Only placeholders for code compatibility are provided.
        """
        if module_name == 'force_gripper':
            force_sensor = WsFakeForceSensor()

            if self.part_name == 'left_arm.hand':
                self.ws.left_force_sensor = force_sensor
            elif self.part_name == 'right_arm.hand':
                self.ws.right_force_sensor = force_sensor

            return force_sensor

        raise NotImplementedError

    def find_dxl(self, dxl_name, dxl_config):
        """Get a specific dynamixel motor from the IO.
        Only goal position is used atm.
        """
        pos = dxl_config['offset'] * (-1 if dxl_config['orientation'] == 'indirect' else 1)
        m = WsMotor(name=f'{self.part_name}.{dxl_name}', initial_position=pos)
        self.motors.append(m)
        self.ws.motors[m.name] = m
        return m

    def find_fan(self, fan_name):
        """Get a specific fan from its name."""
        return FakeFan()

    def find_orbita_disks(self):
        """Get a specific orbita module from the IO.
        Not currently supported.
        """
        bottomOrb = WsFakeOrbitaDisk(name=f'{self.part_name}.disk_bottom', initial_position=-60)
        middleOrb = WsFakeOrbitaDisk(name=f'{self.part_name}.disk_middle', initial_position=-60)
        topOrb = WsFakeOrbitaDisk(name=f'{self.part_name}.disk_top', initial_position=-60)

        disks = [bottomOrb, middleOrb, topOrb]
        self.disks += disks

        return disks

    def find_dual_camera(self, default_camera):
        """Retrieve a dual camera."""
        cam = WsDualCamera(default_camera)
        self.ws.cam = cam
        return cam

    def close(self):
        """Close the WS."""
        self.ws.close()

In [12]:
class WsDualCamera(object):
    """Remote Camera."""

    def __init__(self, default_camera):
        """Set remote camera up."""
        self.set_active(default_camera)
        self.frame = np.zeros((300, 480, 3), dtype=np.uint8)

    @property
    def active_side(self):
        """Get the active camera side."""
        return self._camera_side

    def set_active(self, camera_side):
        """Set one of the camera active (left or right)."""
        self._camera_side = camera_side

    def read(self):
        """Get latest received frame."""
        return True, self.frame

    def close(self):
        """Close the camera."""
        pass

# Initialize WebSocket connection

In [13]:
io = WsIO.shared_server('head')

*Connect Reachy simulator now* 
http://playground.pollen-robotics.com/#quickstart

The only change that you should need to make for Luos is something like this:
```
gate_name = self.name.split('.')[0]
gate_name = f'r_{gate_name}'
self.io = luos.SharedLuosIO.with_gate(gate_name, io)
```

This is similar to what's in ReachyPart.__init__(). https://github.com/CircuitLaunch/reachy/blob/master/software/reachy/parts/part.py

Communication to PyLuos has been abstracted out here:
https://github.com/CircuitLaunch/reachy/blob/master/software/reachy/io/luos.py

# Orbita

## Set up the disks for the orbita

hardware_zero has the initial settings for the orbita motors. 

In [14]:
hardware_zero = np.load('/opt/anaconda/envs/reachy-docker/lib/python3.6/site-packages/reachy/orbita_head_hardware_zero.npy')

Find the motors.

In [15]:
disks_motor = io.find_orbita_disks()
disk_bottom, disk_middle, disk_top = disks_motor

Turn off compliant otherwise your motor might not move.

In [16]:
for d in disks_motor:
    d.compliant = False

In [17]:
def setup_orbita_disks(disks, hardware_zero):
    """Configure each of the three disks.
    .. note:: automatically called at instantiation.
    """
    for disk in disks:
        disk.setup()

    def _find_zero(disk, z):
        A = 360 / (52 / 24)
        p = disk.rot_position

        zeros = [z, -(A - z), A + z]
        distances = [abs(p - z) for z in zeros]
        best = np.argmin(distances)

        return zeros[best]

    time.sleep(0.25)

    for d, z in zip(disks, hardware_zero):
        d.offset = _find_zero(d, z) + 60

Set up the Orbita disks with the initial hardware_zero settings.

In [18]:
setup_orbita_disks(disks_motor, hardware_zero)

## Move the motors

Set orbita disk positions in degrees (-180 to 180)

In [19]:
disk_bottom.target_rot_position = -180
disk_middle.target_rot_position = -180
disk_top.target_rot_position = -180

In [20]:
disk_bottom.target_rot_position = 180
disk_middle.target_rot_position = 180
disk_top.target_rot_position = 180

# Head

Config settings for the antennas. Note that the angle limits are from -150 to 150.

In [21]:
head_dxl_motors = OrderedDict([
        ('left_antenna', {
            'id': 30, 'offset': 0.0, 'orientation': 'direct',
            'angle-limits': [-150, 150],
        }),
        ('right_antenna', {
            'id': 31, 'offset': 0.0, 'orientation': 'direct',
            'angle-limits': [-150, 150],
        }),
    ])

Init the antennas with the config settings.

In [22]:
antennas = attach_dxl_motors(io, "head", head_dxl_motors)

Turn off compliant otherwise your motor might not move.

In [23]:
for m in arm_motors:
    arm_motors[m].compliant = False

NameError: name 'arm_motors' is not defined

## Move the antennas

The antennas uses degree values for goal_position.

In [24]:
antennas['left_antenna'].goal_position = 150

In [25]:
antennas['left_antenna'].goal_position = 0

In [26]:
antennas['left_antenna'].goal_position = -150

In [27]:
antennas['right_antenna'].goal_position = 150

In [28]:
antennas['right_antenna'].goal_position = 0

In [29]:
antennas['right_antenna'].goal_position = -150

# Right Arm

In [30]:
right_arm_dxl_motors = OrderedDict([
        ('shoulder_pitch', {
            'id': 10, 'offset': 90.0, 'orientation': 'indirect',
            'angle-limits': [-180, 60],
            'link-translation': [0, -0.19, 0], 'link-rotation': [0, 1, 0],
        }),
        ('shoulder_roll', {
            'id': 11, 'offset': 90.0, 'orientation': 'indirect',
            'angle-limits': [-100, 90],
            'link-translation': [0, 0, 0], 'link-rotation': [1, 0, 0],
        }),
        ('arm_yaw', {
            'id': 12, 'offset': 0.0, 'orientation': 'indirect',
            'angle-limits': [-90, 90],
            'link-translation': [0, 0, 0], 'link-rotation': [0, 0, 1],
        }),
        ('elbow_pitch', {
            'id': 13, 'offset': 0.0, 'orientation': 'indirect',
            'angle-limits': [0, 125],
            'link-translation': [0, 0, -0.28], 'link-rotation': [0, 1, 0],
        }),
    ])

Init the arm motors.

In [31]:
arm_motors = attach_dxl_motors(io, "right_arm", right_arm_dxl_motors)

Turn off compliant otherwise your motor might not move.

In [32]:
for m in arm_motors:
    arm_motors[m].compliant = False

## Move the Arm

In [107]:
arm_motors['elbow_pitch'].goal_position = -0.0

In [108]:
arm_motors['elbow_pitch'].goal_position = -1.0

In [109]:
arm_motors['elbow_pitch'].goal_position = -2.0

In [110]:
arm_motors['elbow_pitch'].goal_position = -4.0

In [111]:
arm_motors['elbow_pitch'].goal_position = -8.0

In [112]:
arm_motors['elbow_pitch'].goal_position = -16.0

In [113]:
arm_motors['elbow_pitch'].goal_position = -32.0

In [114]:
arm_motors['elbow_pitch'].goal_position = -64.0

In [115]:
arm_motors['elbow_pitch'].goal_position = -32.0

In [116]:
arm_motors['elbow_pitch'].goal_position = -16.0

In [117]:
arm_motors['elbow_pitch'].goal_position = -8.0

In [118]:
arm_motors['elbow_pitch'].goal_position = -4.0

In [119]:
arm_motors['elbow_pitch'].goal_position = -2.0

In [120]:
arm_motors['elbow_pitch'].goal_position = -1.0

In [121]:
arm_motors['elbow_pitch'].goal_position = 0.0