# Unicycle Go To Goal

In [None]:
import copy
import time

from cycler import cycler
import cvxpy as cp
from cvxpylayers.torch import CvxpyLayer
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import torch

from dimp.robots import (
    RobotMPCData, UniInput, UniRobot, UniState
)

from dimp.utils import init_matplotlib, get_colors


init_matplotlib()

### Create The Data for the MPC

In [None]:
ns = 3      # Number of states (x, y, theta)
ni = 2      # Number of inputs (v, omega)

nc = 50     # Number of control intervals

s0 = cp.Parameter(ns, name="s0")

mpc_data = RobotMPCData(
    nc=nc,
    states_list=[UniState(s0)] + [UniState(cp.Variable(ns, name=f"s{k+1}")) for k in range(nc)],
    statesbar_list=[UniState(s0)] + [UniState(cp.Parameter(ns)) for _ in range(nc)],
    inputs_list=[UniInput(cp.Variable(ni, name=f"i{k}")) for k in range(nc)],
    inputsbar_list=[UniInput(cp.Parameter(ni)) for _ in range(nc)],
)

dt = 0.1
robot = UniRobot(dt=dt, mpc_data=mpc_data)

# Parameters
p_goal = np.array([10.0, 0.0])
v_max = 10.0
omega_max = 1.0

### Create The MPC Problem

$$
\begin{aligned}
&\min_{\substack{s_k, u_k \\ \quad k=1, \, \dots, \, n_c}} \quad && \frac{1}{2} \sum_{k=1}^{n_c} (p_k - p_\text{goal})^2 + w_1 u_k^2 + w_2 \left( (s_k - \bar{s}_k)^2 + (u_k - \bar{u}_k)^2 \right) \\
&\text{s.t.} && x_{k+1} = A_k x_k + B_k u_k, \\
& && \left| u_k \right| \leq u_\text{max}.
\end{aligned}
$$

In [None]:
w1 = cp.Parameter(1, name="w1", nonneg=True)
w2 = cp.Parameter(1, name="w2", nonneg=True)

def create_qp(nc: int):
    objective = cp.Minimize(
          0.5 * cp.sum_squares(cp.hstack([mpc_data.statei[i+1][0:2] - p_goal for i in range(nc)]))
        + 0.5 * w1 * cp.sum_squares(cp.hstack([mpc_data.inputi[i] for i in range(nc)]))
        + 0.5 * w2 * cp.sum_squares(cp.hstack([mpc_data.statei[i+1] - mpc_data.statebari[i+1].value for i in range(nc)]))
        + 0.5 * w2 * cp.sum_squares(cp.hstack([mpc_data.inputi[i] - mpc_data.inputbari[i].value for i in range(nc)]))
    )

    dynamics_constraints = robot.dt_dynamics_constraint()

    input_constraints = [cp.abs(mpc_data.inputi[i][0]) <= v_max for i in range(nc)] + \
        [cp.abs(mpc_data.inputi[i][1]) <= omega_max for i in range(nc)]

    constraints = dynamics_constraints + input_constraints

    problem = cp.Problem(objective, constraints)

    return problem

qp_problem = create_qp(nc=nc)
assert qp_problem.is_dpp()

In [None]:
def simulate_to(
    create_problem: callable,
    mpc_data: RobotMPCData,
    n_iters: int=1,
) -> tuple[np.ndarray, np.ndarray]:
    """
    Simulate the robot's motion over the OCP horizon (trajectory optimization).
    The OCP is solved once and the entire state-input trajectory is obtained in one go.

    Args:
        problem (cp.Problem): The optimization problem to solve.
        mpc_data (RobotMPCData): The MPC data containing state and input variables.

    Returns:
        tuple[np.ndarray, np.ndarray]: Arrays of states and inputs over the simulation steps of size [steps, ns] and [steps, ni], respectively.
    """

    states = np.zeros((mpc_data.nc+1, ns))
    inputs = np.zeros((mpc_data.nc, ni))
    
    mpc_data.statei[0].value = np.array([0.0, 0.0, 1.0])
    for i in range(mpc_data.nc):
        mpc_data.statei[i+1].value = np.array([0.0, 0.0, 1.0])
        mpc_data.inputi[i].value = np.array([0.0, 0.0])
    mpc_data.update_bar()

    for i in range(n_iters):
        problem = create_problem(nc=nc)
        problem.solve()
        mpc_data.update_bar()
    
    states[0, :] = mpc_data.statei[0].value
    for i in range(mpc_data.nc):
        states[i+1, :] = mpc_data.statei[i+1].value
        inputs[i, :] = mpc_data.inputi[i].value

    return states, inputs

In [None]:
def simulate_mpc(
    problem: cp.Problem,
    mpc_data: RobotMPCData,
    n_steps: int = 200,
) -> tuple[np.ndarray, np.ndarray]:
    """
    Simulate the robot's motion over a fixed number of steps (MPC).
    The OCP is solved multiple times and only the first control input is applied at each step.

    Args:
        problem (cp.Problem): The optimization problem to solve.
        mpc_data (RobotMPCData): The MPC data containing state and input variables.
        n_steps (int): The number of steps to simulate.

    Returns:
        tuple[np.ndarray, np.ndarray]: Arrays of states and inputs over the simulation steps of size [steps, ns] and [steps, ni], respectively.
    """

    states = np.zeros((n_steps, ns))
    inputs = np.zeros((n_steps, ni))

    mpc_data.statei[0].value = np.array([0.0, 0.0, 1.0])

    for i in range(n_steps):

        problem.solve()

        mpc_data.statei[0].value = mpc_data.statei[1].value

        mpc_data.update_bar()

        states[i, :] = mpc_data.statei[1].value
        inputs[i, :] = mpc_data.inputi[1].value

    return states, inputs

In [None]:
def _triangle_xy(x, y, theta, L=0.6, W=0.35):
    pts = np.array([
        [ +L/2.0,  0.0      ],  # tip
        [ -L/2.0, +W/2.0    ],  # rear-left
        [ -L/2.0, -W/2.0    ],  # rear-right
        [ +L/2.0,  0.0      ],  # close
    ])
    c, s = np.cos(theta), np.sin(theta)
    R = np.array([[c, -s],[s, c]])
    rot = pts @ R.T
    rot[:, 0] += x
    rot[:, 1] += y
    return rot[:, 0], rot[:, 1]

def plot_trajectory(states: np.ndarray, body_len=0.6, body_w=0.35) -> None:
    # x, y, heading(rad) assumed at [:,0], [:,1], [:,3]
    steps = states.shape[0]
    thetas = np.unwrap(states[:, 2].astype(float))  # keep radians; unwrap for smooth rotation

    xm, xM = states[:, 0].min() - 1, states[:, 0].max() + 1
    ym, yM = states[:, 1].min() - 1, states[:, 1].max() + 1

    # colors
    line_blue = "rgba(0, 0, 255, 0.85)"
    fill_blue = "rgba(0, 0, 255, 0.35)"

    # initial triangle
    tri_x0, tri_y0 = _triangle_xy(states[0, 0], states[0, 1], thetas[0], body_len, body_w)

    fig = go.Figure(
        data=[
            # 0) trajectory (blue)
            go.Scatter(
                x=states[:, 0], y=states[:, 1],
                mode="lines", name="Trajectory",
                line=dict(width=2, color="rgba(0, 0, 255, 0.5)", dash="dot")
            ),
            # 1) vehicle triangle (animated trace) — blue
            go.Scatter(
                x=tri_x0, y=tri_y0,
                mode="lines", fill="toself",
                line=dict(width=1, color=line_blue),
                fillcolor=fill_blue,
                opacity=1.0,
                hoverinfo="skip",
                showlegend=False,            # hide this from legend…
                name="Vehicle"
            ),
            # 2) legend-only proxy with a triangle icon
            go.Scatter(
                x=[states[0, 0]], y=[states[0, 1]],
                mode="markers",
                marker=dict(symbol="triangle-right", size=12, color=line_blue),
                name="Vehicle",
                visible="legendonly",        # …and show this only in the legend
                hoverinfo="skip"
            ),
        ]
    )

    # layout + slow animation
    fig.update_layout(
        width=600, height=450,
        xaxis=dict(range=[xm, xM], autorange=False, zeroline=False, scaleanchor="y", title="x [m]"),
        yaxis=dict(range=[ym, yM], autorange=False, zeroline=False, title="y [m]"),
        title_text="Trajectory", title_x=0.5,
        updatemenus=[dict(
            type="buttons",
            buttons=[dict(
                args=[None, {
                    "frame": {"duration": 100, "redraw": False},  # slower
                    "fromcurrent": True,
                    "transition": {"duration": 0},
                    "mode": "immediate"
                }],
                label="Play",
                method="animate",
            )]
        )]
    )

    # frames: update only the triangle trace (index 1)
    fig.update(frames=[
        go.Frame(
            data=[go.Scatter(*(), x=_triangle_xy(states[k, 0], states[k, 1], thetas[k], body_len, body_w)[0],
                                  y=_triangle_xy(states[k, 0], states[k, 1], thetas[k], body_len, body_w)[1])],
            traces=[1]
        ) for k in range(steps)
    ])

    fig.show()
    fig.write_html("omni_robot_mpc.html", include_plotlyjs="cdn", full_html=False)

In [None]:
def simulate_and_plot_qp():
    w1.value = np.array([1.0])
    w2.value = np.array([0.1])
    
    for n_iters in range(5):
        states, inputs = simulate_to(create_qp, mpc_data, n_iters=n_iters+1)
        
        plot_trajectory(states)

simulate_and_plot_qp()

### Create the `CvxpyLayer`

In [None]:
def task_loss(states, inputs):
    p_goal_th = torch.tensor(p_goal, requires_grad=True)
    
    st_cost = torch.stack([(s[0:2] - p_goal_th).pow(2).sum() for s in states]).sum()
    in_cost = torch.stack([u.pow(2).sum()               for u in inputs]).sum()
    return 0.5 * st_cost + 0.5 * 0.1 * in_cost

In [None]:
def plot_training_res(history):
    colors = get_colors()
    
    fig, ax = plt.subplots(2, 1)
    ax[0].plot([h['loss'] for h in history])
    ax[0].set_xlabel("Epoch")
    ax[0].set_ylabel("Loss")
    ax[0].set_title("Loss Evolution")

    ax11 = ax[1].twinx()
    ax[1].plot([h['w'][0] for h in history],)
    ax[1].set_title("Weights Evolution")
    ax[1].set_xlabel("Epoch")
    ax[1].set_ylabel(r"$w_0$", color=colors[0])
    ax[1].tick_params(axis='y', labelcolor=colors[0])

    ax11.plot([h['w'] for h in history], color=colors[1], linestyle='--')
    ax11.set_ylabel(r"$w_1$", color=colors[1])
    ax11.tick_params(axis='y', labelcolor=colors[1])

    fig.set_constrained_layout(True)
    plt.show()

In [None]:
cvxpylayer = CvxpyLayer(
    qp_problem,
    parameters=[w1, w2, s0],
    variables=[mpc_data.statei[i] for i in range(1, nc + 1)] + [mpc_data.inputi[i] for i in range(nc)],
)

w1_th = torch.nn.Parameter(torch.tensor([1.0]))
w2_th = torch.tensor([1.0])
s0_th = torch.tensor([0.0, 0.0, 0.1])

optim = torch.optim.Adam([w1_th], lr=5e-3)

solution = cvxpylayer(w1_th, w2_th, s0_th)

solution[0].sum().backward()

print(f"w1.grad: {w1_th.grad}")

In [None]:
n_epochs = 500
history  = []

with torch.no_grad():
    w1_th.copy_(torch.tensor([1.0]))
    w2_th.copy_(torch.tensor([1.0]))

for epoch in range(n_epochs):
    optim.zero_grad()
    
    s0_th = torch.zeros_like(s0_th)
    
    qp_problem = create_qp(nc=nc)
    cvxpylayer = CvxpyLayer(
        qp_problem,
        parameters=[w1, w2, s0],
        variables=[mpc_data.statei[i] for i in range(1, nc + 1)] + [mpc_data.inputi[i] for i in range(nc)],
    )

    sol = cvxpylayer(w1_th, w2_th, s0_th)
    states = [sol[i] for i in range(nc)]
    inputs = [sol[nc + i] for i in range(nc)]
    
    mpc_data.statebari[0].value = s0_th.detach().cpu().numpy()
    for i in range(nc):
        mpc_data.statebari[i+1].value = states[i].detach().cpu().numpy()
        mpc_data.inputbari[i].value = inputs[i].detach().cpu().numpy()

    loss = task_loss(states, inputs)
    loss.backward()

    optim.step()
    with torch.no_grad():
        w1_th.clamp_(min=1e-6)

    history.append({
        'loss': loss.item(),
        'w': copy.deepcopy(w1_th.detach().cpu().numpy()),
        'dw': copy.deepcopy(w1_th.grad.detach().cpu().numpy()),
    })
    print(f"Epoch {epoch:2d} | loss = {loss.item():.4f} | w = {w1_th.detach().cpu().numpy()}")
    
plot_training_res(history)