In [5]:
from econometron.utils.state_space.update_ss import make_state_space_updater
from econometron.filters import kalman_objective
import numpy as np

In [9]:
def ucm_solver(params: dict) -> tuple:
    F = np.array([[1.0, 0.0]])
    P = np.array([
        [1.0, 1.0],
        [0.0, 1.0]
    ])
    return F, P

def ucm_build_R(params: dict) -> np.ndarray:
    sigma_irregular = params.get('sigma_irregular', 1.0)
    return np.array([[sigma_irregular]])

def ucm_build_C(params: dict) -> np.ndarray:
    sigma_level = params.get('sigma_level', 1.0)
    sigma_slope = params.get('sigma_slope', 1.0)
    return np.array([
        [sigma_level, 0.0],
        [0.0, sigma_slope]
    ])

def ucm_derived_fn(params: dict):
    for key in ['sigma_irregular', 'sigma_level', 'sigma_slope']:
        if key in params:
            params[key] = max(params[key], 1e-6)

ucm_base_params = {
    'sigma_irregular': 1.0,
    'sigma_level': 1.0,
    'sigma_slope': 1.0
}

ucm_state_space_updater = make_state_space_updater(
    base_params=ucm_base_params,
    solver=ucm_solver,
    build_R=ucm_build_R,
    build_C=ucm_build_C,
    derived_fn=ucm_derived_fn
)

In [10]:
params_update = {
        'sigma_irregular': 0.5,
        'sigma_level': 1.2,
        'sigma_slope': 0.3
    }
UCM=ucm_state_space_updater(params_update)

In [11]:
UCM

{'A': array([[1., 1.],
        [0., 1.]]),
 'D': array([[1., 0.]]),
 'Q': array([[1.44, 0.  ],
        [0.  , 0.09]]),
 'R': array([[0.25]])}