In [1]:
from turret import Turret
from cybergear_motor_controller import CyberGearMotorController
import logging
import can

In [2]:
bus = can.Bus(channel="can0", interface="socketcan")
pitch_motor = CyberGearMotorController(bus=bus, motor_can_id=100)
yaw_motor = CyberGearMotorController(bus=bus, motor_can_id=101)
turret = Turret(yaw_motor, pitch_motor, dict())

In [28]:
turret.zero_all()


In [4]:
# turret.disable()

In [44]:
import time


class VisionTurretController:
    def __init__(self, turret, center_coordinate, yaw_config, pitch_config, queue):
        self.turret = turret
        self.center_coordinate = center_coordinate
        self.yaw_config = yaw_config.copy()
        self.pitch_config = pitch_config.copy()

        self.queue = queue

        # Private variables
        self._last_sample_time = None
        
        # Initialize some variables
        self.yaw_config["intergal"] = 0
        self.pitch_config["intergal"] = 0
        self.yaw_config["last_error"] = 0
        self.pitch_config["last_error"] = 0

    def update_coordinate(self, new_coordinate, *put_args, **put_kwargs):
        self.queue.put(new_coordinate, *put_args, **put_kwargs)

    def control(self):
        new_coordinate = self.queue.get(block=True)

        # Feedback
        sample_time = time.time()
        delta_x = self.center_coordinate[0] - new_coordinate[0]
        delta_y = self.center_coordinate[1] - new_coordinate[1]

        # Get current position
        current_yaw = self.turret.yaw_motor.get_position()
        current_pitch = self.turret.pitch_motor.get_position()

        # Control
        if self._last_sample_time is None:
            self._last_sample_time = time.time()
        dt = sample_time - self._last_sample_time

        feed_forward_yaw = self.apply_feed_forward_control(current_yaw, self.yaw_config)
        feed_forward_pitch = self.apply_feed_forward_control(current_pitch, self.yaw_config)

        delta_yaw = self.apply_feed_back_kp_control(delta_x, self.yaw_config) + \
                    self.apply_feed_back_ki_control(delta_x, dt, self.yaw_config) + \
                    self.apply_feed_back_kd_control(delta_x, dt, self.yaw_config)

        delta_pitch = self.apply_feed_back_kp_control(delta_y, self.yaw_config) + \
                      self.apply_feed_back_ki_control(delta_y, dt, self.pitch_config) + \
                      self.apply_feed_back_kd_control(delta_x, dt, self.pitch_config)

        new_yaw = current_yaw + delta_yaw + feedforward_yaw
        new_pitch = current_pitch + delta_pitch + feed_forward_pitch
        
        self.turret.yaw_motor.set_position(new_yaw)
        self.turret.pitch_motor.set_position(new_pitch)
    

    def apply_feed_forward_control(self, set_point, config):
        kf = config.get("kf")
        control = kf * set_point
        return control

    def apply_feed_back_kp_control(self, error, config):
        kp = config.get("kp")
        control = kp * error
        return control

    def apply_feed_back_ki_control(self, error, dt, config):
        ki = config.get("ki")
        config["intergal"] += error
        
        control = ki * config["intergal"]
        return control

    def apply_feed_back_kd_control(self, error, dt, config):
        kd = config.get("kd")
        derivative = (error - config["last_error"]) / dt

        control = kd * derivative
        return control


        
        
        

In [50]:
yaw_config = dict(
    kf=0,
    kp=1,
    ki=0,
    kd=0,
)

pitch_config = dict(
    kf=0,
    kp=1,
    ki=0,
    kd=0,
)

center_coordinate = (320, 240)

In [51]:
from multiprocessing import Queue

queue = Queue()



In [52]:
vision_controller = VisionTurretController(turret, center_coordinate, yaw_config, pitch_config, queue)


In [53]:
vision_controller.update_coordinate((320, 240))
vision_controller.control()

0.0 0.0


In [49]:
# Reset position

yaw_motor.set_position(yaw_motor.get_travel() / 2)
pitch_motor.set_position(pitch_motor.get_travel() / 2)