Copyright (c) 2008 Idiap Research Institute, http://www.idiap.ch/
    
Written by Suhan Shetty <suhan.shetty@idiap.ch>,


In [1]:
%load_ext autoreload
%autoreload 2
from panda_kinematics import PandaKinematics

import torch
torch.set_default_dtype(torch.float64)

import numpy as np
np.set_printoptions(2, suppress=True)
torch.set_printoptions(2, sci_mode=False)

import sys
sys.path.append('../')
from panda_cost_utils import SDF_Cost, PandaCost
from ttgo import TTGO
import tt_utils
from utils import test_ttgo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device: ", device)

  from .autonotebook import tqdm as notebook_tqdm


device:  cuda


In [2]:
# Hyper-params
d0_theta = 50
dh_x = 0.05
margin = 0
kr = 5
b_goal = 0.05 ; b_obst= 0.01; b_orient=0.2;
nswp=20; rmax=500; kr=2;
d_type = "uniform"

### Define the cost/pdf function

In [3]:
data_sdf = np.load('./data/sdf.npy', allow_pickle=True)[()]
sdf_matr = data_sdf['sdf_matr']  
bounds = torch.tensor(data_sdf['bounds']).to(device) # Bound of the environment
sdf_tensor = torch.from_numpy(sdf_matr).to(device)
sdf_cost = SDF_Cost(sdf_tensor=sdf_tensor, domain=bounds, device=device)
env_bound = data_sdf['env_bound']  
shelf_bound = data_sdf['shelf_bound'] 
box_bound = data_sdf['box_bound'] 

# key-points on the body of the robot for collision check
data_keys = np.load('./data/sphere_setting.npy', allow_pickle=True)[()]# key_points
status_array = data_keys['status_array']
body_radius = data_keys['body_radius']
relative_pos = data_keys['relative_pos']
key_points_weight = torch.from_numpy(status_array).float().to(device) >0 # 8xMx1
key_points_weight[-1] = 1*margin
key_points_margin = torch.from_numpy(body_radius).float().to(device)#
key_points_pos = torch.from_numpy(relative_pos).float().to(device)
key_points = [key_points_pos, key_points_weight, key_points_margin]

# define the robot
panda = PandaKinematics(device=device, key_points_data=key_points)

############################################################

# Define the cost function

# Specify the doesired orientation
Rd_0 = torch.tensor([[ 0.7071,0.7071,0.], [0.,0.,1],[0.7071, -0.7071, 0.]]).to(device) # desired orientation
v_d = torch.tensor([0.,0.,1.]).to(device)
# Rd = torch.tensor([[ 0,0.,0.], [0.,0.,1],[0., 0., 0.]])

pandaCost = PandaCost(robot=panda, sdf_cost=sdf_cost,
                    Rd_0=Rd_0, v_d=v_d,b_obst=b_obst, 
                    b_goal=b_goal,b_orient=b_orient,device=device)  


def cost(x): # For inverse kinematics
    return pandaCost.cost_ik(x)[:,0]

def cost_all(x): # For inverse kinematics
    return pandaCost.cost_ik(x)

def pdf(x):
    x = x.to(device)
    pdf_ = torch.exp(-cost(x)**2) 
    return pdf_


In [4]:

############################################################################
# Define the domain for discretization

n_joints=7
d_theta_all = [d0_theta]*n_joints
d_theta = [int(d_theta_all[joint]) for joint in range(n_joints)]

# type of discretization of intervals of decision variables
if d_type == 'uniform':
    domain_decision = [torch.linspace(panda.theta_min[i],panda.theta_max[i],d_theta[i]).to(device) for i in range(n_joints)]
else: # logarithmic scaling
    domain_decision = [exp_space(panda.theta_min[i],panda.theta_max[i],d_theta[i]).to(device) for i in range(n_joints)]

# task space of the manipulator (the shelf)
env_bounds = torch.from_numpy(shelf_bound)
x_min = env_bounds[:,0]
x_max = env_bounds[:,1]
x_max[0] = 0.75; x_min[0]=0.45
x_max[1] = x_max[1]-0.1
x_min[1] = x_min[1]+0.1
x_max[-1] = 0.75; x_min[-1] = 0.


domain_task = [torch.linspace(x_min[i], x_max[i], int((x_max[i]-x_min[i])/dh_x)) for i in range(3)]
d_task = [len(domain_task[i]) for i in range(3)]


domain_task = [x.to(device) for x in domain_task]
domain_decision = [x.to(device) for x in domain_decision]

domain = domain_task + domain_decision
print("Discretization: ",[len(x) for x in domain])

Discretization:  [5, 15, 15, 50, 50, 50, 50, 50, 50, 50]


In [5]:
use_fusion = True # more efficient if True
# for target reaching

def pdf_goal(x):
    d_goal = pandaCost.cost_goal(x)
    return torch.exp(-(d_goal/1)**2) 

def pdf_orient_q(q):
    return torch.exp(-(pandaCost.cost_orient(q)/2)**2)

def pdf_obst_q(q):
    return torch.exp(-(pandaCost.cost_obst(q)/1)**2)

if use_fusion:
    print("Find tt_model of pdf_goal:")
    tt_goal = tt_utils.cross_approximate(fcn=pdf_goal,  domain=domain, 
                            rmax=100, nswp=20, eps=1e-3, verbose=True, 
                            kickrank=5, device=device)
    
    print(tt_goal.ranks_tt)

    print("Find tt_model of pdf_orient:")
    tt_orient_q = tt_utils.cross_approximate(fcn=pdf_orient_q,  domain=domain_decision, 
                            rmax=100, nswp=10, eps=1e-3, verbose=True, 
                            kickrank=5, device=device)
    print(tt_orient_q.ranks_tt)



    print("Find tt_model of pdf_obst:")
    tt_obst_q = tt_utils.cross_approximate(fcn=pdf_obst_q,  domain=domain_decision, 
                            rmax=100, nswp=10, eps=1e-3, verbose=True, 
                            kickrank=5, device=device)
    print(tt_obst_q.ranks_tt)


    # make sure the dimensions of tt_obst matches with that of tt_model desired
    # i.e. pdf_obst(x_task,q) = pdf_obst_q(q)
    tt_obst = tt_utils.extend_model(tt_model=tt_obst_q,site=0,n_cores=3,d=d_task).to(device)
    tt_orient = tt_utils.extend_model(tt_model=tt_orient_q,site=0,n_cores=3,d=d_task).to(device)
    print("Take product to compute pdf(x_task,x_decision) ")
    tt_model = tt_orient.to('cpu')*tt_obst.to('cpu')
    tt_model.round_tt(1e-3)
    tt_model = tt_model.to('cpu')*tt_goal.to('cpu')
    tt_model.round_tt(1e-3)
    

else: # compute the pdf directly using the cost function (less efficient) 
    def pdf(x):
        return torch.exp(-1*cost(x)**2)
    tt_model = tt_utils.cross_approximate(fcn=pdf,  domain=[x.to(device) for x in domain], 
                            rmax=200, nswp=20, eps=1e-3, verbose=True, 
                            kickrank=5, device=device)

print("Rank of TT-model: ", tt_model.ranks_tt)

Find tt_model of pdf_goal:
cross device is cuda
Cross-approximation over a 10D domain containing 8.78906e+14 grid points:
iter: 0  | tt-error: 1.026e+00, test-error:6.214e-01 | time:   0.3533 | largest rank:   1
iter: 1  | tt-error: 9.504e-01, test-error:2.051e-01 | time:   0.4996 | largest rank:   6
iter: 2  | tt-error: 1.960e-01, test-error:7.161e-02 | time:   0.6656 | largest rank:  11
iter: 3  | tt-error: 6.617e-02, test-error:2.760e-02 | time:   0.8586 | largest rank:  16
iter: 4  | tt-error: 2.537e-02, test-error:1.434e-02 | time:   1.1003 | largest rank:  21
iter: 5  | tt-error: 1.186e-02, test-error:8.944e-03 | time:   1.3743 | largest rank:  26
iter: 6  | tt-error: 8.796e-03, test-error:3.374e-03 | time:   1.7018 | largest rank:  31
iter: 7  | tt-error: 3.577e-03, test-error:2.026e-03 | time:   2.1034 | largest rank:  36
iter: 8  | tt-error: 2.094e-03, test-error:9.797e-04 | time:   2.5815 | largest rank:  41
iter: 9  | tt-error: 1.049e-03, test-error:7.697e-04 | time:   3.141

In [7]:
# Refine the discretization and interpolate the model
scale_factor = 10
site_list = torch.arange(len(domain))#len(domain_task)+torch.arange(len(domain_decision))
domain_new = tt_utils.refine_domain(domain=domain, 
                                    site_list=site_list,
                                    scale_factor=scale_factor, device=device)
tt_model_new = tt_utils.refine_model(tt_model=tt_model.to(device), 
                                    site_list=site_list,
                                    scale_factor=scale_factor, device=device)

In [8]:
#######################################################################################
ttgo = TTGO(tt_model = tt_model.to(device), domain=domain, cost=cost, device=device)

In [9]:
############################################################ 
print("############################")
print("Test the model")
print("############################")

# generate test set
ns = 100
test_task = torch.zeros(ns,len(domain_task)).to(device)
for i in range(len(domain_task)):
    unif = torch.distributions.uniform.Uniform(low=domain_task[i][0],high=domain_task[i][-1])
    test_task[:,i]= torch.tensor([unif.sample() for i in range(ns)]).to(device)

file_name = 'panda_ik.pickle'
torch.save({
    'tt_model':ttgo.tt_model,
    'panda': panda,
    'pandaCost':pandaCost,
    'sdf_cost':sdf_cost,
    'w': (pandaCost.w_goal,pandaCost.w_obst,pandaCost.w_orient),
    'b': (b_goal,b_obst,b_orient),
    'margin': margin,
    'key_points_weight':key_points_weight,
    'key_points_margin':key_points_margin,
    'domains': domain,  
    'Rd_0': Rd_0,
    'v_d':v_d,
    'test_task': test_task,
}, file_name)


############################
Test the model
############################


In [10]:
# generate test set
test_task = torch.zeros(1000,len(domain_task)).to(device)
for i in range(len(domain_task)):
    unif = torch.distributions.uniform.Uniform(low=domain_task[i][0],high=domain_task[i][-1])
    test_task[:,i]= torch.tensor([unif.sample() for i in range(1000)]).to(device)

ns = 100
test_task  = test_task[(sdf_cost.sdf(test_task)-0.09)>0][:ns] # 

joint_angles = torch.zeros((test_task.shape[0],7))
x_tasks = torch.zeros((test_task.shape[0],3))
for i in range(test_task.shape[0]):
    x_task =test_task[i].view(1,-1)
    samples = ttgo.sample_tt(n_samples=100,alpha=0.75,x_task=x_task)
    best_sample = ttgo.choose_best_sample(samples)[0]
    sol,_ = ttgo.optimize(best_sample)
    joint_angles[i] = sol[:,3:].cpu()
    x_tasks[i] = sol[:,:3].cpu()
    


In [None]:
# torch.save({'task':x_tasks,'theta':joint_angles},"ik_data.pickle",)

In [None]:
.

## Visualization

In [None]:
import pybullet_data
from panda_visualization_utils import *
import pybullet as p
from functools import partial
# import the environment (SDF and for graphics visualization in pybullet)
import sys
DATA_PATH = './data'
body_sphere_path = './data/sphere_setting.npy'
sdf_path = './data/sdf.npy'
urdf_path = './data/urdf/frankaemika_new/panda_arm.urdf'

p.connect(p.GUI)
data = np.load(sdf_path, allow_pickle=True)[()]
sdf_matr = data['sdf_matr']  #SDF tensor
obstacles = data['obstacles'] #obstacles parameters
colors = [[0.8, 0.5, 0.5, 1]]*len(obstacles)
obj_id, init_id, target_id, border_id, obstacle_ids = init_pybullet (np.zeros(3), np.zeros(3), obstacles, colors=colors)
p.setPhysicsEngineParameter(enableFileCaching=0)
p.setAdditionalSearchPath(pybullet_data.getDataPath())
p.configureDebugVisualizer(p.COV_ENABLE_GUI,0)
robot_urdf ='./data/urdf/frankaemika_new/panda_arm.urdf'
robot_id = p.loadURDF(fileName=robot_urdf)
dof = p.getNumJoints(robot_id)
pb_joint_indices = np.arange(7)
joint_limits = get_joint_limits(robot_id,pb_joint_indices)
mean_pose = 0.5*(joint_limits[0]+joint_limits[1])
set_q_std = partial(set_q,robot_id, pb_joint_indices)
rmodel = pin.buildModelFromUrdf(robot_urdf)
rdata = rmodel.createData()
pin_frame_names = [f.name for f in rmodel.frames]
ee_frame_id = rmodel.getFrameId('panda_hand_joint')
alpha = np.deg2rad(52)
quat = p.getQuaternionFromAxisAngle((0,0,1),alpha)
p.resetBasePositionAndOrientation(robot_id, (0,0,0.05), quat)

pybullet build time: Sep 20 2021 20:34:14


In [None]:
s = np.random.randint(0,test_task.shape[0]-1)
sample_xe =test_task[s] # torch.tensor([0.58, 0.68, 0.78])
print(sample_xe)
# torch.tensor([0.4435, 0.4347, 0.2139]) # bottom center
# torch.tensor([0.4991, 0.6301, 0.3695]) # bottom left
# torch.tensor([0.6352, 0.4708, 0.7535]) # top left
# torch.tensor([0.6789, 0.1950, 0.6976]) # top shelf center
# torch.tensor([0.6635, 0.6250, 0.2031]) # bottom, in, inaccessible
# tirch.tensor([0.4228, 0.4441, 0.8760]) # top 
# sample_xe = torch.tensor([0.8, -0.1, 0.3]) #torch.tensor([0.8, -0.3, 0.35])#torch.torch.tensor([ 0.6404, 0.2350,  0.549])#torch.tensor([ 0.7130, -0.3007, -0.1276]) #
sample_xe[0] = 0.5
n_solutions= 2
n_samples_tt = 200*n_solutions
n_samples_rand= 10*n_samples_tt

alpha=0.5; norm=1 ;

t1 = time.time()
samples_tt, samples_idx = ttgo.sample_tt(n_samples=n_samples_tt, x_task=sample_xe.reshape(1,-1),alpha=alpha)
state_k_tt = ttgo.choose_top_k_sample(samples_tt,n_solutions)[0]

#Optimize
state_k_tt_opt = 1*state_k_tt
for i, state in enumerate(state_k_tt):
    state_k_tt_opt[i,:],_= ttgo.optimize(state,bound=True)
t2 = time.time()

             
c_tt =  cost(state_k_tt_opt)

print("Cost-mean-tt:",torch.mean(c_tt,dim=0))


In [None]:
# _,_,test_sphere = create_primitives(p.GEOM_SPHERE, radius = 0.02)
# p.resetBasePositionAndOrientation(test_sphere, (0,0,1.), (0,0,0,1))

x_target = sample_xe[:3].numpy()
joint_angles_k = state_k_tt[:,3:].numpy() 
joint_angles_k_opt = state_k_tt_opt[:,3:].numpy() 


_ , _,sphere_id = create_primitives(p.GEOM_SPHERE, radius = 0.02)
pos = x_target[:]

p.resetBasePositionAndOrientation(sphere_id, pos, (0,0,0,1))


k = joint_angles_k.shape[0]
dt = 1
dT = 2
for i in range(2*k):
    set_q_std(joint_angles_k[i%k])
    time.sleep(dt)
    set_q_std(joint_angles_k_opt[i%k])
    time.sleep(2*dt)
    