In [1]:
import torch
from torch.utils.tensorboard import SummaryWriter

2024-08-30 07:58:05.632509: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


# This is part of realization dynamic system

In [2]:
def generate_parameters(x_N: int, z_N: int, T: int):
    # parameters of the system
    F = torch.rand(x_N, x_N) / 10
    B = torch.rand(x_N, x_N) / 10
    H = torch.rand(z_N, x_N) / 10
    U = torch.rand(x_N, T)

    X_t = torch.rand(x_N, T)
    X_clear = X_t.clone()

    Z_t = torch.rand(z_N, T)
    Z_clear = Z_t.clone()

    return F, B, H, U, X_t, X_clear, Z_t, Z_clear

In [3]:
def system(F: torch.Tensor, B: torch.Tensor, H: torch.Tensor, U: torch.Tensor, X_t: torch.Tensor, X_clear: torch.Tensor, Z_t: torch.Tensor, Z_clear: torch.Tensor, x_N: int, z_N: int, T: int):
    # result with and without noise
    for t in range(1, T):
        X_clear[:, t] = torch.matmul(F, X_t[:, t-1]) + torch.matmul(B, U[:, t-1])
        X_t[:, t] =  X_clear[:, t] + torch.randn(x_N) / 10
        Z_clear[:, t] = torch.matmul(H, X_t[:, t])
        Z_t[:, t] = Z_clear[:, t] + torch.randn(z_N) / 10
    return X_clear, Z_clear, X_t, Z_t

In [4]:
writer = SummaryWriter('./logs/SourceDatas')

x_N, z_N, T = 3, 2, 100

F, B, H, U, X_t, X_clear, Z_t, Z_clear = generate_parameters(x_N, z_N, T)

X_clear, Z_clear, X_t, Z_t = system(F, B, H, U, X_t, X_clear, Z_t, Z_clear, x_N, z_N, T)

# drawing clear datas for X
for i in range(X_clear.shape[0]):
    for t in range(X_clear.shape[1]):
        writer.add_scalar('Clear_X_Trajectory {}'.format(i), X_clear[i, t], t)

# drawing noisy datas for X
for i in range(X_t.shape[0]):
    for t in range(X_t.shape[1]):
        writer.add_scalar('Noisy_X_Trajectory {}'.format(i), X_t[i, t], t)

# drawing clear datas for Z
for i in range(Z_clear.shape[0]):
    for t in range(Z_clear.shape[1]):
        writer.add_scalar('Clear_Z_Trajectory {}'.format(i), Z_clear[i, t], t)

# drawing noisy datas for Z
for i in range(Z_t.shape[0]):
    for t in range(Z_t.shape[1]):
        writer.add_scalar('Noisy_Z_Trajectory {}'.format(i), Z_t[i, t], t)

# This is part of realization Kalman filter

In [5]:
class KalmanFilter:
    def __init__(self, F: torch.Tensor, B: torch.Tensor, H: torch.Tensor, P: torch.Tensor, x_N: int, z_N: int, Q: torch.Tensor, R: torch.Tensor):
        # dimensions        
        self.x_N = x_N
        self.z_N = z_N

        # parameters of Prediction stage:
        self.x = None
        self.F = F
        self.B = B
        self.P = P
        self.Q = torch.eye(self.x_N) * Q

        # parameters of Update stage:
        self.H = H
        self.R = torch.eye(self.z_N) * R

    def predict(self, x_k_1_k_1: torch.Tensor, u_k_1: torch.Tensor):
        self.x = torch.matmul(self.F, x_k_1_k_1) + torch.matmul(self.B, u_k_1)
        self.P = torch.matmul(torch.matmul(self.F, self.P), self.F.t()) + self.Q

    def update(self, z_k: torch.Tensor):
        S = torch.matmul(torch.matmul(self.H, self.P), self.H.t()) + self.R
        K = torch.matmul(torch.matmul(self.P, self.H.t()), torch.inverse(S))
        self.x = self.x + torch.matmul(K, (z_k - torch.matmul(self.H, self.x)))
        self.P = self.P - torch.matmul(torch.matmul(K, self.H), self.P)
        z = torch.matmul(self.H, self.x)
        return self.x, z

# This is part of testing Kalman Filter

In [6]:
# Tensors of results
x_filtered = torch.zeros(x_N, T)
z_filtered = torch.zeros(z_N, T)

x_filtered[:, 0] = X_t[:, 0]
z_filtered[:, 0] = Z_t[:, 0]

# Defining parameters P, Q, R
P = torch.eye(x_N) * 0.1
Q = torch.eye(x_N) * 0.1
R = torch.eye(z_N) * 0.1

kalman = KalmanFilter(F, B, H, P, x_N, z_N, Q, R)

for t in range(1, T):
    kalman.predict(X_t[:, t-1], U[:, t-1])
    x_filtered[:, t], z_filtered[:, t] = kalman.update(Z_t[:, t])

In [7]:
# drawing filtered datas for X and Z
writer = SummaryWriter('./logs/KalmanFilter')

# X_clear, X_t, x_filtered
# Z_clear, Z_t, z_filtered
for i in range(Z_clear.shape[0]):
    for t in range(Z_clear.shape[1]):
        writer.add_scalars('Z_Trajectory {}'.format(i), {'Clear': Z_clear[i, t], 'Noisy': Z_t[i, t], 'Filtered': z_filtered[i, t]}, t)

for i in range(X_clear.shape[0]):
    for t in range(X_clear.shape[1]):
        writer.add_scalars('X_Trajectory {}'.format(i), {'Clear': X_clear[i, t], 'Noisy': X_t[i, t], 'Filtered': x_filtered[i, t]}, t)