In [None]:
import numpy as np
import scipy.optimize
import matplotlib.pyplot as plt

from pydrake.all import (Variable, SymbolicVectorSystem, DiagramBuilder,
                         LogOutput, Simulator, ConstantVectorSource,
                         MathematicalProgram, Solve, 
                         OsqpSolver, MosekSolver, SnoptSolver, IpoptSolver,
                         PiecewisePolynomial, eq, le, ge)

from matplotlib import rcParams
rcParams['figure.figsize'] = (8, 5)

In [None]:
n_veh = 8
L = 80

L_veh = 5
a = 1
b = 1.5
s0 = 2
v0 = 30
T = 1
delta = 4

In [None]:
sf = L / n_veh - L_veh
accel_fn = lambda v: a * (1 - (v / v0) ** delta - ((s0 + v * T) / sf) ** 2)
sol = scipy.optimize.root(accel_fn, 0)
vf = sol.x.item()
sstarf = s0 + vf * T

In [None]:
# get guesses for solutions
v_guess = [np.mean(v_init)]
for _ in range(N):
    v_guess.append(dt * accel_fn(v_guess[-1]))
s_guess = [sf] * (N + 1)

prog = MathematicalProgram()
v = prog.NewContinuousVariables(N + 1, n_veh, 'v') # speed
s = prog.NewContinuousVariables(N + 1, n_veh, 's') # spacing
flat = lambda x: x.reshape(-1)

# Guess
prog.SetInitialGuess(s, np.stack([s_guess] * n_veh, axis=1))
prog.SetInitialGuess(v, np.stack([v_guess] * n_veh, axis=1))

# initial conditions constraint
prog.AddLinearConstraint(eq(v[0], v_init))
prog.AddLinearConstraint(eq(s[0], s_init))

# velocity constraint
prog.AddLinearConstraint(ge(flat(v[1:]), 0))
prog.AddLinearConstraint(le(flat(v[1:]), vf + 1)) # extra constraint to help solver

# spacing constraint
prog.AddLinearConstraint(ge(flat(s[1:]), s0))
prog.AddLinearConstraint(le(flat(s[1:]), 10)) # extra constraint to help solver
prog.AddLinearConstraint(eq(flat(s[1:].sum(axis=1)), L - L_veh * n_veh))

# spacing update constraint
s_n = s[:-1, 1:]       # s_i[n]
s_np1 = s[1:, 1:]      # s_i[n + 1]
v_n = v[:-1, 1:]       # v_i[n]
v_np1 = v[1:, 1:]      # v_i[n + 1]
v_n_im1 = v[:-1, :-1]  # v_{i - 1}[n]
v_np1_im1 = v[1:, :-1] # v_{i - 1}[n + 1]
prog.AddLinearConstraint(eq(
    flat(s_np1 - s_n), flat(0.5 * dt * (v_n_im1 + v_np1_im1 - v_n - v_np1))))
# handle position wrap for vehicle 1
prog.AddLinearConstraint(eq(s[1:, 0] - s[:-1, 0], 0.5 * dt * (v[:-1, -1] + v[1:, -1] - v[:-1, 0] - v[1:, 0])))

# vehicle 0's action constraint
prog.AddLinearConstraint(ge(v[1:, 0], v[:-1, 0] - u_max * dt))
prog.AddLinearConstraint(le(v[1:, 0], v[:-1, 0] + u_max * dt))

# idm constraint
prog.AddConstraint(eq(
    (v_np1 - v_n - dt * a * (1 - (v_n / v0) ** delta)) * s_n ** 2,
    -dt * a * (s0 + v_n * T + v_n * (v_n - v_n_im1) / (2 * np.sqrt(a * b))) ** 2))

prog.AddCost(((v - vf) ** 2).mean() + ((s - sf) ** 2).mean())

solver = SnoptSolver()
result = solver.Solve(prog)

assert result.is_success()

In [None]:
result.GetSolution()

In [None]:
result.GetSolution(s)
# result.get_optimal_cost()

In [None]:
result.GetSolution(v)

# Old

In [None]:
import pydrake
import inspect
attrs = inspect.getmembers(pydrake.all, lambda a: not inspect.isroutine(a))
attrs

In [None]:
N = 10
dt = 1
u_max = 1
x_init = [np.clip(np.random.normal(i * L / n_veh, 1), 0, np.inf) for i in reversed(range(n_veh))]

prog = MathematicalProgram()
v = prog.NewContinuousVariables(N + 1, n_veh, 'v') # speed
x = prog.NewContinuousVariables(N + 1, n_veh, 'x') # position
flat = lambda x: x.reshape(-1)

# initial conditions constraint
prog.AddLinearConstraint(eq(v[0], 0))
prog.AddLinearConstraint(eq(x[0], x_init))

# velocity constraint
prog.AddLinearConstraint(ge(flat(v[1:]), 0))
prog.AddLinearConstraint(le(flat(v[1:]), 3)) # extra

# position update constraint
prog.AddLinearConstraint(eq(
    flat(x[1:]),
    flat(x[:-1] + 0.5 * (v[:-1] + v[1:]) * dt)))

# vehicle position constraint
prog.AddLinearConstraint(le(flat(x[:, 1:]), flat(x[:, :-1] - (L_veh + s0))))
prog.AddLinearConstraint(le(x[:, 0], x[:, -1] - (L_veh + s0) + L)) # handle position wrap for vehicle 1

# vehicle 1's action constraint
prog.AddLinearConstraint(ge(v[1:, 0], v[:-1, 0] - u_max * dt))
prog.AddLinearConstraint(le(v[1:, 0], v[:-1, 0] + u_max * dt))

# idm constraint
v_i_n = v[:-1, 1:]
v_i_np1 = v[1:, 1:]
v_im1_n = v[:-1, :-1]
x_i_n = x[:-1, 1:]
x_im1_n = x[:-1, :-1]
# prog.AddLinearConstraint(eq(flat(v_i_np1 - v_i_n), flat(dt * a * (1 + v_i_n / v0 + \
#     (sstarf / sf) ** 2 + \
#     (2 * sstarf) / (sf ** 2) * (T + vf / (2 * np.sqrt(a * b))) * (v_i_n - vf) + \
#     -(2 * sstarf) / (sf ** 2) * (vf / (2 * np.sqrt(a * b))) * (v_im1_n - vf) + \
#     2 * (sstarf ** 2) / (sf ** 3) * (x_i_n - x_im1_n)
# ))))
prog.AddConstraint(eq(
    (v_i_np1 - v_i_n - dt * (1 - (v_i_n / v0) ** delta)) * (x_im1_n - x_i_n - L_veh) ** 2,
    -dt * a * (s0 + v_i_n * T + v_i_n * (v_i_n - v_im1_n) / (2 * np.sqrt(a * b))) ** 2))

prog.AddCost((v * v).mean())

print('Solving...')    
solve = OsqpSolver()
# solver = SnoptSolver()
result = solver.Solve(prog)

assert result.is_success()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from u import *
from time import sleep

In [None]:
%matplotlib notebook
def setup_plot(vehicles):
    circle = plt.Circle((0, 0), radius, color='black', linewidth=10, fill=False)
    fig, ax = plt.subplots(figsize=(10, 10))
    ax.add_artist(circle)
    
    def update_plt(veh):
        angle = veh.pos / radius
        x = radius * np.cos(angle)
        y = radius * np.sin(angle)
        slope = np.array([y, -x])
        offset = 0.5 * L_veh * slope / np.linalg.norm(slope)
        center = np.array([x, y])
        veh.plt_line.set_data(*zip(center + offset, center - offset))
        veh.plt_text.set_position((0.9 * x - 0.03 * radius, 0.95 * y - 0.03 * radius))
        
    import types
    for veh in vehicles:
        veh.update_plt = types.MethodType(update_plt, veh)
        veh.plt_line, = ax.plot([0, 1], [0, 1], color='gray', linewidth=4)
        veh.plt_text = ax.text(0, 0, f'{veh.i}', color='red', fontsize=20)
        veh.update_plt()
    plt.xlim((-radius - 5, radius + 5))
    plt.ylim((-radius - 5, radius + 5))

    text = plt.text(0, 0, '')
    def draw(t):
        texts = [
            ('time', t),
            ('speed', np.mean([veh.speed for veh in vehicles])),
            ('minspeed', np.min([veh.speed for veh in vehicles])),
            ('maxspeed', np.max([veh.speed for veh in vehicles])),
            ('accel', np.mean([veh.accel for veh in vehicles]))
        ]
        text.set_text('\n'.join('%s = %.5g' % kv for kv in texts))
        fig.canvas.draw()
        fig.canvas.flush_events()
    draw(0)
    return draw

n_veh = 22
L_veh = 5
circ = 250
radius = circ / 2 / np.pi
N = 3000
step_size = 1

IDM = dict(
    accel=2.6,
    decel=4.5,
    sigma=0.5,
    tau=1.0,  # past 1 at sim_step=0.1 you no longer see waves
    minGap=2.5,
    maxSpeed=30,
    speedFactor=1.0,
    speedDev=0.1,
    impatience=0.5,
    carFollowModel='IDM',
)
delta = 4
v_0 = 30
s_0 = 2
T_headway = 1
a = 1
b = 1.5

vehicles = [Namespace(
    i=i,
    speed=np.clip(np.random.normal(0, 4), 0, np.inf),
    accel=0,
    pos=np.random.normal(circ * i / n_veh)) for i in range(n_veh)]
redraw = setup_plot(vehicles)

def update_speed(veh, vehicles):
    next_veh = vehicles[(veh.i + 1) % n_veh]
    
    headway = next_veh.pos - veh.pos - L_veh + circ * (next_veh.i == 0)
    v_diff = veh.speed - next_veh.speed
    
    s_star = s_0 + veh.speed * T_headway + veh.speed * v_diff / (2 * np.sqrt(a * b))
    veh.accel = a * (1 - (veh.speed / v_0) ** delta - (s_star / headway) ** 2)
    
    veh.speed = max(veh.speed + step_size * veh.accel, 0)

for n in range(N):
    for veh in vehicles:
        update_speed(veh, vehicles)
    
    for veh in vehicles:
        veh.pos = veh.pos + step_size * veh.speed
        veh.update_plt()
    if (n + 1) % int(1 / step_size) == 0:
        redraw(n + 1)