In [1]:
import numpy as np
import scipy as sp

from scipy.spatial.transform import Rotation
from quaternion import from_rotation_matrix, quaternion

from rlbench.environment import Environment
from rlbench.action_modes import ArmActionMode, ActionMode
from rlbench.observation_config import ObservationConfig
from rlbench.tasks import *

from pyrep.const import ConfigurationPathAlgorithms as Algos
from scipy.spatial.transform import Rotation as R


# Problem Statement

In [2]:
def skew(x):
    return np.array([[0, -x[2], x[1]],
                    [x[2], 0, -x[0]],
                    [-x[1], x[0], 0]])


def sample_normal_pose(pos_scale, rot_scale):
    '''
    Samples a 6D pose from a zero-mean isotropic normal distribution
    '''
    pos = np.random.normal(scale=pos_scale)
        
    eps = skew(np.random.normal(scale=rot_scale))
    R = sp.linalg.expm(eps)
    quat_wxyz = from_rotation_matrix(R)

    return pos, quat_wxyz


class RandomAgent:

    def act(self, obs):
        delta_pos = [(np.random.rand() * 2 - 1) * 0.005, 0, 0]
        delta_quat = [0, 0, 0, 1] # xyzw
        gripper_pos = [np.random.rand() > 0.5]
        return delta_pos + delta_quat + gripper_pos


class NoisyObjectPoseSensor:

    def __init__(self, env):
        self._env = env

        self._pos_scale = [0.005, 0.005, 0.005] 
        self._rot_scale = [0.01] * 3

    def get_poses(self):
        objs = self._env._scene._active_task.get_base().get_objects_in_tree(exclude_base=True, first_generation_only=False)
        obj_poses = {}

        for obj in objs:
            name = obj.get_name()
            pose = obj.get_pose()

            pos, quat_wxyz = sample_normal_pose(self._pos_scale, self._rot_scale)
            gt_quat_wxyz = quaternion(pose[6], pose[3], pose[4], pose[5])
            perturbed_quat_wxyz = quat_wxyz * gt_quat_wxyz

            pose[:3] += pos
            pose[3:] = [perturbed_quat_wxyz.x, perturbed_quat_wxyz.y, perturbed_quat_wxyz.z, perturbed_quat_wxyz.w]

            obj_poses[name] = pose

        return obj_poses


## Helper Functions

In [3]:
def contains(r1, r2):
    #whether r2 is within r1
    return r1[0] < r2[0] < r2[1] < r1[1] and r1[2] < r2[2] < r2[3] < r1[3] and r1[4] < r2[4] < r2[5] < r1[5]

def get_edge_points(obj_bbox,obj_mat):
    bbox_faceedge=np.array([[obj_bbox[0],0,0],
    [obj_bbox[1],0,0],
    [0,obj_bbox[2],0],
    [0,obj_bbox[3],0],
    [0,0,obj_bbox[4]],
    [0,0,obj_bbox[5]]]).T #3x6
    bbox_faceedge=np.vstack((bbox_faceedge,np.ones((1,6)))) #4x6
    box=(obj_mat@bbox_faceedge).T #6X3 face edge coords in world frame
    rect=min(box[:,0]),max(box[:,0]),min(box[:,1]),max(box[:,1]),min(box[:,2]),max(box[:,2]) #max/min along x,y,z world axes
    return rect

In [4]:
class StateMachine(object):
    def __init__(self):
        self.env=None
        self.home=None
        self.task=None
        self.sensor=None
        self.objs_dict=None
        
    def initialize(self):
        DATASET = ''
        obs_config = ObservationConfig()
        obs_config.set_all(True)
        obs_config.left_shoulder_camera.rgb = True
        obs_config.right_shoulder_camera.rgb = True
        action_mode = ActionMode(ArmActionMode.DELTA_EE_POSE)
        self.env = Environment(
            action_mode, DATASET, obs_config, False)
        self.sensor = NoisyObjectPoseSensor(self.env)
        self.env.launch()
        self.task = self.env.get_task(EmptyContainer)
        self.task.reset()
        self.home = self.get_objects()['large_container'].get_pose()
        self.home[2]+=0.3
        # demos = task.get_demos(3, live_demos=live_demos)
        
        self.objs = self.get_objects()
        self.movable_objects = ['Shape', 'Shape1', 'Shape3']
        self.target_bins = ['small_container0']
        self.start_bins = ['large_container']
        self.max_retry = 5
        
        
    def get_objects(self):
        objs = self.env._scene._active_task.get_base().get_objects_in_tree(exclude_base=True, first_generation_only=True)
        objs_dict = {}
        for obj in objs:
            name = obj.get_name()
            pose = obj.get_pose()
            objs_dict[name] = obj
        return objs_dict

    # Move above object
    def move_to(self, pose, pad=0.05, ignore_collisions=True):
        target_pose = np.copy(pose)
        obs = self.env._scene.get_observation()
        init_pose=obs.gripper_pose
        obs = self.env._scene.get_observation()
        init_pose=obs.gripper_pose
        target_pose[2]+=pad
        path=self.env._robot.arm.get_path(np.array(target_pose[0:3]),quaternion=np.array([0,1,0,0]), trials=1000,ignore_collisions=True, algorithm=Algos.RRTConnect)
        # TODO catch errors and deal with situations when path not found
        return path
        
    def grasp(self,obj):
        cond=False
        while not cond:
            cond=self.env._robot.gripper.actuate(0,0.1)
            self.env._scene.step()
        cond = self.env._robot.gripper.grasp(obj)
        return cond
    
    def release(self, obj):
        cond=False
        while not cond:
            cond=self.env._robot.gripper.actuate(1,0.1)
            self.env._scene.step()
        self.env._robot.gripper.release()
    
    def execute(self, path):
        done=False
        path.set_to_start()
        while not done:
            done = path.step()
            a = path.visualize()
            self.env._scene.step()
        return done
    
    def reset(self):
        self.task.reset()
    
    
    def is_within(self,obj1,obj2):
        #whether obj2 is within obj1
        obj1_bbox=obj1.get_bounding_box();obj1_mat=np.array(obj1.get_matrix()).reshape(3,4);
        obj2_bbox=obj2.get_bounding_box();obj2_mat=np.array(obj2.get_matrix()).reshape(3,4);
        obj1_rect= get_edge_points(obj1_bbox,obj1_mat)
        obj2_rect= get_edge_points(obj2_bbox,obj2_mat)
        return contains(obj1_rect,obj2_rect)
    
    def picking_bin_empty(self):
        '''
         Returns whether the picking bin is empty
        '''
        self.objs_dict=machine.get_objects()
        bin_obj=self.objs_dict['large_container']
        for obj_name in self.objs_dict.keys():
            if (not ('container' in obj_name)):
                if self.is_within(bin_obj,self.objs_dict[obj_name]):
                    return False
        return True
    def placing_bin_full(self):
        '''
         Returns whether the placing bin is full
        '''
        self.objs_dict=machine.get_objects()
        bin_obj=self.objs_dict['small_container0']
        for obj_name in self.objs_dict.keys():
            if (not ('container' in obj_name)):
                if not (self.is_within(bin_obj,self.objs_dict[obj_name])):
                    return False
        return True
    
    def get_all_shapes(self):
        objects = []

        for name in self.objs:
            if "Shape" in name:
                objects.append(name)

        print(objects)
        return objects
    
    def move_objects_to_target(self, target_bins, start_bins):
        
        target_bin = target_bins[0]
        start_bin = start_bins[0]
        
        start_bin_pose = self.objs[start_bin].get_pose()
        start_bin_pose[2]+=0.3

        target_bin_pose = self.objs[target_bin].get_pose()
        target_bin_pose[2]+=0.3

        
#         move_objs = []
#         for obj in machine.movable_objects:
#             if self.is_within(target_bin, obj):
#                 move_objs.append(obj)
#         print(move_objs)

        move_objs = machine.movable_objects
    
        for i, shape in enumerate(move_objs):
            
            cond = False
            retry_count = 0
            while not cond and retry_count < self.max_retry:
                
                # go back to home position
                path=machine.move_to(start_bin_pose,0,True)
                machine.execute(path)
                print(start_bin_pose)

                # move above object
                objs_poses = machine.sensor.get_poses()
                # pose=objs[shape].get_pose()
                pose=objs_poses[shape]
                path=machine.move_to(pose,False)
                machine.execute(path)

                # grasp the object
                cond = machine.grasp(self.objs[shape])
                
                if not cond:
                    retry_count += 1
                    print("retry count: ", retry_count)

            # move to home position
            path=machine.move_to(start_bin_pose,0,True)
            print("GRIPPER FORCES",self.env._robot.gripper.get_joint_forces())
            machine.execute(path)

            # move above target container
            objs_poses=machine.sensor.get_poses()
            pose = objs_poses[target_bin]
            pose[0] += (i*0.04 - 0.04)
            path=machine.move_to(pose,0.05,True)
            machine.execute(path)

            # release the object
            machine.release(self.objs[shape])
        
        path=machine.move_to(machine.home,0,True)
        machine.execute(path)

In [5]:
machine = StateMachine()
machine.initialize()

In [None]:
# FWD
machine.move_objects_to_target(machine.target_bins, machine.start_bins)

In [6]:
machine.get_objects()

{'large_container': <pyrep.objects.shape.Shape at 0x7fa35dff7fd0>,
 'small_container0': <pyrep.objects.shape.Shape at 0x7fa35dfb2780>,
 'small_container1': <pyrep.objects.shape.Shape at 0x7fa35dfb2048>,
 'Shape': <pyrep.objects.shape.Shape at 0x7fa35dfb2b38>,
 'Shape1': <pyrep.objects.shape.Shape at 0x7fa35dfb2940>,
 'Shape3': <pyrep.objects.shape.Shape at 0x7fa35dfb2908>}

In [None]:
# make the start pose
bin_pose = machine.objs['small_container0'].get_pose()
bin_pose[2] += 0.2
bb_container = machine.objs['small_container0'].get_bounding_box()
bin_pose[0] += bb_container[1]
print(bin_pose)

In [None]:
path=machine.move_to(bin_pose,0,True)
machine.execute(path)

In [None]:
bin_pose[2] += 0.06


In [None]:
path=machine.move_to(bin_pose,0,True)
machine.execute(path)

In [None]:
machine.grasp(machine.objs['small_container0'])

In [None]:
r = R.from_quat(bin_pose[3:])
r2 = R.from_euler('z', 90, degrees=True)
r3 = r2 * r
r3.as_euler('xyz', degrees=True)

In [None]:
r3.as_euler('xyz', degrees=True)

In [None]:
r.as_euler('xyz', degrees=True)

In [None]:
bin_pose = machine.objs['large_container'].get_pose()
bin_pose[2] += 0.4

In [None]:
path=machine.move_to(bin_pose,0,True)
machine.execute(path)

In [None]:
path=machine.env._robot.arm.get_path(np.array(bin_pose[0:3]),euler=[0, np.pi, 0], trials=1000,ignore_collisions=True, algorithm=Algos.RRTConnect)    

In [None]:
machine.execute(path)

In [None]:
# Reset
machine.move_objects_to_target(machine.start_bins, machine.target_bins)