TODO list:

- wire all 3 servos reliably
- test communication with all servos at once
- create mounts on servo arms (small metal piece)

In [1]:
import serial
import numpy as np
from collections import defaultdict
import time
import threading

In [129]:
class ServosSerial:
    def __init__(self, num_servos, port='/dev/ttyACM0', baudrate=115200, offset=0.0):
        self.offset = offset
        self.min_angle = -np.pi/4.
        self.max_angle = np.pi/4.
        
        self.num_servos = num_servos
        self.serial = serial.Serial(port, baudrate=baudrate)
        self.curr_angles = [0]*self.num_servos
        self.set_angles(self.curr_angles)
            
    def is_valid_servo_num(self, servo_num):
        is_valid = 0 <= servo_num < self.num_servos
        if not is_valid:
            print('WARNING: servo_num {0} not in [{1}, {2})'.format(servo_num, 0, self.num_servos))
        return is_valid
    
    def is_valid_angles(self, angles):
        is_valid = len(angles) == self.num_servos
        for angle in angles:
            is_valid = is_valid and (self.min_angle <= angle <= self.max_angle)
            if not is_valid:
                print('WARNING: angle {0} not in [{1},{2}]'.format(angle, self.min_angle, self.max_angle))
        return is_valid
    
    def get_angles(self):
        return list(self.curr_angles)
        
    def set_angles(self, des_angles, speed=None, fs=100.):
        """
        :param des_angle: desired angles (radians). None means don't move
        :param speed: if None, go as fast as possible. Else rad/s
        :param fs: if speed is not None, command discretization
        """
        des_angles = list(des_angles)
        for i, des_angle in enumerate(des_angles):
            if des_angle is None:
                des_angles[i] = self.curr_angles[i]
        
        if speed is None:
            self.write(des_angles)
        else:            
            angle_trajs = list()
            # create trajectories
            for curr_angle, des_angle in zip(self.curr_angles, des_angles):
                sign = 1 if des_angle > curr_angle else -1
                angle_traj = [curr_angle] + list(np.r_[curr_angle:des_angle:sign*speed/fs])
                angle_trajs.append(angle_traj)
            
            # make them all the same length
            max_angle_traj_len = max(len(traj) for traj in angle_trajs)
            for i, traj in enumerate(angle_trajs):
                pad_len = max_angle_traj_len - len(traj)
                if pad_len > 0:
                    angle_trajs[i] = np.pad(np.array(traj), (1, pad_len-1), 'edge')
                
            angle_trajs = np.array(angle_trajs).T
            for angles in angle_trajs:
                self.write(angles)
            
    def write(self, angles, fs=100.):
        if not self.is_valid_angles(angles):
            return
        
        write_str = ''
        for i, angle in enumerate(angles):
            if angle is None:
                angle = self.curr_angles[i]
            write_str = '{0}{1}\n'.format(i, self.offset+angle)
            self.serial.write(write_str)
            time.sleep(1/(3*fs))
            self.curr_angles[i] = angle
        
servos = ServosSerial(3)

In [130]:
curr_angles = servos.get_angles()
des_angles = [0., 0., 0.]
speed = 0.5
expected_time = abs(des_angles[0] - curr_angles[0])/speed

start = time.time()
servos.set_angles(des_angles, speed=speed)
end = time.time()

print('Expected time: {0}'.format(expected_time))
print('Elapsed: {0}'.format(end - start))

Expected time: 0.0
Elapsed: 0.0114212036133


In [161]:
class ServosSDRs:
    def __init__(self, servos, sdrs):
        self.servos = servos
        self.sdrs = sdrs
        
        self.speed = np.pi/4.
        self.run_flag = False
        self.is_stopped = False
        
    def start(self, speed=None):
        if self.run_flag:
            return
        
        if speed is not None:
            self.speed = speed
        
        self.run_flag = True
        self.thread = threading.Thread(target=self.run, args=())
        self.thread.daemon = True                            # Daemonize thread
        self.thread.start()                 
    
    def stop(self):
        if not self.run_flag:
            return
        
        self.run_flag = False
        
        while not self.is_stopped:
            time.sleep(0.2)
        self.is_stopped = False
    
    def run(self, fs=200.):
        # TODO: add SDR reading
        while self.run_flag:
            self.servos.set_angles([self.servos.min_angle]*self.servos.num_servos, speed=self.speed, fs=fs)
            if not self.run_flag:
                break
            self.servos.set_angles([self.servos.max_angle]*self.servos.num_servos, speed=self.speed, fs=fs)
            
        self.is_stopped = True
            
servos_sdrs = ServosSDRs(servos, [None]*servos.num_servos) # TODO: SDR

In [162]:
servos_sdrs.start()

In [163]:
servos_sdrs.stop()