In [1]:
import gym
import numpy as np
import control as ct
import tensorflow as tf
import tensorflow_probability as tfp


For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
If you depend on functionality not listed there, please file an issue.



In [None]:
class KalmanFilter(object):
    """
    This class defines a kalman filter

    l - latent state
    l_a_priori - A priori state estimate
    l_a_posteriori - A posteriori state estimate

    P_a_priori - A priori error covariance
    P_a_posteriori - A posteriori error covariance

    F - state-transition model
    Q - covariance of the process noise    
    a, b - observation model and bias
    R - covariance of the observation noise
    z - observation

    y_pre - measurement pre-fit residual
    S - Pre-fit residual covariance
    K - Kalman gain
    y_post - measurement post-fit residual
    """
    
    def __init__(self, dim_l, dim_z, batch_size, **kwargs):
        self.dim_l = dim_l
        self.dim_z = dim_z
        self.dim_y = dim_z

        # lambda initializer for identity matrices
        self.eye_init = lambda shape, dtype = np.float32: np.eye(*shape, dtype = dtype)

        self._I = tf.constant(self.eye_init((dim_l, dim_l)), name= 'I')


        '''This section requires these kwargs to exist, cannot handle missing args'''


        '''This section also cannot handle missing kwargs'''
        self.l_0 = kwargs.pop('l_0', None)
        self.P_0 = kwargs.pop('P_0', None)
        self.F = kwargs.pop('F', None)
        self.g = kwargs.pop('g', None)
        self.a = kwargs.pop('a', None)
        self.b = kwargs.pop('b', None)
        self.sigma = kwargs.pop('sigma', None)
        self.y_0 = kwargs.pop('y_0', None)
        self.z_0 = kwargs.pop('z_0', None)
        self.pred_0 = kwargs.pop('pred_0', None)
        self.z = kwargs.pop('z', None)
        self.g_pred = kwargs.pop('g_pred', None)
        self.sigma_pred = kwargs.pop('sigma_pred', None)
        self.l_0_pred = kwargs.pop('l_0_pred', None)
        self.z_0_pred = kwargs.pop('z_0_pred', None)
        self.F_pred = kwargs.pop('F_pred', None)
        self.a_pred = kwargs.pop('a_pred', None)
        self.b_pred = kwargs.pop('b_pred', None)


    def forward_filter_fn(self, params, inputs):
        """
        Forward step over a batch
        params contains: l_a_posteriori, P_a_posteriori, y_pre
        inputs contains: z, F, g, sigma, a, b

        Calculates prior distributions based on the given posterior distributions and the current residual
                updates posterior distributions based on the new prior distributions
        """
        '''Shapes:
            z = (bs, dim_z)
            l_a_posteriori = (bs, dim_l, dim_z)
            P_a_posteriori = (bs, dim_l, dim_l)
            F = (bs, dim_l, dim_l)
            Q = (bs, dim_l, dim_l)
            R = (bs, dim_z, dim_z)
            a = (bs, dim_l, dim_z)
            b = (bs, dim_z)
        '''
        
        z, F, g, sigma, a, b = inputs
        l_a_posteriori, P_a_posteriori, y_pre, pred = params


        
        l_a_priori = tf.matmul(F,l_a_posteriori)
        P_a_priori = tf.matmul(tf.matmul(F,P_a_posteriori), F, transpose_b = True) + tf.matmul(g,g, transpose_b=True)


        y_pre = tf.expand_dims(z - tf.squeeze(tf.add(tf.matmul(a, l_a_priori, transpose_a=True), b),-1),-1)

        S = tf.matmul(sigma, sigma, transpose_b=True) + \
            tf.matmul(tf.matmul(a, P_a_priori, transpose_a=True), a)
        S_inv = tf.reciprocal(S)
        '''TODO: Compute inverse using cholesky decomposition? Only works if a is matrix
                so z must be multivariate
        '''
        
        K = tf.matmul(tf.matmul(P_a_priori, a), S_inv)
        l_a_posteriori = l_a_priori + tf.matmul(K,y_pre)
        I_Ka = self._I-tf.matmul(K,a, transpose_b=True)
        P_a_posteriori = tf.matmul(tf.matmul(I_Ka, P_a_priori), I_Ka, transpose_b=True) + \
                         tf.matmul(tf.matmul(K,tf.matmul(sigma, sigma, transpose_b = True)),
                                   K, transpose_b=True)
        y_post = z - tf.squeeze(tf.add(tf.matmul(a, l_a_posteriori, transpose_a=True), b), -1)
        pred = tf.squeeze(tf.add(tf.matmul(a, l_a_posteriori, transpose_a=True),b), -1)
        return l_a_posteriori, P_a_posteriori, y_post, pred

    def forward_filter(self):
        """
        Compute the forward step in Kalman Filter
        The forward pass is initialized with p(x_1) = N(self.x, self.P)
        We return the mean and covariance for p(x_t|x_tm1) for t=2, ..., T+1
        and the filtering distribution p(x_t|z_1:t) for t=1, ..., T
        """

        forward_states = tf.scan(self.forward_filter_fn,
                                 elems = (trans(self.z),trans(self.F),
                                          trans(self.g),trans(self.sigma),
                                          trans(self.a),trans(self.b)),
                                initializer=(self.l_0, self.P_0, self.y_0, self.pred_0))
        
        return forward_states
    
    def Kfilter(self):
        l_filtered, P_filtered, residuals, filtered_prediction = self.forward_filter()
        return trans(l_filtered), trans(P_filtered), trans(residuals), trans(filtered_prediction)
        
    def forward_predict_fn(self, params, inputs):
        """Forward step over a batch
        params contains l_prev, z_prev
        inputs contains F, g, a, b, sigma"""
        
        F, g, a, b, sigma = inputs
        l_prev, z_prev = params
        
#         l_next = tfd.MultivariateNormalDiag(loc = tf.matmul(F, l_prev), scale_diag = g).sample()
        l_next = tf.matmul(F, l_prev)
#         z_next = tfd.Normal(loc = tf.matmul(a, l_prev, transpose_a=True)+b, scale = sigma).sample()
        z_next = tf.matmul(a, l_prev, transpose_a=True) + b
        return l_next, z_next
    
    def forward_predict(self):
        """
        Compute the predictions in state space model
        The forward pass is initialized by l_T = p(l_T|z_1:T)
        We return the hidden states l_T+1:T+t and predictions z_T+1:T+t
        """
        
        forward_predictions = tf.scan(self.forward_predict_fn,
                                      elems = (trans(self.F_pred), trans(self.g_pred),
                                               trans(self.a_pred), trans(self.b_pred),
                                               trans(self.sigma_pred)),
                                      initializer = (self.l_0_pred, self.z_0_pred))
        
        return forward_predictions
        
    def Kpredict(self):
        
        l_predicted, z_predicted = self.forward_predict()
        return trans(l_predicted), trans(z_predicted)
    
def trans(tensor):
    if len(tensor.shape)==3:
        return tf.transpose(tensor, [1,0,2])
    else:
        return tf.transpose(tensor, [1,0,2,3])

In [2]:
noise_magnitude = np.array([0.01,0.1,0.01,0.1])
av_time = []

env = gym.make('Custom_CartPole-v0', thetaacc_error=2, initial_state=1)
g = env.gravity
M = env.masscart
m = env.masspole
l = env.length
Q = np.eye(4)*[10,1,1,1]
R = 1

'''System of equations'''
A = np.array([[0,1,0,0],[0,0,-m*g/M,0],[0,0,0,1],[0,0,(M+m)*g/(l*M),0]])
B = np.array([[0,1/M,0,-1/(l*M)]]).T


'''LQR'''
K,S,E = ct.lqr(A,B,Q,R)

'''Pole Placement'''
#K = ct.place(A,B,np.array([-1.1,-1.2,-1.3,-1.4]))


#env.x_threshold = 5.0
#env.theta_threshold_radians = 10.0




for i_episode in range(5):
    observation = env.reset()
    for t in range(500):
        env.render()
        #print(observation)
        #action = env.action_space.sample()
        u = -np.dot(K,observation)
        observation, reward, done, info = env.step(u[0])
        if done:
            print("Episode finished at time step {}".format(t+1))
            break
    print("Episode complete")
env.close()


Episode complete
Episode complete
Episode complete
Episode complete
Episode finished at time step 26
Episode complete
