# Assistive Gym - New environment for multi robots

## Create the environment

Let's create a new environment where the goal is for two robots each move its end effector to a person's mouth.

This environment will support all existing robots in Assistive Gym and also support a static human motion.

In [13]:
import numpy as np
import pybullet as p
from gym import spaces

from assistive_gym.envs.env import AssistiveEnv

class ReachingMultiRobotEnv(AssistiveEnv):
    def __init__(self, robot_1, human, robot_2):
        # We will call this the 'feeding' task so we don't have to redefine robot positions/orientations in the agent/sawyer.py and agent/panda.py files.
        obs_robot_1_len =(17 + len(robot_1.controllable_joint_indices) - (len(robot_1.wheel_joint_indices) if robot_1.mobile else 0))
        obs_robot_2_len =(17 + len(robot_2.controllable_joint_indices) - (len(robot_2.wheel_joint_indices) if robot_2.mobile else 0))
        super(ReachingMultiRobotEnv, self).__init__(robot=robot_1, human=human, task='feeding',
                                          obs_robot_len=(obs_robot_1_len + obs_robot_2_len), 
                                          obs_human_len=(18 + len(human.controllable_joint_indices)))

        #update with robot actions space 
        self.robot_1 = robot_1
        self.robot_2 = robot_2
        
        self.obs_robot_1_len = obs_robot_1_len
        self.obs_robot_2_len = obs_robot_2_len
        
        #update action space
        self.action_robot_1_len = len(robot_1.controllable_joint_indices) if robot_1 is not None else 0
        self.action_robot_2_len = len(robot_2.controllable_joint_indices) if robot_2 is not None else 0
        self.action_robot_len = self.action_robot_1_len + self.action_robot_2_len
        self.action_space = spaces.Box(low=np.array([-1.0]*(self.action_robot_len+self.action_human_len), dtype=np.float32), 
                                        high=np.array([1.0]*(self.action_robot_len+self.action_human_len), dtype=np.float32), 
                                        dtype=np.float32)
        self.action_space_robot = spaces.Box(low=np.array([-1.0]*self.action_robot_len, dtype=np.float32), 
                                        high=np.array([1.0]*self.action_robot_len, dtype=np.float32), 
                                        dtype=np.float32)

    def get_obs(self):
        return {'robot_1':self._get_obs(agent='robot_1'),
                'robot_2':self._get_obs(agent='robot_2'),
                'robot':self._get_obs(agent='robot'),
                'human':self._get_human_obs()}

    def step(self, action):
        if self.human.controllable:
            #map the agent order : robot_1 human robot_2
            robot_1_action = action['robot'][0:self.action_robot_1_len]
            robot_2_action = action['robot'][self.action_robot_1_len:self.action_robot_1_len+self.action_robot_2_len]
            action = np.concatenate([robot_1_action, action['human'],robot_2_action])
        # Execute the action. Step the simulator forward
        self.take_step(action)

        obs = self._get_obs()

        # Get human preferences
        end_effector_1_velocity = np.linalg.norm(self.robot_1.get_velocity(self.robot_1.right_end_effector))
        end_effector_2_velocity = np.linalg.norm(self.robot_2.get_velocity(self.robot_2.right_end_effector))
        end_effector_velocity = (end_effector_1_velocity + end_effector_2_velocity)/2

        total_force_on_human = np.sum(self.robot_1.get_contact_points(self.human)[-1])
        total_force_on_human += np.sum(self.robot_2.get_contact_points(self.human)[-1])
        preferences_score = self.human_preferences(end_effector_velocity=end_effector_velocity, total_force_on_human=total_force_on_human)

        # Define our reward function
        end_effector_1_pos, end_effector_1_orient = self.robot_1.get_pos_orient(self.robot_1.right_end_effector)
        reward_1_distance_mouth_target = -np.linalg.norm(self.target_pos - end_effector_1_pos)

        end_effector_2_pos, end_effector_2_orient = self.robot_2.get_pos_orient(self.robot_2.right_end_effector)
        reward_2_distance_mouth_target = -np.linalg.norm(self.target_pos - end_effector_2_pos)

        reward_distance_mouth_target = (reward_1_distance_mouth_target + reward_2_distance_mouth_target)/2
        #end_effector_pos, end_effector_orient = self.robot.get_pos_orient(self.robot.right_end_effector)
        #reward_distance_mouth_target = -np.linalg.norm(self.target_pos - end_effector_pos) # Penalize robot for distance between the end effector and human mouth.
        reward_action = -np.linalg.norm(action) # Penalize actions
        reward = self.config('distance_weight')*reward_distance_mouth_target + self.config('action_weight')*reward_action + preferences_score

        info = {'total_force_on_human': total_force_on_human}
        done = self.iteration >= 200

        if not self.human.controllable:
            return obs, reward, done, info
        else:
            # Co-optimization with both human and robot controllable
            return obs, {'robot': reward, 'human': reward}, {'robot': done, 'human': done, '__all__': done}, {'robot': info, 'human': info}

    def init_env_variables(self, reset=False):
        if len(self.action_space.low) <= 1 or reset:
            obs_len = len(self._get_obs())
            self.observation_space.__init__(low=-np.ones(obs_len, dtype=np.float32)*1000000000, high=np.ones(obs_len, dtype=np.float32)*1000000000, dtype=np.float32)
            self.update_action_space()
            # Define action/obs lengths
            self.action_robot_len = len(self.robot_1.controllable_joint_indices)+len(self.robot_2.controllable_joint_indices)
            self.action_human_len = len(self.human.controllable_joint_indices) if self.human.controllable else 0
            self.obs_robot_len = len(self._get_obs('robot'))
            self.obs_human_len = len(self._get_obs('human'))
            self.action_space_robot = spaces.Box(low=np.array([-1.0]*self.action_robot_len, dtype=np.float32), high=np.array([1.0]*self.action_robot_len, dtype=np.float32), dtype=np.float32)
            self.action_space_human = spaces.Box(low=np.array([-1.0]*self.action_human_len, dtype=np.float32), high=np.array([1.0]*self.action_human_len, dtype=np.float32), dtype=np.float32)
            self.observation_space_robot = spaces.Box(low=np.array([-1000000000.0]*self.obs_robot_len, dtype=np.float32), high=np.array([1000000000.0]*self.obs_robot_len, dtype=np.float32), dtype=np.float32)
            self.observation_space_human = spaces.Box(low=np.array([-1000000000.0]*self.obs_human_len, dtype=np.float32), high=np.array([1000000000.0]*self.obs_human_len, dtype=np.float32), dtype=np.float32)

    def _get_robot_obs(self,index=1):
        if index == 1:
            self_robot = self.robot_1
            peer_robot = self.robot_2
        else:
            self_robot = self.robot_2
            peer_robot = self.robot_1
        
        self_end_effector_pos, self_end_effector_orient = self_robot.get_pos_orient(self_robot.right_end_effector)
        self_end_effector_pos_real, self_end_effector_orient_real = self_robot.convert_to_realworld(self_end_effector_pos, self_end_effector_orient)
        self_robot_joint_angles = self_robot.get_joint_angles(self_robot.controllable_joint_indices)
        # Fix joint angles to be in [-pi, pi]
        self_robot_joint_angles = (np.array(self_robot_joint_angles) + np.pi) % (2*np.pi) - np.pi
        if self_robot.mobile:
            # Don't include joint angles for the wheels
            self_robot_joint_angles = self_robot_joint_angles[len(self_robot.wheel_joint_indices):]
        head_pos, head_orient = self.human.get_pos_orient(self.human.head)
        head_pos_real, head_orient_real = self_robot.convert_to_realworld(head_pos, head_orient)
        target_pos_real, _ = self_robot.convert_to_realworld(self.target_pos)

        peer_end_effector_pos,pee_end_effector_orient = peer_robot.get_pos_orient(peer_robot.right_end_effector)
        peer_end_effector_pos_real, peer_end_effector_orient_real = self_robot.convert_to_realworld(peer_end_effector_pos, pee_end_effector_orient)
        # Define the robot observation
        robot_obs = np.concatenate([self_end_effector_pos_real, self_end_effector_orient_real, 
                                    self_end_effector_pos_real - target_pos_real, self_robot_joint_angles, 
                                    head_pos_real, head_orient_real]).ravel()
        return robot_obs

    def _get_human_obs(self):
        human_obs = None
        if self.human.controllable:
            human_joint_angles = self.human.get_joint_angles(self.human.controllable_joint_indices)
            head_pos, head_orient = self.human.get_pos_orient(self.human.head)
            target_pos_human, _ = self.human.convert_to_realworld(self.target_pos)
            head_pos_human, head_orient_human = self.human.convert_to_realworld(head_pos, head_orient)
            # Convert robot_1 pos/orient from global coordinates to human-centric coordinate frame
            end_effector_pos_1, end_effector_orient_1 = self.robot_1.get_pos_orient(self.robot_1.right_end_effector)
            end_effector_pos_1_human, end_effector_orient_1_human = self.human.convert_to_realworld(end_effector_pos_1, end_effector_orient_1)
            # Convert robot_2 pos/orient from global coordinates to human-centric coordinate frame
            end_effector_pos_2, end_effector_orient_2 = self.robot_2.get_pos_orient(self.robot_2.right_end_effector)
            end_effector_pos_2_human, end_effector_orient_2_human = self.human.convert_to_realworld(end_effector_pos_2, end_effector_orient_2)
            
            total_force_on_human = np.sum(self.robot_1.get_contact_points(self.human)[-1])
            total_force_on_human += np.sum(self.robot_2.get_contact_points(self.human)[-1])
            # Define the human observation
            human_obs = np.concatenate([end_effector_pos_1_human, end_effector_orient_1_human, end_effector_pos_1_human - target_pos_human,
                                        end_effector_pos_2_human, end_effector_orient_2_human, end_effector_pos_2_human - target_pos_human,
                                        human_joint_angles, head_pos_human, head_orient_human, [total_force_on_human]]).ravel()
        return human_obs

    def _get_obs(self, agent=None):
        robot_1_obs = self._get_robot_obs(index=1)
        robot_2_obs = self._get_robot_obs(index=2)
        robot_obs = np.concatenate((robot_1_obs,robot_2_obs))
        #robot_obs = self._get_all_robot_obs()
        if(agent == 'robot_1'):
            return robot_1_obs
        if(agent =='robot_2'):
            return robot_2_obs
        if agent == 'robot':
            return robot_obs
        if self.human.controllable:
            human_obs = self._get_human_obs()
            if agent == 'human':
                return human_obs
            # Co-optimization with both human and robot controllable
            return {'robot': robot_obs, 'human': human_obs}
        return robot_obs

    def reset(self):
        super(ReachingMultiRobotEnv, self).reset()
        self.build_assistive_env('wheelchair')
        if self.robot.wheelchair_mounted:
            wheelchair_pos, wheelchair_orient = self.furniture.get_base_pos_orient()
            self.robot.set_base_pos_orient(wheelchair_pos + np.array(self.robot.toc_base_pos_offset[self.task]), [0, 0, -np.pi/2.0])
        self.robot_1 = self.robot
                
        # Initialize the starting human pose
        joints_positions = [(self.human.j_right_elbow, -90), (self.human.j_left_elbow, -90), (self.human.j_right_hip_x, -90),
                            (self.human.j_right_knee, 80), (self.human.j_left_hip_x, -90), (self.human.j_left_knee, 80)]
        joints_positions += [(self.human.j_head_x, self.np_random.uniform(-30, 30)), (self.human.j_head_y, self.np_random.uniform(-30, 30)),
                             (self.human.j_head_z, self.np_random.uniform(-30, 30))]
        self.human.setup_joints(joints_positions, use_static_joints=True, reactive_force=None)

        self.generate_targets()

        # Change camera pos/angle
        p.resetDebugVisualizerCamera(cameraDistance=1.10, cameraYaw=40, cameraPitch=-45, cameraTargetPosition=[-0.2, 0, 0.75], physicsClientId=self.id)

        # Perform base pose optimization
        target_ee_pos = np.array([-0.15, -0.65, 1.15]) + self.np_random.uniform(-0.05, 0.05, size=3)
        target_ee_orient = self.get_quaternion(self.robot.toc_ee_orient_rpy[self.task])
        self.init_robot_pose(target_ee_pos, target_ee_orient, [(target_ee_pos, target_ee_orient), (self.target_pos, None)], 
                             [(self.target_pos, target_ee_orient)], arm='right', tools=[], collision_objects=[self.human, self.furniture])

        if not self.robot.mobile:
            self.robot.set_gravity(0, 0, 0)
        self.human.set_gravity(0, 0, 0)
        #reset second robot
        self.robot_2 = self.create_robot(self.robot_1.__class__)
        self.robot_2.set_gravity(0, 0, 0)
        pos1, orient1 = self.robot_1.get_base_pos_orient()
        self.robot_2.set_base_pos_orient(pos1 + np.array([2.0, 0, 0.0]), [0, 0, np.pi])
        if not self.robot_2.mobile:
            self.robot_2.set_gravity(0, 0, 0)
        
        #set default robot back to robot 1 
        self.robot = self.robot_1        
        # Enable rendering
        p.configureDebugVisualizer(p.COV_ENABLE_RENDERING, 1, physicsClientId=self.id)

        self.init_env_variables()
        return self._get_obs()

    def generate_targets(self):
        # Set target on mouth
        self.mouth_pos = [0, -0.11, 0.03] if self.human.gender == 'male' else [0, -0.1, 0.03]
        head_pos, head_orient = self.human.get_pos_orient(self.human.head)
        target_pos, target_orient = p.multiplyTransforms(head_pos, head_orient, self.mouth_pos, [0, 0, 0, 1], physicsClientId=self.id)
        self.target = self.create_sphere(radius=0.01, mass=0.0, pos=target_pos, collision=False, rgba=[0, 1, 0, 1])
        self.update_targets()

    def update_targets(self):
        # Update target position on mouth
        head_pos, head_orient = self.human.get_pos_orient(self.human.head)
        target_pos, target_orient = p.multiplyTransforms(head_pos, head_orient, self.mouth_pos, [0, 0, 0, 1], physicsClientId=self.id)
        self.target_pos = np.array(target_pos)
        self.target.set_base_pos_orient(self.target_pos, [0, 0, 0, 1])

## Create a version of this environment for each robot


In [14]:
from assistive_gym.envs.agents import pr2, baxter, sawyer, jaco, stretch, panda, human
from assistive_gym.envs.agents.sawyer import Sawyer
from assistive_gym.envs.agents.jaco import Jaco
from assistive_gym.envs.agents.pr2 import PR2
from assistive_gym.envs.agents.baxter import Baxter
from assistive_gym.envs.agents.panda import Panda
from assistive_gym.envs.agents.stretch import Stretch
from assistive_gym.envs.agents.human import Human
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.tune.registry import register_env
import assistive_gym.envs

robot_arm = 'right'
human_controllable_joint_indices = human.head_joints
class ReachingMultiSawyerEnv(ReachingMultiRobotEnv):
    def __init__(self):
        super(ReachingMultiSawyerEnv, self).__init__(robot_1=Sawyer(robot_arm), 
                                                    human=Human(human_controllable_joint_indices, controllable=False),
                                                    robot_2=Sawyer(robot_arm))
# Connect the environment to Assistive Gym
assistive_gym.envs.ReachingMultiSawyerEnv = ReachingMultiSawyerEnv
register_env('assistive_gym:ReachingMultiSawyer-v1', lambda config: ReachingMultiSawyerEnv())

class ReachingMultiJacoEnv(ReachingMultiRobotEnv):
    def __init__(self):
        super(ReachingMultiJacoEnv, self).__init__(robot_1=Jaco(robot_arm),
                                                 human=Human(human_controllable_joint_indices, controllable=False),
                                                 robot_2=Jaco(robot_arm))
assistive_gym.envs.ReachingMultiJacoEnv = ReachingMultiJacoEnv
register_env('assistive_gym:ReachingMultiJaco-v1', lambda config: ReachingMultiJacoEnv())

class ReachingMultiPR2Env(ReachingMultiRobotEnv):
    def __init__(self):
        super(ReachingMultiPR2Env, self).__init__(robot_1=PR2(robot_arm), 
                                                human=Human(human_controllable_joint_indices, controllable=False),
                                                robot_2=PR2(robot_arm))
assistive_gym.envs.ReachingMultiPR2Env = ReachingMultiPR2Env
register_env('assistive_gym:ReachingMultiPR2-v1', lambda config: ReachingMultiPR2Env())

class ReachingMultiBaxterEnv(ReachingMultiRobotEnv):
    def __init__(self):
        super(ReachingMultiBaxterEnv, self).__init__(robot_1=Baxter(robot_arm), 
                                                    human=Human(human_controllable_joint_indices, controllable=False),
                                                    robot_2=Baxter(robot_arm))
assistive_gym.envs.ReachingMultiBaxterEnv = ReachingMultiBaxterEnv
register_env('assistive_gym:ReachingMultiBaxter-v1', lambda config: ReachingMultiBaxterEnv())

class ReachingMultiPandaEnv(ReachingMultiRobotEnv):
    def __init__(self):
        super(ReachingMultiPandaEnv, self).__init__(robot_1=Panda(robot_arm), 
                                                    human=Human(human_controllable_joint_indices, controllable=False),
                                                    robot_2=Panda(robot_arm))
assistive_gym.envs.ReachingMultiPandaEnv = ReachingMultiPandaEnv
register_env('assistive_gym:ReachingMultiPanda-v1', lambda config: ReachingMultiPandaEnv())

class ReachingMultiStretchEnv(ReachingMultiRobotEnv):
    def __init__(self):
        super(ReachingMultiStretchEnv, self).__init__(robot_1=Stretch(robot_arm), 
                                                    human=Human(human_controllable_joint_indices, controllable=False),
                                                    robot_2=Stretch(robot_arm))
assistive_gym.envs.ReachingMultiStretchrEnv = ReachingMultiStretchEnv
register_env('assistive_gym:ReachingMultiStretch-v1', lambda config: ReachingMultiStretchEnv())


class ReachingMultiSawyerHumanEnv(ReachingMultiRobotEnv, MultiAgentEnv):
    def __init__(self):
        super(ReachingMultiSawyerHumanEnv, self).__init__(robot_1=Sawyer(robot_arm), 
                                                        human=Human(human_controllable_joint_indices, controllable=True),
                                                        robot_2=Sawyer(robot_arm))
assistive_gym.envs.ReachingMultiSawyerHumanEnv = ReachingMultiSawyerHumanEnv
# Register the collaborative robot-human environment with RLlib
register_env('assistive_gym:ReachingMultiSawyerHuman-v1', lambda config: ReachingMultiSawyerHumanEnv())

class ReachingMultiJacoHumanEnv(ReachingMultiRobotEnv, MultiAgentEnv):
    def __init__(self):
        super(ReachingMultiJacoHumanEnv, self).__init__(robot_1=Jaco(robot_arm), 
                                                        human=Human(human_controllable_joint_indices, controllable=True),
                                                        robot_2=Jaco(robot_arm))
assistive_gym.envs.ReachingMultiJacoHumanEnv = ReachingMultiJacoHumanEnv
register_env('assistive_gym:ReachingMultiJacoHuman-v1', lambda config: ReachingMultiJacoHumanEnv())

class ReachingMultiPR2HumanEnv(ReachingMultiRobotEnv, MultiAgentEnv):
    def __init__(self):
        super(ReachingMultiPR2HumanEnv, self).__init__(robot_1=PR2(robot_arm), 
                                                    human=Human(human_controllable_joint_indices, controllable=True),
                                                    robot_2=PR2(robot_arm))
assistive_gym.envs.ReachingMultiPR2HumanEnv = ReachingMultiPR2HumanEnv
# Register the collaborative robot-human environment with RLlib
register_env('assistive_gym:ReachingMultiPR2Human-v1', lambda config: ReachingMultiPR2HumanEnv())

class ReachingMultiBaxterHumanEnv(ReachingMultiRobotEnv, MultiAgentEnv):
    def __init__(self):
        super(ReachingMultiBaxterHumanEnv, self).__init__(robot_1=Baxter(robot_arm), 
                                                        human=Human(human_controllable_joint_indices, controllable=True),
                                                        robot_2=Baxter(robot_arm))
assistive_gym.envs.ReachingMultiBaxterHumanEnv = ReachingMultiBaxterHumanEnv
# Register the collaborative robot-human environment with RLlib
register_env('assistive_gym:ReachingMultiBaxterHuman-v1', lambda config: ReachingMultiBaxterHumanEnv())

class ReachingMultiPandaHumanEnv(ReachingMultiRobotEnv, MultiAgentEnv):
    def __init__(self):
        super(ReachingMultiPandaHumanEnv, self).__init__(robot_1=Panda(robot_arm), 
                                                        human=Human(human_controllable_joint_indices, controllable=True),
                                                        robot_2=Panda(robot_arm))
assistive_gym.envs.ReachingMultiPandaHumanEnv = ReachingMultiPandaHumanEnv
# Register the collaborative robot-human environment with RLlib
register_env('assistive_gym:ReachingMultiPandaHuman-v1', lambda config: ReachingMultiPandaHumanEnv())


class ReachingMultiStretchHumanEnv(ReachingMultiRobotEnv, MultiAgentEnv):
    def __init__(self):
        super(ReachingMultiStretchHumanEnv, self).__init__(robot_1=Stretch(robot_arm), 
                                                            human=Human(human_controllable_joint_indices, controllable=True),
                                                            robot_2=Stretch(robot_arm))
assistive_gym.envs.ReachingMultiStretchHumanEnv = ReachingMultiStretchHumanEnv
# Register the collaborative robot-human environment with RLlib
register_env('assistive_gym:ReachingMultiStretchHuman-v1', lambda config: ReachingMultiStretchHumanEnv())

## Connect the environments to OpenAI Gym
Only the non-collaborative environments where the human holds a static pose.

In [None]:
from gym.envs.registration import register

for robot in ['Sawyer', 'Jaco','PR2','Baxter','Panda','Stretch']:
    register(
        id='ReachingMulti%s-v1' % robot,
        entry_point='assistive_gym.envs:ReachingMulti%sEnv' % robot,
        max_episode_steps=200,
    )
    register(
        id='ReachingMulti%sHuman-v1' % robot,
        entry_point='assistive_gym.envs:ReachingMulti%sHumanEnv' % robot,
        max_episode_steps=200,
    )


### Training

In [None]:
from assistive_gym.learn import train, render_policy, evaluate_policy
env_name = 'ReachingMultiSawyer-v1'
algo = 'ppo'
policy_path = train(env_name, algo, timesteps_total=500)

### Rendering

In [None]:
import gym

env = gym.make('assistive_gym:'+env_name)
env.setup_camera(camera_eye=[0.5, -0.75, 1.5], camera_target=[-0.2, 0, 0.75], fov=60, camera_width=1920//4, camera_height=1080//4)
filename = render_policy(env, env_name, algo, policy_path, colab=True, seed=0)

In [None]:
from IPython.display import Image
Image(filename)