In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
from trajectory.segments import * 
from trajectory.planner import * 
from trajectory.plot import * 
from trajectory.trapmath import * 
import matplotlib.pyplot as plt
from IPython.display import display
import trajectory as tj
import pandas as pd
import numpy as np
from math import sqrt
from random import randint, random

pd.set_option('display.max_columns', None)


In [2]:
# Simple test of hex_area and hex_v_c
from trajectory.trapmath import trap_v_c, hex_v_c
import random

a_max = 50_000
v_max = 5_000


for i in range(1000):

    v_0 = randint(0,v_max)
    v_1 = randint(0,v_max)

    t = random.uniform(.2, .8)

    h, l = list(sorted([v_0,v_1]))
    
    v_c = randint( max(0,l-100) ,v_max)

    x = hex_area(t, v_0, v_c, v_1, a_max)

    v_c_2 = hex_v_c(x, t, v_0, v_1, v_max, a_max)

    assert(round(v_c) == round(v_c_2))



In [3]:
# Exhaustively test all combinations of velocities and velocity limits
# for init_parameters and update_boundary_velocities

from itertools import product 

a_max = 50_000
v_max = 5_000

step = 1_000
velocities = list(range(0, v_max+step, step))

vp = list(product(velocities, velocities, velocities, velocities))
keys = 'v_0 v_1 v_0_max v_1_max'.split()

v_args_a =  [dict(zip(keys, v)) for v in vp]
v_args_b =  v_args_a[-1:] + v_args_a[:-1]
v_args_c =  v_args_a[1:]  + v_args_a[:1]

records= []

def ip(args_a, args_b, args_c):
    """Make three adjacent joints"""
    a = init_parameters(randint(0,2000),**args_a, v_max=v_max, a_max = a_max) 
    b = init_parameters(randint(0,2000),**args_b, v_max=v_max, a_max = a_max)
    c = init_parameters(randint(0,2000),**args_c, v_max=v_max, a_max = a_max) 
    return a,b,c

for i,(args_a, args_b, args_c) in enumerate(zip(v_args_a, v_args_b, v_args_c)):
    
    g  =[]
    
    
    a,b,c = ip(args_a, args_b, args_c)
    update_boundary_velocities(a,b,c)
    g.append( (a,b,c) )
    
    a,b,c = ip(args_a, args_b, args_c)
    update_boundary_velocities(None,a,b)
    g.append((None,a,b) )
    
    a,b,c = ip(args_a, args_b, args_c)
    update_boundary_velocities(a,b, None)
    g.append( (a,b, None) )
    
    records.append(g)
    

for i,s in enumerate(records):
    for j, (a, b, c) in enumerate(s):
    
        assert a is None or a.v_1 == b.v_0, (i,j, a.v_1, b.v_0)
        assert c is None or b.v_1 == c.v_0, (i,j, b.v_1, c.v_0)

        def _f(p):
            assert p is None or p.v_0 <= p.v_0_max, (i,j, p.v_0, p.v_0_max)
            assert p is None or p.v_1 <= p.v_1_max, (i,j, p.v_1, p.v_1_max)
            assert p is None or p.v_0 >= p.v_0_min, (i,j, p.v_0, p.v_0_min)
            assert p is None or p.v_1 >= p.v_1_min, (i,j, p.v_1, p.v_1_min)

        _f(a)
        _f(b)
        _f(c)
            

NameError: name 'init_parameters' is not defined

In [None]:
# Check that initial_parameters produces reasonable values for 
# many conditions.
from random import randint, random
from trajectory.trapmath import classify, min_time_parameters, hex_v_c, hex_area
import pandas as pd
from math import sqrt

segments = []
v_max = 5_000
a_max = 50_000

for _ in range(10_000):
    
    err = None
    x_recalc = None
    v_c = None 
    t = None 
    
    x_i = max(randint(-500,1000), 0)
    x_i = x_i if x_i > 100 else 0  # Minimum value of 100, or 0
    
    v_0 = min(max(randint(-3000, v_max+2000), 0),v_max)
    v_1 = min(max(randint(-3000, v_max+2000), 0),v_max)

    cls1 = classify(x_i, v_0, v_1, v_max, a_max)
    
    t = None
    cls2 = None
    x_recalc = None
    p = None
 
    p = min_time_parameters(x_i, v_0, v_1,  v_max, a_max) 

    assert p.v_1 is not None
    assert p.v_0 is not None
    assert p.t is not None, (x_i, v_0, v_1,  v_max, a_max) 
    
    v_c = hex_v_c(x_i, p.t, p.v_0, p.v_1, v_max, a_max)    
    x_recalc = hex_area(p.t, p.v_0, v_c, p.v_1, a_max)
    cls2 = classify(p.x, p.v_0, p.v_1, v_max, a_max).name

    segments.append({
        'x_i': x_i,
        'x_recalc': x_recalc,
        't': t,
        'v_0_i': v_0,
        'v_0_p': p.v_0 if p else None,
        'v_c_p': p.v_c if p else None,
        'v_c': v_c,
        'v_1_i': v_1,
        'v_1': p.v_1 if p else None,
        'v_max': v_max,
        'a_max': a_max,
        'class1': cls1.name,
        'class2': cls2,
        'err': err
    })
    
segments = pd.DataFrame(segments)

for c in ('x_i','x_recalc','v_0_i','v_0_p','v_c','v_c_p','v_1','v_1_i','v_max'):
    pass
    try:
        segments[c] = segments[c].fillna(-1).round(0).astype('Int64')
    except Exception as e:
        print(c, e)
 

# RMS error from x_i to x_p
segments['err_x'] = (segments.x_i-segments.x_recalc).abs()
print(f"RMS error for x_calc-x_recalc: {sqrt((segments[segments.err.isnull()].err_x**2).mean())}")

x_errors = segments[(segments.err_x > 10) & (segments.err.isnull())]
ex_errors = segments[(segments.err_x > 10) & (~segments.err.isnull())] # x_errors and exceptions

print(f"{len(x_errors)} x errors, {len(ex_errors)} exception errors")


display(segments)

assert len(segments[(segments.v_c_p == -1) & (segments.x_i != 0)]) == 0 # v_c_p can == -1 b/c of fill(-1) above
assert len(x_errors) == 0
assert len(ex_errors) == 0


In [None]:
from operator import attrgetter
from copy import copy
df = pd.read_csv('limits.csv', header=[0,1])

# keys = 'v_0 v_1 v_0_max v_1_max'.split()
common = dict(v_max=v_max, a_max = a_max)

ag = attrgetter(*'v_0 v_1 updated_v_0 updated_v_1'.split())

def mtp(r, axis):
    a= r.loc[axis]
    return min_time_parameters(a.x*500, v_0=a['0']*1000, v_1=a['1']*1000, **common)

def update(r,axis,p):
    
    r[(axis,'uv0')] = int(p.updated_v_0)
    r[(axis,'uv1')] = int(p.updated_v_1)
    r[(axis,'v0')] = int(p.v_0)
    r[(axis,'v1')] = int(p.v_1)
    r[(axis,'mv0')] = int(p.v_0_max)
    r[(axis,'mv1')] = int(p.v_1_max)

 
rows = []
for idx, r in df.iterrows():

    a,b,c = mtp(r, 'a'),mtp(r, 'b'),mtp(r, 'c')
    
    update_boundary_velocities(a,b,c)
    #assert     all([j.updated_v_0 for j in (a,b,c)])
    #assert not all([j.updated_v_1 for j in (a,b,c)])

    update(r, 'a',a); update(r, 'b',b); update(r, 'c',c)
    rows.append(r)

    
pd.DataFrame(rows)

In [None]:
# Test the error of the min_time_parameters() function with a lot of random records

from trajectory.planner import accel_tx,  min_time_parameters, ParameterAdjustmentError
from tqdm import tqdm 

from random import randint, random


adjustments = 0
rows = []
for i in tqdm(range(10000)):

    v_max = 5_000 #randint(300,10_000)
    a_max = 50_000 # randint(v_max*15,v_max*25)
    
    x = randint(10,v_max)
    v_0 = randint(0,v_max)
    v_1 = randint(0,v_max)
    
    args = (x, v_0, v_max, v_1, v_max, a_max)
    d = { k:v for k,v in zip('x_in v_0_in v_c_in v_1_in v_max a_max'.split(),args)}
        
    r = min_time_parameters(x, v_0, v_1, v_max, a_max)

    d.update(r.asdict())
    d['exc'] = ''
        
    rows.append(d)
        
df = pd.DataFrame(rows)
df['x'] = df.x.round().astype(int)
# df['x_r'] = (df.x_a + df.x_c + df.x_d).round().astype(int) # This appears to be always true
df['eq'] = df.x_in == df.x
df['err'] =  ((df.x_in - df.x).abs()/df.x_in) 

assert df.err.mean()< 0.01 , df.err.mean()
assert len(df[df.err > 1]) == 0

v_c = [ hex_v_c(r.x, r.t, r.v_0, r.v_1, r.v_max, r.a_max)for idx, r in df.iterrows()]
df['v_c_pa'] = v_c
df['v_c_err'] =  (df.v_c-df.v_c_pa).abs()   
df['ha'] =  [ hex_area(r.t, r.v_0, r.v_c_pa, r.v_1, r.a_max)for idx, r in df.iterrows()]
df['ha_e'] = (df.x-df.ha).abs()

df

In [None]:
joint = Joint(5_000, 50_000)
N = 6
joints = [joint]*N


segments = []

def last_v_1(segments):
    
    if len(segments):
        return [j.v_1 for j in segments[-1] ]
    else:
        return [0]*len(joints)

for i in range(10):
    
    s = [ min_time_parameters(randint(0,j.v_max), v_1, 0, j.v_max, j.a_max) for j,v_1 in zip(joints,last_v_1(segments) ) ]
    
    # Calculate segment minimum time. 
    
    mt1 = max([e.t_min for e in s])
    
    # Re-calc the min-time parameters, for a fresh start
    mtp = [min_time_parameters(r.x, r.v_0, r.v_1, r.v_max, r.a_max) for r in s]
    mt2 = max(e.t_min for e in mtp) # Get our new, fresh min time, which should be the same as in mt1
    assert( round(mt1, 5) == round(mt2, 5) )
    
    # Calcluate the final v_c parameter now that we have the segment time
    v_c_6 = [ hex_v_c(r.x, mt2, r.v_0, r.v_1, r.v_max, r.a_max) for  r in  s ]
    
    # Make sure the areas still work
    x_6 =  [ hex_area(mt2, r.v_0, v_c, r.v_1, r.a_max) for v_c,r in  zip(v_c_6, s) ]
    
    assert all([ abs(x-r.x)<1 for x, r in zip(x_6, s) ]), [ (round(x),round(r.x)) for x, r in zip(x_6, s) ]
    
    segments.append(s)
    
    
    prior = segments[-2] if len(segments)>1 else None
    next_ = None
   
    clear_segment_flags(prior);clear_segment_flags(s);clear_segment_flags(next_)

    for i in range(3):
        r = update_segment_window(prior, s, next_)
        print(r)
    print('---')
    


In [None]:
## Test joint windows

j = Joint(5_000, 50_000)
joints = [j]*3

sl = SegmentList(joints)   
s = sl.rmove([1000,1000,1000])
s = sl.rmove([1000,0,500])
s = sl.rmove([1000,1000,1000])

p,c,n = sl.get_window(-1)

assert all([e is not None for e in p]) 
assert all([e is not None for e in c]) 
assert all([e is     None for e in n]) # Next is empty on last

p,c,n = sl.get_window(-2)

assert all([e is not None for e in p]) 
assert all([e is not None for e in c]) 
assert all([e is not None for e in n]) 

p,c,n = sl.get_window(-3)

assert all([e is     None for e in p]) 
assert all([e is not None for e in c]) 
assert all([e is not None for e in n]) 

p,c,n


In [7]:
from operator import attrgetter
j = Joint(5_000, 50_000)
joints = [j]*3

sl = SegmentList(joints)   
s = sl.rmove([1000,1000,1000])
s = sl.rmove([0,   500, 1000])
s = sl.rmove([1000,1000,1000])


def _f(index):

    p,c,n = sl.get_window(index)

    update_segment(p)
    update_segment(c)
    update_segment(n)

    min_idx = -1
    for e in zip(p,c,n):
        idx, u = update_boundary_velocities(*e)
        print(' ',u)
        min_idx = min(min_idx, idx)

        
    index += idx
    
    return index
ag_ha = attrgetter(*'t v_0 v_c v_1 a_max'.split())
ag = attrgetter(*'x t v_0 v_c v_1 v_0_max v_1_max'.split())
index = _f(-1)
print(ag(sl[1][2].p), hex_area(*ag_ha(sl[1][2].p)))
index = _f(-1)
print(ag(sl[1][2].p), hex_area(*ag_ha(sl[1][2].p)))
sl.plot(); plt.show()

  [(False, False), (False, False), (None, None)]
Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "/Users/eric/opt/anaconda3/envs/robot/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3398, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/var/folders/kp/jn04x74j43j9v0t6cl0574h00000gn/T/ipykernel_68960/3999647735.py", line 31, in <cell line: 31>
    index = _f(-1)
  File "/var/folders/kp/jn04x74j43j9v0t6cl0574h00000gn/T/ipykernel_68960/3999647735.py", line 21, in _f
    idx, u = update_boundary_velocities(*e)
  File "/Users/eric/Documents/proj/trajectory/trajectory/trapmath.py", line 561, in update_boundary_velocities
    return mark_change(l, m)
  File "/Users/eric/Documents/proj/trajectory/trajectory/trapmath.py", line 456, in mark_change
    assert_unchanged_area(l, memo)
  File "/Users/eric/Documents/proj/trajectory/trajectory/trapmath.py", line 449, in assert_unchanged_area
    assert_equal_area(a,b)
  File "/Users/eric/Documents/proj/trajectory/trajectory/trapmath.py", line 440, in assert_e

In [None]:
index = -1
for i in range(5):
    index = _f(index)
    print (index)
    sl.plot(); plt.show()
    print(sl)
    if index == 0:
        break
    
    
display(sl.dataframe)

In [None]:
# Test random segments, without segment-to-segment interactions


joints = [, Joint(5_000, 50_000), Joint(5_000, 50_000)]
sl = SegmentList(joints)   

inputs = []
for i in range(N):
    moves = [randint(-2000,2000) for _ in sl.joints] 

    limits = []
    for j in sl.joints:
        v_0_max = randint(0,j.v_max)
        v_0 = randint(0,j.v_max)
        v_1_max = randint(0,j.v_max)
        limits.append( (v_0_max, v_0 , v_1_max) )
        
    inputs.append( (moves, limits) )
      

In [None]:
from trajectory.exceptions import ValidationError, ConvergenceError      
frames = []

def test_sl(moves, limits):
   
    sl = SegmentList([Joint(5_000, 50_000), Joint(5_000, 50_000), Joint(5_000, 50_000)])   
    s = sl.rmove(moves)  
    
    for js, (v_0_max, v_0, v_1_max) in zip(s,limits):
        js.v_0_max = v_0_max
        js.v_0 = v_0
        js.v_1_max = v_1_max
        
    s.update()
    
    sl.validate()
    
    return s


for i, (moves, limits) in enumerate(inputs):

    try:
        s = test_sl(moves, limits)
        
        v_0_in = [e[1] for e in limits]
        v_0_out = [js.v_0 for js in s]
        
        frames.append(s.params)
    except Exception as e:
        print('E ', type(e), i, e)

df = pd.concat(frames)
df[df.err_x > 1].sort_values('err_x', ascending=False)

In [None]:
from trajectory.planner import max_v0_for_x

def run_segment(moves, limits, update=True):
    from trajectory.exceptions import ValidationError, ConvergenceError
    sl = SegmentList([Joint(5_000, 50_000), Joint(5_000, 50_000)])   
    
    s = sl.rmove(moves, update = update)  

    for js, (v_0_max, v_0, v_1_max) in zip(s,limits):
        js.v_0_max = v_0_max
        js.v_0 = v_0
        js.v_1_max = v_1_max   
    
    return sl


sl = run_segment([250,3000], [(5000, 5000, 0), (5000, 5000, 5000)], False)
sl.update()

plot_segment_list(sl.dataframe)
sl.dataframe

In [None]:
sl = run_segment([500,3000], [(0, 0, 0), (5000, 5000, 0)])
sl.update()
plot_segment_list(sl.dataframe)
sl.dataframe


# Classification

In [None]:
from collections import defaultdict
from random import shuffle
from trajectory.planner import classify, kind_icon_map

a_max= 50_000
v_max = 5_000


# Make new classification tests
dd = defaultdict(set)
for _ in range(10_000):
    x = max(randint(-500,1000), 0)
    v_0 = min(max(randint(-3000, v_max+2000), 0), v_max)
    v_1 = min(max(randint(-3000, v_max+2000), 0), v_max)
    args = (x, v_0, v_1, v_max, a_max)
    r = classify(*args)
    
    dd[r].add(args)

# Make the class_test list
l = []
for k, s in dd.items():
    s = list(s)
    shuffle(s)
    for v in s[:10]:
        l.append((v,k.name))
  
# Create a map from each classification to a set
# of values that has the (min, max) values for each parameter that produce that
# classification
min_max = {}
rows  = []
arg_names = 'x v_0 v_1 v_max, a_max'.split()
for k, s in dd.items():
    z = list(zip(*list(s)))
    z_min = [min(e) for e in z]
    z_max = [max(e) for e in z]
    
    min_max[k] =  list(zip(z_min, z_max))[:3]
    
    d = { 'name': k.name }
    for arg, mn, mx in list(zip(arg_names, z_min, z_max))[:3]:
        d[arg+'_min'] = mn
        d[arg+'_max'] = mx
    
    rows.append(d)
    
min_max_df = pd.DataFrame(rows)
min_max_df


In [None]:
#  (x, v_0, v_1, v_max, a_max) 
class_test = [
 ((991, 3270, 2029, 5000, 50000), 'PENTAGON'),
 ((491, 4113, 5000, 5000, 50000), 'PENTAGON'),
 ((646, 3407, 4052, 5000, 50000), 'PENTAGON'),
 ((985, 1309, 4067, 5000, 50000), 'PENTAGON'),
 ((944, 4496, 1129, 5000, 50000), 'PENTAGON'),
 ((606, 2337, 2051, 5000, 50000), 'PENTAGON'),
 ((514, 1783, 2992, 5000, 50000), 'PENTAGON'),
 ((560, 3763, 556, 5000, 50000), 'PENTAGON'),
    
 ((4000, 0, 0, 5000, 50000), 'TRAPZEZOID'),
 ((546, 0, 0, 5000, 50000), 'TRAPZEZOID'),
    
 ((173, 3548, 3826, 5000, 50000), 'TROUGH'),
 ((188, 4335, 425, 5000, 50000), 'TROUGH'),
 ((30, 1732, 1732, 5000, 50000), 'TROUGH'),
 ((25, 1581, 1581, 5000, 50000), 'TROUGH'),
 ((7, 405, 836, 5000, 50000), 'TROUGH'),
 ((13, 765, 1140, 5000, 50000), 'TROUGH'),
 ((1, 316, 316, 5000, 50000), 'TROUGH'),
 ((10, 1000, 1000, 5000, 50000), 'TROUGH'),
    
 ((289, 0, 0, 5000, 50000), 'TRIANGLE'),
 ((369, 0, 0, 5000, 50000), 'TRIANGLE'),
 ((71, 0, 0, 5000, 50000), 'TRIANGLE'),
 ((252, 0, 0, 5000, 50000), 'TRIANGLE'),
 ((305, 0, 0, 5000, 50000), 'TRIANGLE'),
 ((133, 0, 0, 5000, 50000), 'TRIANGLE'),
 ((390, 0, 0, 5000, 50000), 'TRIANGLE'),
 ((434, 0, 0, 5000, 50000), 'TRIANGLE'),
 ((231, 0, 0, 5000, 50000), 'TRIANGLE'),
 ((281, 0, 0, 5000, 50000), 'TRIANGLE'),
    
 ((629, 0, 3151, 5000, 50000), 'ACEL'),
 ((918, 0, 4129, 5000, 50000), 'ACEL'),
 ((299, 0, 1317, 5000, 50000), 'ACEL'),
 ((576, 0, 570, 5000, 50000), 'ACEL'),
 ((784, 0, 2494, 5000, 50000), 'ACEL'),
 ((393, 0, 188, 5000, 50000), 'ACEL'),
 ((618, 0, 4152, 5000, 50000), 'ACEL'),
 ((862, 0, 1289, 5000, 50000), 'ACEL'),
 ((71, 0, 1392, 5000, 50000), 'ACEL'),
 ((123, 0, 3507, 5000, 50000), 'ACEL'),
    
 ((545, 441, 0, 5000, 50000), 'CLIFF'),
 ((987, 3106, 0, 5000, 50000), 'CLIFF'),
 ((836, 2502, 0, 5000, 50000), 'CLIFF'),
 ((418, 558, 0, 5000, 50000), 'CLIFF'),
 ((559, 904, 0, 5000, 50000), 'CLIFF'),
 ((202, 1662, 0, 5000, 50000), 'CLIFF'),
 ((270, 5000, 0, 5000, 50000), 'CLIFF'),
 ((257, 1856, 0, 5000, 50000), 'CLIFF'),
 ((381, 3522, 0, 5000, 50000), 'CLIFF'),
 ((869, 2426, 0, 5000, 50000), 'CLIFF'),
    
 ((115, 5000, 0, 5000, 50000), 'DECEL'),   
 ((25, 1785, 0, 5000, 50000), 'DECEL'),
 ((53, 5000, 0, 5000, 50000), 'DECEL'),
 ((35, 2898, 0, 5000, 50000), 'DECEL'),
 ((66, 4563, 0, 5000, 50000), 'DECEL'),
 ((100, 4458, 0, 5000, 50000), 'DECEL'),
 ((87, 5000, 0, 5000, 50000), 'DECEL'),
 ((42, 4511, 0, 5000, 50000), 'DECEL'),
    
 ((0, 0, 3438, 5000, 50000), 'ZERO'),
 ((0, 4255, 4443, 5000, 50000), 'ZERO'),
 ((0, 2960, 2762, 5000, 50000), 'ZERO'),
 ((0, 259, 116, 5000, 50000), 'ZERO'),
 ((0, 326, 1424, 5000, 50000), 'ZERO'),
 ((0, 3923, 0, 5000, 50000), 'ZERO'),
 ((0, 0, 3739, 5000, 50000), 'ZERO'),
 ((0, 4587, 1189, 5000, 50000), 'ZERO'),
 ((0, 2137, 5000, 5000, 50000), 'ZERO'),
 ((0, 2911, 0, 5000, 50000), 'ZERO')]

# These record produce a high x error when 
# recalculating the area
class_test_he = [
 ((193, 1018, 4588, 5000, 50000), 'TROUGH'), 
 ((233, 1029, 2812, 5000, 50000), 'TROUGH'), 
 ((218, 3779, 0, 5000, 50000), 'DECEL'),
 ((109, 2709, 0, 5000, 50000), 'DECEL')
]

In [None]:
output = []
for args, name in class_test:
    assert classify(*args).name == name,  (classify(*args).name, name)
    output.append( (args,  classify(*args).name) )


In [None]:
from trajectory.trapmath import *
    
rows = []
v_max = 5_000
a_max = 50_000
av = (v_max, a_max)

for _ in range(1_000):
    x = max(randint(-500,1000), 0)
    v_0 = min(max(randint(-3000, av[0]+2000), 0), av[0])
    v_1 = min(max(randint(-3000, av[0]+2000), 0), av[0])
    
    args = (x, v_0, v_1, v_max, a_max)
    
    err = None
    try:
        p = min_time_parameters(x, v_0, v_1, v_max, a_max)
    except AssertionError as e:
        p = Params(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, InputParams(x, v_0, v_1, a_max, 0))
        err = f"{type(e)} {e}"
        print(args)
        raise
    except TrapMathError as e:
        p = Params(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, InputParams(x, v_0, v_1, a_max, 0))
        err = f"{type(e)} {e}"
        
    d = p.asdict()
    del d['ip']
    d['err'] = err
    rows.append(d)
    
mtp = pd.DataFrame(rows)
mtp.head()