# **Design for Single Differenced Kalman Network for GNSS Triangulation**

### **Introduction**

Here, we design a Recurrent Neural Network (RNN) aided kalman filter for GPS/GNSS triangulation. The main motivation behind this is that this is able to learn the non-linear filter for a given reciever and satellite constellation and the reciever dependent noise characteristics. 

The main paper that we are following:

- [KalmanNet: Neural Network Aided Kalman Filtering for Partially Known Dynamics](https://ieeexplore.ieee.org/abstract/document/9733186?casa_token=E7Dlt41RuYMAAAAA:K_hc0gJoRDXLG3BNt6tS3QW1Tzf54jufij9adE7HcjGst0VeyXWhWe5Mc1MiyXH7tY_gcH0GNg)


This architecture has shown performance improvements over the traditional Kalman Filter (KF), Extended Kalman Filter (EKF), and Unscented Kalman Filter (UKF) in the case of non-linear dynamics and unknown noise characteristics as well as unknown dynamics. The main idea is to use a RNN to learn the dynamics and the noise characteristics of the system. Rather than tracking second order statistics of the system, we let the RNN learn the second order statistics of the system.  

## **Problem Formulation**

The GPS/GNSS problem is a non-linear problem. The measurement model in the non-linear observation equations. The dynamics, for example, the position of the moving object, is also generally non-linear. Hence, the traditional KF is not applicable. The EKF and UKF are applicable, but they require the knowledge of the noise characteristics of the system. This is not always available or easy to obtain. However, the RNN can learn the noise characteristics of the given system using the labeled data.

The state space model for the GPS/GNSS problem is genrally given as:
$$
x = \begin{bmatrix} x \\ y \\ z \\ \dot{x} \\ \dot{y} \\ \dot{z} \\ cdt \\ c\dot{dt}\end{bmatrix}
$$
where $x,y,z$ are the position of the reciever in the ECEF frame, $\dot{x},\dot{y},\dot{z}$ are the velocity of the reciever in the ECEF frame, $cdt$ is the clock bias of the reciever, and $c\dot{dt}$ is the clock drift of the reciever with respect to the GPS/GNSS time.

The pseudorange equation is given as:
$$
\rho = \sqrt{(x-x_s)^2 + (y-y_s)^2 + (z-z_s)^2} + cdt + \epsilon
$$
where $x_s,y_s,z_s$ are the position of the satellite in the ECEF frame, and $\epsilon$ is the noise in the measurement.

These were the assumption made in the [UKF Aided Kalman Filter](unscented_kalman_filter_gps.ipynb) notebook.

## **Architecture Design Problem** 

There are few design problems that we need to solve before we can implement the KalmanNet.

1. How does the training data look like?  
    Just like any neural network, training data will contaion an input X and a labelled output Y. Since the base architecture of the Kalman Net is RNN, the input will be time series of the observables,
    $$
    P_i = \begin{bmatrix} \rho_{1,i} \\ \rho_{2,i} \\ \vdots \\ \rho_{N_s,i} \end{bmatrix}
    $$
    where $i$ is the time index, and $N_s$ is the number of satellites in view. The labelled training data Y will be the time series of position and velocity of the reciever in the ECEF frame.
    $$
    Y_i = \begin{bmatrix} x_i \\ y_i \\ z_i \\ \dot{x}_i \\ \dot{y}_i \\ \dot{z}_i \end{bmatrix}
    $$

    **Note: Prior to the training, user can correct the data for modeled errors such as ionospheric delay, tropospheric delay, and satellite clock bias. This further improves the performance of the KalmanNet because it will be trained on more precise data.**


2. What about the reciever clock bias and drift?  
    The reciever clock bias and drift are not observables since they are very hard to measure with high accuracy and are generally noisy. So, they are not included in the training data. So, we will have to simultaneously eliminate them and gain some advantage doing that. This is done by using the single differenced measurement model. The single differenced measurement mode. Hence out state space model becomes:
    $$
    x = \begin{bmatrix} x \\ y \\ z \\ \dot{x} \\ \dot{y} \\ \dot{z} \end{bmatrix}
    $$
    Hence, the clock bias is not calculated. This can be calculated using least squares or UKF. Eliminating the clock bias, we have more gain more precision in the location estimation and velocity estimation which are the main goals of the GPS/GNSS problem.


##

## **Single Differenced Equations**

To eliminate the reciever clock bias, we use the single differenced measurement model. This measurement model is given as:
$$
\rho_{ij} = \rho_{i} - \rho_{j} + \epsilon_{ij}
$$
where $\rho_{ij}$ is the single differenced pseudorange between the $i^{th}$ and $j^{th}$ reference satellite, $\rho_{i}$ is the pseudorange of the $i^{th}$ reference satellite, $\rho_{j}$ is the pseudorange of the $j^{th}$ reference satellite, and $\epsilon_{ij}$ is the remaining noise in the single differenced pseudorange.

Since the reciever clock bias is same for all the satellites at a given time, the pseudorange equation for the $i^{th}$ satellite is given as:
$$
\rho_{i} = \sqrt{(x-x_i)^2 + (y-y_i)^2 + (z-z_i)^2} + cdt + \epsilon_i
$$
where $x_i,y_i,z_i$ are the position of the $i^{th}$ satellite in the ECEF frame, and $\epsilon_i$ is the noise in the measurement. Now a reference satellite is chosen, generally having the highest elevation angle as the reference satellite. The single differenced pseudorange equation for the $i^{th}$ and $j^{th}$ satellite is given as:
$$
\rho_{ij} = \sqrt{(x-x_i)^2 + (y-y_i)^2 + (z-z_i)^2} - \sqrt{(x-x_j)^2 + (y-y_j)^2 + (z-z_j)^2} + cdt - cdt + \epsilon_{i} - \epsilon_{j}
$$
The clock bias is eliminated. The single differenced pseudorange equation for the $i^{th}$ and $j^{th}$ satellite is given as:
$$
\rho_{ij} = \sqrt{(x-x_i)^2 + (y-y_i)^2 + (z-z_i)^2} - \sqrt{(x-x_j)^2 + (y-y_j)^2 + (z-z_j)^2} + \epsilon_{ij}
$$
where $\epsilon_{ij} = \epsilon_{i} - \epsilon_{j}$ is the remaining noise in the single differenced pseudorange.

We will feed the KalmanNet with the single differenced pseudorange measurements and the KalmanNet will output the position and velocity of the reciever in the ECEF frame. This requires that at least 4 satellites are in view of the reciever.



## **State Transistion and Measurement Model**

### **State Transition Model**

The constant velocity model is used for the state transistion model as in the [UKF Aided Kalman Filter](unscented_kalman_filter_gps.ipynb) notebook. The state transistion model matrix is given as:
$$
A = \begin{bmatrix} 1 & \Delta t \\ 0 & 1 \end{bmatrix}
$$
Now the state transition function can be calculated as:
$$
F = \begin{bmatrix} A & 0 & 0 & 0 \\ 0 & A & 0 & 0 \\ 0 & 0 & A & 0 \end{bmatrix}
$$
where $A$  is constant velocity model matrix, and $\Delta$ is the time step for the measurement update which depends on the sampling rate of the GPS/GNSS reciever. 

For the Kalman Net, the process noise is modelled by the RNN. Hence, the model mismatch can be mitigated, and noise characteristics can be directly learned from the data.

In [1]:
# Implementation of the State Transition Matrix for Kalman Net
import numpy as np
import numba as nb

nb.njit(
    nb.float64[:,:](nb.float64[:], nb.float64),
    fastmath=True,
    cache=True,
    parallel=True
)
def fx(x : np.ndarray, dt : float) -> np.ndarray:
    """State transition function for the Single Differential Kalman Net.
    Args:
        x (ndarray): Current state vector. Shape (6) 1D array.
        dt (float): Time step. Units: seconds.
    
    Returns:
        ndarray: Time translated state vector.
    """
    # Unit evolution matrix
    A = np.eye(2, dtype=np.float64)
    A[0,1] = dt

    # State transition function 6 * 4 constant velocity model
    F = np.kron(np.eye(3, dtype=np.float64), A)

    return F @ x


### **Measurement Model**

The measurement function as discussed above is the SD-Measurement model. Let $j$ denote the reference satellite, and $s$ denote the current state, $ik$ denote the observed $k$ satellites . The measurement function $h(x_s, y_s, z_s,)$ is given as:
$$
h(x_s, y_s, z_s, \vec{X_{ik}}, \vec{X_{rf}}) = \sqrt{(x_s-x_{ik})^2 + (y_s-y_{ik})^2 + (z_s-z_{ik})^2} - \sqrt{(x_s-x_{j})^2 + (y_s-y_{j})^2 + (z_s-z_{j})^2}
$$
where $x_s,y_s,z_s$ are current state of the reciever in the ECEF frame, $\vec{X_{ik}}$ is the vector of the position of the $k$ satellites in the ECEF frame, and $\vec{X_{rf}}$ is the vector of the position of the reference satellite in the ECEF frame.

Hence, the follwing argumente are required for the measurement function:
- The current state of the reciever in the ECEF frame (As is the convention in the Kalman Filter)
- The position of the reference satellite in the ECEF frame(To eliminate the clock bias)
- The position of the satellites in the ECEF frame (To calculate the differenced pseudorange which are the observables)


In [2]:
# Implementation of the measurement function for the Kalman Net
nb.njit(
    nb.float64[:](nb.float64[:], nb.float64[:,:], nb.float64[:]),
    fastmath=True,
    cache=True,
    parallel=True
)
def hx(x : np.ndarray, sv_matrix : np.ndarray , reference_sv : np.ndarray) -> np.ndarray:
    """Measurement function for the Single Differential Kalman Net.
    Args:
        x (ndarray): Current state vector. Shape (6,) 1D array.
        sv_matrix (ndarray): Satellite state vector matrix. Shape (n, 3). n = number of satellites.
        reference_sv (ndarray): Reference satellite state vector. Shape (3) 1D array.
    
    Returns:
        ndarray: Differenced pseduorange measurement vector. Shape (n,) 1D array.
    """
    # Assert that the input is of the correct shape
    assert x.shape == (6,), f"Shape of state is {x.shape}"
    assert sv_matrix.shape[1] == 3 , f"Shape of sv_matrix is {sv_matrix.shape}"
    assert reference_sv.shape == (3,) , f"Shape of reference_sv is {reference_sv.shape}"

    # Single differential pseudorange measurement function
    return np.linalg.norm(x[:3] - sv_matrix, axis=1) - np.linalg.norm(x[:3] - reference_sv)
    


## **KalmanNet Architecture**

Following the paper, the KalmanNet architecture is similar to the EKF architecture. First an abstract class is defined for the KalmanNet. This class contatins abstract methods that need to be implemented by the child class. These methods will be choosen based on the specific architecture of the KalmanNet.

In [1]:
from abc import ABC, abstractmethod
from assertpy import assert_that
from collections import deque
import numpy as np

class AbstractKalmanNet(ABC):
    """The Kalman Net abstract base class.

    Args:
        ABC (_type_): Abstract base class.
    """
    def __init__(self, 
                dim_state : int, 
                dim_measurement : int, 
                dt : float,
                flavor : list[str] | None = None, 
                **kwargs):
        """Initializes the Kalman Net.

        Args:
            dim_state (int): Dimension of the state vector.
            dim_measurement (int): Dimension of the measurement vector.
            dt (float): Time steps of measurements. Units: seconds.
            flavor (list[str], optional): List of the combination flavors to be used for calculating the kalman gain. Defaults to ["F1", "F2", "F4"].
            **kwargs: Keyword arguments for the stack initialization.
        
        Optional Keyword Args:
            posterior_stack_size (int): Size of the posterior stack, should be greater that >=2. Default: 10.
            prior_stack_size (int): Size of the prior stack. Default: 10.
            measurement_history_size (int): Size of the measurement history stack. Default: 2.
            x0 (ndarray): Initial state vector. Shape (dim_state,) 1D array. Default: np.zeros(dim_state).
            y0 (ndarray): Initial measurement vector. Shape (dim_measurement,) 1D array. Default: np.zeros(dim_measurement).

        Raises:
            AssertionError: If the dimensions are not greater than 0.
            AssertionError: If the time step is not greater than 0.
            AssertionError: If the flavor is not a list.

        Returns:
            None
        
        """
        # Initialize the dimensions
        self.x_dim = dim_state if assert_that(dim_state).is_type_of(int).is_greater_than(0) else None
        self.y_dim = dim_measurement if assert_that(dim_measurement).is_type_of(int).is_greater_than(0) else None

        # Initialize the time step
        self.dt = dt if assert_that(dt).is_type_of(float).is_greater_than(0) else None

        # Initialize the posterior stack 
        self.x_posterior_stack = deque(maxlen=kwargs.get("posterior_stack_size", 10))
        self.P_posterior_stack = deque(maxlen=kwargs.get("posterior_stack_size", 10))

        # Initialize the prior stack
        self.x_prior_stack = deque(maxlen=kwargs.get("prior_stack_size", 10))
        self.P_prior_queue = deque(maxlen=kwargs.get("prior_stack_size", 10))

        # Predicted measurement stack
        self.y_stack = deque(maxlen=kwargs.get("measurement_history_size", 2))
        self.y_hat_stack = deque(maxlen=kwargs.get("measurement_history_size", 2))

        # Check if the flavor is a list
        if flavor is None:
            flavor = ["F1", "F2", "F4"]
        self.flavor = flavor if assert_that(flavor).is_type_of(list).contains_only("F1", "F2", "F3", "F4") else ["F1", "F2", "F4"]
        
        # Initialize the stacks appropriately
        self._stack_init(**kwargs)


    def _stack_init(self, **kwargs) -> None:
        """This function initializes the stacks inplace according to the paper.
        
        Args:
            **kwargs: Keyword arguments for the stack initialization.

        Returns:
            None
        """
        # Initialize the initial value for the posterior stack
        self.x_posterior_stack.append(kwargs.get("x0", np.zeros(self.x_dim)))
        # Needs to be initialize twice since two posterior values are needed for the F3 combination at t=1
        self.x_posterior_stack.append(kwargs.get("x0", np.zeros(self.x_dim)))
        
        #Note : P_posterior needn't be initialized since it is calculated from RNN and RNN weights are initialized randomly!
        
        # Initialize the measuremet stack with the initial measurement
        # This is needed for the F1 combination at t=1 since it uses previous measurement (y_{t-1})
        self.y_meas_prev = kwargs.get("y0", np.zeros(self.y_dim)) 

        # No need to initialize the y_hat stack since it is calculated from 
        # x_prior for t=1 and y_1 is provided by the user

        # For the combination F4, the prior needs to be initialized since
        # at t=1, the combination four used x_{t-1}_posterior and x_{t-1}_prior
        self.x_prior = kwargs.get("x0", np.zeros(self.x_dim))

        return


    @property
    def x_posterior(self) -> np.ndarray:
        """Returns the current posterior state vector.

        Returns:
            ndarray: Current posterior state vector. Shape (dim_state,) 1D array.
        """
        return self.x_posterior_stack[-1]
    
    @x_posterior.setter
    def x_posterior(self, x : np.ndarray) -> None:
        """Sets the current posterior state vector.

        Args:
            x (ndarray): Current posterior state vector. Shape (dim_state,) 1D array.
        """
        self.x_posterior_stack.append(x)

    
    @property
    def P_posterior(self) -> np.ndarray:
        """Returns the current posterior covariance matrix.

        Returns:
            ndarray: Current posterior covariance matrix. Shape (dim_state, dim_state) 2D array.
        """
        return self.P_posterior_stack[-1]
    
    @P_posterior.setter
    def P_posterior(self, P : np.ndarray) -> None:
        """Sets the current posterior covariance matrix.

        Args:
            P (ndarray): Current posterior covariance matrix. Shape (dim_state, dim_state) 2D array.
        """
        self.P_posterior_stack.append(P)
    
    @property
    def x_prior(self) -> np.ndarray:
        """Returns the current prior state vector.

        Returns:
            ndarray: Current prior state vector. Shape (dim_state,) 1D array.
        """
        return self.x_prior_stack[-1]
    
    @x_prior.setter
    def x_prior(self, x : np.ndarray) -> None:
        """Sets the current prior state vector.

        Args:
            x (ndarray): Current prior state vector. Shape (dim_state,) 1D array.
        """
        self.x_prior_stack.append(x)

    
    @property
    def P_prior(self) -> np.ndarray:
        """Returns the current prior covariance matrix.

        Returns:
            ndarray: Current prior covariance matrix. Shape (dim_state, dim_state) 2D array.
        """
        return self.P_prior_queue[-1]
    
    @P_prior.setter
    def P_prior(self, P : np.ndarray) -> None:
        """Sets the current prior covariance matrix.

        Args:
            P (ndarray): Current prior covariance matrix. Shape (dim_state, dim_state) 2D array.
        """
        self.P_prior_queue.append(P)

    
    @property
    def y_hat(self) -> np.ndarray:
        """Returns the current predicted measurement vector.

        Returns:
            ndarray: Current predicted measurement vector. Shape (dim_measurement,) 1D array.
        """
        return self.y_hat_stack[-1]
    
    @y_hat.setter
    def y_hat(self, y : np.ndarray) -> None:
        """Sets the current predicted measurement vector.

        Args:
            y (ndarray): Current predicted measurement vector. Shape (dim_measurement,) 1D array.
        """
        self.y_hat_stack.append(y)

    @property
    def y_meas_prev(self) -> np.ndarray:
        """Returns the previous measurement vector.

        Returns:
            ndarray: Current measurement vector. Shape (dim_measurement,) 1D array.
        """
        return self.y_stack[-1]
    
    @y_meas_prev.setter
    def y_meas_prev(self, y : np.ndarray) -> None:
        """Sets the current measurement vecto to the previous measurement vector.

        Args:
            y (ndarray): Current measurement vector. Shape (dim_measurement,) 1D array.
        """
        self.y_stack.append(y)

    @abstractmethod
    def fx(self, x : np.ndarray, *args , **kwargs) -> np.ndarray:
        """State transition function. This function is implemented by the child class.

        Args:
            x (ndarray): Current state vector. Shape (dim_state,) 1D array.

        Returns:
            ndarray: Time translated state vector. Shape (dim_state,) 1D array.
        """
       
        pass

    @abstractmethod
    def hx(self, x : np.ndarray, *args , **kwargs) -> np.ndarray:
        """Measurement function. This function is implemented by the child class.

        Args:
            x (ndarray): Current state vector. Shape (dim_state,) 1D array.

        Returns:
            ndarray: Measurement vector. Shape (dim_measurement,) 1D array.
        """
        pass

    @staticmethod
    def F1(y_t : np.ndarray, y_prev_mes : np.ndarray) -> np.ndarray:
        """Calculates the F1 combination (observation difference) for feeding to RNN layer.
        
        This combination encapsulates the physics state evolution process since this uses only the 
        measurements of the current and previous time step. 


        Args:
            y_t (ndarray): Current measurement vector. Shape (dim_measurement,) 1D array.
            y_prev_mes (ndarray): Previous measurement vector. Shape (dim_measurement,) 1D array.

        Returns:
            ndarray: F1 combination. (dim_measurement)
        """
        return y_t - y_prev_mes

    @staticmethod
    def F2(self, y_t : np.ndarray, y_hat : np.ndarray) -> np.ndarray: 
        """This calculates the F2 combination (innovation difference) for feeding to RNN layer.

        This combination encapsulates the uncertainty of state estimation process since this uses the
        measurement and predicted measurement of the current time step.

        Args:
            y_t (ndarray): Current measurement vector. Shape (dim_measurement,) 1D array.
            y_hat (ndarray): Predicted measurement vector. Shape (dim_measurement,) 1D array.

        Returns:
            ndarray: F2 combination. (dim_measurement)
        """ 
        return y_t - y_hat
    

    @staticmethod
    def F3(self, x_t11 : np.ndarray, x_t00) -> np.ndarray:
        """This calculates the F3 combination (forward evolution difference) for feeding to RNN layer.

        This combination encapsulates the state evolution process since this uses the state vectors of two consecutive posterior
        state vectors. This is taken for t-1 for time step t.

        Args:
            x_t11 (ndarray): Current previous vector. Shape (dim_state,) 1D array.
            x_t00 (ndarray): Previous state vector. Shape (dim_state,) 1D array.

        Returns:
            ndarray: F3 combination. (dim_state)
        """
        return x_t11 - x_t00
    
    @staticmethod
    def F4(self, x_t11 : np.ndarray, x_t00: np.ndarray) -> np.ndarray:
        """This calculates the F4 combination (forward update difference) for feeding to RNN layer.

        This combination encapsulates the uncertainty of state evolution process since this uses the state vectors of prior state estimates
        . This is taken for t-1 for time step t.

        Args:
            x_t11 (ndarray): Current posterior vector. Shape (dim_state,) 1D array.
            x_t00 (ndarray): Current prior vector. Shape (dim_state,) 1D array.

        Returns:
            ndarray: F4 combination. (dim_state)
        """
        return x_t11 - x_t00
    
    
    def predict(self, **kwargs) -> None:
        """Predicts the x_t|t-1 using the state transition function.

        The state transition function is implemented by the child class.The values are stored in the x_prior attribute. The KalmanNet does not track second-order statistics. 
        Please refer to the paper for more details.

        Paper: 
            KalmanNet: Neural Network Aided Kalman Filtering for Partially Known Dynamics
            <https://ieeexplore.ieee.org/abstract/document/9733186?casa_token=E7Dlt41RuYMAAAAA:K_hc0gJoRDXLG3BNt6tS3QW1Tzf54jufij9adE7HcjGst0VeyXWhWe5Mc1MiyXH7tY_gcH0GNg>

        Args:
            **kwargs: Keyword arguments for the state transition function.

        Returns:
            None
    """
        # Get the current posterior
        x_t00 = self.x_posterior
        # Pass the current posterior to the state transition function
        x_t10 = self.fx(x_t00, **kwargs)

        # Set the prior
        self.x_prior = x_t10

        return
    
    
    def update(self, y : np.ndarray, **kwargs) -> None:
        """This is the update loop of the Kalman Net.

        The update loop of the Kalman Net calculates the x_t|t and P_t|t (from Neural Net) from the x_t|t-1 (stored in the x_prior attribute).
        The updated values are stored in the most recent x_posterior and P_posterior attributes.

        Args:
            y (np.ndarray): Measurement vector. Shape (dim_measurement,) 1D array.
            **kwargs: Keyword arguments for the measurement function.

        Returns:
            None
        """
        # Get the current prior
        x_t10 =  self.x_prior

        # Pass the current prior to the measurement function to get y_hat
        y_hat = self.hx(x_t10, **kwargs)
        # Set the predicted measurement to the y_hat attribute
        self.y_hat = y_hat

        # Calculate the flavor combinations
        combinations = {
            "F1" : self.F1(y, self.y_meas_prev),
            "F2" : self.F2(y, y_hat), # This is the innovation difference
            "F3" : self.F3(self.x_posterior, self.x_posterior_stack[-2]), # Use t-1 and t-2 posterior
            "F4" : self.F4(self.x_posterior, self.x_prior_stack[-2]) # Use t-1 posterior and t-1 prior
        }

        # Calculate the kalman gain
        kalman_gain = self.forward(combinations)

        # Update the posterior
        self.x_posterior = x_t10 + kalman_gain @ combinations["F2"]

        return
    
    @abstractmethod
    def forward(self, combination : dict) -> np.ndarray:
        """The forward pass of the Kalman Net.

        This function takes in the combination dictionary and calculates the forward pass of the Kalman Net.
        Must be implemented by the child class using Neural Net.

        Args:
            combination (dict): Dictionary of the combination flavors and their values.

        Returns:
            ndarray: The kalman gain matrix. Shape (dim_state, dim_measurement) 2D array.
        """
        pass

