In [1]:
import sys
import os

notebook_path = os.getcwd() 
project_root = os.path.dirname(notebook_path)

if project_root not in sys.path:
    sys.path.insert(0, project_root)

In [2]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
from copy import deepcopy

In [3]:
from models.filter_trajectory_ensemble import trajectory_StateBayesianKalmanNet
from systems.DynamicSystem import DynamicSystem
from training.utils import calculate_anees_vectorized, generate_data
import training.trainer as trainer

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Používané zařízení: {device}")

Používané zařízení: cuda


In [5]:
import torch
from math import pi

# =================================================================================
# KROK 1: DEFINICE PARAMETRŮ PRO NELINEÁRNÍ SYSTÉM ("Synthetic")
# =================================================================================

state_dim_nl =1
obs_dim_nl = 1

h_true_nonlinear = lambda x: 0.5 * x
f_true_nonlinear = lambda x: 0.9 * x - 0.05 * x**3 

Q_true = torch.tensor([[0.1]])
R_true = torch.tensor([[0.1]])

Ex0_true = torch.tensor([[1.0]])
P0_true = torch.tensor([[0.5]])

sys_true = DynamicSystem(state_dim=state_dim_nl,obs_dim=obs_dim_nl,f=f_true_nonlinear,h= h_true_nonlinear,Q= Q_true,R= R_true,Ex0= Ex0_true,P0= P0_true,device=device)

#  Nepřesná dynamika (lineární aproximace nelineární funkce f)
f_model_nonlinear = lambda x: 0.9 * x 
h_model_nonlinear = h_true_nonlinear
# Nepřesná znalost šumu (podcenění Q)
Q_model = torch.tensor([[0.01]])
R_model = torch.tensor([[0.2]])
# Nepřesný počáteční odhad (pro EKF)
Ex0_model = torch.tensor([[0.5]])
P0_model = torch.tensor([[0.5]])

# Sestavení nepřesného modelu pro filtry
# Funkce h, R jsou pro jednoduchost stejné, ale f, Q, Ex0, P0 jsou jiné
sys_model = DynamicSystem(state_dim=state_dim_nl,obs_dim=obs_dim_nl,f=f_model_nonlinear, h=h_model_nonlinear, Q=Q_model, R=R_model,Ex0= Ex0_model,P0= P0_model,device=device)
# sys_model = Systems.NonlinearSystem(f_true_nonlinear, h_true_nonlinear, Q_true, R_true, Ex0_model, P0_model)


# =================================================================================
# KROK 2: INICIALIZACE OBJEKTŮ SYSTÉMŮ
# =================================================================================
# Ujisti se, že proměnná `device` je definována
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print("\nInicializuji 2D 'Synthetic' nelineární systém...")

# Reálný systém, který bude generovat data
# sys_true = DynamicSystem(
#     state_dim=state_dim_nl, obs_dim=obs_dim_nl,
#     Ex0=Ex0_true_nl, P0=P0_true_nl,
#     Q=Q_true_nl, R=R_true_nl,
#     f=f_true_nonlinear, h=h_true_nonlinear, # Předáváme funkce, ne matice
#     device=device
# )

# # Model, který bude používat tvůj KalmanNet (s nepřesnými parametry)
# sys_model = DynamicSystem(
#     state_dim=state_dim_nl, obs_dim=obs_dim_nl,
#     Ex0=Ex0_model_nl, P0=P0_model_nl,
#     Q=Q_model_nl, R=R_model_nl,
#     f=f_model_nonlinear, h=h_model_nonlinear, # Předáváme funkce, ne matice
#     device=device
# )

print("... Nelineární systém inicializován.")


Inicializuji 2D 'Synthetic' nelineární systém...
... Nelineární systém inicializován.


In [6]:
TRAIN_SEQ_LEN = 10      # Krátké sekvence pro stabilní trénink (TBPTT)
VALID_SEQ_LEN = 20      # Stejná délka pro konzistentní validaci
TEST_SEQ_LEN = 100      # Dlouhé sekvence pro testování generalizace

NUM_TRAIN_TRAJ = 500   # Hodně trénovacích příkladů
NUM_VALID_TRAJ = 200    # Dostatek pro spolehlivou validaci
NUM_TEST_TRAJ = 100     # Pro robustní vyhodnocení

BATCH_SIZE = 8         # Dobrý kompromis

x_train, y_train = generate_data(sys_true, num_trajectories=NUM_TRAIN_TRAJ, seq_len=TRAIN_SEQ_LEN)
x_val, y_val = generate_data(sys_true, num_trajectories=NUM_VALID_TRAJ, seq_len=VALID_SEQ_LEN)
x_test, y_test = generate_data(sys_true, num_trajectories=1, seq_len=TEST_SEQ_LEN)

train_dataset = TensorDataset(x_train, y_train)
val_dataset = TensorDataset(x_val, y_val)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
import os
import random
import csv
from datetime import datetime
import pandas as pd
from copy import deepcopy

model_config = {
    "hidden_size_multiplier": 11,
    "output_layer_multiplier": 4,
    "num_gru_layers": 1,
    "init_min_dropout": 0.5,
    "init_max_dropout": 0.8
}

train_config = {
    "total_train_iter": 1200,
    "learning_rate": 1e-4,
    "clip_grad": 1.0,
    "J_samples": 20,
    "validation_period": 30,
    "logging_period": 20,
    "warmup_iterations":100 # Trénuj prvních 400 iterací jen na MSE
}

# =================================================================================
# KROK 3: SPUŠTĚNÍ JEDNOHO TRÉNINKOVÉHO BĚHU
# =================================================================================

print("="*80)
print("Spouštím jeden plnohodnotný tréninkový běh...")
print(f"Parametry modelu: {model_config}")
print(f"Parametry tréninku: {train_config}")
print("="*80)

# Nastavení seedu pro reprodukovatelnost tohoto běhu
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# Vytvoření modelu
state_bkn_knet = trajectory_StateBayesianKalmanNet(
    sys_model,
    device=device,
    **model_config
).to(device)

# Spuštění tréninku
# Používáme `run_training_session`, která vrací slovník s výsledky
results = trainer.training_session_trajectory_with_gaussian_nll_training_fcn(model=state_bkn_knet,
    train_loader=train_loader,
    val_loader=val_loader,
    device=device,
    **train_config
)

# `run_training_session` automaticky načte nejlepší model zpět,
# takže `state_bkn_knet` nyní obsahuje váhy nejlepšího modelu.
trained_model = results['final_model']

print("\n" + "="*80)
print("TRÉNINK DOKONČEN - FINÁLNÍ VÝSLEDKY Z NEJLEPŠÍHO MODELU")
print("="*80)
print(f"Nejlepší model byl nalezen v iteraci: {results['best_iter']}")
# --- Změněné klíče, aby odpovídaly return statementu ---
print(f"Nejlepší dosažený validační ANEES: {results['best_val_anees']:.4f}")
print("--- Metriky odpovídající tomuto nejlepšímu modelu ---")
print(f"  MSE na validační sadě:       {results['best_val_mse']:.4f}")
print(f"  NLL na validační sadě:       {results['best_val_nll']:.4f}")
print("="*80)

# Nyní můžeš s `trained_model` pokračovat, například ho vyhodnotit na testovací sadě.

Spouštím jeden plnohodnotný tréninkový běh...
Parametry modelu: {'hidden_size_multiplier': 11, 'output_layer_multiplier': 4, 'num_gru_layers': 1, 'init_min_dropout': 0.5, 'init_max_dropout': 0.8}
Parametry tréninku: {'total_train_iter': 1200, 'learning_rate': 0.0001, 'clip_grad': 1.0, 'J_samples': 20, 'validation_period': 20, 'logging_period': 20, 'warmup_iterations': 100}
--- Iteration [20/1200] ---
    - Total Loss: 0.1990
    - NLL: 0.0000
    - Reg: 0.0025
    - p1=0.730, p2=0.754

--- Validace v iteraci 20 ---
  Průměrný MSE: 0.2156, Průměrný ANEES: 54.7709
  >>> Nové nejlepší VALIDAČNÍ ANEES! Ukládám model. <<<
--------------------------------------------------
--- Iteration [40/1200] ---
    - Total Loss: 0.1742
    - NLL: 0.0000
    - Reg: 0.0025
    - p1=0.730, p2=0.755

--- Validace v iteraci 40 ---
  Průměrný MSE: 0.1979, Průměrný ANEES: 46.5593
  >>> Nové nejlepší VALIDAČNÍ ANEES! Ukládám model. <<<
--------------------------------------------------
--- Iteration [60/1200] 

KeyboardInterrupt: 

In [None]:
from models.StateKalmanNet import StateKalmanNet
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
import os
import random
import csv
from datetime import datetime
import pandas as pd
from copy import deepcopy
# Nastavení seedu pro reprodukovatelnost tohoto běhu
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)
state_knet = StateKalmanNet(sys_model, device=device, hidden_size_multiplier=12).to(device)
trainer.train_state_KalmanNet(
    model=state_knet, 
    train_loader=train_loader, 
    val_loader=val_loader, 
    device=device, 
    epochs=150, 
    lr=1e-4,
    early_stopping_patience=50
)



Epoch [5/150], Train Loss: 0.261543, Val Loss: 0.285456
Epoch [10/150], Train Loss: 0.147055, Val Loss: 0.139024
Epoch [15/150], Train Loss: 0.134362, Val Loss: 0.131104
Epoch [20/150], Train Loss: 0.130633, Val Loss: 0.129416
Epoch [25/150], Train Loss: 0.128198, Val Loss: 0.127930
Epoch [30/150], Train Loss: 0.127038, Val Loss: 0.126931
Epoch [35/150], Train Loss: 0.126806, Val Loss: 0.126347
Epoch [40/150], Train Loss: 0.125952, Val Loss: 0.126025
Epoch [45/150], Train Loss: 0.124824, Val Loss: 0.125797
Epoch [50/150], Train Loss: 0.124854, Val Loss: 0.125580
Epoch [55/150], Train Loss: 0.124412, Val Loss: 0.125336
Epoch [60/150], Train Loss: 0.124030, Val Loss: 0.125185
Epoch [65/150], Train Loss: 0.123918, Val Loss: 0.125011
Epoch [70/150], Train Loss: 0.124429, Val Loss: 0.124851
Epoch [75/150], Train Loss: 0.123782, Val Loss: 0.124670
Epoch [80/150], Train Loss: 0.123568, Val Loss: 0.124542
Epoch [85/150], Train Loss: 0.123779, Val Loss: 0.124426
Epoch [90/150], Train Loss: 0.12

StateKalmanNet(
  (dnn): DNN_KalmanNet(
    (input_layer): Linear(in_features=2, out_features=24, bias=True)
    (gru): GRU(24, 24)
    (output_layer): Linear(in_features=24, out_features=1, bias=True)
  )
)

In [None]:
import torch
import torch.nn.functional as F
import numpy as np
from torch.utils.data import TensorDataset, DataLoader

# ==============================================================================
# 0. PŘEDPOKLADY
# ==============================================================================
# Předpokládá se, že v proměnných níže máte již natrénované modely:
# trained_model_bkn      <-- Váš BayesianKalmanNet (z dvoufázového tréninku)
# trained_model_classic  <-- Váš StateKalmanNet (z tréninku čistě na MSE)
#
# Přejmenujme je pro srozumitelnost v tomto skriptu:
trained_model_bkn = trained_model # Upravte podle názvu vaší proměnné
trained_model_classic = state_knet       # Upravte podle názvu vaší proměnné


# ==============================================================================
# 1. KONFIGURACE TESTU
# ==============================================================================
TEST_SEQ_LEN = 300
NUM_TEST_TRAJ = 40
J_SAMPLES_TEST = 25

# ==============================================================================
# 2. PŘÍPRAVA DAT
# ==============================================================================
print(f"\nGeneruji {NUM_TEST_TRAJ} testovacích trajektorií o délce {TEST_SEQ_LEN}...")
x_test, y_test = generate_data(sys_true, num_trajectories=NUM_TEST_TRAJ, seq_len=TEST_SEQ_LEN)
test_dataset = TensorDataset(x_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)
print("Generování dat dokončeno.")

# ==============================================================================
# 3. INICIALIZACE FILTRŮ PRO POROVNÁNÍ
# ==============================================================================
ekf_mismatched = ExtendedKalmanFilter(sys_model)
ekf_ideal = ExtendedKalmanFilter(sys_true)
print("Extended Kalman Filtry inicializovány.")

# ==============================================================================
# 4. VYHODNOCOVACÍ SMYČKA
# ==============================================================================
all_x_true_cpu, all_x_hat_bkn_cpu, all_P_hat_bkn_cpu = [], [], []
all_x_hat_classic_knet_cpu = [] # Klasický KNet neprodukuje P
all_x_hat_ekf_mismatched_cpu, all_P_hat_ekf_mismatched_cpu = [], []
all_x_hat_ekf_ideal_cpu, all_P_hat_ekf_ideal_cpu = [], []

print(f"\nVyhodnocuji modely na {NUM_TEST_TRAJ} testovacích trajektoriích...")

# Důležité: Přepneme oba modely do evaluačního režimu
trained_model_bkn.eval() 
trained_model_classic.eval()

with torch.no_grad():
    for i, (x_true_seq_batch, y_test_seq_batch) in enumerate(test_loader):
        y_test_seq_gpu = y_test_seq_batch.squeeze(0).to(device)
        x_true_seq_gpu = x_true_seq_batch.squeeze(0).to(device)
        initial_state = x_true_seq_gpu[0, :].unsqueeze(0)
        
        # --- A. Vyhodnocení Bayesian KalmanNet (Trajectory-wise) ---
        ensemble_trajectories = []
        for j in range(J_SAMPLES_TEST):
            trained_model_bkn.reset(batch_size=1, initial_state=initial_state)
            current_x_hats = []
            for t in range(1, TEST_SEQ_LEN):
                y_t = y_test_seq_gpu[t, :].unsqueeze(0)
                x_filtered_t, _ = trained_model_bkn.step(y_t)
                current_x_hats.append(x_filtered_t)
            ensemble_trajectories.append(torch.cat(current_x_hats, dim=0))

        ensemble = torch.stack(ensemble_trajectories, dim=0)
        predictions_bkn = ensemble.mean(dim=0)
        diff = ensemble - predictions_bkn.unsqueeze(0)
        outer_products = diff.unsqueeze(-1) @ diff.unsqueeze(-2)
        covariances_bkn = outer_products.mean(dim=0)
        
        full_x_hat_bkn = torch.cat([initial_state, predictions_bkn], dim=0)
        P0_val = trained_model_bkn.system_model.P0.to(device)
        full_P_hat_bkn = torch.cat([P0_val.unsqueeze(0), covariances_bkn], dim=0)

        # --- B. Vyhodnocení klasického StateKalmanNet ---
        trained_model_classic.reset(batch_size=1, initial_state=initial_state)
        classic_knet_preds = []
        for t in range(1, TEST_SEQ_LEN):
            y_t = y_test_seq_gpu[t, :].unsqueeze(0)
            x_filtered_t = trained_model_classic.step(y_t)
            classic_knet_preds.append(x_filtered_t)
        
        predictions_classic_knet = torch.cat(classic_knet_preds, dim=0)
        full_x_hat_classic_knet = torch.cat([initial_state, predictions_classic_knet], dim=0)

        # --- C. Vyhodnocení EKF s nepřesným modelem ---
        Ex0_mismatched = sys_model.Ex0.to(device)
        P0_mismatched = sys_model.P0.to(device)
        ekf_mismatched_results = ekf_mismatched.process_sequence(y_test_seq_gpu, Ex0=Ex0_mismatched, P0=P0_mismatched)
        full_x_hat_ekf_mismatched = torch.cat([Ex0_mismatched.reshape(1, -1), ekf_mismatched_results['x_filtered']], dim=0)
        full_P_hat_ekf_mismatched = torch.cat([P0_mismatched.unsqueeze(0), ekf_mismatched_results['P_filtered']], dim=0)

        # --- D. Vyhodnocení EKF s perfektním modelem (Benchmark) ---
        Ex0_ideal = sys_true.Ex0.to(device)
        P0_ideal = sys_true.P0.to(device)
        ekf_ideal_results = ekf_ideal.process_sequence(y_test_seq_gpu, Ex0=Ex0_ideal, P0=P0_ideal)
        full_x_hat_ekf_ideal = torch.cat([Ex0_ideal.reshape(1, -1), ekf_ideal_results['x_filtered']], dim=0)
        full_P_hat_ekf_ideal = torch.cat([P0_ideal.unsqueeze(0), ekf_ideal_results['P_filtered']], dim=0)

        # --- E. Uložení všech výsledků na CPU ---
        min_len = min(full_x_hat_bkn.shape[0], full_x_hat_classic_knet.shape[0], full_x_hat_ekf_mismatched.shape[0], x_true_seq_gpu.shape[0])
        
        all_x_true_cpu.append(x_true_seq_gpu[:min_len].cpu())
        all_x_hat_bkn_cpu.append(full_x_hat_bkn[:min_len].cpu())
        all_P_hat_bkn_cpu.append(full_P_hat_bkn[:min_len].cpu())
        all_x_hat_classic_knet_cpu.append(full_x_hat_classic_knet[:min_len].cpu())
        all_x_hat_ekf_mismatched_cpu.append(full_x_hat_ekf_mismatched[:min_len].cpu())
        all_P_hat_ekf_mismatched_cpu.append(full_P_hat_ekf_mismatched[:min_len].cpu())
        all_x_hat_ekf_ideal_cpu.append(full_x_hat_ekf_ideal[:min_len].cpu())
        all_P_hat_ekf_ideal_cpu.append(full_P_hat_ekf_ideal[:min_len].cpu())

        print(f"Dokončena trajektorie {i + 1}/{NUM_TEST_TRAJ}...")

# ==============================================================================
# 5. FINÁLNÍ VÝPOČET A VÝPIS METRIK
# ==============================================================================
mse_bkn_list, anees_bkn_list = [], []
mse_classic_knet_list = [] # ANEES zde není
mse_ekf_mismatched_list, anees_ekf_mismatched_list = [], []
mse_ekf_ideal_list, anees_ekf_ideal_list = [], []

print("\nPočítám finální metriky pro jednotlivé trajektorie...")

with torch.no_grad():
    for i in range(NUM_TEST_TRAJ):
        x_true = all_x_true_cpu[i]
        
        # Metriky pro Bayesian KalmanNet
        x_hat_bkn = all_x_hat_bkn_cpu[i]
        P_hat_bkn = all_P_hat_bkn_cpu[i]
        mse_bkn_run = F.mse_loss(x_hat_bkn[1:], x_true[1:]).item()
        anees_bkn_run = calculate_anees_vectorized(x_true.unsqueeze(0), x_hat_bkn.unsqueeze(0), P_hat_bkn.unsqueeze(0))
        mse_bkn_list.append(mse_bkn_run)
        anees_bkn_list.append(anees_bkn_run)

        # Metriky pro klasický KalmanNet
        x_hat_classic_knet = all_x_hat_classic_knet_cpu[i]
        mse_classic_knet_run = F.mse_loss(x_hat_classic_knet[1:], x_true[1:]).item()
        mse_classic_knet_list.append(mse_classic_knet_run)
        
        # Metriky pro nepřesný EKF
        x_hat_ekf_mismatched = all_x_hat_ekf_mismatched_cpu[i]
        P_hat_ekf_mismatched = all_P_hat_ekf_mismatched_cpu[i]
        mse_ekf_mismatched_run = F.mse_loss(x_hat_ekf_mismatched[1:], x_true[1:]).item()
        anees_ekf_mismatched_run = calculate_anees_vectorized(x_true.unsqueeze(0), x_hat_ekf_mismatched.unsqueeze(0), P_hat_ekf_mismatched.unsqueeze(0))
        mse_ekf_mismatched_list.append(mse_ekf_mismatched_run)
        anees_ekf_mismatched_list.append(anees_ekf_mismatched_run)

        # Metriky pro ideální EKF
        x_hat_ekf_ideal = all_x_hat_ekf_ideal_cpu[i]
        P_hat_ekf_ideal = all_P_hat_ekf_ideal_cpu[i]
        mse_ekf_ideal_run = F.mse_loss(x_hat_ekf_ideal[1:], x_true[1:]).item()
        anees_ekf_ideal_run = calculate_anees_vectorized(x_true.unsqueeze(0), x_hat_ekf_ideal.unsqueeze(0), P_hat_ekf_ideal.unsqueeze(0))
        mse_ekf_ideal_list.append(mse_ekf_ideal_run)
        anees_ekf_ideal_list.append(anees_ekf_ideal_run)

        print(f"Traj {i+1:2d} | BKN MSE: {mse_bkn_run:.4f} | KNet MSE: {mse_classic_knet_run:.4f} | EKF-Mis MSE: {mse_ekf_mismatched_run:.4f} | EKF-Ideal MSE: {mse_ekf_ideal_run:.4f}")
        print(f"       | BKN ANEES: {anees_bkn_run:.4f} | KNet ANEES:    N/A    | EKF-Mis ANEES: {anees_ekf_mismatched_run:8.4f} | EKF-Ideal ANEES: {anees_ekf_ideal_run:8.4f}")
        print("-" * 110)

# Průměrování výsledků
avg_mse_bkn = np.mean(mse_bkn_list)
avg_anees_bkn = np.mean(anees_bkn_list)
avg_mse_classic_knet = np.mean(mse_classic_knet_list)
avg_mse_ekf_mismatched = np.mean(mse_ekf_mismatched_list)
avg_anees_ekf_mismatched = np.mean(anees_ekf_mismatched_list)
avg_mse_ekf_ideal = np.mean(mse_ekf_ideal_list)
avg_anees_ekf_ideal = np.mean(anees_ekf_ideal_list)
state_dim_for_nees = all_x_true_cpu[0].shape[1]

print("\n" + "="*80)
print(f"FINÁLNÍ VÝSLEDKY (průměr přes {NUM_TEST_TRAJ} běhů)")
print("="*80)
print("\n--- Průměrná MSE (přesnost) ---")
print(f"Bayesian KNet (BKN):              {avg_mse_bkn:.4f}")
print(f"Klasický KNet (pouze MSE):        {avg_mse_classic_knet:.4f} <-- Nejlepší data-driven přesnost")
print(f"EKF (Nepřesný model):             {avg_mse_ekf_mismatched:.4f}")
print(f"EKF (Ideální model - Benchmark):    {avg_mse_ekf_ideal:.4f} <-- Teoretický limit přesnosti")
print("\n--- Průměrný ANEES (kredibilita/kalibrace) ---")
print(f"Očekávaná hodnota:   {state_dim_for_nees:.4f}")
print("--------------------------------------------------")
print(f"Bayesian KNet (BKN):              {avg_anees_bkn:.4f}")
print(f"Klasický KNet (pouze MSE):        N/A (model neprodukuje kovarianci)")
print(f"EKF (Nepřesný model):             {avg_anees_ekf_mismatched:.4f}")
print(f"EKF (Ideální model - Benchmark):    {avg_anees_ekf_ideal:.4f}")
print("="*80)


Generuji 40 testovacích trajektorií o délce 300...
Generování dat dokončeno.
Extended Kalman Filtry inicializovány.

Vyhodnocuji modely na 40 testovacích trajektoriích...
Dokončena trajektorie 1/40...
Dokončena trajektorie 2/40...
Dokončena trajektorie 3/40...
Dokončena trajektorie 4/40...
Dokončena trajektorie 5/40...
Dokončena trajektorie 6/40...
Dokončena trajektorie 7/40...
Dokončena trajektorie 8/40...
Dokončena trajektorie 9/40...
Dokončena trajektorie 10/40...
Dokončena trajektorie 11/40...
Dokončena trajektorie 12/40...
Dokončena trajektorie 13/40...
Dokončena trajektorie 14/40...
Dokončena trajektorie 15/40...
Dokončena trajektorie 16/40...
Dokončena trajektorie 17/40...
Dokončena trajektorie 18/40...
Dokončena trajektorie 19/40...
Dokončena trajektorie 20/40...
Dokončena trajektorie 21/40...
Dokončena trajektorie 22/40...
Dokončena trajektorie 23/40...
Dokončena trajektorie 24/40...
Dokončena trajektorie 25/40...
Dokončena trajektorie 26/40...
Dokončena trajektorie 27/40...


In [None]:
import torch
from torch.func import jacrev

class ExtendedKalmanFilter:
    def __init__(self, system_model):

        self.device = system_model.Q.device

        self.f = system_model.f
        self.h = system_model.h

        # Pokud je systém lineární, Jakobiány jsou konstantní matice F a H
        self.is_linear_f = getattr(system_model, 'is_linear_f', False)
        self.is_linear_h = getattr(system_model, 'is_linear_h', False)
        if self.is_linear_f:
            self.F = system_model.F
        if self.is_linear_h:
            self.H = system_model.H


        self.Q = system_model.Q.clone().detach().to(self.device)
        self.R = system_model.R.clone().detach().to(self.device)

        self.state_dim = self.Q.shape[0]
        self.obs_dim = self.R.shape[0]

        self.x_filtered_prev = None
        self.P_filtered_prev = None

        self.reset(system_model.Ex0, system_model.P0)

    def reset(self, Ex0=None, P0=None):
        """
        Inicializuje nebo resetuje stav filtru.
        """
        if Ex0 is not None:
            # Ujistíme se, že má správný tvar [dim, 1]
            self.x_filtered_prev = Ex0.clone().detach().reshape(self.state_dim, 1)
        if P0 is not None:
            self.P_filtered_prev = P0.clone().detach()

    def predict_step(self, x_filtered, P_filtered):
        if self.is_linear_f:
            F_t = self.F
        else:
            x_flat = x_filtered.flatten()
            F_t = jacrev(self.f)(x_flat).reshape(self.state_dim, self.state_dim)
        
        # OPRAVA: Transponujeme vstup pro f() a výstup zpět.
        # [State_Dim, 1] -> .T -> [1, State_Dim] -> f() -> [1, State_Dim] -> .T -> [State_Dim, 1]
        x_predict = self.f(x_filtered.T).T

        P_predict = F_t @ P_filtered @ F_t.T + self.Q
        return x_predict, P_predict

    def update_step(self, x_predict, y_t, P_predict):
        y_t = y_t.reshape(self.obs_dim, 1)

        if self.is_linear_h:
            H_t = self.H
        else:
            x_flat = x_predict.flatten()
            H_t = jacrev(self.h)(x_flat).reshape(self.obs_dim, self.state_dim)

        # OPRAVA: Transponujeme vstup pro h() a výstup zpět.
        y_predict = self.h(x_predict.T).T
        innovation = y_t - y_predict
        
        S = H_t @ P_predict @ H_t.T + self.R
        K = P_predict @ H_t.T @ torch.linalg.inv(S)
        x_filtered = x_predict + K @ innovation

        I = torch.eye(self.state_dim, device=self.device)
        P_filtered = (I - K @ H_t) @ P_predict @ (I - K @ H_t).T + K @ self.R @ K.T

        return x_filtered, P_filtered, K, innovation

    def step(self, y_t):
        """
        Provede jeden kompletní krok filtrace (predict + update) pro online použití.
        """
        # 1. Predikce z uloženého interního stavu
        x_predict, P_predict = self.predict_step(self.x_filtered_prev, self.P_filtered_prev)

        # 2. Update s novým měřením
        x_filtered, P_filtered = self.update_step(x_predict, y_t, P_predict)

        # 3. Aktualizace interního stavu pro další volání
        self.x_filtered_prev = x_filtered
        self.P_filtered_prev = P_filtered

        return x_filtered, P_filtered

    def process_sequence(self, y_seq, Ex0=None, P0=None):
            """
            Zpracuje celou sekvenci měření `y_seq` (offline) a vrátí detailní historii.
            """
            # Pokud nejsou zadány, použije defaultní hodnoty z `__init__`
            x_est = Ex0.clone().detach().reshape(self.state_dim, 1) if Ex0 is not None else self.x_filtered_prev.clone()
            P_est = P0.clone().detach() if P0 is not None else self.P_filtered_prev.clone()

            seq_len = y_seq.shape[0]

            x_filtered_history = torch.zeros(seq_len, self.state_dim, device=self.device)
            P_filtered_history = torch.zeros(seq_len, self.state_dim, self.state_dim, device=self.device)
            x_predict_history = torch.zeros(seq_len, self.state_dim, device=self.device)
            P_predict_history = torch.zeros(seq_len, self.state_dim, self.state_dim, device=self.device)
            kalman_gain_history = torch.zeros(seq_len, self.state_dim, self.obs_dim, device=self.device)
            innovation_history = torch.zeros(seq_len, self.obs_dim, device=self.device)

            for t in range(seq_len):
                # 1. Predict
                x_predict, P_predict = self.predict_step(x_est, P_est)

                # 2. Update
                x_est, P_est, K, innovation = self.update_step(x_predict, y_seq[t], P_predict)

                x_filtered_history[t] = x_est.squeeze()
                P_filtered_history[t] = P_est
                x_predict_history[t] = x_predict.squeeze()
                P_predict_history[t] = P_predict
                kalman_gain_history[t] = K
                innovation_history[t] = innovation.squeeze()

            return {
                'x_filtered': x_filtered_history,
                'P_filtered': P_filtered_history,
                'x_predict': x_predict_history,
                'P_predict': P_predict_history,
                'Kalman_gain': kalman_gain_history,
                'innovation': innovation_history
            }