In [1]:
import numpy as np
import scipy.linalg
from copy import deepcopy
from threading import Lock

In [5]:
import numpy as np
import scipy.linalg
from copy import deepcopy
from threading import Lock

class ukfexception(Exception):
    """ raised for errors specific to the unscented filter, typically due to invalid inputs """

class unscented:
    def __init__(self, state_size, noise_covar, init_state, init_covar, alpha, k, beta, state_predictor):
        """
        initializes the unscented kalman filter for financial data analysis
        :param state_size: size of the state vector
        :param noise_covar: covariance matrix for process noise
        :param init_state: initial state vector
        :param init_covar: initial covariance matrix
        :param alpha: spread of sigma points, usually a small positive value
        :param k: scaling parameter, often 0 or 3 - state_size
        :param beta: optimal for gaussian distributions (usually 2)
        :param state_predictor: function to predict next state
        """
        self.state_dim = int(state_size)
        self.sigma_count = 1 + state_size * 2
        self.noise_covar = noise_covar
        self.state = init_state
        self.covar = init_covar
        self.beta = beta
        self.alpha = alpha
        self.k = k
        self.predict_next_state = state_predictor
        self.rho = 1.0  # initial value for the prior coefficient
        self.Sk = np.zeros((state_size, state_size))  # initialize S matrix
        self.lambda_k = 1.0  # initial value for lambda
        self.alpha_k = 1.0  # initial fading factor

        # scaling factor for sigma points
        self.sigma_factor = self.alpha**2 * (self.state_dim + self.k) - self.state_dim

        # weights for mean and covariance calculations
        self.mean_weights = np.zeros(self.sigma_count)
        self.cov_weights = np.zeros(self.sigma_count)

        # first weight calculation
        self.cov_weights[0] = self.sigma_factor / (self.state_dim + self.sigma_factor) + (1 - self.alpha**2 + self.beta)
        self.mean_weights[0] = self.sigma_factor / (self.state_dim + self.sigma_factor)

        # remaining weights are identical
        for i in range(1, self.sigma_count):
            weight = 1 / (2 * (self.state_dim + self.sigma_factor))
            self.cov_weights[i] = weight
            self.mean_weights[i] = weight

        # generate initial sigma points
        self.sigma_points = self.calc_sigmas()

        # lock for thread safety
        self.lock = Lock()

    def calc_alpha_k(self, Pzz, vk):
        """
        calculates the adaptive fading factor alpha_k
        :param Pzz: predicted innovation covariance matrix
        :param vk: prediction residual error
        :return: updated alpha_k value
        """
        self.Sk = self.rho * self.Sk + np.outer(vk, vk) / (1 + self.rho)
        self.lambda_k = np.trace(self.Sk) / np.trace(Pzz)
        self.alpha_k = max(1, self.lambda_k)
        return self.alpha_k

    def calc_sigmas(self):
        """ calculates sigma points for state estimation """
        ret = np.zeros((self.sigma_count, self.state_dim))

        # scale covariance matrix
        scaled_covar = (self.state_dim + self.sigma_factor) * self.covar
        sqrt_covar = scipy.linalg.sqrtm(scaled_covar)

        # first sigma point is the state itself
        ret[0] = self.state
        for i in range(self.state_dim):
            # subsequent points are state +/- scaled square root of covariance
            ret[i + 1] = self.state + sqrt_covar[i]
            ret[i + 1 + self.state_dim] = self.state - sqrt_covar[i]

        return ret.T

    def update(self, asset_indices, market_data, error_covar):
        """
        integrates new market data into the filter
        :param asset_indices: indices of the assets being updated
        :param market_data: corresponding new data for the assets
        :param error_covar: covariance matrix for measurement errors
        """
        self.lock.acquire()

        num_states = len(asset_indices)

        # extract relevant sigma points
        sigmas_split = np.split(self.sigma_points, self.state_dim)
        y = np.concatenate([sigmas_split[i] for i in asset_indices])

        # mean of the relevant states
        state_split = np.split(self.state, self.state_dim)
        y_mean = np.concatenate([state_split[i] for i in asset_indices])

        # calculate differences for covariance update
        y_diff = deepcopy(y)
        x_diff = deepcopy(self.sigma_points)
        for i in range(self.sigma_count):
            for j in range(num_states):
                y_diff[j][i] -= y_mean[j]
            for j in range(self.state_dim):
                x_diff[j][i] -= self.state[j]

        # covariance of the measurement
        meas_covar = np.zeros((num_states, num_states))
        for i, val in enumerate(np.array_split(y_diff, self.sigma_count, 1)):
            meas_covar += self.cov_weights[i] * val.dot(val.T)
        meas_covar += error_covar

        # covariance of state and measurement
        state_meas_covar = np.zeros((self.state_dim, num_states))
        for i, val in enumerate(zip(np.array_split(y_diff, self.sigma_count, 1), np.array_split(x_diff, self.sigma_count, 1))):
            state_meas_covar += self.cov_weights[i] * val[1].dot(val[0].T)

        # kalman gain
        kalman_gain = np.dot(state_meas_covar, np.linalg.inv(meas_covar))

        # update state and covariance
        self.state += np.dot(kalman_gain, (market_data - y_mean))
        self.covar -= np.dot(kalman_gain, np.dot(meas_covar, kalman_gain.T))
        self.sigma_points = self.calc_sigmas()

        self.lock.release()

    def predict(self, delta_time, market_inputs=[]):
        """
        advances state estimate forward in time
        :param delta_time: time interval since last prediction
        :param market_inputs: additional market data inputs, if any
        """
        self.lock.acquire()

        # predict next state for each sigma point
        sigmas_out = np.array([self.predict_next_state(x, delta_time, market_inputs) for x in self.sigma_points.T]).T

        # calculate new state mean
        state_out = np.zeros(self.state_dim)
        for i in range(self.state_dim):
            state_out[i] = sum(self.mean_weights[j] * sigmas_out[i][j] for j in range(self.sigma_count))

        # calculate new state covariance
        covar_out = np.zeros((self.state_dim, self.state_dim))
        for i in range(self.sigma_count):
            diff = np.atleast_2d(sigmas_out.T[i] - state_out)
            covar_out += self.cov_weights[i] * np.dot(diff.T, diff)

        # incorporate process noise
        covar_out += delta_time * self.noise_covar

        # update state and covariance
        self.sigma_points = sigmas_out
        self.state = state_out
        self.covar = covar_out

        # adjust covariance with fading factor
        self.covar *= self.alpha_k

        self.lock.release()

    def get_state(self, index=-1):
        """
        retrieves the current state or a specific state variable
        :param index: index of the state variable, -1 for full state
        :return: state variable or full state
        """
        return self.state[index] if index >= 0 else self.state

    def get_covar(self):
        """ returns the current state covariance matrix """
        return self.covar

    def override_state(self, value, index=-1):
        """
        overrides the filter state
        :param value: new value for the state variable or full state
        :param index: index of the state variable, -1 for full state
        """
        with self.lock:
            if index != -1:
                self.state[index] = value
            else:
                self.state = value

    def restart(self, state, covar):
        """
        restarts the filter with a new state and covariance
        :param state: new state vector
        :param covar: new covariance matrix
        """
        with self.lock:
            self.state = state
            self.covar = covar