## robot server

In [1]:
#!/usr/bin/env python
#example
import grpc
import rospy
from concurrent import futures
# import the publisher and subscriber node
from ros_bridge import RosBridge
# import the gRPC generated classes
from robo_gym_server_modules.robot_server.grpc_msgs.python import robot_server_pb2, robot_server_pb2_grpc

class Client():

    def __init__(self, server_port):
        self.channel = grpc.insecure_channel('[::]:' + repr(server_port))
        self.robot_server_stub = robot_server_pb2_grpc.RobotServerStub(self.channel)

    def set_state(self, state):
        # Old method, to be gradually replaced in all the stack
        msg = self.robot_server_stub.SetState(robot_server_pb2.State(state = state), timeout = 60)
        return msg.success

    def set_state_msg(self, state_msg):
        msg = self.robot_server_stub.SetState(state_msg, timeout = 60)
        return msg.success

    def get_state(self,):
        # Old method, to be gradually replaced in all the stack
        msg = self.robot_server_stub.GetState(robot_server_pb2.Empty(), timeout = 20)
        if msg.success == 1:
            return msg.state
        else:
            raise Exception('Error while retrieving state')

    def get_state_msg(self,):
        msg = self.robot_server_stub.GetState(robot_server_pb2.Empty(), timeout = 20)
        if msg.success == 1:
            return msg
        else:
            raise Exception('Error while retrieving state')

    def send_action(self, action):
        msg = self.robot_server_stub.SendAction(robot_server_pb2.Action(action = action), timeout = 20 )
        return msg.success

    def send_action_get_state(self, action):
        msg = self.robot_server_stub.SendActionGetState(robot_server_pb2.Action(action = action), timeout = 20 )
        if msg.success == 1:
            return msg
        else:
            raise Exception('Error while retrieving state')


## Test robotsever


### init robot server

In [None]:
rospy.loginfo('Starting Example Robot Server...')
server_port = 50051
client = Client(server_port)
channel = grpc.insecure_channel('[::]:' + repr(server_port))
robot_server_stub = robot_server_pb2_grpc.RobotServerStub(channel)

### get state

In [None]:
state = client.get_state()
print("state: ", state)

In [None]:
get_state_msg =client.get_state_msg()
# print(get_state_msg)
print("state: ",get_state_msg.state)
print("float_params: ", get_state_msg.float_params)
print("string_params: ", get_state_msg.string_params)
print("state_dict: ", get_state_msg.state_dict)
state = get_state_msg.state
print(len(state))



### set state

In [None]:
import numpy as np
link_names = [  'front_left_steering_link', 
                'front_right_steering_link', 
                'rear_left_steering_link', 
                'rear_right_steering_link'] 
link_state = dict.fromkeys(link_names, np.deg2rad(90))
link_state.update({'reset_links': False})
state_dict_msg=link_state
print(state_dict_msg)


In [None]:
# print(link_state)
joint_pos_ref = [1,2,3,4]
joint_vel_ref = [0,0,0,0]
state_msg = joint_pos_ref + joint_vel_ref
print(state_msg)
print(state_msg[:4])
print(state_msg[4:8])

In [None]:

# state_msg = robot_server_pb2.State(state_dict = client.get_state_msg().state_dict)
# link_state = {'rear_right_steering_link': 0}
link_state_msg = robot_server_pb2.State(state=state_msg, state_dict=state_dict_msg)
print(link_state_msg)
client.set_state_msg(link_state_msg)


### send action

In [None]:
import time
class Robot:
    def __init__(self):
        self.get_parameters()

    def get_parameters(self):
        self.limit_upper = rospy.get_param('/limit_upper', +120.)
        self.limit_lower = rospy.get_param('/limit_lower', -120.)

        action_cicle_rate = rospy.get_param('/action_cycle_rate', 25.)
        robot_cicle_rate = rospy.get_param('/robot_actuation_cycle_rate', action_cicle_rate)
        action_generation_time = rospy.get_param('/action_generation_time', 0.01)
        action_cicle_time = 1/action_cicle_rate
        robot_cicle_time = 1/robot_cicle_rate
        sleep_time = action_cicle_time - action_generation_time
        
        self.ratio= int(robot_cicle_rate/action_cicle_rate)
        self.ac_rate = action_cicle_rate
        self.rc_rate = robot_cicle_rate
        self.control_period = sleep_time

        # iinit finish msg
        print(f"============Config============")
        print(f"ac_rate: {action_cicle_rate}, rc_rate: {robot_cicle_rate}, ratio: {self.ratio}")
        print(f"ac time: {action_cicle_time}, rc_time: {robot_cicle_time}")
        print(f"sleep_time: {sleep_time}, action_generation_time {action_generation_time}")
robot = Robot()

In [None]:
for i in range(10):
    action = [1000, 1000, 1000, 1000]
    print(action)
    print(type(action))
    client.send_action(action)
    time.sleep(robot.control_period)

In [None]:
for i in range(10):
    action = [100, 1000, 1000, 1000]
    # print(action)
    obs = client.send_action_get_state(action)
    print(obs.state)
    time.sleep(robot.control_period)

## contrler PID

In [144]:
class Controller:
    def __init__(self, K=[1, 0, 0], sp=0, max=None):
        self.kp, self.ki, self.kd= K
        self.sp= sp
        self.last_error= 0
        self.last_time= time.time()
        self.eint= 0
        self.dt = 0 
        self.max =  max

    def get_action(self, angle, velocity, sp=None):
        current_time = time.time()
        if sp:
            self.sp = sp
        self.dt = current_time - self.last_time
        self.pv = angle
        error = self.sp - self.pv
        self.eint += error * self.dt
        self.edot =  (error - self.last_error) / self.dt
        self.cv = self.kp * error + self.ki * self.eint + self.kd * self.edot
        cv = np.clip(self.cv,-self.max,self.max)
        self.last_error= error
        self.last_time= current_time
        return cv
        

In [None]:
import copy
import time
from gym.utils import seeding


class AgribotEnvTest():
    def __init__(self, rs_address=50051):
        self.get_parameters()
        self.client = Client(rs_address)
        self.link_names = [
            'front_left_steering_link', 
            'front_right_steering_link', 
            'rear_left_steering_link', 
            'rear_right_steering_link'
            ] 
        self.joint_names = [
            'front_left_steering_joint', 
            'front_right_steering_joint', 
            'rear_left_steering_joint', 
            'rear_right_steering_joint', 
            'front_left_wheel_joint', 
            'front_right_wheel_joint', 
            'rear_left_wheel_joint', 
            'rear_right_wheel_joint'
            ]
        self.seed()

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def _check_rs_state_keys(self, rs_state) -> None:
        keys = self.get_robot_server_composition()
        if not len(keys) == len(rs_state.keys()):
            print("Robot Server state keys to not match. Different lengths.")

        for key in keys:
            if key not in rs_state.keys():
                print("Robot Server state keys to not match")


    def get_state(self):
        state_msg = self.client.get_state_msg()
        state = state_msg.state
        state_dict = state_msg.state_dict
        self.joint_position = copy.deepcopy(state[:4])
        self.joint_velocity = copy.deepcopy(state[4:8])
        self.joint_position_ref = copy.deepcopy(state[12:16])
        self.joint_velocity_ref = copy.deepcopy(state[16:20])
        return state_dict
    
    def set_state(self,joint_pos_ref=[0]*4,joint_vel_ref=[0]*4, state_dict={}):
        state_msg = joint_pos_ref + joint_vel_ref
        state_dict_msg = state_dict
        link_state_msg = robot_server_pb2.State(state=state_msg, state_dict=state_dict_msg)
        self.client.set_state_msg(link_state_msg)

    def send_action(self, action=[0]*4):
        self.client.send_action(action)
    
    def reset(self):
        # print(link_state)
        state_dict_msg = {}
        joint_pos_ref = [self.np_random.uniform(low=self.limit_lower, high=self.limit_upper) for _ in range(4)]
        joint_vel_ref = [self.np_random.uniform(low=5, high=self.limit_velocity) for _ in range(4)]
        state_msg = joint_pos_ref + joint_vel_ref

        link_state_msg = robot_server_pb2.State(state=state_msg, state_dict=state_dict_msg)
        self.client.set_state_msg(link_state_msg)

        return self.get_state()

    def step(self, action=[0]*4):
        if not len(action) == 4:
            print("not 4 actions")

        rs_state = self.client.send_action_get_state(action).state_dict
        self._check_rs_state_keys(rs_state)
        return rs_state

    def get_parameters(self):
        self.limit_upper = rospy.get_param('/limit_upper', +120.)
        self.limit_lower = rospy.get_param('/limit_lower', -120.)
        self.limit_effort = rospy.get_param('/limit_effort', 1000.)
        self.limit_velocity = rospy.get_param('/limit_velocity', 10.)

        self.action_cicle_rate = rospy.get_param('/action_cycle_rate', 25.)
        self.robot_cicle_rate = rospy.get_param('/robot_actuation_cycle_rate', self.action_cicle_rate)
        self.action_generation_time = rospy.get_param('/action_generation_time', 0.01)
        self.action_cicle_time = 1/self.action_cicle_rate
        self.robot_cicle_time = 1/self.robot_cicle_rate
        self.sleep_time = self.action_cicle_time - self.action_generation_time
        
        self.ratio= int(self.robot_cicle_rate/self.action_cicle_rate)
        self.ac_rate = self.action_cicle_rate
        self.rc_rate = self.robot_cicle_rate
        self.control_period = self.sleep_time
        
    def print_param(self):
        print(f"limit_upper: {self.limit_upper}")
        print(f"limit_lower: {self.limit_lower}")
        print(f"limit_effort: {self.limit_effort}")
        print(f"limit_velocity: {self.limit_velocity}")
        # iinit finish msg
        print(f"============Config============")
        print(f"ac_rate: {self.action_cicle_rate}, rc_rate: {self.robot_cicle_rate}, ratio: {self.ratio}")
        print(f"ac time: {self.action_cicle_time}, rc_time: {self.robot_cicle_time}")
        print(f"sleep_time: {self.sleep_time}, action_generation_time {self.action_generation_time}")

    def get_robot_server_composition(self) -> list:
        rs_state_keys = [
            'front_left_steering_joint_position',
            'front_right_steering_joint_position',
            'rear_left_steering_joint_position',
            'rear_right_steering_joint_position',

            'front_left_steering_joint_velocity',
            'front_right_steering_joint_velocity',
            'rear_left_steering_joint_velocity',
            'rear_right_steering_joint_velocity',
            'front_left_wheel_joint_velocity',
            'front_right_wheel_joint_velocity',
            'rear_left_wheel_joint_velocity',
            'rear_right_wheel_joint_velocity',

            'front_left_steering_joint_position_ref',
            'front_right_steering_joint_position_ref',
            'rear_left_steering_joint_position_ref',
            'rear_right_steering_joint_position_ref',
            
            'front_left_steering_joint_velocity_ref',
            'front_right_steering_joint_velocity_ref',
            'rear_left_steering_joint_velocity_ref',
            'rear_right_steering_joint_velocity_ref'
        ]
        return rs_state_keys

agribot = AgribotEnvTest()
agribot.print_param()

joint_pos_ref = [1,2,-1,-2]
# joint_vel_ref = [0,1,0,1]
# agribot.set_state(joint_pos_ref,joint_vel_ref)
agribot.set_state()
action = [-1000, -1000, -1000, -1000]
for _ in range(5):
    agribot.send_action(action)
agribot.send_action()
obs = agribot.get_state()
print(f"joint_position:{agribot.joint_position}")
print(f"joint_velocity:{agribot.joint_velocity}")
print(f"joint_position_ref:{agribot.joint_position_ref}")
print(f"joint_velocity_ref:{agribot.joint_velocity_ref}")
print(obs)

In [None]:
print(agribot.reset())
print(f"joint_position:{agribot.joint_position}")
print(f"joint_velocity:{agribot.joint_velocity}")
print(f"joint_position_ref:{agribot.joint_position_ref}")
print(f"joint_velocity_ref:{agribot.joint_velocity_ref}")

In [None]:
import math
import time
import numpy as np

obs = agribot.reset()
K = [2000, 100, 500] #no_fmu
# K = [5, 0.1, 5]

controller = [
    Controller(K, sp=obs['front_left_steering_joint_position_ref'], max=agribot.limit_effort), 
    Controller(K, sp=obs['front_right_steering_joint_position_ref'], max=agribot.limit_effort), 
    Controller(K, sp=obs['rear_left_steering_joint_position_ref'], max=agribot.limit_effort), 
    Controller(K, sp=obs['rear_right_steering_joint_position_ref'], max=agribot.limit_effort)]

action = [0]*4
Done = True
it = 0
tol = np.deg2rad(1)
time.sleep(agribot.control_period)

while Done:
    action[0] = controller[0].get_action(angle=obs['front_left_steering_joint_position'], velocity=obs['front_left_steering_joint_velocity'], sp=obs['front_left_steering_joint_position_ref'])
    action[1] = controller[1].get_action(angle=obs['front_right_steering_joint_position'], velocity=obs['front_right_steering_joint_velocity'], sp=obs['front_right_steering_joint_position_ref'])
    action[2] = controller[2].get_action(angle=obs['rear_left_steering_joint_position'], velocity=obs['rear_left_steering_joint_velocity'], sp=obs['rear_left_steering_joint_position_ref'])
    action[3] = controller[3].get_action(angle=obs['rear_right_steering_joint_position'], velocity=obs['rear_right_steering_joint_velocity'], sp=obs['rear_right_steering_joint_position_ref'])
    obs = agribot.step(action)
    time.sleep(agribot.control_period)
    for i in range(4):
        print(f"[{i}]:: obs:{controller[i].pv:.2f} act:{controller[i].cv:.2f} dt:{controller[i].dt:.3f} error:{controller[i].last_error:.3f}| sp:{controller[i].sp:.2f}, eint: {controller[i].eint:.2e}| ")

    if abs(controller[0].last_error) < tol and abs(obs['front_left_steering_joint_velocity']) < 0.1 and \
        abs(controller[1].last_error) < tol and abs(obs['front_right_steering_joint_velocity']) < 0.1 and \
        abs(controller[2].last_error) < tol and abs(obs['rear_left_steering_joint_velocity']) < 0.1 and \
        abs(controller[3].last_error) < tol and abs(obs['rear_right_steering_joint_velocity']) < 0.1:
        agribot.reset()
    it +=1
    if it ==1000:
        agribot.reset()

In [None]:
import math
import time
import numpy as np

agribot.get_state()

# print(obs)
sp = np.deg2rad(0)
max_action = robot.limit_effort
K = [2000, 100, 500] #no_fmu
# K = [5, 0.1, 5]
controller = [Controller(K, sp, max_action), Controller(K, sp, max_action), Controller(K, sp, max_action), Controller(K, sp, max_action)]
print(controller)
# # time_check.reset()
action = [0]*4
Done = True
it = 0
tol = np.deg2rad(1)
print(tol)
time.sleep(robot.control_period)

while Done:
    action[0] = controller[0].get_action(angle=obs['front_left_steering_joint_position'], velocity=obs['front_left_steering_joint_velocity'])
    action[1] = controller[1].get_action(angle=obs['front_right_steering_joint_position'], velocity=obs['front_right_steering_joint_velocity'])
    action[2] = controller[2].get_action(angle=obs['rear_left_steering_joint_position'], velocity=obs['rear_left_steering_joint_velocity'])
    action[3] = controller[3].get_action(angle=obs['rear_right_steering_joint_position'], velocity=obs['rear_right_steering_joint_velocity'])
    obs = client.send_action_get_state(action).state_dict
    time.sleep(robot.control_period)
    for i in range(4):
        print(f"[{i}]:: obs:{controller[i].pv:.2f} act:{controller[i].cv:.2f} dt:{controller[i].dt:.3f} error:{controller[i].last_error:.3f}| sp:{controller[i].sp:.2f}, eint: {controller[i].eint:.2e}| ")

    if abs(controller[0].last_error) < tol and abs(obs['front_left_steering_joint_velocity']) < 0.1 and \
        abs(controller[1].last_error) < tol and abs(obs['front_right_steering_joint_velocity']) < 0.1 and \
        abs(controller[2].last_error) < tol and abs(obs['rear_left_steering_joint_velocity']) < 0.1 and \
        abs(controller[3].last_error) < tol and abs(obs['rear_right_steering_joint_velocity']) < 0.1:
        # client.set_state(get_random_state())
        # obs = client.get_state()
        # controller = Controller(K, obs[2], max_action)
        Done=False
    it +=1
    if it ==1000:
        Done=False

## agribot Env

In [141]:
#!/usr/bin/env python3

import numpy as np
import yaml
import os  
import copy

class Agribot():
    """Universal Robots utilities class.

    Attributes:
        max_joint_positions (np.array): Maximum joint position values (rad)`.
        min_joint_positions (np.array): Minimum joint position values (rad)`.
        max_joint_velocities (np.array): Maximum joint velocity values (rad/s)`.
        min_joint_velocities (np.array): Minimum joint velocity values (rad/s)`.
        joint_names (list): Joint names (Standard Indexing)`.

    Joint Names (ROS Indexing):
    [elbow_joint, shoulder_lift_joint, shoulder_pan_joint, wrist_1_joint, wrist_2_joint,
     wrist_3_joint]

    NOTE: Where not specified, Standard Indexing is used. 
    """

    def __init__(self):
        
        self.links = {'front_left_steering_link': [1, 1],
                      'front_right_steering_link': [1, -1],
                      'rear_left_steering_link': [-1, 1],
                      'rear_right_steering_link': [-1, -1] }

        # Joint states
        self.joint_names = [
            'front_left_steering_joint', 
            'front_right_steering_joint', 
            'rear_left_steering_joint', 
            'rear_right_steering_joint', 
            'front_left_wheel_joint', 
            'front_right_wheel_joint', 
            'rear_left_wheel_joint', 
            'rear_right_wheel_joint'
            ]

        self.steering_joint_position = [
            'front_left_steering_joint_position',
            'front_right_steering_joint_position',
            'rear_left_steering_joint_position',
            'rear_right_steering_joint_position'
            ]
        self.steering_joint_velocity = [
            'front_left_steering_joint_velocity',
            'front_right_steering_joint_velocity',
            'rear_left_steering_joint_velocity',
            'rear_right_steering_joint_velocity'
            ]

        self.steering_joint_position_ref = [
            'front_left_steering_joint_position_ref',
            'front_right_steering_joint_position_ref',
            'rear_left_steering_joint_position_ref',
            'rear_right_steering_joint_position_ref'
            ]
        self.steering_joint_velocity_ref = [         
            'front_left_steering_joint_velocity_ref',
            'front_right_steering_joint_velocity_ref',
            'rear_left_steering_joint_velocity_ref',
            'rear_right_steering_joint_velocity_ref'
            ]                  
        
        self.wheel_joint_velocity = [
            'front_left_wheel_joint_velocity',
            'front_right_wheel_joint_velocity',
            'rear_left_wheel_joint_velocity',
            'rear_right_wheel_joint_velocity'
            ]

In [292]:
import numpy as np
import gym
from typing import Tuple
from robo_gym.utils.exceptions import InvalidStateError, RobotServerError, InvalidActionError
import robo_gym_server_modules.robot_server.client as rs_client
from robo_gym_server_modules.robot_server.grpc_msgs.python import robot_server_pb2
from robo_gym.envs.simulation_wrapper import Simulation
from gym.utils import seeding

class AgribotEnv(gym.Env):

    """AgribotEnv environment.

    Args:
        rs_address (str): Robot Server address. Formatted as 'ip:port'. Defaults to None.
  
    Attributes:

        client (:obj:str): Robot Server client.
        real_robot (bool): True if the environment is controlling a real robot.

    """
    real_robot = False
    max_episode_steps = 300
    max_torque = 1000
    max_speed = np.inf
    limit_upper= +np.deg2rad(133)
    limit_lower= -np.deg2rad(133)

    def __init__(self, rs_address=None, rs_state_to_info=False, **kwargs):
        self.agribot = Agribot()
        self.max_speed = self.max_speed
        self.max_torque = self.max_torque
        self.limit_upper= self.limit_upper
        self.limit_lower= self.limit_lower
        self.elapsed_steps = 0

        self.rs_state_to_info = rs_state_to_info

        self.observation_space = self._get_observation_space()
        self.action_space = self._get_action_space()
        self.seed()
        self.rs_state = None
        self.state = None
        
        # Connect to Robot Server
        if rs_address:
            # self.client = rs_client.Client(rs_address)
            self.client = Client(rs_address)

        else:
            print("WARNING: No IP and Port passed. Simulation will not be started")
            print("WARNING: Use this only to get environment shape")

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def _set_initial_robot_server_state(self, rs_state) -> robot_server_pb2.State:
        string_params = {}
        float_params = {}
        state = {}

        state_msg = robot_server_pb2.State(state = state, float_params = float_params, 
                                            string_params = string_params, state_dict = rs_state)
        return state_msg

    def reset(self, joint_positions_ref=None) -> np.ndarray:
        
        """Environment reset.
        """

        if joint_positions_ref: 
            assert len(joint_positions_ref) == 4
        else:
            joint_positions_ref = [self.np_random.uniform(low=self.limit_lower, high=self.limit_upper) for _ in range(4)]

        self.elapsed_steps = 0

        # Initialize environment state
        state_len = self.observation_space.shape[0]
        state = np.zeros(state_len)
        rs_state = dict.fromkeys(self.get_robot_server_composition(), 0.0)

        # Set initial robot joint positions
        self._set_joint_positions_ref(joint_positions_ref)

        # Update joint positions in rs_state
        rs_state.update(self.joint_positions_ref)

        # Set initial state of the Robot Server
        state_msg = self._set_initial_robot_server_state(rs_state)
        
        if not self.client.set_state_msg(state_msg):
            raise RobotServerError("set_state")

        # Get Robot Server state
        rs_state = self.client.get_state_msg().state_dict
 
        # Check if the length and keys of the Robot Server state received is correct
        self._check_rs_state_keys(rs_state)

        # Convert the initial state from Robot Server format to environment format
        observation = self._robot_server_state_to_env_state(rs_state)

        # Check if the environment state is contained in the observation space
        if not self.observation_space.contains(observation):
            print(f"error observation:{observation}")
            # raise InvalidStateError()

        self.rs_state = rs_state

        return observation

    def env_action_to_rs_action(self, action) -> np.ndarray:
        """Convert environment action to Robot Server action"""
        rs_action = copy.deepcopy(action)

        # Scale action
        rs_action = np.multiply(rs_action, self.max_torque)
        # Convert action indexing from ur to ros
        # rs_action = self.ur._ur_joint_list_to_ros_joint_list(rs_action)

        return rs_action     

    def step(self, action) -> Tuple[np.array, float, bool, dict]:

        if type(action) == list: action = np.array(action)

        # action = action.astype(np.float32)
            
        self.elapsed_steps += 1

        # Check if the action is contained in the action space
        if not self.action_space.contains(action):
            print("error action: ", action)
            pass
            # raise InvalidActionError()
            
        rs_action = self.env_action_to_rs_action(action)
        # action = action * 5
        # print(f"rs action:\n{rs_action}")

        # Send action to Robot Server and get state
        rs_state = self.client.send_action_get_state(rs_action.tolist()).state_dict
        self._check_rs_state_keys(rs_state)

        # Convert the state from Robot Server format to environment format
        observation = self._robot_server_state_to_env_state(rs_state)
        print(observation[8:12])
        # print(observation)
        # Check if the environment state is contained in the observation space
        if not self.observation_space.contains(observation):
            print("error observation: ", observation, "action: ", action)
            # raise InvalidStateError()

        self.rs_state = rs_state

        # Assign reward
        reward = 0
        done = False
        reward, done, info = self.reward(rs_state=rs_state, action=action[0])
        if self.rs_state_to_info: info['rs_state'] = self.rs_state

        return observation, reward, done, info

    def reward(self, rs_state, action) -> Tuple[float, bool, dict]:
        done = False
        info = {}
        # print(f"rs_state\n{rs_state}")

        theta = np.array([
            rs_state['front_left_steering_joint_position'],
            rs_state['front_right_steering_joint_position'],
            rs_state['rear_left_steering_joint_position'],
            rs_state['rear_right_steering_joint_position'],
        ])

        thetadot = np.array([
            rs_state['front_left_steering_joint_velocity'],
            rs_state['front_right_steering_joint_velocity'],
            rs_state['rear_left_steering_joint_velocity'],
            rs_state['rear_right_steering_joint_velocity'],
        ])

        thetaref = np.array([
            rs_state['front_left_steering_joint_position_ref'],
            rs_state['front_right_steering_joint_position_ref'],
            rs_state['rear_left_steering_joint_position_ref'],
            rs_state['rear_right_steering_joint_position_ref'],
        ])

        # print(f"theta:\n{theta}")
        # print(f"thetadot:\n{thetadot}")
        # print(f"thetaref:\n{thetaref}")
        error =  thetaref - theta
        reward = abs(error)

        # reward = error ** 2 + 0.1 * (thetadot/self.max_speed) ** 2 + 0.001 * (action/self.max_torque ) ** 2
        
        # print(f"error:{error:.2e}| vel:{thetadot:.2e}| act:{action:.2e}")
        # print(f"error:{error ** 2:.2e}| rev:{0.1 * (thetadot/self.max_speed) ** 2:.2e}| rac:{0.001 * (action/self.max_torque ) ** 2:.2e}")
        # print(f"rewar:{-reward:.2e}")
        if self.elapsed_steps >= self.max_episode_steps:
            done = True
            info['final_status'] = 'max_steps_exceeded'
        
        return -reward.sum(), done, info


    def get_rs_state(self):
        return self.rs_state

    def render():
        pass
    
    def get_robot_server_composition(self, key=None) -> list:
        position = [
            'front_left_steering_joint_position',
            'front_right_steering_joint_position',
            'rear_left_steering_joint_position',
            'rear_right_steering_joint_position']
        velocity = [
            'front_left_steering_joint_velocity',
            'front_right_steering_joint_velocity',
            'rear_left_steering_joint_velocity',
            'rear_right_steering_joint_velocity',
            'front_left_wheel_joint_velocity',
            'front_right_wheel_joint_velocity',
            'rear_left_wheel_joint_velocity',
            'rear_right_wheel_joint_velocity']
        position_ref = [
            'front_left_steering_joint_position_ref',
            'front_right_steering_joint_position_ref',
            'rear_left_steering_joint_position_ref',
            'rear_right_steering_joint_position_ref']
        velocity_ref = [            
            'front_left_steering_joint_velocity_ref',
            'front_right_steering_joint_velocity_ref',
            'rear_left_steering_joint_velocity_ref',
            'rear_right_steering_joint_velocity_ref']

        if key == "position":
            rs_state_keys = position

        elif key == "velocity":
            rs_state_keys = velocity
            
        elif key == "position_ref":
            rs_state_keys = position_ref

        elif key == "velocity_ref":
            rs_state_keys = velocity_ref 
        
        else:
            rs_state_keys = position + velocity + position_ref + velocity_ref

        return rs_state_keys

    def _get_robot_server_state_len(self) -> int:
        """Get length of the Robot Server state.

        Describes the composition of the Robot Server state and returns
        its length.
        """
        return len(self.get_robot_server_composition())

    def _check_rs_state_keys(self, rs_state) -> None:
        keys = self.get_robot_server_composition()
        if not len(keys) == len(rs_state.keys()):
            raise InvalidStateError("Robot Server state keys to not match. Different lengths.")
        
        for key in keys:
            if key not in rs_state.keys():
                raise InvalidStateError("Robot Server state keys to not match")
        return True

    def _robot_server_state_to_env_state(self, rs_state) -> np.ndarray:
        """Transform state from Robot Server to environment format.

        Args:
            rs_state (dict): State in Robot Server format.

        Returns:
            numpy.array: State in environment format.
        """
        
        states = []
        # [0-3]
        for key in self.agribot.steering_joint_position:
            if rs_state[key] <=0:
                states.append(rs_state[key]/abs(self.limit_lower))
            else:
                states.append(rs_state[key]/abs(self.limit_upper))
        # [4-7]
        for key in self.agribot.steering_joint_velocity:
            states.append(rs_state[key])
        # [8-11]
        for key in self.agribot.steering_joint_position_ref:
            if rs_state[key] <=0:
                states.append(rs_state[key]/abs(self.limit_lower))
            else:
                states.append(rs_state[key]/abs(self.limit_upper))
        # [12-15]
        for key in self.agribot.steering_joint_velocity_ref:
            states.append(rs_state[key])
        # [15-19]
        for key in self.agribot.wheel_joint_velocity:
            states.append(rs_state[key])      

        assert len(states) == 20
        
        return np.array(states)

    def _get_observation_space(self) -> gym.spaces.Box:
        """Get environment observation space.

        Returns:
            gym.spaces: Gym observation space object.
        """
 
        # Joint position range tolerance
        pos_tolerance = np.full(4,0.1)

        # Joint positions range used to determine if there is an error in the sensor readings
        max_steering_joint_position = np.add(np.full(4, 1.0), pos_tolerance)
        min_steering_joint_position = np.subtract(np.full(4, -1.0), pos_tolerance)
        # Joint velocities range 
        max_steering_joint_velocity = np.array([np.inf] * 8)
        min_steering_joint_velocity = -np.array([np.inf] * 8)

        # references positions
        max_steering_joint_position_ref = np.add(np.full(4, 1.0), pos_tolerance)
        min_steering_joint_position_ref = np.subtract(np.full(4, -1.0), pos_tolerance)

        max_steering_joint_velocity_ref = np.array([np.inf] * 4)
        min_steering_joint_velocity_ref = -np.array([np.inf] * 4)
        
        # Definition of environment observation_space
        max_obs = np.concatenate((
            max_steering_joint_position, 
            max_steering_joint_velocity,
            max_steering_joint_position_ref,
            max_steering_joint_velocity_ref))

        min_obs = np.concatenate((
            min_steering_joint_position, 
            min_steering_joint_velocity,
            min_steering_joint_position_ref,
            min_steering_joint_velocity_ref))

        observation_space = gym.spaces.Box(
            low=min_obs,
            high=max_obs,
            dtype=np.float64
        )

        return observation_space
    
    def _get_action_space(self)-> gym.spaces.Box:
        """Get environment action space.

        Returns:
            gym.spaces: Gym action space object.
        """
        action_space = gym.spaces.Box(
            low= np.full(4, -1.0),
            high= np.full(4, 1.0), 
            dtype=np.float64
        )
        return action_space

    def _set_joint_positions(self, joint_positions) -> None:
        """Set desired robot joint positions with standard indexing."""
        # Set initial robot joint positions
        self.joint_positions = {}
        self.joint_positions['front_left_steering_joint_position'] = joint_positions[0]
        self.joint_positions['front_right_steering_joint_position'] = joint_positions[1]
        self.joint_positions['rear_left_steering_joint_position'] = joint_positions[2]
        self.joint_positions['rear_right_steering_joint_position'] = joint_positions[3]

    def _set_joint_positions_ref(self, joint_positions) -> None:
        """Set desired robot joint positions with standard indexing."""
        # Set initial robot joint positions
        self.joint_positions_ref = {}
        self.joint_positions_ref['front_left_steering_joint_position_ref'] = joint_positions[0]
        self.joint_positions_ref['front_right_steering_joint_position_ref'] = joint_positions[1]
        self.joint_positions_ref['rear_left_steering_joint_position_ref'] = joint_positions[2]
        self.joint_positions_ref['rear_right_steering_joint_position_ref'] = joint_positions[3]

agribot = AgribotEnv(rs_address=50051)

In [11]:
joint_positions = [agribot.np_random.uniform(low=agribot.limit_lower, high=agribot.limit_upper) for _ in range(4)]
print(joint_positions)
agribot._set_joint_positions(joint_positions)
state_dict=agribot.joint_positions
print(state_dict)

[0.10798399967482064, -1.8757029807555707, -3.0360432170190728, -2.537283971135914]
{'front_left_steering_joint_position': 0.10798399967482064, 'front_right_steering_joint_position': -1.8757029807555707, 'rear_left_steering_joint_position': -3.0360432170190728, 'rear_right_steering_joint_position': -2.537283971135914}


In [170]:
rs_state = agribot.client.get_state_msg().state_dict

states= []
for key in agribot.agribot.steering_joint_position:
    if rs_state[key] <=0:
        states.append(-rs_state[key]/agribot.limit_lower)
    else:
        states.append(rs_state[key]/agribot.limit_upper)
    print(f"rs_state[{key}]{rs_state[key]}|states:{states[-1]} ")

for key in agribot.agribot.steering_joint_position_ref:
    if rs_state[key] <=0:
        states.append(-rs_state[key]/agribot.limit_lower)
    else:
        states.append(rs_state[key]/agribot.limit_upper)
    print(f"rs_state[{key}]{rs_state[key]}|states:{states[-1]} ")
    
print(agribot._robot_server_state_to_env_state(rs_state))

rs_state[front_left_steering_joint_position]-2.3212075233459473|states:-0.7388633025652072 
rs_state[front_right_steering_joint_position]2.321288585662842|states:0.7388891055020717 
rs_state[rear_left_steering_joint_position]2.152250051498413|states:0.6850824689315175 
rs_state[rear_right_steering_joint_position]0.5358936786651611|states:0.17058025586252035 
rs_state[front_left_steering_joint_position_ref]-2.1873457431793213|states:-0.6962537745560088 
rs_state[front_right_steering_joint_position_ref]1.2644098997116089|states:0.4024741712668604 
rs_state[rear_left_steering_joint_position_ref]0.8669301271438599|states:0.2759524301004612 
rs_state[rear_right_steering_joint_position_ref]0.3467949628829956|states:0.11038826516439824 
[-7.38863303e-01  7.38889106e-01  6.85082469e-01  1.70580256e-01
  6.21501822e-03 -1.94049382e-03 -8.14366620e-03 -3.65191634e-04
 -6.96253775e-01  4.02474171e-01  2.75952430e-01  1.10388265e-01
  0.00000000e+00  0.00000000e+00  0.00000000e+00  0.00000000e+00


In [270]:

obs = agribot.reset([0,0,0,0])
print(obs[8:12])

[0. 0. 0. 0.]


In [130]:

observation, reward, done, info = agribot.step([1,1,1,1])
print(f"observation:\n{observation}")
print(f"reward:\n{reward}")
print(f"done:\n{done}")
print(f"info:\n{info}")

reward:
[2.47220904 3.98382628 2.02115041 1.21555561]
sum:
9.692741330713034
observation:
[0.58338167315444, 0.693751952154024, 0.6575084640152405, 0.6941578171727042, 3.4963793754577637, 3.608764886856079, 4.776437282562256, 3.9451000690460205, 0.20354690382874696, 0.5743393374942417, 0.014156308177808241, 0.3072344498533996, 0.0, 0.0, 0.0, 0.0, -0.018487926572561264, -2.9127577363396995e-05, -2.2244295905693434e-05, 0.000783797528129071]
reward:
-9.692741330713034
done:
False
info:
{}


### PID Controller

In [282]:
class Controller:
    def __init__(self, K=[1, 0, 0], sp=0, max=1):
        self.kp, self.ki, self.kd= K
        self.sp= sp
        self.last_error= 0
        self.last_time= time.time()
        self.eint= 0
        self.dt = 0 
        self.max =  max

    def get_action(self, angle, velocity, sp=None):
        current_time = time.time()
        if not sp==None:
            self.sp = sp
        self.dt = current_time - self.last_time
        self.pv = angle
        error = self.sp - self.pv
        self.eint += error * self.dt
        self.edot =  (error - self.last_error) / self.dt
        self.cv = self.kp * error + self.ki * self.eint + self.kd * self.edot
        cv = np.clip(self.cv,-self.max,self.max)
        self.last_error= error
        self.last_time= current_time
        return cv

In [293]:
import math
import time
import numpy as np

obs = agribot.reset()
K = [5, .2, 0.5] #no_fmu
# K = [5, 0.1, 5]

controller = [
    Controller(K, sp=obs[8]), 
    Controller(K, sp=obs[9]), 
    Controller(K, sp=obs[10]), 
    Controller(K, sp=obs[11])]

action = [0]*4
Done = True
it = 0
tol = np.deg2rad(2.5)/np.pi
print(tol)
time.sleep(0.1)

while Done:
    action[0] = controller[0].get_action(angle=obs[0], velocity=obs[4], sp=obs[8])
    action[1] = controller[1].get_action(angle=obs[1], velocity=obs[5], sp=obs[9])
    action[2] = controller[2].get_action(angle=obs[2], velocity=obs[6], sp=obs[10])
    action[3] = controller[3].get_action(angle=obs[3], velocity=obs[7], sp=obs[11])
    obs, reward, done, info = agribot.step(action)
    
    for i in range(4):
        print(f"{it}/[{i}]:obs:{controller[i].pv:.2f}|act:{controller[i].cv:.2f} dt:{controller[i].dt:.3f}|error:{controller[i].last_error:.3f}|sp:{controller[i].sp:.2f}|eint: {controller[i].eint:.2e}")

    if abs(controller[0].last_error) < tol and abs(obs[4]) < 0.1 and \
        abs(controller[1].last_error) < tol and abs(obs[5]) < 0.1 and \
        abs(controller[2].last_error) < tol and abs(obs[6]) < 0.1 and \
        abs(controller[3].last_error) < tol and abs(obs[7]) < 0.1:
        print("!!!!!!!!!!!!!!!")
        agribot.reset()
        it = 0

    if it >= 1000:
        obs_reset = agribot.reset()
        print(obs_reset[8:12]*1)
        it = 0
    it +=1
    time.sleep(0.005)

0.013888888888888888
[-0.34982231  0.62790638 -0.32979111  0.7017132 ]
0/[0]:obs:-0.07|act:-2.76 dt:0.101|error:-0.277|sp:-0.35|eint: -2.79e-02
0/[1]:obs:-0.09|act:7.20 dt:0.101|error:0.722|sp:0.63|eint: 7.28e-02
0/[2]:obs:-0.16|act:-1.70 dt:0.101|error:-0.170|sp:-0.33|eint: -1.72e-02
0/[3]:obs:0.35|act:3.55 dt:0.101|error:0.355|sp:0.70|eint: 3.59e-02
[-0.34982231  0.62790638 -0.32979111  0.7017132 ]
1/[0]:obs:-0.13|act:-0.85 dt:0.106|error:-0.221|sp:-0.35|eint: -5.13e-02
1/[1]:obs:-0.04|act:3.15 dt:0.106|error:0.671|sp:0.63|eint: 1.44e-01
1/[2]:obs:-0.26|act:0.12 dt:0.106|error:-0.070|sp:-0.33|eint: -2.46e-02
1/[3]:obs:0.42|act:1.12 dt:0.106|error:0.286|sp:0.70|eint: 6.63e-02
[-0.34982231  0.62790638 -0.32979111  0.7017132 ]
2/[0]:obs:-0.32|act:0.85 dt:0.097|error:-0.027|sp:-0.35|eint: -5.40e-02
2/[1]:obs:0.15|act:1.38 dt:0.097|error:0.473|sp:0.63|eint: 1.90e-01
2/[2]:obs:-0.28|act:-0.16 dt:0.097|error:-0.051|sp:-0.33|eint: -2.95e-02
2/[3]:obs:0.56|act:-0.07 dt:0.097|error:0.137|sp:0.

KeyboardInterrupt: 