In [1]:
import numpy as np
import matplotlib.pyplot as plt
np.set_printoptions(suppress=True, precision=2)
import scipy.optimize as optimize

# Optimize Simple Models

We assume linear forms for all our functions:

$$
\begin{align}
o_t &= As_t \\
o_{t+1} &= o_t + Bo_t + Cu_t \\
\hat{c}(s_t) &= (Ag - As_t)^TD(Ag - As_t)\\
\end{align}
$$

In [13]:
class LinearStateSpaceModelWithQuadraticCost:
    
    def __init__(self, N, M, L):
        """
        N: dimensionality of the full state
        M: dimensionality in the reduced state
        L: dimensionality in the actions
        """
        self.N = N
        self.M = M
        self.L = L
        self.A = np.ndarray((M, N))
        self.B = np.ndarray((1, M))
        self.C = np.ndarray((M, L))
        self.D = np.ndarray((M, M))
        
    def size(self):
        return self.A.size + self.B.size + self.C.size + self.D.size
        
    def from_matrices(self, A, B, C, D):
        assert A.size == self.A.size
        assert B.size == self.B.size
        assert C.size == self.C.size
        assert D.size == self.D.size
        
        self.A = A
        self.B = B
        self.C = C
        self.D = D
        
    def from_params(self, params):
        assert len(params) == self.size(), "Expected {:d} params, fot {:d}".format(self.size(), len(params))
        params = np.array(params)
        n_A = self.A.size
        self.A = params[0:n_A].reshape(self.A.shape)
        n_B = n_A + self.B.size
        self.B = params[n_A:n_B].reshape(self.B.shape)
        n_C = n_B + self.C.size
        self.C = params[n_B:n_C].reshape(self.C.shape)
        n_D = n_C + self.D.size
        self.D = params[n_C:n_D].reshape(self.D.shape)
    
    def to_params(self):
        return np.concatenate((self.A.flatten(),
                               self.B.flatten(),
                               self.C.flatten(),
                               self.D.flatten()))
    
    def reduce(self, s):
        return self.A@s

    def predict_from_s(self, s, u):
        o = self.A@s
        o_ = o + (self.B@o + self.C@u)
        return o_

    def predict_from_o(self, o, u):
        o_ = o + (self.B@o + self.C@u)
        return o_

    def cost_of_s(self, s, g):
        o = self.A@s
        o_g = self.A@g
        return (o_g - o).T@self.D@(o_g - o)
    
    def cost_of_o(self, o, g):
        o_g = self.A@g
        return (o_g - o).T@self.D@(o_g - o)
    
    def print(self):
        print("Model reduction Matrix:", self.A)
        print("Dynamics Matrices:", self.B, ',', self.C)
        print("Cost Matrix:", self.D)

In [4]:
def train(data, model, goal, objective_func, **kwargs):
    """
    mutates the model that was passed in
    """
        
    def __objective(params):
        model.from_params(params)
        return objective_func(model=model, g=goal, data=data, **kwargs)
    
    initial_params = np.random.randn(model.size())
    result = optimize.minimize(__objective, initial_params, method='Powell')
    
    if not result.success:
        print("Status: {:d}, Message: {:s}".format(result.status, result.message))
        return None
    print('Finished in {:d} iterations'.format(result.nit))

In [6]:
def latent_prediction_objective(model, g, data, alpha=0.5, regularization=1e-3):
    """ return: MSE over all training examples """
    err = np.zeros(len(data))
    for i, (s, u, s_, c, c_) in enumerate(data):
        latent_prediction_accuracy = np.linalg.norm(model.predict_from_s(s, u) - model.reduce(s_))
        current_cost_accuracy = abs(model.cost_of_s(s, g) - c)
        err[i] = (1 - alpha) * current_cost_accuracy \
                + alpha * latent_prediction_accuracy \
                + regularization * np.linalg.norm(model.to_params())
        
    obj = (err**2).mean()
    return obj

In [5]:
def one_step_cost_prediction_objective(model, g, data, alpha=0.5, regularization=1e-3):
    """ return: MSE over all training examples """
    err = np.zeros(len(data))
    for i, (s, u, s_, c, c_) in enumerate(data):
        predicted_cost_accuracy = np.linalg.norm(model.cost_of_o(model.predict_from_s(s, u), g) - model.cost_of_s(s_, g))
        current_cost_accuracy = abs(model.cost_of_s(s, g) - c)
        err[i] = (1 - alpha) * current_cost_accuracy \
                + alpha * predicted_cost_accuracy \
                + regularization * np.linalg.norm(model.to_params())
        
    obj = (err**2).mean()
    return obj

# Functions for handling Gazebo data

In [6]:
# Load some test data
def load_gazebo_data(log_file ,g):
    log_data = np.loadtxt(log_file)
    new_data = []
    for i in range(log_data.shape[0] - 1):
        s = np.expand_dims(log_data[i][0:4], axis=1)
        s_ = np.expand_dims(log_data[i+1][0:4], axis=1)
        u = np.expand_dims(log_data[i][4:6], axis=1)
         # NOTE: we are now using quadratic cost here!
        c = (log_data[i][0] - g[0])**2 + (log_data[i][1] - g[1])**2
        c_ = (log_data[i+1][0] - g[0])**2 + (log_data[i+1][1] - g[1])**2
        new_datum = [s, u, s_, c, c_]
        new_data.append(new_datum)
    return new_data

In [7]:
def plot_gz_data(new_data):
    plt.figure()
    plt.title(r"Full State ($s$)")
    plt.plot([s[0][0,0] for s in new_data], label='x1')
    plt.plot([s[0][1,0] for s in new_data], label='y1')
    plt.plot([s[0][2,0] for s in new_data], label='x2')
    plt.plot([s[0][3,0] for s in new_data], label='y2')
    plt.ylabel("meters")
    plt.xlabel("time (s)")
    plt.legend();

    plt.figure()
    plt.title(r"Control Input ($u$)")
    plt.plot([s[1][0,0] for s in new_data], label='vx')
    plt.plot([s[1][1,0] for s in new_data], label='vy')
    plt.ylabel("m/s")
    plt.xlabel("time (s)")
    plt.legend();

    plt.figure()
    plt.title(r"Cost ($c$)")
    plt.plot([s[4] for s in new_data])
    plt.xlabel("time (s)");
    
    plt.show()

# Animation Functions

In [8]:
import imageio

def animate_prediction(params, s_0, u):
    o = reduce_from_params(params, s_0)
    
    print(o.shape)
    
    images = []
    for u_t in u:
        o = predict_next_o(params, s)
        
        images.append(img)
        
    temp_filename = '/tmp/temp.gif'
    imageio.mimsave(temp_filename, images)
    from IPython.display import Image
    Image(filename=temp_filename) 