In [1]:
import gym
import tensorflow as tf
from tqdm.notebook import tqdm
from autograd import grad, jacobian
import autograd.numpy as np

In [2]:
class refILQR:
    def __init__(self, next_state, running_cost, final_cost,
                 umax, state_dim, pred_time=3):
        self.pred_time = pred_time
        self.umax = umax
        self.v = [0.0 for _ in range(pred_time + 1)]
        self.v_x = [np.zeros(state_dim) for _ in range(pred_time + 1)]
        self.v_xx = [np.zeros((state_dim, state_dim)) for _ in range(pred_time + 1)]
        self.f = next_state
        self.lf = final_cost
        self.lf_x = grad(self.lf)
        self.lf_xx = jacobian(self.lf_x)
        self.l_x = grad(running_cost, 0)
        self.l_u = grad(running_cost, 1)
        self.l_xx = jacobian(self.l_x, 0)
        self.l_uu = jacobian(self.l_u, 1)
        self.l_ux = jacobian(self.l_u, 0)
        self.f_x = jacobian(self.f, 0)
        self.f_u = jacobian(self.f, 1)
        self.f_xx = jacobian(self.f_x, 0)
        self.f_uu = jacobian(self.f_u, 1)
        self.f_ux = jacobian(self.f_u, 0)

    def backward(self, x_seq, u_seq):
        self.v[-1] = self.lf(x_seq[-1])
        self.v_x[-1] = self.lf_x(x_seq[-1])
        self.v_xx[-1] = self.lf_xx(x_seq[-1])
        k_seq = []
        kk_seq = []
        for t in range(self.pred_time - 1, -1, -1):
            f_x_t = self.f_x(x_seq[t], u_seq[t])
            f_u_t = self.f_u(x_seq[t], u_seq[t])
            q_x = self.l_x(x_seq[t], u_seq[t]) + np.matmul(f_x_t.T, self.v_x[t + 1])
            q_u = self.l_u(x_seq[t], u_seq[t]) + np.matmul(f_u_t.T, self.v_x[t + 1])
            q_xx = self.l_xx(x_seq[t], u_seq[t]) + \
              np.matmul(np.matmul(f_x_t.T, self.v_xx[t + 1]), f_x_t) + \
              np.dot(self.v_x[t + 1], np.squeeze(self.f_xx(x_seq[t], u_seq[t])))
            tmp = np.matmul(f_u_t.T, self.v_xx[t + 1])
            q_uu = self.l_uu(x_seq[t], u_seq[t]) + np.matmul(tmp, f_u_t) + \
              np.dot(self.v_x[t + 1], np.squeeze(self.f_uu(x_seq[t], u_seq[t])))
            q_ux = self.l_ux(x_seq[t], u_seq[t]) + np.matmul(tmp, f_x_t) + \
              np.dot(self.v_x[t + 1], np.squeeze(self.f_ux(x_seq[t], u_seq[t])))
            inv_q_uu = np.linalg.inv(q_uu)
            k = -np.matmul(inv_q_uu, q_u)
            kk = -np.matmul(inv_q_uu, q_ux)
            dv = 0.5 * np.matmul(q_u, k)
            self.v[t] += dv
            self.v_x[t] = q_x - np.matmul(np.matmul(q_u, inv_q_uu), q_ux)
            self.v_xx[t] = q_xx + np.matmul(q_ux.T, kk)
            k_seq.append(k)
            kk_seq.append(kk)
        k_seq.reverse()
        kk_seq.reverse()
        return k_seq, kk_seq

    def forward(self, x_seq, u_seq, k_seq, kk_seq):
        x_seq_hat = np.array(x_seq)
        u_seq_hat = np.array(u_seq)
        for t in range(len(u_seq) - 1):
            control = k_seq[t] + np.matmul(kk_seq[t], (x_seq_hat[t] - x_seq[t]))
            u_seq_hat[t] = np.clip(u_seq[t] + control, -self.umax, self.umax)
            x_seq_hat[t + 1] = self.f(x_seq_hat[t], u_seq_hat[t])
        return x_seq_hat, u_seq_hat

# ILQR

In [3]:
class ILQR:
    def __init__(self, final_cost, running_cost, model, u_range, horizon, per_iter):
        '''
            final_cost:     f(x)    ->  cost
            running_cost:   f(x, u) ->  cost
            model:          f(x, u) ->  new state
        '''
        self.model = model
        self.final_cost = final_cost
        self.running_cost = running_cost
        
        self.u_range = u_range
        self.horizon = horizon
        self.per_iter = per_iter
        
    
    def cal_deriV(self, x):
        x = tf.convert_to_tensor(x)
        
        with tf.GradientTape() as t1:
            t1.watch(x)
            with tf.GradientTape() as t2:
                t2.watch(x)
                l = self.final_cost(x)
                
            lf_x = t2.gradient(l, x)
        lf_xx = t1.jacobian(lf_x, x)
        
#         lf_x, lf_xx = (tf.squeeze(t) for t in [lf_x, lf_xx])
        
        return lf_x, lf_xx
    
    
    def cal_deriL(self, x, u):
        x = tf.convert_to_tensor(x)
        u = tf.convert_to_tensor(u)
        
        with tf.GradientTape(persistent=True) as t1:
            t1.watch([x, u])
            with tf.GradientTape(persistent=True) as t2:
                t2.watch([x, u])
                l = self.running_cost(x, u)
                
            l_x = t2.gradient(l, x)
            l_u = t2.gradient(l, u)
        l_xx = t1.jacobian(l_x, x)
        l_uu = t1.jacobian(l_u, u)
        l_xu = t1.jacobian(l_u, x)
        
#         l_x, l_u, l_xx, l_uu, l_xu = (tf.squeeze(t) for t in [l_x, l_u, l_xx, l_uu, l_xu])
        
        return l_x, l_u, l_xx, l_uu, l_xu
    
    
    def cal_deriF(self, x, u):
        x = tf.convert_to_tensor(x)
        u = tf.convert_to_tensor(u)
        
        with tf.GradientTape(persistent=True) as t1:
            t1.watch([x, u])
            with tf.GradientTape(persistent=True) as t2:
                t2.watch([x, u])
                l = self.model(x, u)
                
            l_x = t2.jacobian(l, x)
            l_u = t2.jacobian(l, u)
        l_xx = t1.jacobian(l_x, x)
        l_uu = t1.jacobian(l_u, u)
        l_xu = t1.jacobian(l_u, x)
        
#         l_x, l_u, l_xx, l_uu, l_xu = (tf.squeeze(t) for t in [l_x, l_u, l_xx, l_uu, l_xu])
        
        return l_x, l_u, l_xx, l_uu, l_xu
                
    
    def cal_K(self, x_seq, u_seq):
        '''
            Calculate all the necessary derivatives, and compute the Ks
        '''
        state_dim = x_seq[0].shape[-1]
        v_seq = [None] * self.horizon
        v_x_seq = [None] * self.horizon
        v_xx_seq = [None] * self.horizon
        
        v_seq[-1] = self.final_cost(x_seq[-1])
        v_x_seq[-1], v_xx_seq[-1] = self.cal_deriV(x_seq[-1])
        
        k_seq = [None] * self.horizon
        kk_seq = [None] * self.horizon
        
        for i in range(self.horizon - 2, -1, -1):
            x, u = x_seq[i], u_seq[i]
            v_x, v_xx = v_x_seq[i+1], v_xx_seq[i+1]
            
            l_x, l_u, l_xx, l_uu, l_xu = self.cal_deriL(x, u)
            f_x, f_u, f_xx, f_uu, f_xu = self.cal_deriF(x, u)
            
            q_x = l_x + tf.linalg.matvec(tf.transpose(f_x), v_x)
            q_u = l_u + tf.linalg.matvec(tf.transpose(f_u), v_x)
            q_xx = l_xx + tf.transpose(f_x) @ v_xx @ f_x
            q_uu = l_uu + tf.transpose(f_u) @ v_xx @ f_u
            q_xu = l_xu + tf.transpose(f_u) @ v_xx @ f_x
            
            quu_inv = tf.linalg.inv(q_uu)
            k = - tf.linalg.matvec(quu_inv, q_u)
            kk = - quu_inv @ q_xu
            
            m = quu_inv @ q_xu
            a = tf.expand_dims(q_u, axis=0)
            
            dv = tf.tensordot(q_u, tf.linalg.matvec(quu_inv, q_u), axes=1) / 2.0
            v_x = q_x - tf.linalg.matvec(tf.transpose(q_xu), tf.linalg.matvec(q_uu, q_u))
            v_xx = q_xx - tf.transpose(q_xu) @ quu_inv @ q_xu
            
            v_seq[i] = v_seq[i+1] + dv
            v_x_seq[i] = v_x
            v_xx_seq[i] = v_xx
            k_seq[i] = k
            kk_seq[i] = kk
            
        return k_seq, kk_seq
            
    def forward(self, x_seq, u_seq, k_seq, kk_seq):
        # limping forward, calculate the action
        new_u_seq = [None] * self.horizon
        new_x_seq = [None] * self.horizon
        
        new_x_seq[0] = x_seq[0]
        
        cost = self.final_cost(x_seq[-1])
        for i in range(self.horizon-1):
            new_u = u_seq[i] + k_seq[i] + tf.linalg.matvec(kk_seq[i], (new_x_seq[i] - x_seq[i]))
            new_u = np.clip(new_u, self.u_range[0], self.u_range[1])
            new_u_seq[i] = new_u
            new_x_seq[i+1] = self.model(new_x_seq[i], new_u)
            
            cost += self.running_cost(x_seq[0], new_u)
        
        print("Cost:", cost)
        return new_x_seq, new_u_seq
            
    @tf.function 
    def predict(self, x_seq, u_seq):
        # make tensor
        x_seq = tf.convert_to_tensor(x_seq)
        u_seq = tf.convert_to_tensor(u_seq)
        
        for i in tqdm(range(self.per_iter)):
            k_seq, kk_seq = self.cal_K(x_seq, u_seq)
            x_seq, u_seq = self.forward(x_seq, u_seq, k_seq, kk_seq)
        
        return u_seq[0]

In [11]:
'''
            final_cost:     f(x)    ->  cost
            running_cost:   f(x, u) ->  cost
            model:          f(x, u) ->  new state
    x: 8
    u: 4
    
    def __init__(self, final_cost, running_cost, model, u_range, horizon, per_iter):
'''
np.random.seed(1231)

n_x = 4
n_u = 2

u_range = np.abs(np.random.rand(1, n_u)) * 2
u_range = np.concatenate([-u_range, u_range])
horizon = 30
per_iter = 1

FC = tf.random.uniform([n_x], dtype=tf.float64)
RC = tf.random.uniform([n_x, n_u], dtype=tf.float64)
MD = tf.random.uniform([n_x, n_x, n_u], dtype=tf.float64)

def final_cost(x):
    m = tf.tensordot(FC, x, axes=1) ** 2
    return  m

def running_cost(x, u):
    m = tf.tensordot(tf.tensordot(tf.transpose(x), RC, axes=1), u, axes=1) ** 2
    return m
    
def model(x, u):
    return tf.linalg.matvec(tf.linalg.matvec(MD, u), x) ** 2

In [12]:
ilqr = ILQR(final_cost, running_cost, model, u_range, horizon, per_iter)

In [13]:
def ref_final_cost(x):
    m = np.array(FC.numpy())
    s = (m @ x) ** 2
    return s

def ref_running_cost(x, u):
    m = np.array(RC.numpy())
    s = (x.T @ m @ u) **2
    return s
    
def ref_model(x, u):
    m = np.array(MD.numpy())
    s = (x.T @ m @ u) ** 2
    return s

ref = refILQR(ref_model, ref_running_cost, ref_final_cost, u_range[1], n_x, horizon)

In [14]:
np.random.seed(1231)

x_seq = np.random.rand(horizon, n_x)
u_seq = np.random.rand(horizon, n_u)

In [15]:
print(x_seq.shape)
print(u_seq.shape)

k_seq, kk_seq = ref.backward(x_seq, u_seq)
ref_x_seq, ref_u_seq = ref.forward(x_seq, u_seq, k_seq, kk_seq)

(30, 4)
(30, 2)


ValueError: shapes (4,) and (4,2,2) not aligned: 4 (dim 0) != 2 (dim 1)

In [9]:
ilqr.predict(x_seq, u_seq)

HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))




KeyboardInterrupt: 

In [None]:
ref_u_seq