In [2]:
import numpy as np
from scipy.linalg import *
from drl.env import TwoLinkArm

In [3]:
epsilon = 1e-5
Ts = 10
env = TwoLinkArm(g=0.)
N = int(Ts/env.dt)

# Cost function scaling factors
wp = 1000. # Position
wv = 1000. # Velocity
wu = 1. # input

lamb = 1.0
lamb_factor = 10.
lamb_max = 1000.
eps_converged = 0.001
max_iter = 100

## Define cost functions

In [4]:
def calc_cost(u):
    l = wu * np.sum(u**2)
    lx = np.zeros(env.state_dim)
    lu = wu * 2 * u
    lxx = np.zeros((env.state_dim, env.state_dim))
    luu = wu * 2 * np.eye(env.action_dim)
    lux = np.zeros((env.action_dim, env.state_dim))
    
    return l, lx, lu, lxx, luu, lux

In [5]:
def calc_final_cost(x, goal):
    x1, y1, x2, y2 = env.to_cartesian(x)
    x_cart = np.array([x2, y2])
    
    error = x_cart - goal
    l = wp * np.sum(error**2) + wv * np.sum(x[env.action_dim:]**2)
    
    lx1 = calc_dldx(x, goal)
    lx2 = 2 * wv * x[env.action_dim:]
    lx = np.concatenate((lx1, lx2))
    
    lxx = np.zeros((env.state_dim, env.state_dim))
    for i in range(2):
        x_tmp = np.copy(x)
        x_tmp[i] += epsilon
        lxx_plus = calc_dldx(x_tmp, goal)
        x_tmp = np.copy(x)
        x_tmp[i] -= epsilon
        lxx_min = calc_dldx(x_tmp, goal)
        lxx[i,:2] = (lxx_plus - lxx_min) / (2.*epsilon)
    lxx[2:, 2:] = 2 * wv * np.eye(2)
    
    return l, lx, lxx

In [6]:
def calc_dldx(x, goal):
    m1, m2, l1, l2, g = env.params
    x1, y1, x2, y2 = env.to_cartesian(x)
    x_cart = np.array([x2, y2])
    error = x_cart - goal
    dx_cartdx = np.array([[np.cos(x[0])*l1 + np.cos(x[0]+x[1])*l2, np.cos(x[0]+x[1])*l2], [-np.sin(x[0])*l1 - np.sin(x[0]+x[1])*l2, -np.sin(x[0]+x[1])*l2]])
    lx1 = 2 * wp * np.dot(error.T, dx_cartdx)
    
    return lx1

## Simulate
Performs a roll-out of the system executing policy U starting in x0

In [7]:
def simulate(x0, U):
    N = U.shape[0]
    X = np.zeros((N, env.state_dim))
    X[0,:] = x0
    cost = 0.
    
    for i in range(1, N):        
        X[i,:], _ = env.simulate_system(X[i-1,:], U[i-1,:])
        l, _, _, _, _, _ = calc_cost(U[i-1,:])
        cost += l*env.dt
    
    l, _, _ = calc_final_cost(X[-1,:], env.get_goal(True))
    cost += l*env.dt
    
    return X, cost

##  Finite difference derivatives
Calculates the system dynamics using finite difference

$\dot{x} = f(x,u) = Ax + Bu$

Where we can calculate $A$ and $B$ using centered finite difference formula

$A = \frac{\partial f(x, u)}{\partial x} = \frac{\partial f(x, u)}{\partial x} = \frac{f(x + h, u) - f(x - h, u)}{2h} \\
B = \frac{\partial f(x, u)}{\partial u} \frac{\partial f(x, u)}{\partial u} = \frac{f(x, u + h) - f(x, u - h)}{2h}$

In [8]:
def calc_derivatives(x, u):
    A = np.zeros((env.state_dim, env.state_dim))
    for i in range(env.state_dim):
        x_tmp = x.copy()
        x_tmp[i] += epsilon
        f_1 = env.dynamics_func(x_tmp, u)
        x_tmp = x.copy()
        x_tmp[i] -= epsilon
        f_2 = env.dynamics_func(x_tmp, u)
        fxdx = (f_1 - f_2) / (2*epsilon)
        A[:, i] = fxdx
        
    B = np.zeros((env.state_dim, env.action_dim))
    for i in range(env.action_dim):
        u_tmp = u.copy()
        u_tmp[i] += epsilon
        f_1 = env.simulate_system(x, u_tmp)
        u_tmp = u.copy()
        u_tmp[i] -= epsilon
        f_2 = env.simulate_system(x, u_tmp)
        fxdu = (f_1 - f_2) / (2*epsilon)
        B[:, i] = fxdu
        
    return A, B

## Forward pass

iLQG requires discrete dynamics, convert using:

$x_{k+1} = x_k + \dot{x}_k\Delta t$

$\dot{x}_k = Ax_k + Bx_k$

Results in:

$x_{k+1} = (I + A\Delta t)x_k + (B\Delta t)u_k$

In [9]:
def forward_pass(X, U):
    N = X.shape[0]
    # Calculate costs and derivatives along trajectory
    fx = np.zeros((N, env.state_dim, env.state_dim))
    fu = np.zeros((N, env.state_dim, env.action_dim))
    l = np.zeros(N)
    lx = np.zeros((N, env.state_dim))
    lu = np.zeros((N, env.action_dim))
    lxx = np.zeros((N, env.state_dim, env.state_dim))
    luu = np.zeros((N, env.action_dim, env.action_dim))
    lux = np.zeros((N, env.action_dim, env.state_dim))
    
    for i in range(0, N-1):        
        A, B = calc_derivatives(X[i,:], U[i,:]) 
        
        fx[i,:,:] = np.eye(env.state_dim) + A*env.dt
        fu[i,:,:] = B*env.dt
        
        l[i], lx[i,:], lu[i,:], lxx[i,:,:], luu[i,:,:], lux[i,:,:] = calc_cost(U[i,:])
        l[i] *= env.dt
        lx[i,:] *= env.dt
        lu[i,:] *= env.dt
        lxx[i,:,:] *= env.dt
        luu[i,:,:] *= env.dt
        lux[i,:,:] *= env.dt
        
    l[-1], lx[-1,:], lxx[-1,:,:] = calc_final_cost(X[-1,:], env.get_goal(True))
    l[-1] *= env.dt
    lx[-1,:] *= env.dt
    lxx[-1,:,:] *= env.dt  
        
    return X, U, fx, fu, l, lx, lu, lxx, luu, lux

## Backward pass

In [10]:
def backward_pass(res_forward):
    X, U, fx, fu, l, lx, lu, lxx, luu, lux = res_forward
    N = X.shape[0]
    
    k = np.zeros((N, env.action_dim))
    K = np.zeros((N, env.action_dim, env.state_dim))
    
    V = l[-1].copy()
    Vx = lx[-1,:].copy()
    Vxx = lxx[-1,:,:].copy()
    
    for i in range(N-2, -1, -1):
        Qx = lx[i,:] + np.dot(fx[i,:].T, Vx)
        Qu = lu[i,:] + np.dot(fu[i,:].T, Vx)
        Qxx = lxx[i,:,:] + np.dot(fx[i,:].T, np.dot(Vxx, fx[i,:]))
        Quu = luu[i,:,:] + np.dot(fu[i,:].T, np.dot(Vxx, fu[i,:]))
        Qux = lux[i,:,:] + np.dot(fu[i,:].T, np.dot(Vxx, fx[i,:]))
        
        U, S, V = np.linalg.svd(Quu)
        S[S < 0] = 0.0
        S += lamb
        Quu_inv = np.dot(U, np.dot(np.diag(1.0/S), V.T))
        
        k[i,:] = -np.dot(Quu_inv, Qu)
        K[i,:,:] = -np.dot(Quu_inv, Qux)
        
        Vx = Qx - np.dot(K[i,:,:].T, np.dot(Quu, k[i,:]))
        Vxx = Qxx - np.dot(K[i,:,:].T, np.dot(Quu, K[i,:,:]))
        
    return k, K

## Update control signals and calculate cost

In [11]:
def update_control_signal(X, U, k, K):
    N = X.shape[0]
    X_hat = np.zeros(X.shape)
    X_hat[0,:] = X[0,:]
    U_hat = np.zeros(U.shape)
    
    for i in range(N-1):
        U_hat[i,:] = U[i,:] + k[i,:] + np.dot(K[i,:,:], X_hat[i,:] - X[i,:])
        X_hat[i+1,:], _ = env.simulate_system(X_hat[i,:], U_hat[i,:])
        
    return X_hat, U_hat

## iLQG algorithm

In [12]:
def iLQG(x, U):
    lamb = 1.
    perform_rollout = True
    
    for i in range(max_iter):
        
        # Perform a roll-out
        if perform_rollout:
            X, cost = simulate(x, U)
            perform_rollout = False

        res_forward = forward_pass(X, U)
        k, K = backward_pass(res_forward)
        X_hat, U_hat = update_control_signal(X, U, k, K)

        X_hat, cost_hat = simulate(x, U_hat)

        if cost_hat < cost:
            lamb /= lamb_factor

            X = np.copy(X_hat)
            U = np.copy(U_hat)
            cost_old = np.copy(cost)
            cost = np.copy(cost_hat)
    
            perform_rollout = True
        
            if np.abs((cost_hat - cost)/cost) < eps_converged:
                print("Converged at iteration %d - Cost: %.4f - Lambda: %.1f" % (i, cost, lamb))
                break
        else:
            lamb *= lamb_factor
            
            if lamb > lamb_max:
                print("Lambda > max lambda at iteration %d - Cost %.1f - Lambda: %.1f" % (i, cost, lamb))
                break
            
    return X, U, cost

In [13]:
x = env.reset()
env.render()

# Generate random sequence of actions
U = np.random.randn(N-1, env.action_dim)*env.action_high

for i in range(N):
    X, U, cost = iLQG(x, U)
    
    x, _, _, _ = env.step(U[0,:])
    
    U = U[1:,:]
    
    env.render()

AttributeError: 'TwoLinkArm' object has no attribute 'simulate_system'

In [None]:
env.render(close=True)