In [1]:
shape = 'Angle'
dataset = 'lasa_dataset/{shape}.mat'.format(shape=shape)
matlab_export_file = shape + '.m'
deg_p = 4
deg_f = 4
alpha_p = 1e-8
alpha_f = 1e-8
make_zero_at_end = False
tau = 0.

In [None]:
import numpy as np
import pandas as pd
import learn_vectorfield_from_data
from importlib import reload

# plotting
import matplotlib.pyplot as plt
%matplotlib inline

# logger
import logging
logger = logging.getLogger()
logger.setLevel(logging.ERROR)

# matlab
import matlab.engine
try:
    eng = matlab.engine.connect_matlab('shared_davisson')
except matlab.engine.EngineError:
    print('Engine already connected or not able to connect')

In [None]:
# learn the optimal contracting vector field

tracking_error, opt_vf = learn_vectorfield_from_data.learn_and_output(dataset, 
                                             matlab_export_file, 
                                             deg_p, 
                                             deg_f, 
                                             alpha_p, 
                                             alpha_f, 
                                             tau,
                                             make_zero_at_end)

In [None]:
# load dataset

demo = eng.load(dataset)
for k, v in demo.items():
    print('Loading item', k)
    eng.workspace[k] = v

# run evaluation metrics
print('Running LfD evaluation')    
matlab_result = eng.eval("lfd_evaluation(@{shape}, demos, 0)"\
                         .format(shape=shape))
print('End evaluation')

In [None]:
# visualize the vector field
def py_to_mat_array(x):
    x = list(map(list, x))
    s = str(x).replace('],', '];')
    return s

from scipy.integrate import odeint
def integrate_trajectory(f, x_0, T):
    f_t = lambda x, t: f([x])[0]
    vf_path = odeint(f_t, x_0, T, )
    return vf_path

f  = lambda x: np.array(eng.eval('{f}({x})'\
                                 .format(f=shape, 
                                x=py_to_mat_array(x))))
demo_i = np.array(demo['demos'][0]['pos'])
x_0 = demo_i[:, 0]
T = np.array(demo['demos'][0]['t']).squeeze()
vf_path = integrate_trajectory(f, x_0, T)
plt.scatter(*demo_i, s=1, c='r')
plt.plot(*vf_path.T, c='g')
limits = list(plt.xlim()) + list(plt.ylim())
learn_vectorfield_from_data.visualize_vf(f, limits)

In [None]:
# How well does the dynamical system represent the demonstrations?
keys  = 'trajectory_error velocity_error distance_to_goal'.split()
# How long does it take to hit the target
keys += 'dtwd_at_30T duration_to_goal'.split()
pd.DataFrame({k: np.array(matlab_result[k])[0] for k in keys})          

In [None]:
grid_dtwd = np.array(matlab_result['grid_dtwd'])
columns = ['demo_{i}'.format(i=i) for i in range(grid_dtwd.shape[1])]
rows = ['x0_{i}'.format(i=i) for i in range(grid_dtwd.shape[0])]
print('grid_dtwd')
pd.DataFrame(grid_dtwd, columns=columns, index=rows).round(1)