Special Types

In [None]:
from ..base import Property
from .array import CovarianceMatrix
from .base import Type
from .state import (State, GaussianState, ParticleState, WeightedGaussianState,
                    GaussianMixtureState)

class GaussianStatePrediction(Prediction, GaussianState):
    """ GaussianStatePrediction type

    This is a simple Gaussian state prediction object, which, as the name
    suggests, is described by a Gaussian distribution.
    """


class GaussianMeasurementPrediction(MeasurementPrediction, GaussianState):
    """ GaussianMeasurementPrediction type

    This is a simple Gaussian measurement prediction object, which, as the name
    suggests, is described by a Gaussian distribution.
    """

    cross_covar = Property(CovarianceMatrix,
                           doc="The state-measurement cross covariance matrix",
                           default=None)

    def __init__(self, state_vector, covar, timestamp=None,
                 cross_covar=None, *args, **kwargs):
        if(cross_covar is not None
           and cross_covar.shape[1] != state_vector.shape[0]):
            raise ValueError("cross_covar should have the same number of \
                             columns as the number of rows in state_vector")
        super().__init__(state_vector, covar, timestamp,
                         cross_covar, *args, **kwargs)

from ..base import Property
from .base import Type
from .hypothesis import Hypothesis
from .state import (State, GaussianState, ParticleState,
                    WeightedGaussianState, GaussianMixtureState)

class WeightedGaussianStateUpdate(Update, WeightedGaussianState):
    """ WeightedGaussianStateUpdate type

    This is an Gaussian state update object, which is augmented with a
    weight  property.
    """


IMM Prediction

In [None]:
import numpy as np
from functools import lru_cache

from .base import Predictor
from ..base import Property, Base
from ..functions import gauss2sigma, unscented_transform, imm_merge
from ..types.prediction import (GaussianStatePrediction,
                                WeightedGaussianStatePrediction,
                                GaussianMixtureStatePrediction)
from ..types.state import (State, GaussianMixtureState, GaussianState,
                           WeightedGaussianState)


class IMMPredictor(Base):
    predictors = Property([Predictor],
                          doc="A bank of predictors each parameterised with "
                              "a different model")
    model_transition_matrix = \
        Property(np.ndarray,
                 doc="The square transition probability "
                     "matrix of size equal to the number of "
                     "predictors")

    @lru_cache()
    def predict(self, prior, control_input=None, timestamp=None, **kwargs):
        """
        IMM prediction step
        Parameters
        ----------
        prior: :class:`~GaussianMixtureState`
            The prior state
        control_input : :class:`~.State`, optional
            The control input. It will only have an effect if
            :attr:`control_model` is not `None` (the default is `None`)
        timestamp: :class:`datetime.datetime`, optional
            A timestamp signifying when the prediction is performed \
            (the default is `None`)

        Returns
        -------
        :class:`~GaussianMixtureState`
        """
        nm = self.model_transition_matrix.shape[0]

        # Extract means, covars and weights
        means = prior.means
        covars = prior.covars
        weights = prior.weights

        # Step 1) Calculation of mixing probabilities
        c_j = self.model_transition_matrix.T @ weights
        mu_ij = (self.model_transition_matrix * (weights @ (1 / c_j).T)).T

        # Step 2) Mixing (Mixture Reduction)
        means_k, covars_k = imm_merge(means, covars, mu_ij)

        # Step 3) Mode-matched prediction
        predictions = []
        for i in range(nm):
            prior_i = GaussianState(means_k[:, [i]],
                                    np.squeeze(covars_k[[i], :, :]),
                                    timestamp=prior.timestamp)
            prediction = self.predictors[i].predict(prior_i,
                                                    timestamp=timestamp)
            predictions.append(
                WeightedGaussianStatePrediction(
                      prediction.mean,
                      prediction.covar,
                      weight=weights[i, 0],
                      timestamp=prediction.timestamp))

        return GaussianMixtureStatePrediction(predictions)

IMM Updater

In [None]:
import numpy as np
from scipy.stats import multivariate_normal as mvn
from functools import lru_cache

from .base import Updater
from ..base import Property, Base
from ..types.hypothesis import SingleHypothesis
from ..types.prediction import (WeightedGaussianMeasurementPrediction,
                                GaussianMixtureMeasurementPrediction)
from ..types.update import (WeightedGaussianStateUpdate,
                            GaussianMixtureStateUpdate)
from ..types.state import WeightedGaussianState

class IMMUpdater(Base):

    updaters = Property([Updater],
                        doc="A bank of predictors each parameterised with "
                            "a different model")
    model_transition_matrix = \
        Property(np.ndarray,
                 doc="The square transition probability "
                     "matrix of size equal to the number of "
                     "updaters")
    @lru_cache()
    def predict_measurement(self, predicted_state, measurement_model=None,
                            **kwargs):
        """IMM measurement prediction step

        Parameters
        ----------
        predicted_state : :class:`~.GaussianMixtureStatePrediction`
            A predicted state object

        Returns
        -------
        : :class:`~.GaussianMixtureMeasurementPrediction`
            The measurement prediction
        """
        nm = self.model_transition_matrix.shape[0]

        # Extract means, covars and weights
        means, covars, weights = (predicted_state.means,
                                  predicted_state.covars,
                                  predicted_state.weights)

        meas_predictions = []
        for i in range(nm):
            pred = WeightedGaussianState(means[:, [i]],
                                         np.squeeze(covars[[i], :, :]),
                                         timestamp=predicted_state.timestamp)
            meas_prediction = self.updaters[i].predict_measurement(pred)
            meas_predictions.append(
                WeightedGaussianMeasurementPrediction(
                    meas_prediction.mean,
                    meas_prediction.covar,
                    cross_covar=meas_prediction.cross_covar,
                    timestamp=meas_prediction.timestamp,
                    weight = weights[i,0]))
        return GaussianMixtureMeasurementPrediction(meas_predictions)

    def update(self, hypothesis, **kwargs):
        """ IMM measurement update step

        Parameters
        ----------
        hypothesis : :class:`~.Hypothesis`
            Hypothesis with predicted state and associated detection used for
            updating.

        Returns
        -------
        : :class:`~.GaussianMixtureStateUpdate`
            The state posterior
        """
        nm = self.model_transition_matrix.shape[0]

        # Extract weights
        weights = hypothesis.prediction.weights

        # Step 3) Mode-matched filtering (ctn'd)
        Lj = []
        posteriors = []
        for i in range(nm):
            pred = hypothesis.prediction.components[i]
            meas_prediction = self.updaters[i].predict_measurement(pred)
            hyp = SingleHypothesis(pred, hypothesis.measurement,
                                   meas_prediction)
            posterior = self.updaters[i].update(hyp)
            posteriors.append(posterior)
            Lj.append(mvn.pdf(posterior.hypothesis.measurement.state_vector.T,
                              posterior.hypothesis.measurement_prediction.mean.ravel(),
                              meas_prediction.covar))

        # Step 4) Mode Probability update
        c_j = self.model_transition_matrix.T @ weights  # (11.6.6-8)
        weights = Lj * c_j.ravel()  # (11.6.6-15)
        weights = weights / np.sum(weights)  # Normalise
        posteriors_w = [WeightedGaussianStateUpdate(
                            posteriors[i].mean,
                            posteriors[i].covar,
                            posteriors[i].hypothesis,
                            weight=weights[i],
                            timestamp=posteriors[i].timestamp)
                        for i in range(nm)]

        return GaussianMixtureStateUpdate(posteriors_w, hypothesis)