In [2]:
import numpy as np
import matplotlib.pyplot as plt
np.set_printoptions(suppress=True, precision=2)
from IPython.display import display, Markdown, Pretty, HTML
import scipy.optimize as optimize

def BOLD(x):
    display(Markdown("**" + str(x) + "**"))

# Optimize Simple Models

We assume linear forms for all our functions:

$$
\begin{align}
o_t &= As_t \\
\frac{\partial o_t}{\partial t} &= Bo_t + Cu_t && \text{continuous} \\
o_{t+1} &= o_t + \Delta t(Bo_t + Cu_t) && \text{discrete} \\
\hat{c}(s_t) &= (Ag - As_t)^TD(Ag - As_t)\\
\end{align}
$$

In [19]:
class LinearStateSpaceModelWithQuadraticCost:
    
    def __init__(self, N, M, L):
        """
        N: dimensionality of the full state
        M: dimensionality in the reduced state
        L: dimensionality in the actions
        """
        self.N = N
        self.M = M
        self.L = L
        self.A = np.ndarray((M, N))
        self.B = np.ndarray((1, M))
        self.C = np.ndarray((M, L))
        self.D = np.ndarray((M, M))
        
    def size(self):
        return self.A.size + self.B.size + self.C.size + self.D.size
        
    def from_matrices(self, A, B, C, D):
        assert A.size == self.A.size
        assert B.size == self.B.size
        assert C.size == self.C.size
        assert D.size == self.D.size
        
        self.A = A
        self.B = B
        self.C = C
        self.D = D
        
    def from_params(self, params):
        assert len(params) == self.size(), "Expected {:d} params, fot {:d}".format(self.size(), len(params))
        params = np.array(params)
        n_A = self.A.size
        self.A = params[0:n_A].reshape(self.A.shape)
        n_B = n_A + self.B.size
        self.B = params[n_A:n_B].reshape(self.B.shape)
        n_C = n_B + self.C.size
        self.C = params[n_B:n_C].reshape(self.C.shape)
        n_D = n_C + self.D.size
        self.D = params[n_C:n_D].reshape(self.D.shape)
    
    def to_params(self):
        return np.concatenate((self.A.flatten(),
                               self.B.flatten(),
                               self.C.flatten(),
                               self.D.flatten()))
    
    def reduce(self, s):
        return np.dot(self.A, s)

    def predict_from_s(self, s, u, dt):
        o = np.dot(self.A, s)
        o_ = o + (np.dot(self.B, o) + np.dot(self.C, u)) * dt
        return o_

    def predict_from_o(self, o, u, dt):
        o_ = o + (np.dot(self.B, o) + np.dot(self.C, u)) * dt
        return o_

    def cost_of_s(self, s, g):
        o = np.dot(self.A, s)
        o_g = np.dot(self.A, g)
        return np.dot((o_g - o).T, np.dot(self.D, o_g - o))
    
    def cost_of_o(self, o, g):
        o_g = np.dot(self.A, g)
        return np.dot((o_g - o).T, np.dot(self.D, o_g - o))
    
    def predict_cost(self, o, u, dt, g):
        return self.cost_of_o(self.predict_from_o(o, u, dt), g)
            
    def __repr__(self):
        print("Model reduction Matrix:", self.A)
        print("Dynamics Matrices:", self.B, ',', self.C)
        print("Cost Matrix:", self.D)

# Train

In [7]:
def train(data, model, goal, dt, objective_func, initial_params=None, **kwargs):
    """
    mutates the model that was passed in
    """
        
    def __objective(params):
        model.from_params(params)
        return objective_func(model=model, g=goal, data=data, dt=dt, **kwargs)
    
    if initial_params is None:
        initial_params = np.random.randn(model.size())
        
    result = optimize.minimize(__objective, initial_params, method='Powell')
    
    if not result.success:
        print("Status: {:d}, Message: {:s}".format(result.status, result.message))
        return None
    print('Finished in {:d} iterations'.format(result.nit))

# Objectives

Mean Squared Error of our cost error
$$ \text{C}_\theta = \frac{1}{K}\sum_1^K \big[\hat{c}(f_\theta(s), f_\theta(g)) - c(s_t)\big]^2 $$

In [8]:
def current_cost(model, g, data):
    err = np.zeros(len(data))
    for i, (s, u, s_, c, c_) in enumerate(data):
        err[i] = model.cost_of_s(s, g) - c
       
    return (err**2).mean()

Mean Squared Error of our state prediction error
$$ \text{SP}_\theta = \frac{1}{K}\sum_1^K \big[T_\theta(f_\theta(s_t), u_t) - f_\theta(s_{t+1})\big]^2 $$

In [9]:
def state_prediction(model, g, data, dt):
    err = np.zeros(len(data))
    for i, (s, u, s_, c, c_) in enumerate(data):
        err[i] = np.linalg.norm(model.predict_from_s(s, u, dt) - model.reduce(s_))
        
    return (err**2).mean()

Mean Squared Error of our cost prediction error
$$ \text{CP}_\theta = \frac{1}{K}\sum_1^K \big[\hat{c}_\theta(T_\theta(f_\theta(s_t), u_t)) - c(s_{t+1})\big]^2 $$

In [10]:
def cost_prediction(model, g, data, dt):
    err = np.zeros(len(data))
    for i, (s, u, s_, c, c_) in enumerate(data):
        o = model.reduce(s)
        err[i] = np.linalg.norm(model.predict_cost(o, u, dt, g) - c_)
        
    return (err**2).mean()

$$ \text{CSP}_\theta = \alpha \text{C}_\theta + (1 - \alpha ) \text{SP}_\theta + \epsilon ||\theta|| $$ 

In [11]:
def state_prediction_objective(model, g, data, dt, alpha=0.5, regularization=1e-4):
    """ return: MSE over all training examples """
    obj = alpha * state_prediction(model, g, data, dt)
    obj += (1 - alpha) * current_cost(model, g, data)
    obj += regularization * np.linalg.norm(model.to_params())
    return obj

$$ \text{CCP}_\theta = \alpha \text{C}_\theta + (1 - \alpha ) \text{CP}_\theta + \epsilon ||\theta|| $$ 

In [12]:
def one_step_cost_prediction_objective(model, g, data, dt, alpha=0.5, regularization=1e-4):
    """ return: MSE over all training examples """
    obj = alpha * cost_prediction(model, g, data, dt)
    obj += (1 - alpha) * current_cost(model, g, data)
    obj += regularization * np.linalg.norm(model.to_params())
    return obj

# Functions for handling Gazebo data

In [13]:
# Load some test data
def load_gazebo_data(log_file ,g):
    log_data = np.loadtxt(log_file)
    new_data = []
    for i in range(log_data.shape[0] - 1):
        s = np.expand_dims(log_data[i][0:4], axis=1)
        s_ = np.expand_dims(log_data[i+1][0:4], axis=1)
        u = np.expand_dims(log_data[i][4:6], axis=1)
         # NOTE: we are now using quadratic cost here!
        c = (log_data[i][0] - g[0])**2 + (log_data[i][1] - g[1])**2
        c_ = (log_data[i+1][0] - g[0])**2 + (log_data[i+1][1] - g[1])**2
        new_datum = [s, u, s_, c, c_]
        new_data.append(new_datum)
    return new_data

In [14]:
def plot_gz_data(new_data):
    plt.figure()
    plt.title(r"Full State ($s$)")
    plt.plot([s[0][0,0] for s in new_data], label='x1')
    plt.plot([s[0][1,0] for s in new_data], label='y1')
    plt.plot([s[0][2,0] for s in new_data], label='x2')
    plt.plot([s[0][3,0] for s in new_data], label='y2')
    plt.ylabel("meters")
    plt.xlabel("time (steps)")
    plt.legend();

    plt.figure()
    plt.title(r"Control Input ($u$)")
    plt.plot([s[1][0,0] for s in new_data], label='vx')
    plt.plot([s[1][1,0] for s in new_data], label='vy')
    plt.ylabel("m/s")
    plt.xlabel("time (steps)")
    plt.legend();

    plt.figure()
    plt.title(r"Cost ($c$)")
    plt.plot([s[4] for s in new_data])
    plt.xlabel("time (steps)");
    
    plt.show()

# Training Wrapppers

In [20]:
def train_and_eval(model, data, g, dt, objective, initial_params=None, alpha=0.5, regularization=1e-5, print_model=True):
    train(data, model, g, dt, objective, initial_params)
    return eval_model(model, data, g, dt, alpha, regularization, print_model)
    
def eval_model(model, data, g, dt, alpha=0.5, regularization=1e-5, print_model=True):
    pred_state_loss = state_prediction(model, g, data, dt)
    cost_loss = current_cost(model, g, data)
    pred_cost_loss = cost_prediction(model, g, data, dt)
    pred_state_curr_cost_loss = state_prediction_objective(model, g, data, dt, alpha=alpha, regularization=regularization)
    pred_cost_curr_cost_loss = one_step_cost_prediction_objective(model, g, data, dt, alpha=alpha, regularization=regularization)
    reg_loss = regularization * np.linalg.norm(model.to_params())
    print("Loss Components:")
    print("\tcurrent cost:", cost_loss)
    print("\tpredict next latent state:", pred_state_loss)
    print("\tpredict next cost:", pred_cost_loss)
    print("\tregularization:", reg_loss)
    print("Complete Losses:")
    print("\tpredict next latent state and current cost:", pred_state_curr_cost_loss)
    print("\tpredict next cost and current cost:", pred_cost_curr_cost_loss)
    if print_model:
        print(model)
    return cost_loss, pred_state_loss, pred_cost_loss, reg_loss

# More Plotting & Introspection

In [16]:
def plot_x_rollout(model, data, dt, s0, g):
    actions = [d[1] for d in data]
    o = model.reduce(s0)
    predicted_total_cost = 0.0
    o_s = [o]
    for u in actions:
        c_hat = model.cost_of_o(o, g)
        o = model.predict_from_o(o, u, dt)
        o_s.append(o)
        predicted_total_cost += c_hat
    
    states = [d[0] for d in data]
    plt.figure()
    plt.plot([s[0,0] for s in states], label='true x1')
    plt.plot(np.squeeze(o_s), label='latent space o', linewidth=3, linestyle='--')
    plt.xlabel("time steps")
    plt.ylabel("o")
    plt.legend();
    plt.show()
    
    return predicted_total_cost

def plot_xy_rollout(model, data, dt, s0, g):
    actions = [d[1] for d in data]
    o = model.reduce(s0)
    predicted_total_cost = 0.0
    o_s = [o]
    for u in actions:
        c_hat = model.cost_of_o(o, g)
        o = model.predict_from_o(o, u, dt)
        o_s.append(o)
        predicted_total_cost += c_hat
    
    states = [d[0] for d in data]
    plt.figure()
    plt.plot([s[0,0] for s in states], label='true x1')
    plt.plot([s[1,0] for s in states], label='true y1')
    plt.plot(np.squeeze(o_s), label='latent space o', linewidth=3, linestyle='--')
    plt.xlabel("time steps")
    plt.ylabel("o")
    plt.legend();
    plt.show()
    
    return predicted_total_cost

In [17]:
def plot_cost(model, data, dt, g):
    states = [d[0] for d in data]
    actions = [d[1] for d in data]
    costs = [d[3] for d in data]
    s0 = states[0]
    o = model.reduce(s0)
    plt.figure()
    estimated_costs = []
    for u in actions:
        c_hat = model.cost_of_o(o, g)
        o = model.predict_from_o(o, u, dt)
        estimated_costs.append(np.squeeze(c_hat))
        
    plt.title("Estimated vs True Cost")
    plt.plot([np.squeeze(c) for c in costs], label='true cost')
    plt.plot(estimated_costs, label='estimated cost', linestyle='--')
    plt.xlabel("time steps")
    plt.ylabel("o")
    plt.legend();
    plt.show()
    
def plot_one_step(model, data, dt, s0, g):
    actions = [d[1] for d in data]
    states = [d[0] for d in data]
    first_n = 10
    plt.figure()
    for s, u in zip(states, actions)[:first_n]:
        o = model.reduce(s)
        c_hat = model.cost_of_o(o, g)
        o_ = model.predict_from_o(o, u, dt)
        predicted_total_cost += c_hat
        plt.plot([o[0,0], o_[0,0]], [o[1,0], o_[1,0]], label='latent space o', marker='r')
        
    plt.plot([s[0,0] for s in states], label='true x1')
    plt.xlabel("time steps")
    plt.ylabel("o")
    plt.legend();
    plt.show()
    
    return predicted_total_cost