In [None]:
%load_ext autoreload
%autoreload 1
%aimport src.sln_stroke_fit
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.lines as mlines
import matplotlib.animation
from IPython.display import HTML
import gtsam
from gtsam.symbol_shorthand import P, X
from art_skills import SlnStrokeExpression
from src.sln_stroke_fit import SlnStrokeFit, OptimizationLoggingParams
import tqdm

import temp_crt

plt.rcParams['figure.figsize'] = [5, 3]
plt.rcParams['figure.dpi'] = 100
plt.rcParams['figure.constrained_layout.use'] = True


# py.offline.init_notebook_mode(connected=True)

In [None]:
# Utils
calc_v = lambda t, x, y: np.gradient(np.array([x, y]).T, axis=0) / np.gradient(t).reshape(-1, 1)
calc_speed = lambda v: np.sqrt(np.sum(np.square(v), axis=1))
def extract_params(fitter, values):
    return [fitter.query_parameters(values, k) for k in range(3)]
def extract_xy(fitter, values, t):
    return np.array([fitter.query_estimate_at(values, t_) for t_ in t])

In [None]:
# parse the letter 'D' (mocap data)
traj_D = temp_crt.parse_trajectory("D")
strokes = traj_D[0]
#print(strokes)
t = strokes[0]
x = strokes[1] - strokes[1][0]
y = strokes[2] - strokes[2][0]
velocity = calc_speed(calc_v(t, x, y))
stroke1 = strokes[:, 0:31]  # Y = -0.34639319
np.savetxt("stroke1.csv", np.transpose(stroke1), delimiter=",")
stroke2 = strokes[:, 31:57] # X = 0.74592703
np.savetxt("stroke2.csv", np.transpose(stroke2), delimiter=",")
stroke3 = strokes[:, 57::] # to end
np.savetxt("stroke3.csv", np.transpose(stroke3), delimiter=",")

In [None]:
strokes_centered = strokes * 1
# strokes_centered[1:, :] = strokes_centered[1:, :] - np.mean(strokes_centered[1:, :],
#                                                             axis=1).reshape(2, 1)
strokes_centered[1:, :] = strokes_centered[1:, :] - strokes_centered[1:, 0].reshape(2, 1)
stroke1 = strokes_centered[:, 0:31]
stroke2 = strokes_centered[:, 31:57]
stroke3 = strokes_centered[:, 57::]
stroke_datas = [stroke1.T, stroke2.T, stroke3.T]

In [None]:
# Fit stroke with logging the estimate for each iteration
dt = strokes[0, 1] - strokes[0, 0]
N_ITER = 50

# Fitter object
fitter = SlnStrokeFit(dt,
                      integration_noise_model=gtsam.noiseModel.Isotropic.Sigma(2, 0.01),
                      data_prior_noise_model=gtsam.noiseModel.Unit.Create(2))
# Optimization Parameters
params = fitter.create_params(verbosityLM='SILENT',
                              relativeErrorTol=0,
                              absoluteErrorTol=1e-10,
                              maxIterations=N_ITER)
# Initial values
initial_values = gtsam.Values()
if False:  # Use "good" initialization
    initial_values.insert(P(0), np.array([-0.4, 2.1, 1.23, 1.46, 0.8, -0.5]))
    initial_values.insert(P(1), np.array([0.1, 0.7, -0.25, -1.77, 0.4, -1.15]))
    initial_values.insert(P(2), np.array([0.3, 0.7, -1.77, -2.5, 0.4, -1.15]))
if True:  # Use generic initialization (horizontal line)
    for i in range(3):
        initial_values.insert(P(i), np.array([0.0, 1., 0., 0., 0.5, -0.5]))
if True:  # Initialize X0, X1, ... points using the initialization parameters (else initialize to 0)
    initial_values = fitter.create_initial_values_from_params(
        stroke1[1:, 0], initial_values, fitter.stroke_indices(stroke_datas))
    initial_values = fitter.create_initial_values_from_params(
        stroke1[1:, 0], initial_values, fitter.stroke_indices(stroke_datas))
# Solve
(sol, history), stroke_indices = fitter.fit_stroke(stroke_datas,
                                                   initial_values=initial_values,
                                                   params=params,
                                                   logging_params=OptimizationLoggingParams(
                                                       print_progress=True,
                                                       log_optimization_values=True,
                                                       progress_bar_class=tqdm.tqdm_notebook))
optim_history = [(extract_params(fitter, est), extract_xy(fitter, est, t)) for est in history]
print("DONE")

In [None]:
# animate
t = np.arange(0, strokes[0, -1] + dt, dt)

fig, ax = plt.subplots(figsize=(5, 4))
ax.plot(strokes_centered[1, :], strokes_centered[2, :], 'k.', label='Mocap data')
ls_param = [None] * 3
l1, = ax.plot([],[], 'm.', label='predicted positions')
ls_param[0], = ax.plot([], [], 'r-', label='stroke 1 params')
ls_param[1], = ax.plot([], [], 'g-', label='stroke 2 params')
ls_param[2], = ax.plot([],[], 'b-', label='stroke 3 params')
txt_iter = ax.text(0.61, 0.53, 'iter 0')
ax.axis('equal')
ax.set_xlim(-0.1, 0.8)
ax.set_ylim(-0.1, 0.6)
ax.set_title('Optimizing the letter D')
ax.set_xlabel('x (m)')
ax.set_ylabel('y (m)')
ax.legend(loc='lower right')

def animate(i):
    params, xy = optim_history[i]
    l1.set_data(xy[:, 0], xy[:, 1])
    xy_from_params = fitter.compute_trajectory_from_parameters(xy[0], params, stroke_indices)
    for l, (kstart, kend) in zip(ls_param, stroke_indices.values()):
        l.set_data(xy_from_params[kstart:kend, 0], xy_from_params[kstart:kend, 1])
    txt_iter.set_text('iter {:}'.format(i))

ani = matplotlib.animation.FuncAnimation(fig, animate, frames=len(optim_history))
# ani.save("./src/data/optimization_animation_bad_init.mp4", writer=matplotlib.animation.FFMpegWriter(fps=60))
HTML(ani.to_jshtml())

In [None]:
# Initialize 3 strokes, one stroke at a time
dt = strokes[0, 1] - strokes[0, 0]
fitter = SlnStrokeFit(dt,
                      integration_noise_model=gtsam.noiseModel.Isotropic.Sigma(2, 0.01),
                      data_prior_noise_model=gtsam.noiseModel.Unit.Create(2))

stroke_inits = {
    0: np.array([-0.4, 2.1, 1.23, 1.46, 0.8, -0.5]),
    1: np.array([0.1, 0.7, -0.25, -1.77, 0.4, -1.15]),
    2: np.array([0.3, 0.7, -1.77, -2.5, 0.4, -1.15])
}
stroke_inits = {i: np.array([0.0, 1., 0., 0., 0.5, -0.5]) for i in range(3)}

def create_init_values(strokes):
    initial_values = gtsam.Values()
    for i in range(len(strokes)):
        initial_values.insert(P(i), stroke_inits[i])
    initial_values = fitter.create_initial_values_from_params(stroke1[1:, 0], initial_values,
                                                              fitter.stroke_indices(strokes))
    return initial_values

params = fitter.create_params(verbosityLM='SILENT',
                              relativeErrorTol=0,
                              absoluteErrorTol=1e-10,
                              maxIterations=200)  # set number of iterations!

sols = [None] * 3
stroke_indices = [None] * 3
for i in range(3):
    data = stroke_datas[:i + 1]
    (sols[i], _), stroke_indices[i] = fitter.fit_stroke(
        data,
        initial_values=create_init_values(data),
        params=params,
        logging_params=OptimizationLoggingParams(
            progress_bar_description='Fitting Letter D with {:} stroke'.format(i + 1),
            progress_bar_class=tqdm.tqdm_notebook))


def sol2txyvs(sol: gtsam.Values, kstart:int, kend:int):
    t = np.arange(kstart, kend) * fitter.dt
    xy = np.array([fitter.query_estimate_at(sol, t_) for t_ in t])
    x, y = xy[:, 0], xy[:, 1]
    v = calc_v(t, x, y)
    return t, x, y, v, calc_speed(v)

gt1_t1, gt1_x1, gt1_y1, gt1_vel1, gt1_v1 = sol2txyvs(sols[0], *stroke_indices[0][0])
gt2_t1, gt2_x1, gt2_y1, gt2_vel1, gt2_v1 = sol2txyvs(sols[1], *stroke_indices[1][0])
gt2_t2, gt2_x2, gt2_y2, gt2_v2l1, gt2_v2 = sol2txyvs(sols[1], *stroke_indices[1][1])
gt3_t1, gt3_x1, gt3_y1, gt3_vel1, gt3_v1 = sol2txyvs(sols[2], *stroke_indices[2][0])
gt3_t2, gt3_x2, gt3_y2, gt3_v2l1, gt3_v2 = sol2txyvs(sols[2], *stroke_indices[2][1])
gt3_t3, gt3_x3, gt3_y3, gt3_v3l1, gt3_v3 = sol2txyvs(sols[2], *stroke_indices[2][2])

In [None]:
# # GTSAM test result (from csv)
# # Import csv file (gtsam result)
# def read_from_csv(fname):
#     columns = ["time", "x", "y"]
#     gt = pd.read_csv(fname, usecols=columns)
#     t, x, y = gt.time.tolist(), gt.x.tolist(), gt.y.tolist()
#     v = calc_v(t, x, y)
#     speed = calc_speed(v)
#     return t, x, y, v, speed
# gt1_t1, gt1_x1, gt1_y1, gt1_vel1, gt1_v1 = read_from_csv(
#     "../build/cpp/SLM/tests/gtsam1_stroke1.csv")
# gt2_t1, gt2_x1, gt2_y1, gt2_vel1, gt2_v1 = read_from_csv(
#     "../build/cpp/SLM/tests/gtsam2_stroke1.csv")
# gt2_t2, gt2_x2, gt2_y2, gt2_v2l1, gt2_v2 = read_from_csv(
#     "../build/cpp/SLM/tests/gtsam2_stroke2.csv")
# gt3_t1, gt3_x1, gt3_y1, gt3_vel1, gt3_v1 = read_from_csv(
#     "../build/cpp/SLM/tests/gtsam3_stroke1.csv")
# gt3_t2, gt3_x2, gt3_y2, gt3_v2l1, gt3_v2 = read_from_csv(
#     "../build/cpp/SLM/tests/gtsam3_stroke2.csv")
# gt3_t3, gt3_x3, gt3_y3, gt3_v3l1, gt3_v3 = read_from_csv(
#     "../build/cpp/SLM/tests/gtsam3_stroke3.csv")

In [None]:
# Plotting
blk_ln = mlines.Line2D([], [], color='black', marker='_',
                       markersize=15, label='measured')
green_pt = mlines.Line2D([], [], color='green', marker='_',
                         markersize=15, label='measured')
red_pt = mlines.Line2D([], [], color='red', marker='_',
                       markersize=15, label='predicted')
yellow_ln = mlines.Line2D([], [], color='yellow', marker='_',
                          markersize=15, label='error')

fig, axs = plt.subplots(1,3, figsize=(12,3), sharey=True)
axs[0].set_title('1-Stroke Trajectory, "D"')
axs[0].axis('equal')
axs[0].grid(visible=1)
axs[0].scatter(x, y, c='g', s=1)
axs[0].scatter(gt1_x1, gt1_y1, c='r')
axs[0].set_xlabel('$x (cm)$',labelpad=0)
axs[0].set_ylabel('$y (cm)$',labelpad=0)

axs[1].set_title('2-Stroke Trajectory, "D"')
axs[1].axis('equal')
axs[1].grid(visible=1)
axs[1].scatter(x, y, c='g')
axs[1].scatter(gt2_x1, gt2_y1, c='r')
axs[1].scatter(gt2_x2, gt2_y2, c='r')
plt.xlabel('$x (cm)$',labelpad=0)

axs[2].set_title('3-Stroke Trajectory, "D"')
axs[2].axis('equal')
axs[2].grid(visible=1)
axs[2].scatter(x, y, c='g')
axs[2].scatter(gt3_x1, gt3_y1, c='r')
axs[2].scatter(gt3_x2, gt3_y2, c='r')
axs[2].scatter(gt3_x3, gt3_y3, c='r')
plt.xlabel('$x (cm)$',labelpad=0)

plt.legend(handles=[green_pt, red_pt],
           bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)

In [None]:
fig, axs = plt.subplots(1,3, figsize=(12,3), sharey=True)
axs[0].set_title('1-Stroke Velocity, "D"')
axs[0].grid(visible=1)
axs[0].scatter(t, velocity, c='g', s=1)
axs[0].scatter(gt1_t1, gt1_v1, c='r')
axs[0].set_xlabel('$x (cm)$',labelpad=0)
axs[0].set_ylabel('$y (cm)$',labelpad=0)

axs[1].set_title('2-Stroke Velocity, "D"')
axs[1].grid(visible=1)
axs[1].scatter(t, velocity, c='g', s=1)
axs[1].scatter(gt2_t1, gt2_v1, c='r', s=1)
axs[1].scatter(gt2_t2, gt2_v2, c='r', s=1)
axs[1].set_xlabel('$x (cm)$',labelpad=0)

axs[2].set_title('3-Stroke Velocity, "D"')
axs[2].grid(visible=1)
axs[2].scatter(t, velocity, c='g')
axs[2].scatter(gt3_t1, gt3_v1, c='r')
axs[2].scatter(gt3_t2, gt3_v2, c='r')
axs[2].scatter(gt3_t3, gt3_v3, c='r')
axs[2].set_xlabel('$x (cm)$',labelpad=0)

plt.legend(handles=[green_pt, red_pt],
           bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)


In [None]:
# parse the letter 'D' (mocap data)
traj_C = temp_crt.parse_trajectory("A")
strokes = traj_C[0]
#print(strokes)
t = strokes[0]
x = strokes[1]
y = strokes[2]
# stroke1 = strokes[:, 0:31]  # Y = -0.34639319
#print(stroke1)
fig, axs = plt.subplots(1,3, figsize=(12,3), sharey=True)
axs[0].set_title('1-Stroke Trajectory, "C"')
axs[0].grid(visible=1)
axs[0].scatter(x, y, c='g')