# Particle Filter Demonstration: Circular Motion

**Richard Hodgskin-Brown**
**First Published September 17th 2022**

## Overview

This notebook aims to give a visual demonstration of the bootstrap particle filter algorithm operating on a simple toy model. It is intended as a purely pedagogical exercise to strengthen the intuition behind the algorithm. This is an augmented and generalised version of a similar **[notebook](https://colab.research.google.com/drive/1AoGZAFa_8mG1jQAniV1q8bGZsMQnErzl?usp=sharing)** by Frank Dellaert, with the accompanying text rewritten.

In this basic scenario, an object travels on a circular path, and we would like to track its location. We have several pieces of prior knowledge before any measurements take place:

- We know the object travels in a perfect circular path around the origin, with no deviations or noise
- We know the exact velocity of the object, which is the same no matter its distance from the origin
- We have a prior distribution of the location of the object before any measurements in the form of a symmetric multivariate Gaussian centred around a known mean (by deafault, this is at the origin)
- We know the exact error distribution on our measurement (i.e. we have a likelihood model)

In addition to this, we can take a measurement of the object's location at each timestep. Unfortunately, we are only able to take measurements of the x-coordinate, and there is some degree of noise on each measurement. The goal is to set up a cloud of particles, the distribution of which iteratively closes in on the ground truth of the object location.

## Setup

In [31]:
# Import required libraries (just numerical & plotting libs)
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Patch

In [32]:
# Setup plotting environment to appear in new window
%matplotlib qt
sns.set_style('darkgrid') # Favourite darkgrid plotting style

### Define Constants

Here we'll define the constants for this simulation. These can be altered to get a feel for how the algorithm works.

In [33]:
# Algorithm parameters
n_particles = 1000  # Number of particles simulated in the algorithm
num_iterations = 10  # Number of iterations of the algorithm before the script stops

# Properties of the multivariate Gaussian prior distribution of the object's location
prior_mean = (0, 0)  # Coordinates (in m), default is centred around origin
prior_covariance = 9  # Value along diagonal of covariance matrix (in m)

# Motion_model
velocity = 3  # m/s
n_euler_steps = 10  # Specifies precision of motion calculation

# Observation_model
measurement_sigma = 1.0  # standard deviation error (m) on our measurements

### Plot Aesthetics

Here we'll define some variables controlling the figure aesthetics.

In [34]:
# Define colours for plotting
light_green = (0.5, 0.9, 0.5)
soft_blue = (72/255, 169/255, 247/255)
soft_orange = (243/255, 120/255, 6/255)

#
legend_elements = [Patch(facecolor=soft_blue, edgecolor=tuple(i*0.8 for i in soft_blue)),
                   Patch(facecolor=soft_orange, edgecolor=tuple(i*0.8 for i in soft_orange))]
legend_labels = ['Prior', 'Posterior']

bbox_properties = dict(boxstyle='round', facecolor=(238/255, 238/255, 244/255), alpha=0.5, edgecolor=(212/320, 212/320, 213/320))

___

## Creating the Model

Particle filters are a tool for solving stochastic problems which can be conceptualised in the language of *state space models*. In this formulation, a state vector describes the properties of the system at any particular time. For example, it may have parameters describing the positions and velocities of moving vehicles, or the abstract space of some complex control system.

In addition to the state, we need two further models to describe our system: a _transition_ (or _dynamics_) model, and a _measurement_ (or _observation_) model. The transition model describes how the state evolves over time; usually, discrete points in time are considered, creating a time series, though continuous-time state space models exist. The transition model can be exact, if the dynamics are perfectly known, or (more commonly) describe a probability distribution of values based on the previous state if the evolution is stochastic. Under the Markov condition restriction of state space models, the evolution of the state is dependent on the previous state only. The measurement model describes the relationship between any measured variables and the state itself, including a model of the error on these measurements if present. In this demonstration, we are making a measurement at a rate of one per second.

### State

In this example, the state $\textbf{x}_t$ at a given time is fully described using the x and y coordinates ($p_x^t$ & $p_y^t$). The motion is entirely deterministic, and dependent only on these coordinates. The state could alternatively be parameterised in terms of a radius and a phase.

$$ \textbf{x}_t = \begin{bmatrix} p_x^t \\ p_y^t \\ \end{bmatrix}$$

### Transition Model

The transition model for this demonstration consists of a motion model only, in this case a perfect circular motion trajectory. The scalar velocity $v$ of the object is prior knowledge, and in this demo there is no noise on the trajectory. The state, containing the object position, is updated using the vector velocity $\textbf{v}_t$ at each timestep. This is calculated using the **[Euler method](https://en.wikipedia.org/wiki/Euler_method)**; there are of course simple analytic solutions to circular motion, but this approach generalises well to more complex motion models.

$$\textbf{x}_{t+1} = \textbf{x}_{t} + \textbf{v}_t$$
$$\textbf{v}_t = \frac{v}{\sqrt{p_x^t^2 + p_y^t^2}} \cdot \begin{bmatrix} -p_y^t \\ p_x^t \end{bmatrix}$$

### Measurement Model

We are restricting ourselves in this example to a noisy measurement $m^t$ of the x-value only. Over repeated measurements, this is enough to determine the full state of the object. We will construct a model to describe how the measurement is corrupted with noise (we will use simple Gaussian noise here), equivalently, we are defining a *likelihood* $l(p_x^t | m^t)$ of the ground truth x-value $p_x^t$ given the measurement $m^t$. (The normalisation factor can be ignored, as the particle weights which will be calculated using this likelihood will be normalised anyway. This likelihood function only needs to be *proportional* to the probability distribution of the ground truth given a measurement). The standard deviation of the measurement noise $\sigma_m$ is a parameter which can be set. By default, it is relatively high (1.0 m) to demonstrate the ability of particle filters to converge on a stable estimate of the ground truth despite noisy measurements.

$$l(p_x^t | m^t)=\exp\left(\frac{(p_x^t - m^t)^2}{2\sigma_m^2}\right)$$


In [35]:
# Calculate circular motion at the specified velocity for one second
def motion_model(samples):
    # Simulate all samples forward for one second, using specified number of Euler steps

    # Perform motion on copy rather than inplace
    predictions = np.copy(samples)

    # Calculate velocity over n intermediate steps
    for i in range(n_euler_steps):
        x = predictions[:, 0]
        y = predictions[:, 1]
        norm = np.sqrt(x ** 2 + y ** 2)
        predictions[:, 0] -= (1 / n_euler_steps) * y * velocity / norm
        predictions[:, 1] += (1 / n_euler_steps) * x * velocity / norm

    return predictions

# Define likelihood function. This returns a function - a probability distriubtion - given an input measurement
def likelihood_func(measurement):

    # Define likelihood function for this measurement
    def likelihood(location):
        return np.exp(-0.5 * (location[0]-measurement)**2 / (measurement_sigma**2) )

    # Return likelihood function
    return likelihood

# Take a noisy measurement of the ground truth
def take_measurement(ground_truth):
    return np.random.normal(ground_truth, measurement_sigma)

# Sample from the prior distribution defined for the object
def sample_prior(n_samples):
    sample_mean = np.array(prior_mean)
    sample_covariance = np.diag([prior_covariance] * 2)
    return np.random.multivariate_normal(sample_mean, sample_covariance, size=n_samples)

---

## Performing a Particle Filer

The idea behind the particle filter is very simple. We run a set of parallel simulations, known in the jargon as *particles*, according to the dynamics model we have set up. The initial states are sampled from the prior distribution we established, and their states are updated according to the dynamics model. At each timestep, we take a measurement using our measurement model. We use this measurement to *weight* particles which have higher probability according to our likelihood model. In the bootstrap particle filter approach implemented here, we resample the particles from the current set at each timestep according to their weights. This means that at each step, particles which are closer to the measurements survive, and inaccurate particles are effectively discarded.

At each step, we can use the (weighted) distribution of particles to estimate a probability distribution over the unknown state parameters. If the particle filter is set up correctly, this probability distribution should iteratively approximate the ground truth.


### Initialisation

First, we will randomly sample the ground truth from our prior - in this toy model, our prior is exactly correct. We will sample a set of particles from our prior for simulation.

In [36]:
ground_truth = sample_prior(1)[0]  # Sample ground truth from prior
trajectory_radius = np.sqrt(np.sum(ground_truth**2))  # Calculate trajectory radius for plotting
samples = sample_prior(n_particles)  # Sample particles from prior

### Recursive Simulation

We will now perform the particle filter algorithm. At each iteration, we have a collection of particles which form our prior distribution of the state. A measurement is then made, which is used to weight these particles. These weighted particles create a posterior distribution using the information from the measurement. The motion model is applied to the weighted particles, and they are resampled according to their (normalised) weights. This new set of particles then forms the prior distribution for the next iteration.

Each of these steps are plotted on one figure (for each iteration) to demonstrate this process. It can be seen that the posterior distribution after measurement slowly closes in on the ground truth.

In [37]:
for iter_i in range(num_iterations):

    # Create plot grid
    fig, axs = plt.subplots(2, 3, figsize=(13, 7))
    fig.suptitle(f'Circular Motion Particle Filter Demonstration\nIteration {iter_i+1}', fontsize=16)
    fig.text(0.0, 0.85, 'Prior and Posterior Distributions', ha='left', va='center', fontsize=16, color='g')

    # Plot prior distribution of particles
    axs[0][0].scatter(samples[:,0], samples[:,1], alpha=0.6, label='Particles')
    axs[0][0].set_title('Prior distribution of particles')

    # Take a measurement of the ground truth using our measurement model
    measurement = take_measurement(ground_truth[0])

    # Calculate importance weights as likelihood of measurement
    weights = np.apply_along_axis(likelihood_func(measurement), 1, samples)  # Weight for each sample
    weights[weights < 1e-10] = 0  # remove very small weights (eliminates plotting errors, realistically these are never sampled)

    # Bootstrap sample from weighted particles to calculate posterior distribution
    sample_indices = np.random.choice(len(samples), p=weights/np.sum(weights), size=n_particles)  # Select particles to resample
    resampled_particles = samples[sample_indices]  # Resample particles (with probability proportional to their normalised weights)

    # Plot weighted particles
    axs[0][1].scatter(samples[:,0], samples[:,1], c=weights, s=weights*1000, alpha=0.6)
    axs[0][1].axvline(measurement, -10, 10, color='red', linestyle='--', label='Measurement')
    axs[0][1].set_title('Weighted samples from posterior (using measurement)')

    # 2D Seaborn KDE plot of particles
    sns.kdeplot(x=samples[:, 0], y=samples[:, 1], ax=axs[0][2], fill=True, bw_method=measurement_sigma)
    sns.kdeplot(x=resampled_particles[:, 0], y=resampled_particles[:, 1], ax=axs[0][2], fill=True, bw_method=measurement_sigma)
    axs[0][2].set_title('2D KDE plot of prior & posterior distributions')
    axs[0][2].axvline(measurement, -10, 10, color='red', linestyle='--')

    # Add descriptive text describing second row of plots
    fig.text(0.0, 0.438, 'Prediction with motion model', ha='left', va='center', fontsize=16, color='g')
    plt.subplots_adjust(hspace=0.3)

    # Transform particles according to motion model
    predictions = motion_model(samples)  # Particles after motion
    unmodified_ground_truth = ground_truth  # Save unmodified ground truth to display on figure
    ground_truth = motion_model(ground_truth.reshape(1,2)).reshape(2)  # Transform ground truth

    # Plot weighted particles after motion model
    axs[1][0].scatter(predictions[:,0], predictions[:,1], c=weights, s=weights*1000, alpha=0.6)
    axs[1][0].set_title('Distribution of weighted particles after motion model')

    # Bootstrap sample from weighted particles after motion to calculate prior for next iteration
    # (could use previous resampled particles here)
    sample_indices = np.random.choice(len(samples),p=weights/np.sum(weights),size=n_particles)
    samples = predictions[sample_indices]

    # Plot resampled particles
    axs[1][1].scatter(samples[:,0], samples[:,1], alpha=0.6)
    axs[1][1].set_title('Resampled prior for next iteration')

    # 2D Seaborn KDE plot of resampled particles after motion
    sns.kdeplot(x=samples[:,0], y=samples[:,1], ax=axs[1][2], fill=True, bw_method=measurement_sigma)
    axs[1][2].set_title('2D KDE plot of resampled prior')

    # Set axis labels and limits (same for all plots to highlight convergence)
    # Plot ground truth as scatter point with actual trajectory
    for ax_row, ax_i in enumerate(axs):
        for ax_j in ax_i:
            ax_j.set_xlabel('x (m)')
            ax_j.set_ylabel('y (m)')
            ax_j.set_xlim(-10,10)
            ax_j.set_ylim(-10,10)

            # Plot ground truth scatter and circular trajectory
            if ax_row == 0:
                ax_j.scatter(unmodified_ground_truth[0], unmodified_ground_truth[1], marker='x',
                             color=light_green, s=100, label='Ground Truth')
            else:
                ax_j.scatter(ground_truth[0], ground_truth[1], marker='x',
                     color=light_green, s=100, label='Ground Truth')

            ax_j.add_patch(plt.Circle((0, 0), trajectory_radius, color=np.array(light_green)*0.8, fill=False,
                                      linestyle='--', linewidth=2, label='Trajectory'))


    # Create legend from labels in subplots
    handles1, labels1 = axs[0][0].get_legend_handles_labels()
    handles2, labels2 = axs[0][1].get_legend_handles_labels()
    handles1.insert(2, handles2[0])
    labels1.insert(2, labels2[0])
    handles = handles1 + legend_elements
    labels = labels1 + legend_labels
    fig.legend(handles, labels, loc='upper right')

    # Add descriptive text describing simulation
    simulation_text = '\n'.join([
        f'N particles = {n_particles}',
        f'N iterations = {num_iterations}',
        f'Prior Mean = {prior_mean} (m)',
        f'Prior Covariance = {prior_covariance} (m)',
        f'Measurement Error = {measurement_sigma} (m)',
    ])

    # Add descriptive text describing state of simulation
    state_text = '\n'.join([
        f'Object Velocity = {velocity} [$ms^{{-1}}]$',
        f'Ground Truth = {tuple(unmodified_ground_truth.round(3))}',
        f'Posterior Mean = {tuple(np.mean(resampled_particles, axis=0).round(3))}',
        f'Measurement = {np.round(measurement, 3)}',
        f'GT After Motion = {tuple(ground_truth.round(3))}',
    ])

    # Plot text in text boxes at top of figure
    fig.text(0.005, 0.99, simulation_text, fontsize=14, verticalalignment='top', bbox=bbox_properties)
    fig.text(0.766, 0.983, state_text, fontsize=14, verticalalignment='top', bbox=bbox_properties)

    # Adjust figure aesthetics
    plt.tight_layout()
    plt.subplots_adjust(top=0.8, hspace=0.4)
    figManager = plt.get_current_fig_manager()
    figManager.window.showMaximized()  # Full screen
    plt.show(block=True)  # Show plot, pause simulation until closed

___

## Plotting Final Posterior Distributions

We will now plot the final posterior distributions to evaluate the performance of the algorithm. The posterior mode is sometimes a better point estimate of the ground truth, though it is a little more involved to calculate this value from the posterior.

In [40]:
# Get posterior from final iteration
posterior = resampled_particles

# Matplotlib grid plot comparing posterior and ground truth
fig = plt.figure()
ax1 = plt.subplot(2,1,1)
ax2 = plt.subplot(2,2,3)
ax3 = plt.subplot(2,2,4)
axs = [[ax1], [ax2, ax3]]

# 2D Seaborn kde of posterior with ground truth scatter
sns.kdeplot(x=posterior[:, 0], y=posterior[:, 1], ax=axs[0][0], fill=True, color='orange',bw_method=measurement_sigma)
axs[0][0].set_title('Posterior KDE Plot with Ground Truth', fontsize=16)
axs[0][0].scatter(unmodified_ground_truth[0], unmodified_ground_truth[1], marker='x', c='green', s=100, label='Ground Truth')
axs[0][0].scatter(posterior.mean(axis=0)[0], posterior.mean(axis=0)[1], marker='x', c='red', s=100, label='Posterior Mean')
handles, labels = axs[0][0].get_legend_handles_labels()
handles = [legend_elements[1]] + handles
labels = ['Posterior'] + labels
axs[0][0].legend(handles, labels)
axs[0][0].set_xlabel('x (m)', fontsize=16)
axs[0][0].set_ylabel('y (m)', fontsize=16)

# 1D KDE plot of posterior x with ground truth
sns.kdeplot(posterior[:, 0], ax=axs[1][0], color='orange', bw_method=measurement_sigma, label='Posterior')
axs[1][0].axvline(unmodified_ground_truth[0], c='green', linestyle='--', label='Ground Truth')
axs[1][0].axvline(posterior.mean(axis=0)[0], c='red', linestyle='--', label='Posterior Mean')
axs[1][0].set_title('Posterior x', fontsize=16)
axs[1][0].set_xlabel('x (m)', fontsize=16)
axs[1][0].legend()

# 1D KDE plot of posterior y with ground truth
sns.kdeplot(posterior[:, 1], ax=axs[1][1], color='orange', bw_method=measurement_sigma, label='Posterior')
axs[1][1].axvline(unmodified_ground_truth[1], c='green', linestyle='--', label='Ground Truth')
axs[1][1].axvline(posterior.mean(axis=0)[1], c='red', linestyle='--', label='Posterior Mean')
axs[1][1].set_title('Posterior y', fontsize=16)
axs[1][1].set_xlabel('y (m)', fontsize=16)
axs[1][1].legend()

# Finalise figure
fig.suptitle(f'Final Posterior Distribution after {num_iterations} Iterations', fontsize=25)
figManager = plt.get_current_fig_manager()
figManager.window.showMaximized()  # Full screen
plt.tight_layout()
plt.show(block=True)