This notebook provides examples to go along with the [textbook](http://underactuated.csail.mit.edu/dp.html).  I recommend having both windows open, side-by-side!

[Click here](http://underactuated.csail.mit.edu/drake.html#notebooks) for instructions on how to run the notebook on Deepnote and/or Google Colab.

In [None]:
from time import sleep

import matplotlib.animation as animation
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import HTML, clear_output, display
from matplotlib import cm
from pydrake.all import (DiagramBuilder, DiscreteAlgebraicRiccatiEquation,
                         DynamicProgrammingOptions, FittedValueIteration,
                         LinearSystem, MultilayerPerceptron,
                         PerceptronActivationType, PeriodicBoundaryCondition,
                         RandomGenerator, Rgba, Simulator, StartMeshcat,
                         WrapToSystem)
from pydrake.examples.pendulum import PendulumPlant

from underactuated.double_integrator import DoubleIntegratorVisualizer
from underactuated.jupyter import AdvanceToAndVisualize, running_as_notebook
from underactuated.meshcat_cpp_utils import interact, plot_surface
from underactuated.pendulum import PendulumVisualizer
from underactuated.optimizers import Adam

plt.rcParams.update({"savefig.transparent": True})


In [None]:
# Start the visualizer (run this cell only once, each instance consumes a port)
meshcat = StartMeshcat()

# The Grid World

The setup here is *almost* identical as the simplest version described in the notes.  The only difference is that this agent is allowed to move diagonally in a single step; this is slightly easier to code since I can have two actions (one for left/right, and another for up/down), and write the dynamics as the trivial linear system ${\bf x}[n+1] = {\bf u}[n].$  Only the value iteration code needs to know that the states and actions are actually restricted to the integers. I also add a very large cost when the action would be diagonal, so that it is never chosen.

The obstacle (pit of despair) is provided by the method below.  Play around with it!  The rest of the code is mostly to support visualization.

In [None]:
def grid_world_example():
    time_step = 1
    # TODO(russt): Support discrete-time systems in the dynamic programming code, and use this properly.
    #plant = LinearSystem(A=np.eye(2), B=np.eye(2), C=np.eye(2), D=np.zeros((2,2)), time_period=time_step)
    # for now, just cheat because I know how to make the discrete system as a continuous that will be discretized.
    plant = LinearSystem(A=np.zeros((2,2)), B=np.eye(2), C=np.eye(2), D=np.zeros((2,2)))
    simulator = Simulator(plant)
    options = DynamicProgrammingOptions()

    xbins = range(0, 21)
    ybins = range(0, 16)
    state_grid = [set(xbins), set(ybins)]

    input_grid = [set([-1, 0, 1]), set([-1, 0, 1])]

    goal = [2, 8]

    def obstacle(x):
        return x[0]>=6 and x[0]<=8 and x[1]>=4 and x[1]<=7

    [X, Y] = np.meshgrid(xbins, ybins)

    frames=[]
    def draw(iteration, mesh, cost_to_go, policy):
        J = np.reshape(cost_to_go, X.shape)
        artists = [ax.imshow(J, cmap=cm.jet)]
        artists += [
            ax.quiver(X,
                      Y,
                      np.reshape(policy[0], X.shape),
                      np.reshape(policy[1], Y.shape),
                      scale=1.4,
                      scale_units='x')
        ]
        frames.append(artists)

    if running_as_notebook:
        options.visualization_callback = draw

    def min_time_cost(context):
        x = context.get_continuous_state_vector().CopyToVector()
        x = np.round(x)
        state_cost = 1
        if obstacle(x):
            state_cost = 10
        if np.array_equal(x, goal):
            state_cost = 0
        u = plant.get_input_port(0).Eval(context)
        action_cost = np.linalg.norm(u, 1)
        if action_cost > 1:
            action_cost = 10
        return state_cost + action_cost

    cost_function = min_time_cost
    options.convergence_tol = .1;

    (fig,ax) = plt.subplots(figsize=(10,6))
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    ax.set_title("Cost-to-Go")

    policy, cost_to_go = FittedValueIteration(simulator, cost_function,
                                              state_grid, input_grid, time_step,
                                              options)

    draw('Final', None, cost_to_go, policy.get_output_values())

    ax.invert_yaxis()
    plt.colorbar(frames[-1][0])

    print("generating animation...")
    # create animation using the animate() function
    ani = animation.ArtistAnimation(fig, frames, interval=200, blit=True, repeat=False)
    plt.close('all')

    display(HTML(ani.to_jshtml()))

grid_world_example()

Your turn.  Change the cost.  Change the obstacles.

# Value Iteration for the Double Integrator

Note that I've inserted a sleep command in the draw method to intentionally slow down the algorithm, so that you can watch the convergence in the visualizer.  If you take out the pause, it's quite fast!

In [None]:
def DoubleIntegrator():
    return LinearSystem(A=np.mat('0 1; 0 0'),
                        B=np.mat('0; 1'),
                        C=np.eye(2),
                        D=np.zeros((2,1)))
meshcat.Delete()
meshcat.SetProperty('/Background', "visible", False)
plant = DoubleIntegrator()

def double_integrator_example(cost_function,
                              convergence_tol,
                              animate=True,
                              plot=True,
                              draw_iterations=True):
    simulator = Simulator(plant)
    options = DynamicProgrammingOptions()

    qbins = np.linspace(-3., 3., 31)
    qdotbins = np.linspace(-4., 4., 51)
    state_grid = [set(qbins), set(qdotbins)]

    input_limit = 1.
    input_grid = [set(np.linspace(-input_limit, input_limit, 9))]
    timestep = 0.01

    [Q, Qdot] = np.meshgrid(qbins, qdotbins)

    def draw(iteration, mesh, cost_to_go, policy):
        # Don't draw every frame.
        if iteration % 20 != 0:
            return

        # TODO: color by z value (e.g. cm.jet)
        plot_surface(meshcat,
                     'Cost-to-go',
                     Q,
                     Qdot,
                     np.reshape(cost_to_go, Q.shape),
                     wireframe=True)
        plot_surface(meshcat,
                     'Policy',
                     Q,
                     Qdot,
                     np.reshape(policy, Q.shape),
                     rgba=Rgba(.3, .3, .5))

        # Slow down the algorithm so we can visualize the convergence.
        sleep(0.1)

    def simulate(policy):
        # Animate the resulting policy.
        builder = DiagramBuilder()
        plant = builder.AddSystem(DoubleIntegrator())

        vi_policy = builder.AddSystem(policy)
        builder.Connect(plant.get_output_port(0), vi_policy.get_input_port(0))
        builder.Connect(vi_policy.get_output_port(0), plant.get_input_port(0))

        visualizer = builder.AddSystem(DoubleIntegratorVisualizer(show=False))
        builder.Connect(plant.get_output_port(0), visualizer.get_input_port(0))

        diagram = builder.Build()
        simulator = Simulator(diagram)

        simulator.get_mutable_context().SetContinuousState([-10.0, 0.0])

        AdvanceToAndVisualize(simulator, visualizer, 10.)

    if running_as_notebook and draw_iterations:
        options.visualization_callback = draw
    options.convergence_tol = convergence_tol

    policy, cost_to_go = FittedValueIteration(simulator, cost_function,
                                              state_grid, input_grid, timestep,
                                              options)

    J = np.reshape(cost_to_go, Q.shape)

    plot_surface(meshcat, 'Cost-to-go', Q, Qdot, J, wireframe=True)

    if animate:
        print('Simulating...')
        simulate(policy)

    if plot:
        fig = plt.figure(1, figsize=(9, 4))
        ax1, ax2 = fig.subplots(1, 2)
        ax1.set_xlabel("q")
        ax1.set_ylabel("qdot")
        ax1.set_title("Cost-to-Go")
        ax2.set_xlabel("q")
        ax2.set_ylabel("qdot")
        ax2.set_title("Policy")
        ax1.imshow(J,
                   cmap=cm.jet,
                   extent=(qbins[0], qbins[-1], qdotbins[-1], qdotbins[0]))
        ax1.invert_yaxis()
        Pi = np.reshape(policy.get_output_values(), Q.shape)
        ax2.imshow(Pi,
                   cmap=cm.jet,
                   extent=(qbins[0], qbins[-1], qdotbins[-1], qdotbins[0]))
        ax2.invert_yaxis()
        display(plt.show())


In [None]:
def min_time_cost(context):
    x = context.get_continuous_state_vector().CopyToVector()
    if x.dot(x) < .05:
        return 0.
    return 1.


double_integrator_example(cost_function=min_time_cost,
                          convergence_tol=0.001,
                          animate=True);


In [None]:
def quadratic_regulator_cost(context):
    x = context.get_continuous_state_vector().CopyToVector()
    u = plant.EvalVectorInput(context, 0).CopyToVector()
    return x.dot(x) + u.dot(u)

double_integrator_example(cost_function=quadratic_regulator_cost, convergence_tol=0.1, animate=True)

# Value Iteration for the Simple Pendulum

In [None]:
def pendulum_swingup_example(min_time=True, animate=True):
    plant = PendulumPlant()
    simulator = Simulator(plant)
    options = DynamicProgrammingOptions()

    qbins = np.linspace(0., 2. * np.pi, 51)
    qdotbins = np.linspace(-10., 10., 51)
    state_grid = [set(qbins), set(qdotbins)]
    options.periodic_boundary_conditions = [
        PeriodicBoundaryCondition(0, 0., 2. * np.pi),
    ]
    input_limit = 3.
    input_grid = [set(np.linspace(-input_limit, input_limit, 9))]
    timestep = 0.01

    [Q, Qdot] = np.meshgrid(qbins, qdotbins)

    meshcat.Delete()
    meshcat.SetProperty("/Background", "visible", False)

    def draw(iteration, mesh, cost_to_go, policy):
        # Don't draw every frame.
        if iteration % 20 != 0:
            return

        plot_surface(meshcat,
                     'Cost-to-go',
                     Q,
                     Qdot,
                     np.reshape(cost_to_go, Q.shape),
                     wireframe=True)
        plot_surface(meshcat,
                     'Policy',
                     Q,
                     Qdot,
                     np.reshape(policy, Q.shape),
                     rgba=Rgba(.3, .3, .5))

        # Slow down the algorithm so we can visualize the convergence.
        sleep(0.1)

    def simulate(policy):
        # Animate the resulting policy.
        builder = DiagramBuilder()
        pendulum = builder.AddSystem(PendulumPlant())

        wrap = builder.AddSystem(WrapToSystem(2))
        wrap.set_interval(0, 0, 2*np.pi)
        builder.Connect(pendulum.get_output_port(0), wrap.get_input_port(0))
        vi_policy = builder.AddSystem(policy)
        builder.Connect(wrap.get_output_port(0), vi_policy.get_input_port(0))
        builder.Connect(vi_policy.get_output_port(0),
                        pendulum.get_input_port(0))

        visualizer = builder.AddSystem(
            PendulumVisualizer(show=False))
        builder.Connect(pendulum.get_output_port(0),
                        visualizer.get_input_port(0))

        diagram = builder.Build()
        simulator = Simulator(diagram)
        simulator.get_mutable_context().SetContinuousState([0.1, 0.0])

        AdvanceToAndVisualize(simulator, visualizer, 8.)

    if running_as_notebook:
        options.visualization_callback = draw

    def min_time_cost(context):
        x = context.get_continuous_state_vector().CopyToVector()
        x[0] = x[0] - np.pi
        if x.dot(x) < .05:
            return 0.
        return 1.

    def quadratic_regulator_cost(context):
        x = context.get_continuous_state_vector().CopyToVector()
        x[0] = x[0] - np.pi
        u = plant.EvalVectorInput(context, 0).CopyToVector()
        return 2 * x.dot(x) + u.dot(u)

    if min_time:
        cost_function = min_time_cost
        options.convergence_tol = 0.001
    else:
        cost_function = quadratic_regulator_cost
        options.convergence_tol = 0.1

    policy, cost_to_go = FittedValueIteration(simulator, cost_function,
                                              state_grid, input_grid, timestep,
                                              options)

    J = np.reshape(cost_to_go, Q.shape)

    plot_surface(meshcat, 'Cost-to-go', Q, Qdot, J, wireframe=True)

    if animate:
        print('Simulating...')
        simulate(policy)

    fig = plt.figure(figsize=(9, 4))
    ax1, ax2 = fig.subplots(1, 2)
    ax1.set_xlabel("q")
    ax1.set_ylabel("qdot")
    ax1.set_title("Cost-to-Go")
    ax2.set_xlabel("q")
    ax2.set_ylabel("qdot")
    ax2.set_title("Policy")
    ax1.imshow(J,
               cmap=cm.jet, aspect='auto',
               extent=(qbins[0], qbins[-1], qdotbins[-1], qdotbins[0]))
    ax1.invert_yaxis()
    Pi = np.reshape(policy.get_output_values(), Q.shape)
    ax2.imshow(Pi,
               cmap=cm.jet, aspect='auto',
               extent=(qbins[0], qbins[-1], qdotbins[-1], qdotbins[0]))
    ax2.invert_yaxis()
    display(plt.show())


pendulum_swingup_example(min_time=True, animate=True)

# Neural Fitted Value Iteration

In [None]:

# Define the double integrator
A = np.array([[0., 1.], [0., 0.]])
B = np.array([[0.], [1.]])
Q = 0.1*np.eye(2)
R = np.eye(1)

# vectorized
def min_time_cost(x, u):
    return 1.0 - np.isclose(x, np.zeros((2,1))).all(axis=0)

def quadratic_regulator_cost(x, u):
    return (x * (Q @ x)).sum(axis=0) + (u * (R @ u)).sum(axis=0)

def min_time_solution(x):
    # Caveat: this does not take the time discretization (zero-order hold on u) into account.
    q = x[0,:]
    qdot = x[1,:]
    # mask indicates that we are in the regime where u = +1.
    mask = ((qdot < 0) & (2 * q <=
                          (qdot**2))) | ((qdot >= 0) & (2 * q < -(qdot**2)))
    T = np.empty(q.size)
    T[mask] = 2*np.sqrt(.5*qdot[mask]**2 - q[mask]) - qdot[mask]
    T[~mask] = qdot[~mask] + 2*np.sqrt(.5*qdot[~mask]**2 + q[~mask])
    return T

def quadratic_regulator_solution(x, timestep, gamma=1):
    S = DiscreteAlgebraicRiccatiEquation(A=np.sqrt(gamma) *
                                         (np.eye(2) + timestep * A),
                                         B=timestep * B,
                                         Q=timestep * Q,
                                         R=timestep * R / gamma)
    return (x * (S @ x)).sum(axis=0)

def plot_and_compare(mlp, context, running_cost, timestep, gamma=1.0):
    x1s = np.linspace(-5,5,31)
    x2s = np.linspace(-4,4,51)
    X1s, X2s = np.meshgrid(x1s, x2s)
    N = X1s.size
    X = np.vstack((X1s.flatten(), X2s.flatten()))
    J = np.zeros((1,N))

    mlp.BatchOutput(context, X, J)

    plot_surface(meshcat,
                 "Jhat",
                 X1s,
                 X2s,
                 J.reshape(X1s.shape),
                rgba=Rgba(0,0,1),
                 wireframe=True)

    if running_cost == min_time_cost:
        Jd = min_time_solution(X)
    elif running_cost == quadratic_regulator_cost:
        Jd = quadratic_regulator_solution(X, timestep, gamma)

    plot_surface(meshcat,
                 "J_desired",
                 X1s,
                 X2s,
                 Jd.reshape(X1s.shape),
                 rgba=Rgba(1, 0, 0),
                 wireframe=True)


First, let's simply evaluate how well the network can fit the known cost-to-go functions (using supervised learning)

In [None]:
def SupervisedDemo(running_cost, timestep):
    x1s = np.linspace(-5,5,51)
    x2s = np.linspace(-4,4,51)
    X1s, X2s = np.meshgrid(x1s, x2s)
    N = X1s.size
    X = np.vstack((X1s.flatten(), X2s.flatten()))

    if running_cost == min_time_cost:
        Jd = min_time_solution(X)
    elif running_cost == quadratic_regulator_cost:
        Jd = quadratic_regulator_solution(X, timestep)

    Jd = Jd.reshape((1,N))

    mlp = MultilayerPerceptron(
        [2,100,100,1],
        [PerceptronActivationType.kReLU, 
         PerceptronActivationType.kReLU,
         PerceptronActivationType.kIdentity])
    context = mlp.CreateDefaultContext()
    generator = RandomGenerator(152)
    mlp.SetRandomContext(context, generator)

    optimizer = Adam(mlp.GetMutableParameters(context))

    dloss_dparams = np.zeros(mlp.num_parameters())
    last_loss = np.inf
    for epoch in range(1000 if running_as_notebook else 2):
        loss = mlp.BackpropagationMeanSquaredError(context, X, Jd,
                                                    dloss_dparams)
        clear_output(wait=True)
        print(f"loss = {loss}")
        if np.linalg.norm(last_loss - loss) < 0.0001:
            break
        last_loss = loss
        optimizer.step(loss, dloss_dparams)

    plot_and_compare(mlp, context, running_cost, timestep)

meshcat.Delete()
SupervisedDemo(min_time_cost, 0.1)
#SupervisedDemo(quadratic_regulator_cost, 0.1)

## Discrete time, continuous state, discrete action

This is the standard "fitted value iteration" algorithm with a multilayer perceptron (MLP) as the function approximator, and a single step of gradient descent performed on each iteration.

In [None]:
def FittedValueIteration(running_cost, timestep):
    x1s = np.linspace(-5,5,31)
    x2s = np.linspace(-4,4,31)
    us = np.linspace(-1,1,9)
    Us, X1s, X2s = np.meshgrid(us, x1s, x2s, indexing='ij')
    XwithU = np.vstack((X1s.flatten(), X2s.flatten()))
    UwithX = Us.flatten().reshape(1,-1)
    Nx = x1s.size * x2s.size
    X = XwithU[:,:Nx]
    N = X1s.size

    Xnext = XwithU + timestep * (A @ XwithU + B @ UwithX)
    G = timestep*running_cost(XwithU, UwithX)
    Jnext = np.zeros((1,N))
    Jd = np.zeros((1,Nx))

    mlp = MultilayerPerceptron(
        [2,100,100,1],
        [PerceptronActivationType.kReLU,
         PerceptronActivationType.kReLU,
         PerceptronActivationType.kIdentity])
    context = mlp.CreateDefaultContext()
    generator = RandomGenerator(123)
    mlp.SetRandomContext(context, generator)

    optimizer = Adam(mlp.GetMutableParameters(context))

    gamma = 0.9
    plot_and_compare(mlp, context, running_cost, timestep, gamma)
    dloss_dparams = np.zeros(mlp.num_parameters())
    last_loss = np.inf
    for epoch in range(500 if running_as_notebook else 2):
        mlp.BatchOutput(context, Xnext, Jnext)
        Jd[:] = np.min((G + gamma*Jnext).reshape(us.size, Nx), axis=0)
        for i in range(100 if running_as_notebook else 2):
            loss = mlp.BackpropagationMeanSquaredError(
                context, X, Jd, dloss_dparams)
            optimizer.step(loss, dloss_dparams)
        if np.linalg.norm(last_loss - loss) < 1e-8:
            break
        last_loss = loss
        clear_output(wait=True)
        print(f"epoch {epoch}: loss = {loss}")
        if epoch%10 == 0:
            plot_and_compare(mlp, context, running_cost, timestep, gamma)

    plot_and_compare(mlp, context, running_cost, timestep, gamma)

FittedValueIteration(min_time_cost, 0.1)
#FittedValueIteration(quadratic_regulator_cost, 0.1)

Even simpler, let's do linear function approximation with the quadratic form for the LQR problem...