In [1]:
import matplotlib.pyplot as plt
import seaborn as sns
import polars as pl
import numpy as np
from numpy.typing import NDArray
from typing import Sequence, Any

In [None]:
df = pl.read_csv("data.csv")
with_date = df.with_columns([pl.col("time").str.to_datetime("%Y-%m-%d %H:%M:%S%.f", strict=True).alias("time")])
with_date.describe()

In [None]:
# calculate the average interval for time
t = with_date.select("time").with_columns([pl.col("time").diff().alias("diff")])
t.describe()

In [None]:
xs = df["x"].to_numpy()
ys = df["y"].to_numpy()


def plot_lines(xs: Sequence[Any], ys: Sequence[Any]):
    assert len(xs) == len(ys)
    c = len(xs)
    colors = plt.cm.viridis(np.linspace(0, 1, c))
    ax = plt.gca()
    ax.set_xlim([0, 100])
    ax.set_ylim([0, 100])
    # set the origin to the upper left
    ax.invert_yaxis()
    # plot line with color gradient
    for i in range(c):
        # plt.plot(xs[i - 1:i + 1], ys[i - 1:i + 1], c=colors[i - 1], linewidth=0.5)
        plt.plot(xs[i - 1:i + 1], ys[i - 1:i + 1], c=colors[i - 1], marker="o", markersize=5, linewidth=0.5)
    plt.show()


plot_lines(xs, ys)

In [None]:
sns.histplot(xs, kde=True)
sns.histplot(ys, kde=True)
plt.xlim(0, 100)

In [None]:
class ExtremeFilter:
    WINDOW_SIZE = 10
    # shape should be (n, 2) where n <= WINDOW_SIZE
    _window: NDArray = np.empty((0, 2))
    # shape should be (m, 2) where m >= 0
    _pts_inbound: NDArray = np.empty((0, 2))

    def is_extreme(self, x: float | int, xs: NDArray, threshold: float = 10.0):
        # use IQR
        if len(xs) < self.WINDOW_SIZE:
            return False
        q1 = np.percentile(xs, 25)
        q3 = np.percentile(xs, 75)
        iqr = q3 - q1
        if iqr == 0 and abs(x - q1) < threshold:
            return False
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        return x < lower_bound or x > upper_bound

    def append_point(self, x: float | int, y: float | int):
        pt = np.array([[x, y]])
        self._window = np.vstack([self._window, pt])
        if self._window.shape[0] > self.WINDOW_SIZE:
            self._window = self._window[-self.WINDOW_SIZE:]
        xs = self._window[:, 0]
        ys = self._window[:, 1]
        if not any([self.is_extreme(x, xs), self.is_extreme(y, ys)]):
            self._pts_inbound = np.vstack([self._pts_inbound, pt])

    @property
    def window(self):
        return self._window

    @property
    def inbound(self):
        return self._pts_inbound



In [None]:
xs_data = df["x"].to_numpy()
ys_data = df["y"].to_numpy()

f = ExtremeFilter()
for x, y in zip(xs_data, ys_data):
    f.append_point(x, y)
f.inbound

In [None]:
plot_lines(f.inbound[:, 0], f.inbound[:, 1])

In [None]:
pts = np.array([xs, ys]).T
pts

In [None]:
# assume time is constant (unit 1) 
# this is our velocity vector
vel = np.diff(pts, axis=0)
vel

In [None]:
vel_mag = np.linalg.norm(vel, axis=1)
vel_mag_norm = vel_mag / np.max(vel_mag)

In [None]:
sns.histplot(vel_mag, kde=True)

In [None]:
# sns.histplot(vel_mag[vel_mag <= 10], kde=True)
sns.histplot(vel_mag[vel_mag > 10], kde=True)


In [None]:
def plot_vel(x, y, u, v, norm=None, scale=1):
    q = plt.quiver(x, y, u, v, norm, scale_units='xy', angles='xy', scale=scale, cmap="viridis")
    if norm is not None:
        plt.colorbar(q, label="Velocity magnitude")
    ax = plt.gca()
    ax.set_xlim([0, 100])
    ax.set_ylim([0, 100])
    ax.invert_yaxis()


plot_vel(pts[:-1, 0], pts[:-1, 1], vel[:, 0], vel[:, 1], vel_mag_norm, scale=6)

In [None]:
pts_v = np.hstack([pts[:-1], vel, vel_mag_norm.reshape(-1, 1)])
BIG_THRES = 0.6
big = pts_v[np.where(vel_mag_norm >= BIG_THRES)]
display(np.shape(big))
plot_vel(big[:, 0], big[:, 1], big[:, 2], big[:, 3], big[:, 4], scale=4)

In [None]:
indexes = np.where(vel_mag_norm >= BIG_THRES)
indexes

In [None]:
small = pts_v[np.where(vel_mag_norm < BIG_THRES)]
display(np.shape(small))
plot_vel(small[:, 0], small[:, 1], small[:, 2], small[:, 3], small[:, 4], scale=0.75)

In [None]:
# from pykalman import KalmanFilter
# pykalman is broken 
# https://github.com/pykalman/pykalman/issues/106

from filterpy.kalman import KalmanFilter

# https://github.com/rlabbe/filterpy/issues/223

kf = KalmanFilter(dim_x=2, dim_z=2)
kf.x = np.array([xs[0], ys[0]])
kf.F = np.array([[1, 0],
                 [0, 1]])
kf.H = np.array([[1, 0],
                 [0, 1]])
kf.P *= 10
kf.R = 10
kf.Q = 10

In [None]:
# predict
preds = np.empty((0, 2))
inn = np.empty((0, 2))
inn_norm = np.empty(0)
for x, y in pts:
    measurement = np.array([x, y])
    kf.predict()
    preds = np.vstack([preds, kf.x])
    # innovation = measurement - H * predicted_state
    # innovation is not always reliable
    innovation = measurement - kf.H @ kf.x
    inn_mag = np.linalg.norm(innovation)
    inn_norm = np.append(inn_norm, inn_mag)
    inn = np.vstack([inn, innovation])
    INN_THRES = 35
    SCALE_FACTOR = 100
    kf.update(measurement)

kf_xs = preds[:, 0]
kf_ys = preds[:, 1]

display(np.shape(preds))
plot_lines(kf_xs, kf_ys)
# preds

In [None]:
inn_df = pl.DataFrame({"inn_x": inn[:, 0], "inn_y": inn[:, 1], "inn_mag": inn_norm})
inn_df

## Hysteresis
Implement hysteresis into the decision-making process. This means using two thresholds instead of one: a higher threshold to trigger a change, and a lower threshold to return to the previous state. This can prevent the system from toggling back and forth near the boundary.


## Particle Filter

In [None]:
# x, y, x_vel, y_vel
PARTICLE_COUNT = 3000
SPEED = 2
particles = np.empty((PARTICLE_COUNT, 4), dtype=float)
particles[:, 0] = np.random.uniform(0, 100, PARTICLE_COUNT)
particles[:, 1] = np.random.uniform(0, 100, PARTICLE_COUNT)
particles[:, 2] = np.random.uniform(-SPEED, SPEED, PARTICLE_COUNT)
particles[:, 3] = np.random.uniform(-SPEED, SPEED, PARTICLE_COUNT)

In [None]:
plt.scatter(particles[:, 0], particles[:, 1], s=1)

In [None]:
def predict(particles: NDArray, dt: float = 1.0, speed_std: float = 1.0):
    # add noise
    particles[:, 0] += (particles[:, 2] + np.random.normal(0, speed_std, PARTICLE_COUNT)) * dt
    particles[:, 1] += (particles[:, 3] + np.random.normal(0, speed_std, PARTICLE_COUNT)) * dt
    return particles

We do the same with our particles. Each particle has a position and a weight which estimates how well it matches the
measurement. Normalizing the weights, so they sum to one turns them into a probability distribution.

In [None]:
import scipy


def update(particles: NDArray, weights: NDArray, x: float, y: float) -> NDArray:
    MEASUREMENT_NOISE = 10
    distance = np.linalg.norm(particles[:, :2] - np.array([x, y]), axis=1)
    weights *= scipy.stats.norm(0, MEASUREMENT_NOISE).pdf(distance)
    weights += 1.e-300  # avoid round-off to zero
    weights /= sum(weights)  # normalize
    return weights

We don't resample at every epoch. For example, if you received no new measurements you have not received any information from which the resample can benefit. We can determine when to resample by using something called the *effective N*, which approximately measures the number of particles which meaningfully contribute to the probability distribution. The equation for this is

$$\hat{N}_\text{eff} = \frac{1}{\sum w^2}$$

and we can implement this in Python with

In [None]:
from random import random


def simple_resample(particles, weights):
    N = len(particles)
    cumulative_sum = np.cumsum(weights)
    cumulative_sum[-1] = 1.  # avoid round-off error
    indexes = np.searchsorted(cumulative_sum, random(N))

    # resample according to indexes
    particles[:] = particles[indexes]
    weights.fill(1.0 / N)


def neff(weights):
    return 1. / np.sum(np.square(weights))

If $\hat{N}_\text{eff}$ falls below some threshold it is time to resample. A useful starting point is $N/2$, but this varies by problem. It is also possible for $\hat{N}_\text{eff} = N$, which means the particle set has collapsed to one point (each has equal weight). It may not be theoretically pure, but if that happens I create a new distribution of particles in the hopes of generating particles with more diversity. If this happens to you often, you may need to increase the number of particles, or otherwise adjust your filter. We will talk more of this later.

In [None]:
def resample_from_index(particles, weights, indexes):
    particles[:] = particles[indexes]
    weights.resize(len(particles))
    weights.fill(1.0 / len(weights))


# https://filterpy.readthedocs.io/en/latest/monte_carlo/resampling.html
from filterpy.monte_carlo import systematic_resample

In [None]:
weights = np.ones(PARTICLE_COUNT) / PARTICLE_COUNT
alpha = 0.2
if PARTICLE_COUNT > 5000:
    alpha *= np.sqrt(5000) / np.sqrt(PARTICLE_COUNT)

In [None]:
def estimate(particles, weights):
    """returns mean and variance of the weighted particles"""
    pos = particles[:, 0:2]
    mean = np.average(pos, weights=weights, axis=0)
    var = np.average((pos - mean) ** 2, weights=weights, axis=0)
    return mean, var

In [None]:
c = 152
from time import time
start = time()
particles = predict(particles)
weights = update(particles, weights, xs[c], ys[c])
plt.scatter(particles[:, 0], particles[:, 1], s=1, alpha=alpha)
plt.scatter(xs[c], ys[c], color='r')
axis = plt.gca()
axis.set_xlim([0, 100])
axis.set_ylim([0, 100])
axis.invert_yaxis()
# resample if too few effective particles
N = PARTICLE_COUNT
if neff(weights) < N / 2:
    indexes = systematic_resample(weights)
    resample_from_index(particles, weights, indexes)
    assert np.allclose(weights, 1 / N)
mean, var = estimate(particles, weights)
display(time() - start)
plt.scatter(mean[0], mean[1], color='g', alpha=0.5)

In [None]:
import matplotlib.pyplot as plt
from ipywidgets import interact, IntSlider
import numpy as np

In [None]:
plt.hist(weights, bins=30)

In [None]:
from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \
                                               ConstantVelocity
from stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState
from stonesoup.types.detection import TrueDetection
from stonesoup.types.detection import Clutter
from stonesoup.models.measurement.linear import LinearGaussian

transition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.05),
                                                          ConstantVelocity(0.05)])
measurement_model = LinearGaussian(ndim_state=4, mapping=(0, 2), noise_covar=np.diag([0.75, 0.75]))


In [None]:
from stonesoup.predictor.kalman import KalmanPredictor
from stonesoup.updater.kalman import KalmanUpdater

predictor = KalmanPredictor(transition_model)

updater = KalmanUpdater(measurement_model)

In [None]:
from stonesoup.hypothesiser.distance import DistanceHypothesiser
from stonesoup.measures import Mahalanobis
from stonesoup.dataassociator.neighbour import GlobalNearestNeighbour

hypothesiser = DistanceHypothesiser(predictor, updater, measure=Mahalanobis(), missed_distance=3)

data_associator = GlobalNearestNeighbour(hypothesiser)

In [None]:
from stonesoup.deleter.error import CovarianceBasedDeleter
from stonesoup.types.state import GaussianState
from stonesoup.initiator.simple import MultiMeasurementInitiator

deleter = CovarianceBasedDeleter(covar_trace_thresh=4)

initiator = MultiMeasurementInitiator(
    prior_state=GaussianState([[0], [0], [0], [0]], np.diag([0, 1, 0, 1])),
    measurement_model=measurement_model,
    deleter=deleter,
    data_associator=data_associator,
    updater=updater,
    min_points=2,
    )

In [None]:
from datetime import datetime, timedelta

In [None]:
from stonesoup.types.detection import Detection
detections = np.array([xs, ys])
# for n, v in enumerate(detections.T):
#     measurement = Detection(v)
#     print(v)


tracks, all_tracks = set(), set()
start_time = datetime.now().replace(microsecond=0)
all_measurements = [set([Detection(v, timestamp=start_time + timedelta(seconds=n))]) for n, v in enumerate(detections.T)]
time_steps = []
for n, measurements in enumerate(all_measurements):
    time_steps.append(start_time + timedelta(seconds=n))
    # Calculate all hypothesis pairs and associate the elements in the best subset to the tracks.
    hypotheses = data_associator.associate(tracks,
                                           measurements,
                                           start_time + timedelta(seconds=n))
    associated_measurements = set()
    for track in tracks:
        hypothesis = hypotheses[track]
        if hypothesis.measurement:
            post = updater.update(hypothesis)
            track.append(post)
            associated_measurements.add(hypothesis.measurement)
        else:  # When data associator says no detections are good enough, we'll keep the prediction
            track.append(hypothesis.prediction)

    # Carry out deletion and initiation
    tracks -= deleter.delete_tracks(tracks)
    tracks |= initiator.initiate(measurements - associated_measurements,
                                 start_time + timedelta(seconds=n))
    all_tracks |= tracks

In [None]:
# all_measurements
len(all_measurements)

In [None]:
from stonesoup.plotter import AnimatedPlotterly
plotter = AnimatedPlotterly(time_steps, tail_length=0.3)
plotter.plot_tracks(all_tracks, [0, 2], uncertainty=True)
# plotter.fig.update_layout(xaxis=dict(range=[0, 100]), yaxis=dict(range=[0, 100]))
plotter.fig

In [None]:
import plotly.graph_objects as go

fig = go.Figure()

times = list(range(len(xs)))
# Get a list of colors from the 'viridis' colormap in Matplotlib
viridis_cmap = plt.cm.get_cmap('viridis', len(times))
viridis_colors = [viridis_cmap(i) for i in range(len(times))]

# Convert RGBA colors to RGB format as a hex string for Plotly
viridis_hex = ['rgba(' + ','.join([f'{int(c*255)}' for c in color]) + ')' for color in viridis_colors]

for i, t in enumerate(times):
    fig.add_trace(go.Scatter(
        x=xs[:i+1], 
        y=ys[:i+1], 
        marker=dict(color=viridis_hex[:i+1]),
        mode='markers', 
        visible=False))

fig.data[0].visible = True

fig.update_layout(xaxis=dict(range=[0, 100]), yaxis=dict(range=[0, 100]))
steps = []
for i, t in enumerate(times):
    step = dict(
        method="update",
        args=[{"visible": [False] * len(fig.data)},
              {"title": f"Time: {t}"}],  # layout attribute
    )
    step["args"][0]["visible"][i] = True  # Toggle i'th trace to "visible"
    steps.append(step)

slider = [dict(
  active=0,
    currentvalue={"prefix": "Time: "},
    pad={"t": 50}, # top padding
    steps=steps
)]

fig.update_layout(sliders=slider)
fig.show()