In [None]:
%load_ext autoreload
%autoreload 1
%aimport communicate
from communicate import CableRobot, MotorState, ControllerState
import numpy as np
import matplotlib.pyplot as plt
import time
import scipy
import scipy.optimize

## Data Collection

In [None]:
def collect_data(duration=5):
    """Collects log data from the cable robot.  Returns tuple of Nx4 arrays for lengths and vels"""
    with CableRobot(print_raw=True, write_timeout=None, initial_msg='d10,1', port='/dev/tty.usbmodem103568503') as robot:
        tstart = time.time()
        while True:
            robot.update()
            time.sleep(0.01)
            if (time.time() - tstart > duration):
                break
        robot.send('d10,100')
    motor_ls = np.array([[m.length for m in datum['motors']] for datum in robot.all_data])
    motor_ldots = np.array([[m.lengthdot for m in datum['motors']] for datum in robot.all_data])
    xy = np.array([[dat['controller'].cur_x, dat['controller'].cur_y] for dat in robot.all_data])
    return motor_ls, motor_ldots, xy

def collect_points(N):
    motor_ls = []
    motor_ldots = []
    xy = []
    for i in range(N):
        input(f'Press enter to record point {i}')
        a, b, c = collect_data(0.5)
        motor_ls.append(a[-1])
        motor_ldots.append(b[-1])
        xy.append(c[-1])
        print(f'point {i}: ', motor_ls[-1], motor_ldots[-1], xy[-1])
    return np.array(motor_ls), np.array(motor_ldots), np.array(xy)
    # with CableRobot(print_raw=True, write_timeout=None, initial_msg='d10,1', port='/dev/tty.usbmodem103568503') as robot:
    #     for i in range(N):
    #         input(f'Press enter to record point {i}')
    #         robot.update()
    #         print(robot.all_data[-1]['controller'].cur_x)
    #         for _ in range(10):
    #             time.sleep(0.1)
    #             robot.update()
    #             print(robot.all_data[-1]['controller'].cur_x)
    #         motor_ls.append([m.length for m in robot.all_data[-1]['motors']])
    #         motor_ldots.append([m.lengthdot for m in robot.all_data[-1]['motors']])
    #         xy.append([robot.all_data[-1]['controller'].cur_x, robot.all_data[-1]['controller'].cur_y])
    #         print(motor_ls, motor_ldots, xy)
    #     robot.send('d10,100')
    # return np.array(motor_ls), np.array(motor_ldots), np.array(xy)

In [None]:
# Test connection
ls, ldots, xys = collect_data(duration=0.1)
print(ls.shape, ldots.shape, xys.shape)

# # Test "collect_points"
# ls, ldots, xys = collect_points(3)
# print(ls)
# print(ldots)
# print(xys)

In [None]:
if True:
    # Collect some data
    time.sleep(2)
    ls, ldots, xy = collect_data(duration=180)
    print(f'Collected {len(ls)} samples')
    print(ls[-1])
    # Save/Load data (in case of ipynb failure)
    np.savez('/tmp/data.npz', ls=ls, ldots=ldots, est=xy)
    np.savez(f'/tmp/data_{time.time()}.npz', ls=ls, ldots=ldots, est=xy)
else:
    with np.load('/tmp/data.npz') as f:
        ls = f['ls']
        ldots = f['ldots']
        xy = f['est']

In [None]:
# Collect 4 corners
USE_CORNERS = False
if USE_CORNERS:
    if False:
        # Collect some data
        ls4, ldots4, xy4 = collect_points(4)
        print(ls4)
        print(ldots4)
        print(xy4)
        # Save/Load data (in case of ipynb failure)
        np.savez('/tmp/4pts_data.npz', ls4=ls4, ldots4=ldots4, est4=xy4)
        np.savez(f'/tmp/4pts_data_{time.time()}.npz', ls4=ls4, ldots4=ldots4, est4=xy4)
    else:
        with np.load('/tmp/4pts_data.npz') as f:
            ls4 = f['ls4']
            ldots4 = f['ldots4']
            xy4 = f['est4']

    print(ls4)
    print(ldots4)
    print(xy4)

PULLEY_O = .037
X1, X2 = 1.887 - PULLEY_O, 4.022 - PULLEY_O
print(X2 - X1, X2 + X1)
Y1 = 5.417 - 4.614
Y2 = Y1 + 1.822
BRUSH_R = (3.1e-2 + 0.8e-2) / 2 # 3.1cm brush diameter, 0.8cm fuzz factor
baseTbrush = np.array([21.0e-2, 38.5e-2])
baseTcdprrest = -np.array([0.5334, 0.381]) / 2
XY4 = np.array([[X2 - BRUSH_R, Y1 + BRUSH_R],
                [X2 - BRUSH_R, Y2 - BRUSH_R],
                [X1 + BRUSH_R, Y2 - BRUSH_R],
                [X1 + BRUSH_R, Y1 + BRUSH_R]]) - baseTbrush
print('Corner Locs: (using bottom-left coordinate system)')
print(XY4)

print('Expected Corner Locs: (using carriage-center coordinate system)')
print(XY4 - baseTcdprrest)
print('60mm buffer limits: (using carriage-center coordinate system)')
XY60 = np.array([[X2 - 0.06, Y1 + 0.06],
                 [X2 - 0.06, Y2 - 0.06],
                 [X1 + 0.06, Y2 - 0.06],
                 [X1 + 0.06, Y1 + 0.06]]) - baseTbrush - baseTcdprrest
print(f'xLl{XY60[2][0]};xLr{XY60[0][0]};xLd{XY60[0][1]};xLu{XY60[1][1]}')

In [None]:
# Plot data as a sanity check
fig, axes = plt.subplots(1, 3, sharex=False, figsize=(12, 4))
axes[0].plot(ls)
axes[1].plot(ldots);
axes[2].plot(*xy.T)
if USE_CORNERS:
    axes[2].plot(*xy4.T - baseTcdprest[:, None], 'r*')
    axes[2].plot(*XY4.T, 'k*')
axes[2].axis('equal')
axes[0].set_title('Cable Lengths (m)')
axes[1].set_title('Cable Velocities (m/s)')

## Calibration

In [None]:
# Constants
W_tot = 5.833
# H_tot = 3.563
H_tot = 5.358 - 0.06
W_carriage = 0.5461  # 21.5" = 0.5461m
H_carriage =  0.38735 # 15.25" = 0.38735m, note one side is 15" the other is 15.5"
W = W_tot - W_carriage
H = H_tot - H_carriage
INIT_XS = lambda ls: np.ones(ls.shape[0] * 2) * 1.5
# INIT_LPARAMS = lambda ls: np.array([0,0,0,0,1,1,1,1,*(-ls.mean(axis=0) + 1.5)])
INIT_LPARAMS = lambda ls: np.array([0,0,0,0,0.5,0.5,0.5,0.5,*(-ls.mean(axis=0) + 1.5)])
INIT_MOUNTPOINTS = np.array([[W, 0], [W, H], [0, H], [0, 0]]).T.flatten()

# Helper functions
def l_corr(ls, params):
    params = params.reshape(-1, 4)
    # return params[0] * np.square(ls) + (1 + 0.05 * np.tanh(params[1])) * ls + params[2]
    return params[0] * np.square(ls) + ls * params[1] + params[2]
    return np.sqrt(np.square(ls*params[1] + params[2]) - params[0])
def ik(x, mountPoints):
    mountPoints = mountPoints.reshape(1, 2, 4)
    mountPoints[0, :, 3] = [0, 0]
    # mountPoints[0, :, 0] = [W, 0]
    mountPoints[0, 1, 0] = 0
    mountPoints[0, 0, 2] = 0
    mountPoints = INIT_MOUNTPOINTS.reshape(1,2,4)
    return np.sqrt(np.sum(np.square(x.reshape(-1, 2, 1) - mountPoints), axis=1))
def err(ls, params):
    N = ls.shape[0]
    mountPoints = params[:8]
    lparams = params[8:20]
    xs = params[20:20 + 2 * N].reshape(-1, 2)
    return (l_corr(ls, lparams) - ik(xs, mountPoints)).flatten()

# Actual Calibrate Function Call
def calibrate(ls, init_params = None):
    if init_params is None:
        init_params = np.concatenate((INIT_MOUNTPOINTS, INIT_LPARAMS(ls), INIT_XS(ls)))
    return scipy.optimize.least_squares(lambda params: err(ls, params),
                                        init_params,
                                        verbose=2,
                                        method='lm')

# Do the calibration
# for every in (1000, 500, 100):
for every in (1000, 500, 200):
    INIT_XS = lambda ls: xy[::every, :].flatten()
    result = calibrate(ls[::every])
    print(result.success, result.message)
    INIT_MOUNTPOINTS = result.x[:8]  # hack to set initialization
    INIT_LPARAMS = lambda ls: result.x[8:20]
# result2 = calibrate(ls[::100])


In [None]:
if USE_CORNERS:
    # Calibrate with corners
    def err_corners(params):
        mountPoints = params[:8]
        lparams = params[8:20]
        return (l_corr(ls4, lparams) - ik(XY4, mountPoints)).flatten()
    rms = lambda x: np.sqrt(np.mean(np.square(x)))
    print(rms(err(ls[::every], result.x)), rms(err_corners(result.x)))


    # Actual Calibrate Corners Function Call
    def calibrate_corners(ls, init_params, WEIGHT = 0.4):
        mult = WEIGHT * ls[::every].shape[0] / 4
        return scipy.optimize.least_squares(lambda params: np.hstack((err(ls, params), err_corners(params) * mult)),
                                            init_params,
                                            verbose=2,
                                            method='lm')
    result2 = calibrate_corners(ls[::every], result.x, WEIGHT=5)
    assert result2.success, 'corner calibration failed with ' + result2.message
    print(result2.x)

    # FK sanity check on 4 corners
    def fk_corners(params):
        def fk_obj(xs):
            mountPoints = params[:8]
            lparams = params[8:20]
            return (l_corr(ls4, lparams) - ik(xs, mountPoints)).flatten()
        result = scipy.optimize.least_squares(fk_obj, XY4.flatten(), verbose=2, method='lm')
        assert result.success, 'fk failed with ' + result.message
        return result.x.reshape(-1, 2)
    predicted_x = fk_corners(result2.x)
    plt.plot(*predicted_x.T, 'r*')
    plt.plot(*XY4.T, 'k*')
    plt.axis('equal')

    with np.printoptions(precision=5, suppress=True):
        print('before corner opt:')
        print(rms(err(ls[::every], result.x)), rms(err_corners(result.x)))
        print(result.x[8:20].reshape(-1, 4))
        print('after corner opt:')
        print(rms(err(ls[::every], result2.x)), rms(err_corners(result2.x)))
        print(result2.x[8:20].reshape(-1, 4))

In [None]:
# Extract parameters and plot
if USE_CORNERS:
    mountPoints = result2.x[:8].reshape(2, 4)
    lparams = result2.x[8:20].reshape(-1, 4)
    xs = result2.x[20:].reshape(-1, 2)
else:
    mountPoints = result.x[:8].reshape(2, 4)
    lparams = result.x[8:20].reshape(-1, 4)
    xs = result.x[20:].reshape(-1, 2)
print('mount points:')
print(mountPoints)
print('lparams:')
print(lparams)

import matplotlib as mpl
mpl.rcParams['axes.titlesize'] = 22
mpl.rcParams['axes.labelsize'] = 18

plt.figure(figsize=(16,4))
plt.subplot(131)
plt.plot(ls, ':')
plt.plot(l_corr(ls, lparams), '-')
plt.title('Cable Lengths')
plt.legend([f'cable {i} uncalibrated' for i in range(4)] + [f'cable {i} calibrated' for i in range(4)])
plt.xlabel('Data Sample \# (roughly 2ms / sample)')
plt.ylabel('Cable Length (m)')
plt.subplot(132)
plt.plot(xs[:, 0], xs[:, 1])
plt.title('Estimated Trajectory')
plt.xlabel('x (m)')
plt.ylabel('y (m)')
plt.axis('equal');

# Plot the correction functions
LONGEST_M = np.sqrt(mountPoints2[0][0]**2 + mountPoints2[1][2]**2)
def find_longest_uncorrected():
    f_0 = lambda ls: np.sum(np.square(l_corr(ls, lparams) - 0))
    f_l = lambda ls: np.sum(np.square(l_corr(ls, lparams) - LONGEST_M))
    result0 = scipy.optimize.minimize(f_0, [0, 0, 0, 0])
    resultl = scipy.optimize.minimize(f_l, [0, 0, 0, 0])
    assert result0.success and resultl.success
    return result0.x, resultl.x
lmin, lmax = find_longest_uncorrected()
uncorrected_lengths = np.linspace(lmin, lmax, 100)
plt.subplot(133)
plt.plot(uncorrected_lengths, l_corr(uncorrected_lengths, lparams), '-')
plt.xlabel('Uncorrected Length (m?)')
plt.ylabel('Corrected Length (m)')
plt.title('Cable Length Correction Functions')
plt.legend([f'cable {i}' for i in range(4)] )

In [None]:
# Output in a format that can be sent directly to the cable robot
# CARRIAGE_WIDTH, CARRIAGE_HEIGHT = 0.22377, 0.22377
mountPoints2 = mountPoints * 1
# mountPoints2[0, 0:2] += CARRIAGE_WIDTH
# mountPoints2[1, 1:3] += CARRIAGE_HEIGHT
print(mountPoints2) # sanity check
# lparams2 = np.vstack((lparams[0], np.ones(4), lparams[1]))
lparams2 = lparams
with np.printoptions(precision=5, suppress=True):
    print(lparams2) # sanity check
print('-'*40)
print('c54', *mountPoints2.T.flatten().tolist(), sep=',')
print('c44', *lparams2.T.flatten().tolist(), sep=',')
print('-'*40)
print('c54', *mountPoints2.T.flatten().tolist(), sep=',', end=';')
print('c44', *lparams2.T.flatten().tolist(), sep=',')

In [None]:
with CableRobot(print_raw=True, write_timeout=None, initial_msg='d10,100', port='/dev/tty.usbmodem103568503') as robot:
    robot.update()
with CableRobot(print_raw=True, write_timeout=None, initial_msg='d10,100', port='/dev/tty.usbmodem103568503', silent=False) as robot:
    robot.send('g6')
    s1 = 'c54,' + ','.join(map(str, mountPoints2.T.flatten()))
    s2 = 'c44,' + ','.join(map(str, lparams2.T.flatten()))
    robot.send(s1)
    robot.send(s2)
    for _ in range(50):
        robot.update()
        time.sleep(0.005)

In [None]:
# Compute an xy homography correction using the 4 corners

def compute_H(act, est):
    # act is 2x4, est is 2x4
    A = np.zeros((8, 9))
    for i in range(4):
        A[2*i, 0:3] = act[:, i]
        A[2*i, 6:9] = -est[0, i] * act[:, i]
        A[2*i+1, 3:6] = act[:, i]
        A[2*i+1, 6:9] = -est[1, i] * act[:, i]
    _, _, V = np.linalg.svd(A)
    H = V[-1, :].reshape(3, 3)
    return H

def make_homogeneous(x):
    return np.vstack((x, np.ones(x.shape[1])))

H = compute_H(make_homogeneous(XY4.T), make_homogeneous(predicted_x.T))
print(H)

# Sanity check
def apply_H(good):
    pred = H @ make_homogeneous(good.T)
    pred /= pred[-1, :]
    return pred[:-1, :].T
print(apply_H(XY4))
print(predicted_x)


print('60mm buffer limits:')
XY60_cdpr_coords = apply_H(XY60)
print(XY60_cdpr_coords)
xmin, xmax = min(XY60_cdpr_coords[:, 0]), max(XY60_cdpr_coords[:, 0])
ymin, ymax = min(XY60_cdpr_coords[:, 1]), max(XY60_cdpr_coords[:, 1])
print(f'xLl{xmin};xLr{xmax};xLd{ymin};xLu{ymax}')
print('H:')
print(H.tolist())
print('Expected Mural Translation:')
print(np.min(XY60, axis=0))
print('Expected Mural Dimensions:')
print(np.max(XY60, axis=0) - np.min(XY60, axis=0))
# print((np.max(XY60, axis=0) - np.min(XY60, axis=0)) + 0.12)

In [None]:
XY60

In [None]:
plt.plot(*XY60.T, 'r*')
plt.plot(*XY60_cdpr_coords.T, 'k*')

In [None]:
X, Y = 1355.851/1e3, 12.000/1e3
X, Y = (X * 0.94379391 + 1.9667), (Y * 0.93413831 + 0.6685)
print(X, Y)
apply_H(np.array([[X, Y]]).reshape(1, 2))