In [7]:
import numpy as np
import matplotlib.pyplot as plt
from tfilterpy.module_functions import KalmanFilter

In [9]:
data = np.array([[1, 1, 1],
                [2, 2, 2],
                [3, 3, 3],
                [4, 4, 4],
                [5, 5, 5],
                [6, 6, 6],
                [7, 7, 7],
                [8, 8, 8],
                [9, 9, 9],
                [10, 10, 10]])

In [10]:
F  = np.array([[1, 0.1, 0.01],
            [0, 1, 0.1],
            [0, 0, 1]])
H  = np.array([[1, 0, 0],
            [0, 1, 0],
            [0, 0, 1]])
Q  = np.eye(3) * 0.01
R  = np.eye(3) * 0.1
x0 = np.array([0, 0, 0])
P0 = np.eye(3) *0.1165161

In [11]:
# Initialize the Kalman filter
kf = KalmanFilter(F, H, Q, R, x0, P0)

# Run the Kalman filter on the data
state_estimates = kf.run(data)

In [12]:
state_estimates

array([[0.58343517, 0.60375998, 0.58105494],
       [1.23038307, 1.27315939, 1.18235292],
       [1.97116322, 2.03289095, 1.83424543],
       [2.8091638 , 2.88147217, 2.54193243],
       [3.73797143, 3.80957745, 3.30274215],
       [4.74630461, 4.80475675, 4.11014155],
       [5.82117601, 5.85409372, 4.95609638],
       [6.94990705, 6.94573035, 5.83259004],
       [8.12120247, 8.06957069, 6.7324676 ],
       [9.3255598 , 9.21744614, 7.64978288]])

In [18]:
import numpy as np

class MotionModel:
    def __init__(self, alpha, delta_t):
        self.alpha = alpha
        self.delta_t = delta_t

    def sample(self, particle):
        noise = np.random.normal(0, np.sqrt(self.alpha[0]*particle[0]**2 +
                                            self.alpha[1]*particle[1]**2 +
                                            self.alpha[2]*particle[2]**2 +
                                            self.alpha[3]*particle[3]**2))
        x = particle[0] + (particle[2]/particle[3])*(np.sin(particle[3]*(self.delta_t + particle[3]/2)) - np.sin(particle[3]*particle[3]/2))/self.alpha[3] + noise*np.sin(particle[3]*(self.delta_t + particle[3]/2))
        y = particle[1] + (particle[2]/particle[3])*(-np.cos(particle[3]*(self.delta_t + particle[3]/2)) + np.cos(particle[3]*particle[3]/2))/self.alpha[3] + noise*np.cos(particle[3]*(self.delta_t + particle[3]/2))
        velocity = particle[2] + noise*self.alpha[4]*self.delta_t
        yaw_rate = particle[3] + noise*self.alpha[5]*self.delta_t

        return np.array([x, y, velocity, yaw_rate])

# Create an instance of MotionModel
alpha = [0.01, 0.01, 0.01, 0.01, 0.01, 0.01]
delta_t = 0.1
motion_model = MotionModel(alpha, delta_t)

# Sample a new particle state from the motion model
particle = np.array([1, 2, 3, 4])
new_particle = motion_model.sample(particle)
print("New particle state:", new_particle)


New particle state: [-8.78541111 29.83876633  3.00037625  4.00037625]


In [24]:
import numpy as np

# Define state and observation dimensions
state_dim = 2
obs_dim = 1

# Generate random observation matrix and covariance matrix
H = np.random.rand(obs_dim, state_dim)
R = np.random.rand(obs_dim, obs_dim)

# Define an observation model
observation_model = ObservationModel(state_dim, obs_dim, H, R)

# Generate some random data
state = np.random.rand(state_dim)
observation = np.random.rand(obs_dim)

# Compute the likelihood of the observation given the state
likelihood = observation_model.likelihood(state, observation)

print(f"Likelihood of observation given state: {likelihood}")


Likelihood of observation given state: 0.44153214202960805


In [28]:
class SystematicResampling:
    def __init__(self):
        pass

    @staticmethod
    def resample(particles, weights):
        M = len(particles)
        cum_weights = np.cumsum(weights)
        cum_weights[-1] = 1.0  # make sure weights sum to 1

        # draw random numbers and add equally spaced offsets
        # to obtain resampled indices
        r = np.random.uniform(0, 1/M)
        resampled_indices = np.zeros(M, dtype=int)
        for i in range(M):
            u = r + i/M
            j = np.searchsorted(cum_weights, u)
            resampled_indices[i] = j

        # resample particles and weights
        new_particles = particles[resampled_indices]
        new_weights = np.ones(M) / M

        return new_particles, new_weights


In [46]:
import numpy as np

class ParticleFilter:
    
    def __init__(self, num_particles, state_dim, motion_model, observation_model, resampling_method):
        self.num_particles = num_particles
        self.state_dim = state_dim
        self.motion_model = motion_model
        self.observation_model = observation_model
        self.resampling_method = resampling_method
        
        self.particles = np.zeros((self.num_particles, self.state_dim))
        self.weights = np.ones(self.num_particles) / self.num_particles
    
    def predict(self):
        new_particles = []
        for particle in self.particles:
            new_particles.append(self.motion_model.sample(particle))
        self.particles = np.array(new_particles)
    
    def update(self, observation):
        weights = []
        for particle in self.particles:
            weights.append(self.observation_model.likelihood(observation, particle))
        weights = np.array(weights)
        self.weights = weights / np.sum(weights)
    
    def resample(self):
        indices = self.resampling_method.resample(self.weights)
        self.particles = self.particles[indices]
        self.weights = np.ones(self.num_particles) / self.num_particles
    
    def run(self, observations):
        state_estimates = []
        for observation in observations:
            self.predict()
            self.update(observation)
            state_estimate = self.compute_state_estimate()
            state_estimates.append(state_estimate)
            self.resample()
        return np.array(state_estimates)
    
    def compute_state_estimate(self):
        return np.average(self.particles, weights=self.weights, axis=0)


In [48]:
import numpy as np



# Define resampling method
resampling_method = SystematicResampling

# Define initial particles and weights
num_particles = 100
state_dim=2
initial_particles = np.random.normal(size=(num_particles, 3))
initial_weights = np.ones(num_particles) / num_particles

# Create particle filter object
pf = ParticleFilter(num_particles, state_dim, motion_model, observation_model, resampling_method)

# Generate measurements
true_state = np.array([0.0, 0.0, 0.0])
measurements = []
for t in range(50):
    true_state = motion_model_fn(true_state, np.array([0.5, 0.1]))
    obs = np.random.normal(loc=true_state[:2], scale=0.5)
    measurements.append(obs)

# Run particle filter
estimated_states = []
for t in range(len(measurements)):
    pf.predict()
    pf.update(measurements[t])
    estimated_state = pf.get_state_estimate()
    estimated_states.append(estimated_state)
    
# Plot estimated trajectory and measurements
import matplotlib.pyplot as plt

true_trajectory = [motion_model_fn(true_state, np.array([0.5, 0.1])) for t in range(len(measurements))]
true_trajectory = np.array(true_trajectory)

estimated_states = np.array(estimated_states)
measurements = np.array(measurements)

plt.plot(true_trajectory[:,0], true_trajectory[:,1], label='True Trajectory')
plt.plot(estimated_states[:,0], estimated_states[:,1], label='Estimated Trajectory')
plt.scatter(measurements[:,0], measurements[:,1], label='Measurements')
plt.legend()
plt.show()


IndexError: index 2 is out of bounds for axis 0 with size 2

In [34]:
np.zeros((num_particles, state_dim))

array([[0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.

In [1]:
import numpy as np
import dask.array as da
import matplotlib.pyplot as plt

# Import the DaskKalmanFilter from TFilterPy
from TFilterPy.state_estimation.linear_filters import DaskKalmanFilter

# ------------------------------------------------------------------
# Define the system model parameters for a simple constant velocity model
# ------------------------------------------------------------------
F = np.array([[1, 1],
              [0, 1]])  # State transition matrix

H = np.array([[1, 0]])  # Observation matrix

# Noise covariances
Q = np.eye(2) * 0.01  # Process noise covariance
R = np.eye(1) * 0.1   # Observation noise covariance

# Initial state vector
initial_state = np.array([0, 1])

# Optionally convert some arrays to Dask arrays for distributed computation
F_dask = da.from_array(F, chunks=F.shape)
H_dask = da.from_array(H, chunks=H.shape)
Q_dask = da.from_array(Q, chunks=Q.shape)
R_dask = da.from_array(R, chunks=R.shape)

# ------------------------------------------------------------------
# Instantiate the Kalman Filter
# ------------------------------------------------------------------
# Using NumPy arrays:
kf_numpy = DaskKalmanFilter(
    state_transition_matrix=F,
    observation_matrix=H,
    process_noise_cov=Q,
    observation_noise_cov=R,
    initial_state=initial_state
)

# Alternatively, instantiate using Dask arrays:
kf_dask = DaskKalmanFilter(
    state_transition_matrix=F_dask,
    observation_matrix=H_dask,
    process_noise_cov=Q_dask,
    observation_noise_cov=R_dask,
    initial_state=da.from_array(initial_state, chunks=initial_state.shape)
)

print("Filter instances created.")

# ------------------------------------------------------------------
# Simulate a series of measurements
# ------------------------------------------------------------------
num_steps = 50
true_states = []
measurements = []

# Starting true state
x = initial_state.copy()
np.random.seed(42)  # For reproducibility

for _ in range(num_steps):
    # Evolve the true state using the state transition model
    x = F @ x
    true_states.append(x.copy())
    
    # Simulate a measurement of the position with added noise
    measurement = H @ x + np.random.normal(0, np.sqrt(R[0, 0]))
    measurements.append(measurement.item())

print("Simulation complete.")

# ------------------------------------------------------------------
# Run the filter over the simulated measurements
# ------------------------------------------------------------------
estimated_states = []

# This loop assumes that your DaskKalmanFilter class implements the following methods:
# - predict() : to perform the prediction step
# - update(z) : to update the state with measurement z
# - state     : to retrieve the current state estimate
for z in measurements:
    kf_numpy.predict()
    kf_numpy.update(z)
    estimated_states.append(kf_numpy.state.copy())

print("Filtering complete.")

# ------------------------------------------------------------------
# Visualize the results
# ------------------------------------------------------------------
# Extract the positions (assumed to be the first element in the state vector)
true_positions = [s[0] for s in true_states]
estimated_positions = [s[0] for s in estimated_states]

plt.figure(figsize=(10, 6))
plt.plot(true_positions, label='True Position', marker='o')
plt.plot(estimated_positions, label='Estimated Position', marker='x')
plt.xlabel('Time Step')
plt.ylabel('Position')
plt.title('True vs. Estimated Position')
plt.legend()
plt.grid(True)
plt.show()


TypeError: __init__() missing 1 required positional argument: 'initial_covariance'