Author: suhan.n.shetty@gmail.com | Date: 9th Sept 2019

This is an implementation of tensor methods applied to bimanual coordination
This proposes tensor methods as an alternative to the techniques used in the following paper:
"Learning bimanual end-effector poses from demonstrations using
task-parameterized dynamical systems"
http://publications.idiap.ch/downloads/papers/2015/Silverio_IROS_2015.pdf

In [1]:
import scipy.io
import numpy as np
from mat4py import loadmat
import pdb
from tensor_tools import array3

In [2]:
# Demonstration data
data_path = './data/Bimanual_demos/demo_allSweep_Seg.mat'
data = loadmat(data_path)
demos = data["s"]["d"]

rob1_idx = [2,3,4,8,9,10,11] # robot-1 state index
rob2_idx = [5,6,7,12,13,14,15]
p = 7 # Number of states
q = 2 # Number of subjects/robots
r = 258 # Number of time steps
#Import the data into tensor structure
N = 5
tensor_data = [np.empty([r,p,q])]*5 #
for k in range(5): # iterate over the 5 demonstrations
    tmp  = np.empty([r,p,q])
    for t in range(258):
        tmp[t,:,0]=np.array([demos[k][t][idx] for idx in rob1_idx])
        tmp[t,:,1]=np.array([demos[k][t][idx] for idx in rob2_idx]) 
        tmp_ = tmp[:,[0,1,2],:]
    
    tensor_data[k] = tmp_ #scale the data by 100sualization b
    
        

In [3]:
# # Generate artificial data
# N = 10
# #tensor_data = [np.random.randint(100)*np.random.randn(r,p,q) for j in range( N)]
# tensor_data = [np.empty([r,p,q])]*N
# for k in range(N): # iterate over the N demonstrations
#     tmp  = np.empty([r,p,q])
#     xyz = np.array([1*np.random.randn() for j in range(p)])
#     sig = 140
#     sc = np.random.randint(10)
#     center = np.random.randint(200,300)
#     for t in range(258):
#         tmp[t,:,0]= xyz  #np.array([demos[k][t][idx] for idx in rob1_idx])
#         tmp[t,:,1]=xyz+1*np.exp(((t-center)**2)/(sig)**2) #np.random.randn() #np.array([demos[k][t][idx] for idx in rob2_idx]) 
#         tmp_ = tmp[:,[0,1,2],:]
    
#     tensor_data[k] = tmp_

In [4]:
# Find the array-normal distribution
t_tools = array3()
coef = 0.9
var = 1
pow_ = 10
I = t_tools.shape(tensor_data[0])
M, Cov, A = t_tools.anormal(tensor_data, coef, var, pow_, constraint=True, normalised=False, 
                for_mix_model=False,gamma_k=None)
M_mix, Cov_mix, p_mix = t_tools.anormal_mix(tensor_data,2,coef, var,pow_, constraint=True) 

MLE has converged in  7  steps
EM step  1 
 err_cov  :  78.68636497513816 err_mean:  0.09211852414326306 err_pi:  0.0006332250410336915
EM step  1 
 err_cov  :  0.0030362546191145365 err_mean:  0.5090087085641191 err_pi:  0.1993151967668898
EM step  1 
 err_cov  :  6.068854419750771e-07 err_mean:  3.55759213044887e-05 err_pi:  5.1578192076462415e-05
EM converged in steps:  2


In [5]:
# Plotting tools
from matplotlib import interactive
interactive(True)
# for interractive plots in jupyter-notebook
%matplotlib widget
from mpl_toolkits import mplot3d
import matplotlib.pyplot as plt

In [6]:
fig = plt.figure()
ax1 = fig.add_subplot(1, 2, 1, projection='3d')

# Pick a random demo from the data
k = int(np.random.randint(5,size=1))
X = tensor_data[k]

# Robot-2 trajectory from the data given robot-1 trajectory
xline = X[:,0,1]
yline = X[:,1,1]
zline = X[:,2,1] 
ax1.plot3D(xline, yline, zline, 'green')
ax1.title.set_text("Robot-2 Data Trajectory ")
ax2 = fig.add_subplot(1, 2, 2, projection='3d')
ax2.title.set_text("Robot-2 Estimated Trajectory")
# Robot 2 trajectory from the conditioning the array-normal
Ia = [0]
Xa = X[:,:,0]
Xa = Xa[:,:,np.newaxis]
(M_, Cov_,_,_) = t_tools.anormal_condition(M,Cov,[0],Xa, slice_=1)
#Xc = t_tools.anormal_sampling(M_mix_o,Cov_mix_o)
Xc = M_[:,:,:]#t_tools.anormal_mix_sampling(M_mix_o,Cov_mix_o, p_mix_o)
xline = Xc[:,0,0]
yline = Xc[:,1,0]
zline = Xc[:,2,0]
ax2.plot3D(xline, yline, zline, 'red')
plt.show()


FigureCanvasNbAgg()

In [7]:
# # Find the array-normal distribution
# t_tools = array3()
# coef = 0.9
# var = 0.001
# pow_ = 1
# #(M, Cov, A) = t_tools.anormal_hoff(tensor_data,coef, var,pow_, constraint=False) 
# # M, Cov, A = t_tools.anormal(tensor_data, coef=0.9, var=1, pow_ = 1, constraint=True, normalised=False, 
# #                 for_mix_model=False,gamma_k=None)
# #pdb.set_trace()
# M_mix, Cov_mix, pi_mix = t_tools.anormal_mix(tensor_data,1,coef,var,pow_,constraint=True) 
# #print(coef**pow_)
# #print(t_tools.kron(Cov))


In [8]:
# Test sampling

#Robot 1 trajectory from the data
fig = plt.figure()
ax1 = fig.add_subplot(3, 1, 1)
ax2 = fig.add_subplot(3, 1, 2)
ax3 = fig.add_subplot(3, 1, 3)

t = np.arange(258)

#plot robot-1 demo traj
for j in range(N):
    X = tensor_data[j]
    xt = X[:,0,0]
    yt = X[:,1,0]
    zt = X[:,2,0]    
    ax1.plot(t,xt,'green')
    ax2.plot(t,yt,'blue')
    ax3.plot(t,zt,'red')


for j in range(5):
    Xc = t_tools.anormal_sampling(M,Cov)
    #Xc = t_tools.anormal_mix_sampling(M_mix,Cov_mix,p_mix)
    xt = Xc[:,0,0]
    yt = Xc[:,1,0]
    zt = Xc[:,2,0]    
    ax1.plot(t,xt,'grey')
    ax2.plot(t,yt,'grey')
    ax3.plot(t,zt,'grey')

# plot mean of the robo-1 traj
#M = sum([p_mix[j]*M_mix[j] for j in range(K)])

ax1.plot(t,M[:,0,0],'black',linewidth=1)
ax2.plot(t,M[:,1,0],'black')
ax3.plot(t,M[:,2,0],'black')
   
ax3.set_ylabel("z")
ax2.set_ylabel("y")
ax1.set_ylabel("x")
ax1.set_xlabel("t")
ax1.title.set_text("Trajectory Sampling. Colored traj: Demo, Grey Traj: Sample")
plt.show()


FigureCanvasNbAgg()

In [9]:
fig = plt.figure()
ax1 = fig.add_subplot(1, 2, 1, projection='3d')

# Pick a random demo from the data
k = int(np.random.randint(5,size=1))
X = tensor_data[k]

# Robot-2 trajectory from the data given robot-1 trajectory
xline = X[:,0,1]
yline = X[:,1,1]
zline = X[:,2,1] 
ax1.plot3D(xline, yline, zline, 'green')
ax1.title.set_text("Robot-2 Data Trajectory")
ax2 = fig.add_subplot(1, 2, 2, projection='3d')
ax2.title.set_text("Robot-2 Estimated Trajectory")
K = len(p_mix)
# Robot 2 trajectory from the conditioning the array-normal
Ia = [0]
Xa = X[:,:,0]
Xa = Xa[:,:,np.newaxis]
#(M_, Cov_) = t_tools.anormal_condition(M,Cov,[0],Xa, slice_=1)
#Xc = t_tools.anormal_sampling(M_mix_o,Cov_mix_o)
(p_mix_o,M_mix_o,Cov_mix_o) = t_tools.anormal_mix_condition(p_mix,M_mix,Cov_mix,Ia,Xa,slice_=1)
M_o = sum([p_mix_o[j]*M_mix_o[j] for j in range(K)])
Xc = M_o[:,:,:]#t_tools.anormal_mix_sampling(M_mix_o,Cov_mix_o, p_mix_o)
xline = Xc[:,0,0]
yline = Xc[:,1,0]
zline = Xc[:,2,0]
ax2.plot3D(xline, yline, zline, 'red')
plt.show()



FigureCanvasNbAgg()

In [14]:
# Test conditioning: Robot-2 Trajectories given robot-1 trajectory

fig = plt.figure()
ax1 = fig.add_subplot(3, 1, 1)
ax2 = fig.add_subplot(3, 1, 2)
ax3 = fig.add_subplot(3, 1, 3)


# Robot trajectory from the data (perturbed)
X = tensor_data[1]

# Robot-2 for the given robot-1 taken from data
t = np.arange(258)  

#plot all robot-2 trajectories
for j in range(N):
    X = tensor_data[j]
    xt = X[:,0,1]
    yt = X[:,1,1]
    zt= X[:,2,1]
    ax1.plot(t,xt,'green')
    ax2.plot(t,yt,'blue')
    ax3.plot(t,zt,'red')

K = len(p_mix)
M = sum([p_mix[k]*M_mix[k] for k in range(K)])
# plot mean of the robo-1 traj
ax1.plot(t,M[:,0,1],'black')
ax2.plot(t,M[:,1,1],'black')
ax3.plot(t,M[:,2,1],'black')
# Sample trajectories for robot-2
for j in range(2):
    Xc = t_tools.anormal_mix_sampling(M_mix,Cov_mix, p_mix)
    #Xc = t_tools.anormal_sampling(M,Cov)
    xt = Xc[:,0,1]
    yt = Xc[:,1,1]
    zt = Xc[:,2,1]    
#     ax1.plot(t,xt,'grey')
#     ax2.plot(t,yt,'grey')
#     ax3.plot(t,zt,'grey')
#plot mean of the robo-1 traj
ax1.plot(t,M[:,0,1],'black')
ax2.plot(t,M[:,1,1],'black')
ax3.plot(t,M[:,2,1],'black')
ax1.title.set_text("Trajectory Sampling. Colored traj: Demo, Grey Traj: Sample")
ax3.set_ylabel("z")
ax2.set_ylabel("y")
ax1.set_ylabel("x")
ax1.set_xlabel("t")


FigureCanvasNbAgg()

Text(0.5, 0, 't')

In [11]:
def get_xyz(X,k):
    x = X[:,0,k-1]
    y = X[:,1,k-1]
    z = X[:,2,k-1]
    return (x,y,z)

In [12]:
# Test conditioning: Robot-2 Trajectories given robot-1 trajectory

fig = plt.figure()
ax1 = fig.add_subplot(3, 1, 1)
ax2 = fig.add_subplot(3, 1, 2)
ax3 = fig.add_subplot(3, 1, 3)

color_d = ["red", "green"]
color_t = ["green", "green", "blue", "blue","cyan", "red"]
t = np.arange(258)
# Robot trajectory from the data (perturbed)
for k  in range(1):  
    X = tensor_data[3]
    (xt10,yt10,zt10) = get_xyz(X,1)
    (xt20,yt20,zt20) = get_xyz(X,2)
    # Plot undisturbed traj
    ax1.plot(t,xt10,color_t[k])
    ax2.plot(t,yt10,color_t[k])
    ax3.plot(t,zt10,color_t[k])
    ax1.plot(t,xt20,color_t[k+1])
    ax2.plot(t,yt20,color_t[k+1])
    ax3.plot(t,zt20,color_t[k+1])
    
    # Disturb the traj
    Y = 1.0*X
    Y[:,:,0] = Y[:,:,0] + 0.5*Y[:,:,0]
    (xt11,yt11,zt11) = get_xyz(Y,1)
    ax1.plot(t,xt11,color_t[k+2])
    ax2.plot(t,yt11,color_t[k+2])
    ax3.plot(t,zt11,color_t[k+2])
   
    
    #Robot 2 trajectory from the conditioning
    Ia = [0]
    Xa = Y[:,:,0] # Robot1 data
    Xa = Xa[:,:,np.newaxis]
    (M_o, Cov_o,_,_) = t_tools.anormal_condition(M,Cov,Ia,Xa,slice_=1) # Distribution of robo2
    
    
    # Sample trajectories of robot2
    for j in range(1):
        Xc = t_tools.anormal_sampling(M_o,Cov_o)
        Xc = M_  
        print(Xc.shape)
        xt_ = Xc[:,0,0]
        yt_ = Xc[:,1,0]
        zt_ = Xc[:,2,0]
        ax1.plot(t,xt_,color_t[k+3])
        ax2.plot(t,yt_,color_t[k+3])
        ax3.plot(t,zt_,color_t[k+3])
        
ax1.title.set_text("Trajectory Sampling. Colored traj: Demo, Grey Traj: Sample")
ax3.set_ylabel("z")
ax2.set_ylabel("y")
ax1.set_ylabel("x")
ax1.set_xlabel("t")

FigureCanvasNbAgg()

(258, 3, 1)


Text(0.5, 0, 't')

In [13]:
# Test conditioning: 

fig = plt.figure()
ax1 = fig.add_subplot(3, 1, 1)#x, robot-1 and 2
ax2 = fig.add_subplot(3, 1, 2)#y robot-1 and 2
ax3 = fig.add_subplot(3, 1, 3)#z


# Robot 1 trajectory from the data
X = tensor_data[4]

# Robot-1 and 2 trajectory 
t = np.arange(258) 
T0 = 50
t0 = np.arange(T0)
dt = 10+np.arange(40)

# Add noise/disturbance to the trajectory of robo-1 for a short duration dt
Xa_noisy = X[t0,:,:]
Xdt =  X[dt,:,0] # chunk where noise/disturbance occurs
Xa_noisy[dt,:,0] = Xa_noisy[dt,:,0]- 0.5*Xa_noisy[dt,:,0]
#Xa_noisy[dt,:,1] = Xa_noisy[dt,:,1]+ 0.0*Xa_noisy[dt,:,1]
#Predict the future trajectory

#Without Noise:
Ia = np.arange(50)
Xa = X[Ia,:,:] #Initial traj of the robos
(M_, Cov_,_,_) = t_tools.anormal_condition(M,Cov,list(Ia),Xa,slice_=3) #distrn of future traj given past traj
Xb = t_tools.anormal_sampling(M_,Cov_) #sample future traj
Xab = X.copy()
a = 50 + np.arange(208)
Xab[a,:,:] = Xb

#With Noise:
(M_noisy, Cov_noisy,_,_) = t_tools.anormal_condition(M,Cov,list(Ia),Xa_noisy,slice_=3)
Xb_noisy = M_noisy #t_tools.anormal_sampling(M_noisy,Cov_noisy)
Xab_noisy = X.copy()
Xab_noisy[t0,:,:] = Xa_noisy
Xab_noisy[a,:,:] = Xb_noisy


# Collect and Plot the estimation 
(x1,y1,z1) = get_xyz(Xab,1)
(x2,y2,z2) = get_xyz(Xab,2)
(x1_n,y1_n,z1_n) = get_xyz(Xab_noisy,1)
(x2_n,y2_n,z2_n) = get_xyz(Xab_noisy,2)

ax1.plot(t,x1,'blue') #robot-1 x without noise
ax2.plot(t,y1,'blue')#robot-1 y
ax3.plot(t,z1,'blue')# robot-1 z

ax1.plot(t,x2,'green') # robot-2 x without noise
ax2.plot(t,y2,'green')
ax3.plot(t,z2,'green')

ax1.plot(t,x1_n,'red') # robot-1 with noise
ax2.plot(t,y1_n,'red')
ax3.plot(t,z1_n,'red')


ax1.plot(t,x2_n,'orange')
ax2.plot(t,y2_n,'orange')
ax3.plot(t,z2_n,'orange')

plt.show()

print("The output covariance:\n")
print(M_-M_noisy, "\n \n")



FigureCanvasNbAgg()

The output covariance:

[[[-0.04213275  0.        ]
  [-0.00316266  0.        ]
  [-0.05387528  0.        ]]

 [[-0.01469078  0.        ]
  [-0.00110275  0.        ]
  [-0.01878515  0.        ]]

 [[-0.00512236  0.        ]
  [-0.00038451  0.        ]
  [-0.00654998  0.        ]]

 ...

 [[ 0.          0.        ]
  [ 0.          0.        ]
  [ 0.          0.        ]]

 [[ 0.          0.        ]
  [ 0.          0.        ]
  [ 0.          0.        ]]

 [[ 0.          0.        ]
  [ 0.          0.        ]
  [ 0.          0.        ]]] 
 

