<a href="https://colab.research.google.com/github/eisbetterthanpi/ilqr_pytorch/blob/main/iLQR_np.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

####ilqr

In [None]:
# @title controller
import numpy as np

class iLQR:
    def __init__(self, dynamics, cost):
        self.cost = cost
        self.dynamics = dynamics

    def fit(self, x0, us_init, maxiters = 50, early_stop = True):
        return run_ilqr(self.dynamics.f, self.dynamics.f_prime, self.cost.L, self.cost.Lf, 
            self.cost.L_prime, self.cost.Lf_prime, x0, us_init, maxiters, early_stop)

    def rollout(self, x0, us):
        return rollout(self.dynamics.f, self.cost.L, self.cost.Lf, x0, us)

class MPC:
    def __init__(self, controller, control_horizon = 1):
        super(MPC, self).__init__()
        self.ch = control_horizon
        self.controller = controller
        self.us_init = None

    def set_initial(self, us_init):
        '''Set initial guess of actions'''
        if us_init.shape[0] <= self.ch:
            raise Exception('prediction horizon must be greater than control horizon')
        self.us_init = us_init

    def control(self, x0, maxiters = 50, early_stop = True):
        '''Returns optimal actions. Supposed to be called Sequentially with observed state'''
        if self.us_init is None:
            raise Exception('initial guess has not been set')
        xs, us, cost_trace = self.controller.fit(x0, self.us_init, maxiters, early_stop)
        # self.us_init[:-self.ch] = self.us_init[self.ch:]
        # return us[:self.ch]
        return xs, us, cost_trace

def run_ilqr(f, f_prime, L, Lf, L_prime, Lf_prime, x0, u_init, max_iters, early_stop,
             alphas=0, regu_init = 20, max_regu = 10000, min_regu = 0.001):
    alphas = 0.5**np.arange(8)
    us = u_init
    regu = regu_init
    # First forward rollout
    xs, J_old = rollout(f, L, Lf, x0, us)
    # cost trace
    cost_trace = [J_old]
    # Run main loop
    for it in range(max_iters):
        ks, Ks, exp_cost_redu = backward_pass(f_prime, L_prime, Lf_prime, xs, us, regu)
        # Early termination if improvement is small
        if it > 3 and early_stop and np.abs(exp_cost_redu) < 1e-5: break
        # Backtracking line search
        for alpha in alphas:
            xs_new, us_new, J_new = forward_pass(f, L, Lf, xs, us, ks, Ks, alpha)
            if J_old - J_new > 0:
                # Accept new trajectories and lower regularization
                J_old = J_new
                xs = xs_new
                us = us_new
                regu *= 0.7
                break
        else:
            # Reject new trajectories and increase regularization
            regu *= 2.0
        cost_trace.append(J_old)
        regu = min(max(regu, min_regu), max_regu)
    return xs, us, cost_trace

def rollout(f, L, Lf, x0, us):
    xs = np.empty((us.shape[0] + 1, x0.shape[0]))
    xs[0] = x0
    cost = 0
    for n in range(us.shape[0]):
        xs[n+1] = f(xs[n], us[n])
        cost += L(xs[n], us[n])
    cost += Lf(xs[-1])
    return xs, cost

def forward_pass(f, L, Lf, xs, us, ks, Ks, alpha):
    xs_new = np.empty(xs.shape)
    cost_new = 0.0
    xs_new[0] = xs[0]
    us_new = us + alpha*ks
    for n in range(us.shape[0]):
        # us_new[n] += Ks[n].dot(xs_new[n] - xs[n])
        us_new[n] += Ks[n]@(xs_new[n] - xs[n])
        xs_new[n + 1] = f(xs_new[n], us_new[n])
        cost_new += L(xs_new[n], us_new[n])
    cost_new += Lf(xs_new[-1])
    return xs_new, us_new, cost_new

def backward_pass(f_prime, L_prime, Lf_prime, xs, us, regu):
    ks = np.empty(us.shape)
    Ks = np.empty((us.shape[0], us.shape[1], xs.shape[1]))
    delta_V = 0
    V_x, V_xx = Lf_prime(xs[-1])
    regu_I = regu*np.eye(V_xx.shape[0])
    for n in range(us.shape[0] - 1, -1, -1):
        f_x, f_u = f_prime(xs[n], us[n])
        l_x, l_u, l_xx, l_ux, l_uu  = L_prime(xs[n], us[n])
        # Q_terms
        Q_x  = l_x  + f_x.T@V_x
        Q_u  = l_u  + f_u.T@V_x
        Q_xx = l_xx + f_x.T@V_xx@f_x
        Q_ux = l_ux + f_u.T@V_xx@f_x
        Q_uu = l_uu + f_u.T@V_xx@f_u
        # gains
        f_u_dot_regu = f_u.T@regu_I
        Q_ux_regu = Q_ux + f_u_dot_regu@f_x
        Q_uu_regu = Q_uu + f_u_dot_regu@f_u
        Q_uu_inv = np.linalg.inv(Q_uu_regu)

        k = -Q_uu_inv@Q_u
        K = -Q_uu_inv@Q_ux_regu
        ks[n], Ks[n] = k, K
        # V_terms
        V_x  = Q_x + K.T@Q_u + Q_ux.T@k + K.T@Q_uu@k
        V_xx = Q_xx + 2*K.T@Q_ux + K.T@Q_uu@K
        #expected cost reduction
        delta_V += Q_u.T@k + 0.5*k.T@Q_uu@k
    return ks, Ks, delta_V


In [None]:
# @title utils
import numpy as np

def Bounded(high, low, eps = 1e-4):
    def cons(vars):
        diff = (high - low)/2
        cs=np.concatenate((high - vars,vars - low))/diff
        cost = -np.log(cs + eps).sum()
        return 0.1*cost
    return cons

# def SoftConstrain(cs, alpha = 0.01, beta = 10):
#     cost = (alpha*np.exp(-beta*cs)).sum()
#     return cost
# def Smooth_abs(x, alpha = 0.25):
#     return np.sqrt(x**2 + alpha**2) - alpha


In [None]:
# @title containers

def FiniteJac(fun, args, i, eps=1e-5): # eps=1e-8
    '''Finite difference approximation'''
    fun0 = fun(*args)
    n = args[i].size
    jac = np.empty((fun0.size,n))
    for k in range(n):
        args[i][k] += eps
        jac[:,k] = (fun(*args) - fun0)/eps
        args[i][k] -= eps
    return jac

class Dynamics:
    def __init__(self, f):
        self.f = f
        self.f_prime = lambda x, u: (FiniteJac(f, (x, u), 0), FiniteJac(f, (x, u), 1))

class Cost:
    def __init__(self, Q, R, QT, x_goal, add_on = 0):
        '''Container for Cost.
              L:  Running cost
              Lf: Terminal cost'''
        #Running cost and it's partial derivatives
        # '''Construct Quadratic cost'''

        def L(x, u): # Quadratic cost
            er = x - x_goal
            return er.T@Q@er + u.T@R@u + add_on(u)
        self.L = L

        def L_prime(x,u):
            l_x, l_u = FiniteJac(L, (x, u), 0).squeeze(0), FiniteJac(L, (x, u), 1).squeeze(0)
            l_xx = FiniteJac(lambda *args : FiniteJac(L, args, 0).squeeze(), (x, u), 0)
            l_ux = FiniteJac(lambda *args : FiniteJac(L, args, 1).squeeze(), (x, u), 0)
            l_uu = FiniteJac(lambda *args : FiniteJac(L, args, 1).squeeze(), (x, u), 1)
            return l_x, l_u, l_xx, l_ux, l_uu
        self.L_prime = L_prime

        #Terminal cost and it's partial derivatives
        def Lf(x): # Quadratic cost
            er = x - x_goal
            return er.T@QT@er
        self.Lf = Lf
        
        #Partial derivatives of terminal cost
        self.Lf_prime = lambda x: (FiniteJac(Lf,(x,),0).squeeze(), FiniteJac(lambda *args : FiniteJac(Lf, args, 0).squeeze(), (x,), 0))


In [None]:
# @title pendulum
import torch
import numpy as np

#state and action dimensions
n_x = 3
n_u = 1
#time step
dt = 0.025
m, g, l = 1, 10, 1

def f(x, u):
    if type(x)==np.ndarray: x, u = torch.tensor(x, dtype=float), torch.tensor(u, dtype=float)
    sin, cos, omega = x
    theta = torch.arctan2(sin, cos)
    alpha = (u[0] - m*g*l*torch.sin(theta + torch.pi))/(m*l**2)
    theta_n = theta + omega*dt
    # return torch.cat((torch.sin(theta_n).unsqueeze(0), torch.cos(theta_n).unsqueeze(0), omega.unsqueeze(0) + alpha*dt),0)
    return torch.cat((torch.sin(theta_n).unsqueeze(0), torch.cos(theta_n).unsqueeze(0), omega.unsqueeze(0) + alpha*dt),0).numpy()
# def f(x, u):
#     sin, cos, omega = x
#     theta = np.arctan2(sin, cos)
#     alpha = (u[0] - m*g*l*np.sin(theta + np.pi))/(m*l**2)
#     theta_n = theta + omega*dt
#     return np.array([np.sin(theta_n), np.cos(theta_n), omega + alpha*dt])
Pendulum = Dynamics(f)


#Construct cost to swing up Pendulum
#theta = 0 --> sin(theta) = 0, cos(theta) = 1
x_goal = np.array([0, 1, 0])
Q  = np.diag([0, 1, 0.1])
R  = np.diag([0.1])
QT = np.diag([0, 100, 100])
#Add constraints on torque input (2Nm to -2Nm)
cons = Bounded(high = 2, low = -2)
# def cons(u): return Bounded(u, high = np.array(2.), low = np.array(-2.))
SwingUpCost = Cost(Q, R, QT, x_goal, cons)

#initialise the controller
controller = iLQR(Pendulum, SwingUpCost)

#initial state
#theta = pi --> sin(theta) = 0, cos(theta) = -1
x0 = np.array([0., -1., 0.])
#initial guess
us_init = np.random.randn(200, n_u)*0.01
#get optimal states and actions

# xs, us, cost_trace = controller.fit(x0, us_init)


mpc = MPC(controller)
mpc.set_initial(us_init)
xs, us, cost_trace = mpc.control(x0)


#Plot theta and action trajectory
import matplotlib.pyplot as plt
theta = np.arctan2(xs[:, 0], xs[:, 1])
theta = np.where(theta < 0, 2*np.pi+theta, theta)
plt.plot(theta)
plt.plot(us)
plt.show()

# 45sec down
# ? simplify 37sec 40sec
# .compile 40Sec
# pure np 17sec
# np, pytorch f 21-22sec
# jit in mpc control 21 sec
# jit f 23

# run rollout fwd bwd ,cons jac f


In [None]:

%%timeit
# f:
# 174 µs ± 32.1 µs 7 runs
# 13.4 µs ± 2.94 µs jit empty
# 12.5 µs ± 414 ns njit empty 13.1 µs ± 1.72 µs
# 13.7 µs ± 1.75 µs njit zeros

# FiniteJac(f, (x, u), 0)
# f:
# 15 µs ± 1.93 µs np finite
# 127 µs ± 11.8 µs

# fn:
# 59.5 µs ± 1.42 µs np
# 234 µs ± 7.2 µs jit
# jacobian(fn,(x,u))


controller.fit(x0, us_init)
# 20.6 s ± 288 ms pure np
# 9.33 s ± 220 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) base
# 9.42 s ± 392 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) bwd
# 9.61 s ± 368 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) bwd fwd




  cost = -np.log(cs + eps).sum()


20.6 s ± 288 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
