Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
173 lines (129 sloc) 4.99 KB
import multiprocessing as mp
from time import sleep
from time import time
from vesc import CanVesc
import sys
import os
import math
import numpy as np
import threading
def main():
if len(sys.argv) != 2:
print('Provide CAN device name (can0, slcan0 etc.)')
sys.exit(0)
vesc = CanVesc(sys.argv[0])
mythread = ControlThread(name = "MotionControlThread") # ...Instantiate a thread and pass a unique ID to it
mythread.start() # ...Start the thread
lock = threading.Lock()
sleep(0.1)
while True:
with lock:
if mythread.motor0.position > 5:
mythread.motor0.velocity_goal = -1.0
if mythread.motor0.position < -5:
mythread.motor0.velocity_goal = 1.0
sleep(4)
with lock:
mythread.motor0.velocity_goal = -1.0
sleep(4)
with lock:
mythread.motor0.velocity_goal = 1.0
vesc.set_motor_rpm(1000, 0)
#print(b'\x00\x08\x00\x80\x07\x00\x00\x00\x01\x00\x08\x00\x00\x03\xe8\x00')
# create a raw socket and bind it to the given CAN interface
class ControlThread(threading.Thread):
def __init__(self, name, lock, event, vesc):
threading.Thread.__init__(self)
self.name = name
self.lock = lock
self.event = event
self.vesc = vesc
self.motor0 = Motor(vesc, 0)
#self.motor1 = Motor(vesc, 1)
def run(self):
print("{} started!".format(self.getName()))
start_time = time()
_TIMESTEP = 0.001
_RENDERESTEP = .1
tick = time()
rendertick = tick
motors = [self.motor0] #, self.motor1, self.motor2]
self.motor0.acceleration = .5
self.motor0.velocity_goal = 1.0
self.motor0.velocity = -1.0
while True:
clock = time()
if clock > tick1:
tick1 += _RENDERESTEP
self.moto0r.plot_position(cols, 10.0)
if clock > tick2:
tick2 += _TIMESTEP
self.motor.tick_velocity(_TIMESTEP)
class Motor:
def __init__(self):
self.velocity_command = 0
self.velocity_feedback = 0
self.position = 180
self.acceleration = 0
self.velocity_goal = 0
self.vesc = vesc
self.canId = canId
self.errors = [0] * _NUM_ERRORS_TO_AVERAGE
self.last_error = 0
# def set_speed(self):
# print_spaces(self.velocity)
# def plot_position(self, cols, scale_factor):
# s = cols/2 + ((cols/2) * (self.position / scale_factor))
# print_spaces(int(s))
def update_values(self, interval, overrun):
if math.fabs(self.velocity_command) > MAX_VELOCITY:
self.velocity_command = 0
error = self.velocity_command - self.velocity_feedback
self.errors.insert(0, error)
self.errors.pop()
error_diffs = [i-j for i, j in zip(self.errors[:-1], self.errors[1:])]
average_error_diffs = 0
for e in error_diffs:
average_error_diffs += e
average_error_diffs /= len(error_diffs)
print("Velocity Command %r, velocity_feedback %r, error %r" % (self.velocity_command, self.velocity_feedback, average_error_diffs))
print("error is:")
print(self.errors)
d_comp = interval/(interval+overrun)
current_command = error * P_SCALAR + D_SCALAR * average_error_diffs * d_comp
self.last_error = error
if (current_command * np.sign(self.velocity_command)) > 0:
brake = False
if current_command > MAX_CURRENT:
current_command = MAX_CURRENT
elif current_command < -MAX_CURRENT:
current_command = -MAX_CURRENT
else:
brake = True
if current_command > MAX_BRAKE_CURRENT:
current_command = MAX_BRAKE_CURRENT
elif current_command < -MAX_BRAKE_CURRENT:
current_command = -MAX_BRAKE_CURRENT
self.set_current(current_command, brake)
def set_current(self, current_command, brake=True):
self.vesc.set_motor_current(current_command, self.canId, brake)
def tick_velocity(self, tick_length):
if self.velocity != self.velocity_setpoint:
increment = self.acceleration * tick_length * np.sign(self.velocity_goal-self.velocity_command)
#print("velocity %g, velocity setpoint %f, increment %f, position %g" % (self.velocity, self.velocity_setpoint, increment, self.position))
self.velocity_command += increment
self.tick_position(tick_length)
def tick_position(self, tick_length):
self.position += self.velocity_command * tick_length
def disable(self):
self.velocity_command = 0
self.position = 0
self.acceleration = 0
self.velocity_goal = 0
self.set_current(-0.001, brake=True)
print('DISABLE')
def print_spaces(num_spaces):
print(' ' * num_spaces + '.')
if __name__ == "__main__":
# execute only if run as a script
main()