For exploring discrete time dubins car risk estimation where uncertainty enters linearly in the dynamics

In [1]:
import sympy as sp
import numpy as np
from functools import reduce
from itertools import accumulate
import time
from sympy.utilities.autowrap import ufuncify
from inspect import signature
import itertools
from itertools import permutations
import operator

In [2]:
def compute_all_thetas(theta0, utheta):
    return theta0 + np.cumsum(utheta)

def compute_beta2_mono_moments(alpha, beta, monos):
    max_order_needed = max(max(monos))
    #assume iid beta moments
    beta2_moments = compute_beta2_moments(alpha,beta,max_order_needed)
    cross_moments = [reduce(lambda prev,ni: prev*beta2_moments[ni],[1] + list(tup)) for tup in monos]
    return cross_moments

def compute_beta_moments(alpha,beta,order):
    #Compute beta moments up to the given order
    #the returned list indices should match the moment orders
    #e.g. the return[i] should be the ith beta moment
    fs = map(lambda r: (alpha + r)/(alpha + beta + r), range(order))
    return [1] + list(accumulate(fs, lambda prev,n: prev*n))

def compute_beta2_moments(alpha, beta, n):
    beta_moments = compute_beta_moments(alpha, beta, n)
    return [beta_moments[i]*2**i for i in range(len(beta_moments))]

def chebyshev_bound(first_moment, second_moment):
    #bound the probability that p<=0
    if first_moment<=0:
        return None
    else:
        variance = second_moment - first_moment**2
        return variance/(variance + first_moment**2)

In [3]:
def non_repeating_permutations(seq):
    seen = set()
    for permutation in permutations(seq):
        hperm = hash(permutation)
        if hperm in seen:
            continue
        seen.add(hperm)
        yield permutation

In [4]:
class CarKinematicsNoisyAccel(object):
    def __init__(self, n_steps, dt):
        x0 = sp.symbols('x0', real = True, constant = True)
        y0 = sp.symbols('y0', real = True, constant = True)
        v0 = sp.symbols('v0', real = True, constant = True)
        uaws = sp.symbols('uaw0:' + str(n_steps), real = True) #accel command multiplied by some random variable
        
        #vs is a list of speeds at times 0, 1,..., n_steps
        vs = list(accumulate([0] + list(uaws), lambda cumsum, nextcontrol: cumsum + nextcontrol))
        vs = [v0 + dt*x for x in vs]
        
        thetas = sp.symbols('theta0:' + str(n_steps), real = True)
        
        #cos(theta_0) to cos(theta_{n_steps-1})
        costhetas = [sp.cos(t) for t in thetas]
      
        sinthetas = [sp.sin(t) for t in thetas]
        
        xs = list(accumulate([x0] + [x for x in range(n_steps)], lambda pre,i: pre + vs[i]*costhetas[i]))
        ys = list(accumulate([y0] + [x for x in range(n_steps)], lambda pre,i: pre + vs[i]*sinthetas[i]))
        
        self.xs = [sp.poly(x, uaws) for x in xs]
        self.ys = [sp.poly(y, uaws) for y in ys]
        self.n_steps = n_steps
        self.dt = dt
        self.x0 = x0
        self.y0 = y0
        self.v0 = v0
        self.uaws = uaws
        self.costhetas = costhetas
        self.sinthetas = sinthetas
        self.thetas = thetas
        
    def get_final_state_vec(self):
        return np.asarray([self.xs[-1], self.ys[-1]])
    
    
def generate_iid_beta_random_vec(n, alpha, beta):
    return RandomVector(n, [("beta",[alpha, beta]) for i in range(n)])

def generate_c_beta_random_vec(n, alpha, beta):
    return RandomVector(n, [("beta",[alpha, beta]) for i in range(n)])
    
class RandomVector(object):
    def __init__(self, n, distributions):
        #n: dimension of the random vector
        #distributions: dictionary with name as key and param list as val
        self.valid_distributions = {"beta", "cbeta"}
        if len(distributions)!=n:
            raise Exception("Der")
        elif len(list(filter(self.valid_distribution, distributions))) != n:
            raise Exception("Her")
        else:
            self.distributions = distributions
    
    def valid_distribution(self, distrib):
        if distrib[0] in self.valid_distributions:
            return True
        else:
            return False
    
    def compute_vector_moments(self, n_moments):
        #n_moments is a list of numbers of the maximum moment order to be computed
        all_moments = [] #list of list
        for i in range(len(self.distributions)):
            imoments = self.compute_moments(self.distributions[i], n_moments[i])
            all_moments.append(imoments)
        return all_moments
    
    def compute_moments(self, distr, order):
        if distr[0] == "beta":
            #Compute beta moments up to the given order
            #the returned list indices should match the moment orders
            #e.g. the return[i] should be the ith beta moment
            alpha = distr[1][0]
            beta = distr[1][1]
            fs = map(lambda r: (alpha + r)/(alpha + beta + r), range(order))
            return [1] + list(accumulate(fs, lambda prev,n: prev*n))
        
        elif distr[0] == "cbeta":
            alpha = distr[1][0]
            beta = distr[1][1]
            c = distr[1][2]
            fs = map(lambda r: (alpha + r)/(alpha + beta + r), range(order))
            beta = [1] + list(accumulate(fs, lambda prev,n: prev*n))
            cbeta = [beta[i]*c**i for i in range(len(beta))]
            return cbeta
        else:
            raise Exception("type not supported der")
        
    def set_cvals(self, cvals):
        #if cbeta distribution, this sets the values of c
        for i in range(len(cvals)):
            self.distributions[i][1][2] = cvals[i]

class StochasticVerificationFunction(object):
    def __init__(self, p, dynamics, n_steps, dt):
        #p: an anonymous function in state
        #dynamics: something something dynamics class, for now just CarKinematicsNoisyAccel
        self.p = p
        self.dynamics = dynamics(n_steps, dt)
        self.p_moments = {}
        self.coef_funcs = {}
        self.monoms = {}
        
    def compile_moment_functions(self):
        final_state = self.dynamics.get_final_state_vec()
        x_final = final_state[0]
        y_final = final_state[1]
        self.p_moments[0] = 1
        self.p_moments[1] = self.p(x_final, y_final) #function of self.dynamics.uaws
        self.p_moments[2] = self.p(x_final, y_final)**2 #function of self.dynamics.uaws
#         self.coef_funcs[1] = sp.lambdify([self.dynamics.thetas, self.dynamics.x0, self.dynamics.y0, self.dynamics.v0],self.p_moments[1].coeffs(), 'numpy')
#         self.coef_funcs[2] = sp.lambdify([self.dynamics.thetas, self.dynamics.x0, self.dynamics.y0, self.dynamics.v0],self.p_moments[2].coeffs(), 'numpy')
        
        input_vars = list(self.dynamics.thetas) + [self.dynamics.x0, self.dynamics.y0, self.dynamics.v0]
        
        self.coef_funcs[1] = [ufuncify(input_vars,coef) for coef in self.p_moments[1].coeffs()]
        
        self.coef_funcs[2] = [ufuncify(input_vars,coef) for coef in self.p_moments[2].coeffs()]
        
        self.monoms[1] = self.p_moments[1].monoms()
        self.monoms[2] = self.p_moments[2].monoms()
        
    def compile_moment_functions_multinomial(self):
        final_state = self.dynamics.get_final_state_vec()
        x_final = final_state[0]
        y_final = final_state[1]
        self.p_moments[0] = 1
        self.p_moments[1] = self.p(x_final, y_final) #function of self.dynamics.uaws
        input_vars = list(self.dynamics.thetas) + [self.dynamics.x0, self.dynamics.y0, self.dynamics.v0]
        
        self.coef_funcs[1] = [ufuncify(input_vars, coef) for coef in self.p_moments[1].coeffs()]
        
        self.monoms[1] = self.p_moments[1].monoms()
        
        
        #
        n_monoms = len(self.monoms[1])
        comb_twos = list(itertools.combinations(range(n_monoms), 1))
        comb_ones = list(itertools.combinations(range(n_monoms), 2))
        monom_twos = len(comb_twos)*[None]
        monom_ones = len(comb_ones)*[None]
        for i in range(len(comb_twos)):
            new_array = n_monoms*[0]
            new_array[comb_twos[i][0]] = 2
            monom_twos[i] = tuple(new_array)

        for i in range(len(comb_ones)):
            new_array = n_monoms*[0]
            for n in comb_ones[i]:
                new_array[n] = 1
            monom_ones[i] = tuple(new_array)
        
        #variables are the terms of p!
        p2_var_monoms = monom_twos + monom_ones
        
        #Figure out self.monoms[2]
        p2_moment_monoms = []
        for mono in p2_var_monoms:
            if 1 in mono:
                idx = [i for i,x in enumerate(mono) if x == 1]
                assert(len(idx) == 2)
                new_mono = tuple(map(operator.add,self.monoms[1][idx[0]],self.monoms[1][idx[1]]))
            elif 2 in mono:
                new_mono = tuple(map(operator.add,self.monoms[1][mono.index(2)],self.monoms[1][mono.index(2)]))
            else:
                raise Exception("There should be a 1 or 2 in here...")
            p2_moment_monoms.append(new_mono)
        self.monoms[2] = p2_moment_monoms
        self.p2_var_monoms = p2_var_monoms
        
        #We won't have self.coef_funcs[2] because that will be best processed after the fact

    
    def compute_prob_bound_multimonial(self):
        coef_data = list(self.thetas) + [self.x0, self.y0, self.v0]
        p_first_coefs = [coef_func(*coef_data) for coef_func in self.coef_funcs[1]]
        p_first_monomoments, p_second_monomoments = self.compute_rv_moments()
        
        p_second_coefs = len(self.p2_var_monoms)*[0]
        for count, mono in enumerate(self.p2_var_monoms):
            if 1 in mono:
                idx = [i for i,x in enumerate(mono) if x==1]
                coefs = 2*p_first_coefs[idx[0]]*p_first_coefs[idx[1]]
            elif 2 in mono:
                i = mono.index(2)
                coefs = p_first_coefs[i]**2
            else:
                raise Exception("There should be a 1 or 2 in here...")
            p_second_coefs[count] = coefs
            
        p_first_moment = np.dot(p_first_coefs, p_first_monomoments)
        p_second_moment = np.dot(p_second_coefs, p_second_monomoments)
        print("first moment is: " + str(p_first_moment))
        print("second moment is: " + str(p_second_moment))
        prob_bound = self.chebyshev_bound(p_first_moment, p_second_moment)
        print("prob bound is: " + str(prob_bound))
        
    def compute_p_second_coefs(self, p_first_coefs):
        pass
        
    def chebyshev_bound(self, first_moment, second_moment):
        #bound the probability that p<=0
        if first_moment<=0:
            return None
        else:
            variance = second_moment - first_moment**2
            return variance/(variance + first_moment**2)
    
    
    def set_problem_data(self, uaccel_seq, random_vector, theta_seq, x0, y0, v0):
        self.random_vector = random_vector
        self.random_vector.set_cvals(uaccel_seq)
        self.thetas = theta_seq
        self.x0 = x0
        self.y0 = y0
        self.v0 = v0
    
    def compute_rv_moments(self):
        n_vars = len(self.monoms[1][0])
        x_max_moments = []
        y_max_moments = []
        for i in range(n_vars):
            x_max_moments.append(max([x[i] for x in self.monoms[1]]))
            y_max_moments.append(max([y[i] for y in self.monoms[2]]))
        max_moments = [max(x_max_moments[i], y_max_moments[i]) for i in range(len(x_max_moments))]
        moments = self.random_vector.compute_vector_moments(max_moments)
            
        #for each mono, the ith entry corresponds to the ith rv and the moment we want....
        mono1_moments = [reduce(lambda a,b: a*b, map(lambda i:moments[i][mono[i]] , range(len(mono)))) for mono in self.monoms[1]]
        mono2_moments = [reduce(lambda a,b: a*b, map(lambda i:moments[i][mono[i]] , range(len(mono)))) for mono in self.monoms[2]]
        
        return mono1_moments, mono2_moments
    
    def compute_prob_bound(self):
        coef_data = list(self.thetas) + [self.x0, self.y0, self.v0]
        
        p_first_coefs = [coef_func(*coef_data) for coef_func in self.coef_funcs[1]]
        p_second_coefs = [coef_func(*coef_data) for coef_func in self.coef_funcs[2]]
        p_first_monomoments, p_second_monomoments = self.compute_rv_moments()
        
        p_first_moment = np.dot(p_first_coefs, p_first_monomoments)
        p_second_moment = np.dot(p_second_coefs, p_second_monomoments)
        prob_bound = self.chebyshev_bound(p_first_moment, p_second_moment)
        print("prob bound is: " + str(prob_bound))

        


In [5]:
p = lambda x,y: -2*x**2 - 4*y**2 + 3*y**4 + 2.0*x**4 - 0.1
n_t = 7
dt = 0.05
alpha = 500
beta = 550
random_vec = RandomVector(n_t, [["cbeta",[alpha, beta, 1]] for i in range(n_t)])


print("MULTINOMIAL RESULTS")
t1 = time.time()
test = StochasticVerificationFunction(p, CarKinematicsNoisyAccel, n_t, dt)
test.compile_moment_functions_multinomial()
t2 = time.time()
print("TOTAL COMPILATION TIME WAS: " + str(t2 - t1))



x0 = -2.5
y0 = 0.0
v0 = 0.0
theta_seq = np.zeros((1,n_t)).flatten()
uaccel_seq = 2*np.ones((1,n_t)).flatten()
alpha = 500
beta = 550
t1 = time.time()
test.set_problem_data(uaccel_seq, random_vec, theta_seq, x0, y0, v0)
test.compute_prob_bound_multimonial()
t2 = time.time()
print("TOTAL `ONLINE' PROCESSING TIME WAS: " + str(t2 - t1))





# print("CLASSIC RESULTS")
# t1 = time.time()
# test = StochasticVerificationFunction(p, CarKinematicsNoisyAccel, n_t, 0.05)
# test.compile_moment_functions()
# t2 = time.time()
# print("TOTAL COMPILATION TIME WAS: " + str(t2 - t1))

# t1 = time.time()
# test.set_problem_data(uaccel_seq, random_vec, theta_seq, x0, y0, v0)
# test.compute_prob_bound()
# t2 = time.time()
# print("TOTAL `ONLINE' PROCESSING TIME WAS: " + str(t2 - t1))

MULTINOMIAL RESULTS
TOTAL COMPILATION TIME WAS: 106.26482677459717
first moment is: 5.530399407851981
second moment is: 30.68067905459975
prob bound is: 0.0031081920990244025
TOTAL `ONLINE' PROCESSING TIME WAS: 0.2165834903717041


In [6]:
for n_t in range(2,10):
    p = lambda x,y: -2*x**2 - 4*y**2 + 3*y**4 + 2.0*x**4 - 0.1
    dt = 0.05
    random_vec = RandomVector(n_t, [["cbeta",[alpha, beta, 1]] for i in range(n_t)])


    print("Results for n_t = " + str(n_t))
    t1 = time.time()
    test = StochasticVerificationFunction(p, CarKinematicsNoisyAccel, n_t, dt)
    test.compile_moment_functions_multinomial()
    t2 = time.time()
    print("TOTAL COMPILATION TIME WAS: " + str(t2 - t1))



    x0 = -2.5
    y0 = 0.0
    v0 = 0.0
    theta_seq = np.zeros((1,n_t)).flatten()
    uaccel_seq = 2*np.ones((1,n_t)).flatten()
    alpha = 500
    beta = 550
    t1 = time.time()
    test.set_problem_data(uaccel_seq, random_vec, theta_seq, x0, y0, v0)
    test.compute_prob_bound_multimonial()
    t2 = time.time()
    print("TOTAL `ONLINE' PROCESSING TIME WAS: " + str(t2 - t1))

NameError: name 'alpha' is not defined

In [None]:
x0 = -2.0
y0 = 0.0
theta0 = 0.0
v0 = 0

n_t = 10
dt = 0.05

alpha = 500
beta = 550

utheta = np.zeros((n_t,1))
uaccel = 2*np.ones((n_t,1))

w_accel = sp.symbols('w0:' + str(n_t))

accel_uncertain = np.asarray(list(map(lambda i: w_accel[i]*uaccel[i], range(n_t))))

accel_uncertain = [dt*x for x in accel_uncertain]

vs_uncertain_dt = accumulate([v0] + list(accel_uncertain), lambda pre,ne: pre + ne)

thetas = compute_all_thetas(theta0, utheta)

cos_thetas = np.cos(thetas)

sin_thetas = np.sin(thetas)

p = lambda x,y: -2*x**2 - 4*y**2 + 3*y**4 + 2.0*x**4 - 0.1

p_poly = lambda x,y: sp.poly(p(x,y))

p_sq = lambda x,y: sp.poly((p(x,y))**2)

xs = list(accumulate([x0] + [x for x in range(n_t)], lambda pre,i: pre + vs_uncertain_dt[i]*cos_thetas[i]))
ys = list(accumulate([y0] + [x for x in range(n_t)], lambda pre,i: pre + vs_uncertain_dt[i]*sin_thetas[i]))

beta2_moments = compute_beta2_moments(alpha, beta, 1)

accel_expected = np.asarray(list(map(lambda i: beta2_moments[1]*uaccel[i], range(n_t))))
vs_expected_dt = v0 + dt*accel_expected

xs_expected = list(accumulate([x0] + [x for x in range(n_t)], lambda pre,i: pre + vs_expected_dt[i]*cos_thetas[i]))
ys_expected = list(accumulate([y0] + [x for x in range(n_t)], lambda pre,i: pre + vs_expected_dt[i]*sin_thetas[i]))

xs_final = xs[-1]
ys_final = ys[-1]

final_p = p_poly(xs_final, ys_final)

final_p_sq = p_sq(xs_final, ys_final)

#Assume coefs[i] corresponds to monos[i]
#Compute first moment
coefs_p = np.asarray(final_p.coeffs())
monos_p = final_p.monoms()
mono_moments_p = np.asarray(compute_beta2_mono_moments(alpha, beta, monos_p)) #these moments correspond to the monos
expected_p = np.dot(coefs_p, mono_moments_p)

#Compute the second moment
coefs_p_sq = np.asarray(final_p_sq.coeffs())
monos_p_sq = final_p_sq.monoms()
mono_moments_p_sq = np.asarray(compute_beta2_mono_moments(alpha, beta, monos_p_sq)) #these moments correspond to the monos
expected_p_sq = np.dot(coefs_p_sq, mono_moments_p_sq)
variance = expected_p_sq - expected_p**2


risk_bound = chebyshev_bound(expected_p, expected_p_sq)

print(xs_expected)
print("expected value of p")
print(expected_p)
print("variance of p")
print(variance)

if risk_bound:
    print('risk bound')
    print(risk_bound)

In [157]:
sp.var('x y')
%matplotlib
sp.plot_implicit(-2*x**2 - 4*y**2 + 3*y**4 + 2.0*x**4 - 0.1)

Using matplotlib backend: Qt5Agg


<sympy.plotting.plot.Plot at 0x7fa6e4d89b38>

In [158]:
n_validate = 100000
xs_final_func = sp.lambdify(w_accel, xs_final,'numpy')
ys_final_func = sp.lambdify(w_accel, ys_final,'numpy')
wsamps = 2*np.random.beta(alpha,beta,size = (n_validate,n_t))
xsamps = np.apply_along_axis(lambda row:xs_final_func(*row), 1, wsamps)
ysamps = np.apply_along_axis(lambda row:ys_final_func(*row), 1, wsamps)
xys = np.vstack((xsamps,ysamps))
psamps = np.apply_along_axis(lambda row:p(*row), 0, xys)
n_fail = len(list(filter(lambda x: x<=0, psamps)))
fail_percent = n_fail/n_validate
print(fail_percent)

1e-05
