Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise FIRuncFilter full covariance #190

Merged
merged 25 commits into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0d7e5ad
wip(_fir_filter): rewrite FIR uncertainty with full covariance support
mgrub Nov 19, 2020
4a89b49
wip(_fir_filter): reduce runtime in case of certain signal or filter
mgrub Nov 19, 2020
03a072b
wip(_fir_filter): add wrapper resembles FIRuncFilter
mgrub Nov 20, 2020
b937a8b
feat(FIRuncFilter_2): add FIR filter with full covariance support
mgrub Nov 20, 2020
abdb574
test(FIRuncFilter_2): compare FIRuncFilter_2 against FIRuncFilter
mgrub Nov 20, 2020
b36772f
test(_fir_filter): compare with Monte Carlo method
mgrub Nov 20, 2020
6586b0a
wip(FIRuncFilter_2): fix missing import in __init__
mgrub Nov 20, 2020
83c6a82
test(propagate_filter): adjust definition of covariance matrix
mgrub Nov 20, 2020
d3f98c7
test(FIRuncFilter_2): adjust definitions of covariance
mgrub Nov 23, 2020
9f33097
test(FIRuncFilter_2): introduce random autocorrelation
mgrub Nov 23, 2020
6e92b26
wip(FIRuncFilter_2): include legacy return option
mgrub Nov 24, 2020
4410f4a
wip(propagate_filter): test different convolution backends
mgrub Nov 24, 2020
661956e
wip(FIRuncFilter): adjust example
mgrub Nov 24, 2020
c5f5ef0
wip(_fir_filter): store less variables for better memory footprint
mgrub Nov 24, 2020
090b8f8
docs(_fir_filter): adjust docstring and ValueError
mgrub Nov 25, 2020
7186755
wip(_fir_filter_diag): add method fir_filter_diag
mgrub Nov 25, 2020
6681792
wip(FIRuncFilter_2): add comments about reasoning of implementation
mgrub Nov 27, 2020
9c9d77a
wip(_fir_filter_diag): fix broken condition
mgrub Nov 27, 2020
fef6d2e
refactor(propagate_filter): rename FIRuncFilter_2
mgrub Feb 15, 2021
6c801a4
test(FIRuncFilter): test full covariance output of MC
mgrub Feb 15, 2021
9bcfe97
Merge branch 'master' of github.com:PTB-PSt1/PyDynamic into revise_FI…
mgrub Feb 15, 2021
fa57605
test(FIRuncFilter): adjust MC threshold
mgrub Feb 15, 2021
10bf715
wip(FIRuncFilter): inform user about option 'return_full_covariance'
mgrub Feb 15, 2021
d20b4f9
wip(FIRuncFilter): different position for user note about FIR full co…
mgrub Feb 15, 2021
91b1c66
fix(test_propagate_filter): relax Monte Carlo threshold
mgrub Feb 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions PyDynamic/__init__.py
Expand Up @@ -40,6 +40,7 @@
"AmpPhase2Time",
"Time2AmpPhase",
"FIRuncFilter",
"FIRuncFilter_2",
"IIRuncFilter",
"MC",
"SMC",
Expand Down
3 changes: 2 additions & 1 deletion PyDynamic/uncertainty/__init__.py
Expand Up @@ -20,7 +20,7 @@
Time2AmpPhase,
)

from .propagate_filter import FIRuncFilter, IIRuncFilter
from .propagate_filter import FIRuncFilter, IIRuncFilter, FIRuncFilter_2

from .propagate_MonteCarlo import MC, SMC, UMC, UMC_generic

Expand All @@ -36,6 +36,7 @@
"AmpPhase2Time",
"Time2AmpPhase",
"FIRuncFilter",
"FIRuncFilter_2",
"IIRuncFilter",
"MC",
"SMC",
Expand Down
227 changes: 208 additions & 19 deletions PyDynamic/uncertainty/propagate_filter.py
Expand Up @@ -18,9 +18,179 @@
import numpy as np
from scipy.linalg import toeplitz
from scipy.signal import lfilter, lfilter_zi, dimpulse
from scipy.signal import convolve
from ..misc.tools import trimOrPad

__all__ = ["FIRuncFilter", "IIRuncFilter"]
__all__ = ["FIRuncFilter", "IIRuncFilter", "FIRuncFilter_2"]


def _fir_filter(x, theta, Ux=None, Utheta=None, initial_conditions="stationary"):
"""Uncertainty propagation for signal x with covariance Ux
and uncertain FIR filter theta with covariance Utheta.

If either Ux or Utheta are omitted (None), then corresponding terms are not
calculated to reduce computation time.

Parameters
----------
x: np.ndarray
filter input signal
theta: np.ndarray
FIR filter coefficients
Ux: np.ndarray, optional
covariance matrix of input signal
Utheta: np.ndarray, optional
covariance matrix associated with theta
if the filter is fully certain, use `Utheta = None` (default) to make use of more efficient calculations.
see also the comparison given in <examples\Digital filtering\FIRuncFilter_runtime_comparison.py>
initial_conditions: str, optional
stationary: assume signal + uncertainty are constant before t=0 (default)
zero: assume signal + uncertainty are zero before t=0


Returns
-------
y: np.ndarray
FIR filter output signal
Uy: np.ndarray
covariance matrix of filter output y


References
----------
* Elster and Link 2008 [Elster2008]_

.. seealso:: :mod:`PyDynamic.deconvolution.fit_filter`
BjoernLudwigPTB marked this conversation as resolved.
Show resolved Hide resolved

"""

Ntheta = len(theta) # FIR filter size

if initial_conditions == "constant":
x0 = x[0]

elif initial_conditions == "zero":
x0 = 0.0

else:
raise ValueError(f"'initial_conditions' = '{initial_conditions}'.")
BjoernLudwigPTB marked this conversation as resolved.
Show resolved Hide resolved

# propagate filter
y, _ = lfilter(theta, 1.0, x, zi=x0 * lfilter_zi(theta, 1.0))

# propagate uncertainty
Uy = np.zeros((len(x), len(x)))

## only calculate subterms, that are non-zero (compare eq. 34 of paper)
if Ux is not None:
## extend covariance Ntheta steps into the past
if initial_conditions == "constant":
Ux_extended = _stationary_prepend_covariance(Ux, Ntheta - 1)

elif initial_conditions == "zero":
# extend covariance Ntheta steps into the past
Ux_extended = np.pad(
Ux,
((Ntheta - 1, 0), (Ntheta - 1, 0)),
"constant",
constant_values=0,
)

# calc subterm theta^T * Ux * theta
Uy += convolve(np.outer(theta, theta), Ux_extended, mode="valid")

if Utheta is not None:
## extend signal Ntheta steps into the past
x_extended = np.r_[np.full((Ntheta - 1), x0), x]

# calc subterm x^T * Utheta * x
Uy += convolve(np.outer(x_extended, x_extended), Utheta, mode="valid")

if (Ux is not None) and (Utheta is not None):
# calc subterm Tr(Ux * Utheta)
Uy += convolve(Ux_extended, Utheta.T, mode="valid")

return y, Uy


def _stationary_prepend_covariance(U, n):
""" Prepend covariance matrix U by n steps into the past"""

c = np.r_[U[:, 0], np.zeros(n)]
r = np.r_[U[0, :], np.zeros(n)]

U_adjusted = toeplitz(c, r)
U_adjusted[n:, n:] = U

return U_adjusted


def FIRuncFilter_2(y, sigma_noise, theta, Utheta=None, shift=0, blow=None, kind="corr", legacy_return=True):
"""This method keeps the signature of `PyDynamic.uncertainty.FIRuncFilter`, but internally
works differently and returns a full covariance matrix. Also, sigma_noise can be a full
covariance matrix.

Returns
-------
x: np.ndarray
FIR filter output signal
ux: np.ndarray
covariance matrix containing uncertainties associated with x


References
----------
* Elster and Link 2008 [Elster2008]_

.. seealso:: :mod:`PyDynamic.deconvolution.fit_filter`

"""

Ntheta = len(theta) # FIR filter size

# check which case of sigma_noise is necessary
if isinstance(sigma_noise, float):
Uy = np.diag(np.full(len(y), sigma_noise ** 2))

elif isinstance(sigma_noise, np.ndarray):

if len(sigma_noise.shape) == 1:
if kind == "diag":
Uy = np.diag(sigma_noise ** 2)
elif kind == "corr":
Uy = toeplitz(trimOrPad(sigma_noise, len(y)))
else:
raise ValueError("unknown kind of sigma_noise")

elif len(sigma_noise.shape) == 2:
Uy = sigma_noise

else:
raise ValueError(
"Unsupported value of sigma_noise. Please check the documentation."
)

# filter operation(s)
if isinstance(blow, np.ndarray):
# apply (fully certain) lowpass-filter
xlow, Ulow = _fir_filter(y, blow, Uy, None, initial_conditions="constant")

# apply filter to lowpass-filtered signal
x, Ux = _fir_filter(xlow, theta, Ulow, Utheta, initial_conditions="constant")

else:
# apply filter to input signal
x, Ux = _fir_filter(y, theta, Uy, Utheta, initial_conditions="constant")

# shift result
if shift != 0:
x = np.roll(x, -int(shift))
Ux = np.roll(Ux, (-int(shift), -int(shift)))

if legacy_return:
return x, np.sqrt(np.abs(np.diag(Ux)))
else:
return x, Ux


def FIRuncFilter(y, sigma_noise, theta, Utheta=None, shift=0, blow=None, kind="corr"):
Expand All @@ -39,7 +209,7 @@ def FIRuncFilter(y, sigma_noise, theta, Utheta=None, shift=0, blow=None, kind="c
FIR filter coefficients
Utheta: np.ndarray, optional
covariance matrix associated with theta
if the filter is fully certain, use `Utheta = None` (default) to make use of more efficient calculations.
if the filter is fully certain, use `Utheta = None` (default) to make use of more efficient calculations.
see also the comparison given in <examples\Digital filtering\FIRuncFilter_runtime_comparison.py>
shift: int, optional
time delay of filter output signal (in samples) (defaults to 0)
Expand Down Expand Up @@ -89,12 +259,15 @@ def FIRuncFilter(y, sigma_noise, theta, Utheta=None, shift=0, blow=None, kind="c
f"parameters sigma_noise and kind for more information."
)


if isinstance(blow,np.ndarray): # calculate low-pass filtered signal and propagate noise
if isinstance(
blow, np.ndarray
): # calculate low-pass filtered signal and propagate noise

if isinstance(sigma2, float):
Bcorr = np.correlate(blow, blow, 'full') # len(Bcorr) == 2*Ntheta - 1
ycorr = sigma2 * Bcorr[len(blow)-1:] # only the upper half of the correlation is needed
Bcorr = np.correlate(blow, blow, "full") # len(Bcorr) == 2*Ntheta - 1
ycorr = (
sigma2 * Bcorr[len(blow) - 1 :]
) # only the upper half of the correlation is needed

# trim / pad to length Ntheta
ycorr = trimOrPad(ycorr, Ntheta)
Expand All @@ -118,7 +291,10 @@ def FIRuncFilter(y, sigma_noise, theta, Utheta=None, shift=0, blow=None, kind="c
sigma2_extended = np.append(sigma2[0] * np.ones((Ntheta - 1)), sigma2)

N = toeplitz(blow[1:][::-1], np.zeros_like(sigma2_extended)).T
M = toeplitz(trimOrPad(blow, len(sigma2_extended)), np.zeros_like(sigma2_extended))
M = toeplitz(
trimOrPad(blow, len(sigma2_extended)),
np.zeros_like(sigma2_extended),
)
SP = np.diag(sigma2[0] * np.ones_like(blow[1:]))
S = np.diag(sigma2_extended)

Expand All @@ -139,7 +315,9 @@ def FIRuncFilter(y, sigma_noise, theta, Utheta=None, shift=0, blow=None, kind="c
sigma2 = trimOrPad(sigma2, len(blow) + Ntheta - 1)
sigma2_reflect = np.pad(sigma2, (len(blow) - 1, 0), mode="reflect")

ycorr = np.correlate(sigma2_reflect, Bcorr, mode="valid") # used convolve in a earlier version, should make no difference as Bcorr is symmetric
ycorr = np.correlate(
sigma2_reflect, Bcorr, mode="valid"
) # used convolve in a earlier version, should make no difference as Bcorr is symmetric
Ulow = toeplitz(ycorr)

xlow, _ = lfilter(blow, 1.0, y, zi=y[0] * lfilter_zi(blow, 1.0))
Expand All @@ -155,7 +333,9 @@ def FIRuncFilter(y, sigma_noise, theta, Utheta=None, shift=0, blow=None, kind="c
sigma2_extended = np.append(sigma2[0] * np.ones((Ntheta - 1)), sigma2)

# Ulow is to be sliced from V, see below
V = np.diag(sigma2_extended) # this is not Ulow, same thing as in the case of a provided blow (see above)
V = np.diag(
sigma2_extended
) # this is not Ulow, same thing as in the case of a provided blow (see above)

elif kind == "corr":
Ulow = toeplitz(trimOrPad(sigma2, Ntheta))
Expand All @@ -170,10 +350,10 @@ def FIRuncFilter(y, sigma_noise, theta, Utheta=None, shift=0, blow=None, kind="c
if len(theta.shape) == 1:
theta = theta[:, np.newaxis]

# NOTE: In the code below whereever `theta` or `Utheta` get used, they need to be flipped.
# NOTE: In the code below whereever `theta` or `Utheta` get used, they need to be flipped.
# This is necessary to take the time-order of both variables into account. (Which is descending
# for `theta` and `Utheta` but ascending for `Ulow`.)
#
#
# Further details and illustrations showing the effect of not-flipping
# can be found at https://github.com/PTB-PSt1/PyDynamic/issues/183

Expand All @@ -185,32 +365,41 @@ def FIRuncFilter(y, sigma_noise, theta, Utheta=None, shift=0, blow=None, kind="c

if isinstance(Utheta, np.ndarray):
for k in range(len(sigma2)):
Ulow = V[k:k+Ntheta,k:k+Ntheta]
UncCov[k] = np.squeeze(np.flip(theta).T.dot(Ulow.dot(np.flip(theta))) + np.abs(np.trace(Ulow.dot(np.flip(Utheta))))) # static part of uncertainty
Ulow = V[k : k + Ntheta, k : k + Ntheta]
UncCov[k] = np.squeeze(
np.flip(theta).T.dot(Ulow.dot(np.flip(theta)))
+ np.abs(np.trace(Ulow.dot(np.flip(Utheta))))
) # static part of uncertainty
else:
for k in range(len(sigma2)):
Ulow = V[k:k+Ntheta,k:k+Ntheta]
UncCov[k] = np.squeeze(np.flip(theta).T.dot(Ulow.dot(np.flip(theta)))) # static part of uncertainty
Ulow = V[k : k + Ntheta, k : k + Ntheta]
UncCov[k] = np.squeeze(
np.flip(theta).T.dot(Ulow.dot(np.flip(theta)))
) # static part of uncertainty

else:
if isinstance(Utheta, np.ndarray):
UncCov = np.flip(theta).T.dot(Ulow.dot(np.flip(theta))) + np.abs(np.trace(Ulow.dot(np.flip(Utheta)))) # static part of uncertainty
UncCov = np.flip(theta).T.dot(Ulow.dot(np.flip(theta))) + np.abs(
np.trace(Ulow.dot(np.flip(Utheta)))
) # static part of uncertainty
else:
UncCov = np.flip(theta).T.dot(Ulow.dot(np.flip(theta))) # static part of uncertainty
UncCov = np.flip(theta).T.dot(
Ulow.dot(np.flip(theta))
) # static part of uncertainty

if isinstance(Utheta, np.ndarray):
unc = np.empty_like(y)

# use extended signal to match assumption of stationary signal prior to first entry
xlow_extended = np.append(np.full(Ntheta - 1, xlow[0]), xlow)

for m in range(len(xlow)):
# extract necessary part from input signal
XL = xlow_extended[m : m + Ntheta, np.newaxis]
unc[m] = XL.T.dot(np.flip(Utheta).dot(XL)) # apply formula from paper
else:
unc = np.zeros_like(y)

ux = np.sqrt(np.abs(UncCov + unc))
ux = np.roll(ux, -int(shift)) # correct for delay

Expand Down
6 changes: 3 additions & 3 deletions examples/Digital filtering/FIRuncFilter_runtime_comparison.py
Expand Up @@ -6,7 +6,7 @@
import time
import pandas as pd

from PyDynamic.uncertainty.propagate_filter import FIRuncFilter
from PyDynamic.uncertainty.propagate_filter import FIRuncFilter as FIRuncFilter

def evaluate(signal, filter, case="full/zero/none"):
# different covariance matrices
Expand All @@ -28,13 +28,13 @@ def evaluate(signal, filter, case="full/zero/none"):
return runtime, y, Uy


filter_lengths = [10, 50, 100, 200, 500, 1000, 2000]
filter_lengths = [10, 50, 100, 200, 500, 1000]
cases = ["full", "zero", "none"]
results = pd.DataFrame(index=filter_lengths, columns=cases)


# prepare signal
signal_length = 10000
signal_length = 2000
sigma_noise = 1e-2 # 1e-5
signal = sigma_noise * np.random.randn(signal_length)

Expand Down