Skip to content

Commit

Permalink
Merge pull request #315 from PTB-M4D/fix_DFT_covariance_identity
Browse files Browse the repository at this point in the history
Fix covariance of `GUM_iDFT` output
  • Loading branch information
BjoernLudwigPTB committed Jul 26, 2023
2 parents 4efb97a + aabe88e commit d52d68d
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 52 deletions.
114 changes: 71 additions & 43 deletions setup.py
Expand Up @@ -2,79 +2,104 @@
import codecs
import os
from os import path
from typing import List, Tuple

from setuptools import Command, find_packages, setup


def get_readme():
"""Get README.md's content"""
this_directory = path.abspath(path.dirname(__file__))
with open(path.join(this_directory, "README.md"), encoding="utf-8") as f:
return f.read()
with open(path.join(this_directory, "README.md"), encoding="utf-8") as file:
return file.read()


def read(rel_path):
def get_version(rel_path: str) -> str:
"""Extract __version__ variable's value from a file's content"""
for line in read_file_content(rel_path).splitlines():
if line.startswith("__version__"):
delim = '"' if '"' in line else "'"
return line.split(delim)[1]
raise RuntimeError("Unable to find version string.")


def read_file_content(rel_path: str) -> str:
"""Extract a file's content and provide a variable to access it"""
here = path.abspath(path.dirname(__file__))
with codecs.open(path.join(here, rel_path), "r") as fp:
return fp.read()
with codecs.open(path.join(here, rel_path), "r") as file:
return file.read()


class Tweet(Command):
"""Handle the tweeting on executing setup.py tweet"""

_filename: str

filename: str
_consumer_key: str
_consumer_secret: str
_access_token: str
_access_token_secret: str
_twitter_api_auth_handle = None

description = "Send new tweets to the Twitter API to announce releases"
description: str = "Send new tweets to the Twitter API to announce releases"

user_options = [("filename=", "f", "filename containing the tweet")]
user_options: List[Tuple[str, str, str]] = [
("filename=", "f", "filename containing the tweet")
]

def initialize_options(self):
self.filename = "tweet.txt"
"""Set filename to default value"""
self._filename = "tweet.txt"

def finalize_options(self):
if self.filename is None:
"""React to invalid circumstance that no filename is set"""
if self._filename is None:
raise RuntimeError("Parameter --filename is missing")

def run(self):
import tweepy
"""Actually organize and conduct the tweeting"""
from tweepy import Client

def _tweet():
_get_twitter_api_handle().update_status(read_tweet_from_file())
def set_twitter_api_secrets_from_environment():
self._consumer_key = os.getenv("CONSUMER_KEY")
self._consumer_secret = os.getenv("CONSUMER_SECRET")
self._access_token = os.getenv("ACCESS_TOKEN")
self._access_token_secret = os.getenv("ACCESS_TOKEN_SECRET")

def _get_twitter_api_handle():
return tweepy.API(_get_twitter_api_auth_handle())
def set_twitter_api_auth_handle():
self._twitter_api_auth_handle = raise_error_or_retrieve_handle()

def _get_twitter_api_auth_handle():
def raise_error_or_retrieve_handle() -> Client:
try:
auth = tweepy.OAuthHandler(
os.getenv("consumer_key"), os.getenv("consumer_secret")
)
auth.set_access_token(
os.getenv("access_token"), os.getenv("access_token_secret")
)
return auth
except TypeError as e:
if "Consumer key must be" in str(e):
return initialize_twitter_api_auth_handle()
except TypeError as type_error_message:
if "must be string or bytes" in str(type_error_message):
raise ValueError(
"ValueError: Environment variables 'consumer_key', "
"'consumer_secret', 'access_token' and 'access_token_secret' "
"ValueError: Environment variables 'CONSUMER_KEY', "
"'CONSUMER_SECRET', 'ACCESS_TOKEN' and 'ACCESS_TOKEN_SECRET' "
"have to be set."
)
raise TypeError from type_error_message

def read_tweet_from_file() -> str:
with open(self.filename, "r") as f:
content: str = f.read()
return content
def initialize_twitter_api_auth_handle() -> Client:
return Client(
consumer_key=self._consumer_key,
consumer_secret=self._consumer_secret,
access_token=self._access_token,
access_token_secret=self._access_token_secret,
)

_tweet()
def tweet():
self._twitter_api_auth_handle.create_tweet(text=read_tweet_from_file())

def read_tweet_from_file() -> str:
with open(self._filename, "r", encoding="utf-8") as file:
content: str = file.read()
return content

def get_version(rel_path):
for line in read(rel_path).splitlines():
if line.startswith("__version__"):
delim = '"' if '"' in line else "'"
return line.split(delim)[1]
else:
raise RuntimeError("Unable to find version string.")
set_twitter_api_secrets_from_environment()
set_twitter_api_auth_handle()
tweet()


current_release_version = get_version("src/PyDynamic/__init__.py")
Expand All @@ -86,10 +111,13 @@ def get_version(rel_path):
long_description=get_readme(),
long_description_content_type="text/markdown",
url="https://ptb-m4d.github.io/PyDynamic/",
download_url="https://github.com/PTB-M4D/PyDynamic/releases/download/v{0}/"
"PyDynamic-{0}.tar.gz".format(current_release_version),
author="Sascha Eichstädt, Maximilian Gruber, Björn Ludwig, Thomas Bruns, "
"Martin Weber",
download_url=(
f"https://github.com/PTB-M4D/PyDynamic/releases/download/"
f"v{current_release_version}/PyDynamic-{current_release_version}.tar.gz"
),
author=(
"Sascha Eichstädt, Maximilian Gruber, Björn Ludwig, Thomas Bruns, Martin Weber"
),
author_email="sascha.eichstaedt@ptb.de",
keywords="measurement uncertainty, dynamic measurements, metrology, GUM",
packages=find_packages(where="src"),
Expand Down
4 changes: 3 additions & 1 deletion src/PyDynamic/uncertainty/propagate_DFT.py
Expand Up @@ -418,7 +418,9 @@ def GUM_iDFT(
II = UF[N_in:, N_in:]
# propagate uncertainties
Ux = np.dot(Cc, np.dot(RR, Cc.T))
Ux += 2 * np.dot(Cc, np.dot(RI, Cs.T))
term2 = np.dot(Cc, np.dot(RI, Cs.T))
Ux += term2
Ux += term2.T
Ux += np.dot(Cs, np.dot(II, Cs.T))
else:
RR = UF[:N_in]
Expand Down
101 changes: 93 additions & 8 deletions test/test_DFT.py
@@ -1,15 +1,18 @@
""" Perform tests on methods to handle DFT and inverse DFT."""

from typing import Callable, Dict, Optional, Tuple, Union

import numpy as np
import pytest
from hypothesis import assume, given, strategies as hst
import scipy.linalg as scl
from hypothesis import assume, given, settings, strategies as hst
from hypothesis.strategies import composite
from numpy.testing import assert_allclose, assert_almost_equal
from typing import Callable, Dict, Optional, Tuple, Union

from PyDynamic.misc.testsignals import multi_sine
from PyDynamic.misc.tools import real_imag_2_complex as ri2c
from PyDynamic.misc.tools import (
complex_2_real_imag as c2ri,
real_imag_2_complex as ri2c,
)

# noinspection PyProtectedMember
from PyDynamic.uncertainty.propagate_DFT import (
Expand All @@ -20,6 +23,7 @@
GUM_iDFT,
Time2AmpPhase,
)
from PyDynamic.uncertainty.propagate_MonteCarlo import UMC_generic
from .conftest import (
check_no_nans_and_infs,
hypothesis_float_matrix,
Expand Down Expand Up @@ -163,7 +167,6 @@ def random_vector_and_matrix_with_matching_number_of_columns(

@composite
def iDFT_input_output_lengths(draw: Callable, equal_lengths: bool = False):

input_length = draw(hst.integers(min_value=5, max_value=15))

if equal_lengths:
Expand Down Expand Up @@ -208,7 +211,8 @@ def test_DFT_iDFT_identity(params):
assert N == Nx

# create time signal and corresponding covariance matrix
x, x_cov = np.arange(N) + 2, np.eye(N)
x = np.arange(N) + 2
x_cov = scl.toeplitz(np.arange(N)[::-1])

# get spectrum
X, X_cov = GUM_DFT(x, x_cov)
Expand All @@ -217,8 +221,89 @@ def test_DFT_iDFT_identity(params):
x_reconstructed, x_reconstructed_cov = GUM_iDFT(X, X_cov, Nx=Nx)

# check signal and covariance in case of reconstruction to identity
assert_allclose(x, x_reconstructed, atol=1e-14)
assert_allclose(x_cov, x_reconstructed_cov, atol=1e-14)
assert_allclose(x, x_reconstructed, atol=1e-13)
assert_allclose(x_cov, x_reconstructed_cov, atol=1e-13)


def evaluate_dft_mc(x):
return c2ri(np.fft.rfft(x))


@settings(deadline=2000)
@pytest.mark.slow
@given(iDFT_input_output_lengths(equal_lengths=True))
def test_DFT_MC(params):
N = params["input_length"]

# create time signal and corresponding covariance matrix
x = np.arange(N) + 2
x_cov = scl.toeplitz(0.01 * np.arange(N)[::-1])

# get spectrum
X, X_cov = GUM_DFT(x, x_cov)

# get spectrum (Monte Carlo)
draw_samples = lambda size: np.random.multivariate_normal(
mean=x, cov=x_cov, size=size
)
X_MC, X_MC_cov, _, _ = UMC_generic(
draw_samples,
evaluate_dft_mc,
runs=20000,
blocksize=2000,
runs_init=2,
return_samples=False,
return_histograms=False,
compute_full_covariance=True,
n_cpu=1,
)

# compare analytical and numerical result
assert_allclose(X, X_MC, rtol=1e-1)
assert_allclose(X_cov, X_MC_cov, atol=8e-1)


def evaluate_idft_mc(X, Nx):
return np.fft.irfft(ri2c(X), Nx)


@pytest.mark.slow
@given(iDFT_input_output_lengths(equal_lengths=True))
def test_iDFT_MC(params):
N = params["input_length"]
Nx = params["output_length"]
NX = evaluate_dft_mc(np.zeros(N)).size # corresponding spectrum size
assert N == Nx

# create spectrum and corresponding covariance matrix
X = np.arange(NX) + 2
X_cov = scl.toeplitz(0.01 * np.arange(NX)[::-1])

# get spectrum
x, x_cov = GUM_iDFT(X, X_cov, Nx=Nx)

# get spectrum (Monte Carlo)
draw_samples = lambda size: np.random.multivariate_normal(
mean=X, cov=X_cov, size=size
)
import functools

evaluate = functools.partial(evaluate_idft_mc, Nx=Nx)
x_MC, x_MC_cov, _, _ = UMC_generic(
draw_samples,
evaluate,
runs=1000,
blocksize=1000,
runs_init=2,
return_samples=False,
return_histograms=False,
compute_full_covariance=True,
n_cpu=1,
)

# compare analytical and numerical result
assert_allclose(x, x_MC, atol=2e-1)
assert_allclose(x_cov, x_MC_cov, atol=1e-1)


@given(iDFT_input_output_lengths())
Expand Down

0 comments on commit d52d68d

Please sign in to comment.