In [None]:
import math
import numpy as np
from typing import Any, Callable

class KalmanFilter_MultipleObservation:
    def __init__(self, 
        a:np.ndarray=None, 
        b:np.ndarray=None, 
        c:np.ndarray=None, 
        q:np.ndarray=None, 
        r:np.ndarray=None, 
        init_x:np.ndarray=None, 
        init_p:np.ndarray=None) -> None:
        """線形定常カルマンフィルタ

        複数観測値

        状態変数をn次元ベクトルとする

        モデルのノイズベクトルをp次元ベクトルとする(ややこしい)

        観測値ベクトルをq次元ベクトルとする

        注意として、l次元ベクトルはlx1行列とすること(内部で変換してもいいが、引数のややこしさ回避のためこの様にした)

            > np.array([10, 20]) #間違い

            > np.array([[10],  
                               [20]]) #これ
        
        parameters
        ----------
            a : np.ndarray
                推移行列 
                size : nxn
            b : np.ndarray
                名前がわからない
                size : nxp
            c : np.ndarray 
                観測行列。単一観測値のときと向きが違うことに注意
                size : qxn
            q : np.ndarray
                モデルの共分散
                size : pxp
            r : float
                観測値の共分散
                size : qxq
            init_x : np.ndarray
                状態変数の初期値
                size : nx1
            init_p : np.ndarray
                共分散行列の
                size : nxn"""
        self.a : np.ndarray = a
        self.b : np.ndarray = b
        self.c : np.ndarray = c
        self.x : np.ndarray = init_x
        self.p : np.ndarray = init_p
        self.q : np.ndarray = q
        self.r : np.ndarray = r
    @property
    def observation(self) -> float:
        """Cx"""
        return self.c@self.x
    def update(self, y) -> None:
        """アップデート"""
        #予測
        x_ = self.a@self.x
        p_ = self.a@self.p@self.a.T+self.b@self.q@self.b.T
        #修正（フィルタリング）
        self.g = p_@self.c.T@np.linalg.solve(self.c@p_@self.c.T+self.r, np.eye(self.r.shape[0]))
        self.x = x_+self.g@(y-self.c@x_)
        self.p = (np.eye(len(self.x))-self.g@self.c)@p_


In [1]:
import numpy as np
import matplotlib.pyplot as plt

def sim():
    true = 100
    tp = 100

    loc = 0
    lp = 500
    n = 1000
    particle = np.random.normal(loc, lp, n)
    weight = np.full([n], 1000.)

    u = 0.98
    m = int(np.log(1/lp)/np.log(u))
    for i in range(m):
        h = np.random.normal(true, tp, 1)
        weight_temp = max(abs(h-particle))-abs(h-particle)
        weight_temp = weight_temp/sum(weight_temp)
        weight += weight_temp*0.3
        weight = weight/sum(weight)
        loc = sum(particle*weight)
        lp = lp*u
        boder1 = np.random.random(n)*weight*n/sum(weight) < 0.2
        boder2 = abs(weight-loc)>lp*3
        boder = boder1 | boder2
        particle[boder] = np.random.normal(loc, lp, len(particle[boder]))

    # print(f"TRUE : {true}")
    # print(f"実際の分散 : {tp**2}")
    # print(f"推測 : {sum(weight*particle)}")
    # print(f"誤差 : {true-sum(weight*particle)}")
    # print(f"2乗誤差 : {(true-sum(weight*particle))**2}")

    return (true-sum(weight*particle))**2

    # plt.show()

i = 0
for _ in range(100):
    i += sim()
print(i/100)

96.84849647120674


In [2]:
# def diff(f, x, dx=1e-4):
#     return (f(x+dx)-f(x-dx))/2/dx
import numpy as np

def jacobian(f, x, dx=1e-4, *args, **kwargs):
    n = len(x)
    dx_offset = np.eye(n)*dx
    f_ = lambda x: f(x, *args, **kwargs)
    g = lambda x: np.asarray(list(map(f_, x)))
    result = (g(x+dx_offset)-g(x-dx_offset))/2/dx
    return result.T

def f(x):
    return np.array([x[1-1]**x[2-1]])

x = np.asarray([2, 3])

jacobian(f, x)

# f(x)



array([[12.00000001,  5.54517745]])

In [189]:
from collections import abc
from typing import Any, Self


class FlaggedAttributeDescriptor(object):
    def __set_name__(self, obj, name: str) -> None:
        self._name = name
    def __init__(self, flag_name: str) -> None:
        self._flag_name = flag_name
    def __set__(self, instance, value: Any) -> None:
        instance.__dict__[self._flag_name] = True
        instance.__dict__[self._name] = value
    def __get__(self, instance, *args) -> Any:
        return instance.__dict__[self._name]

class ArrayReshaperDescriptor(object):
    def __set_name__(self, obj, name: str) -> None:
        self._name = name
    def __init__(self, input_size: tuple, output_size: tuple, 
                 storage_attribute_name: str) -> None:
        self._input_size = input_size
        self._output_size = output_size
        self._storage_attribute_name = storage_attribute_name
    def __set__(self, instance, array_like_object:np.ndarray) -> None:
        if isinstance(array_like_object, np.ndarray):
            pass
        elif isinstance(array_like_object, abc.Sequence):
            pass
        else:
            raise ValueError("値が配列オブジェクトではありません")
        try:
            reshaped_array_like_object \
                = np.reshape(array_like_object, self._input_size)
        except:
            raise ValueError("array sizeが一致していません")
        instance.__dict__[self._storage_attribute_name] \
            = reshaped_array_like_object
    def __get__(self, instance, *args) -> ...:
        array_like_object = instance.__dict__[self._storage_attribute_name]
        if isinstance(array_like_object, np.ndarray):
            pass
        elif isinstance(array_like_object, abc.Sequence):
            pass
        else:
            raise ValueError("値が配列オブジェクトではありません")
        try:
            reshaped_array_like_object \
                = np.reshape(array_like_object, self._output_size)
        except:
            raise ValueError("array sizeが一致していません")
        return reshaped_array_like_object

class TestClass:
    attr1 = FlaggedAttributeDescriptor("is_update")
    attr2 = ArrayReshaperDescriptor(input_size=(-1, 1), output_size=(-1), 
                                    storage_attribute_name="attr1")
    is_update:bool = False

a = TestClass()
print(a.is_update)
a.attr1 = 10
print(a.is_update)
a.attr2 = [1,2,3,4,5]
print(a.attr1)

False
True
[[1]
 [2]
 [3]
 [4]
 [5]]
