Skip to content

Commit

Permalink
Add code from PR obspy#1718 to remove evalresp dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
amkearns-usgs authored and flixha committed Nov 15, 2022
1 parent 502b97f commit 67c2e34
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 112 deletions.
33 changes: 13 additions & 20 deletions obspy/core/inventory/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ def __init__(self, stage_sequence_number, stage_gain,
stage_gain_frequency, input_units, output_units,
pz_transfer_function_type,
normalization_frequency, zeros, poles,
normalization_factor=None, resource_id=None,
resource_id2=None, name=None, input_units_description=None,
normalization_factor=None, resource_id=None, resource_id2=None,
name=None, input_units_description=None,
output_units_description=None, description=None,
decimation_input_sample_rate=None, decimation_factor=None,
decimation_offset=None, decimation_delay=None,
Expand Down Expand Up @@ -391,17 +391,18 @@ def calc_normalization_factor(self):
return None

A0 = 1.0 + (1j * 0.0)
if self.pz_transfer_function_type == "LAPLACE (HERTZ)":
# TODO: ensure that this coercion to float is valid
if self.transfer_function_type == "LAPLACE (HERTZ)":
s = 1j * float(self.normalization_frequency)
elif self.pz_transfer_function_type == "LAPLACE (RADIANS/SECOND)":
elif self.transfer_function_type == "LAPLACE (RADIANS/SECOND)":
s = 1j * 2 * pi * float(self.normalization_frequency)
else:
print("Don't know how to calculate normalization factor "
"for z-transform poles and zeros!")
return False
for p in self._poles:
for p in self.poles:
A0 *= (s - p)
for z in self._zeros:
for z in self.zeros:
A0 /= (s - z)

return abs(A0)
Expand Down Expand Up @@ -608,7 +609,7 @@ def get_response(self, frequencies, fast=True):
elif self.cf_transfer_function_type == "ANALOG (HERTZ)":
# XXX: Untested so far!
resp = scipy.signal.freqs(
b=self.numerator, a=[1.0], worN=resp_frequencies)[1]
b=self.numerator, a=[1.0], worN=frequencies)[1]
gain_freq_amp = np.abs(scipy.signal.freqs(
b=self.numerator, a=[1.0],
worN=[self.stage_gain_frequency])[1])
Expand All @@ -628,19 +629,9 @@ def get_response(self, frequencies, fast=True):
# frequency. I'm not sure this is entirely correct, as the digitizer
# will likely just apply the FIR filter and send the data along. But
# evalresp does this and thus so do we.
if self.cf_transfer_function_type != 'DIGITAL':
amp *= self.stage_gain / gain_freq_amp
amp *= self.stage_gain / gain_freq_amp

# If "fast", then interpolate the amplitude and phase onto the
# originally requested frequencies.
if len(frequencies) > 10000 and fast:
amp = scipy.interpolate.InterpolatedUnivariateSpline(
resp_frequencies, amp, k=2)(frequencies)
phase = scipy.interpolate.InterpolatedUnivariateSpline(
resp_frequencies, phase, k=2)(frequencies)
final_resp = np.zeros_like(frequencies) + 0j
else:
final_resp = np.empty_like(resp)
final_resp = np.empty_like(resp)
final_resp.real = amp * np.cos(phase)
final_resp.imag = amp * np.sin(phase)

Expand Down Expand Up @@ -1137,8 +1128,10 @@ def get_response(self, frequencies, output="velocity", start_stage=None,
else:
raise ValueError("Unknown output '%s'." % output)

apply_sens = False
if start_stage is None and end_stage is None:
apply_sens = True
# Convert to 0-based indexing.
# (End stage stays the same because it's the exclusive bound)
if start_stage is None:
start_stage = 0
else:
Expand Down
99 changes: 12 additions & 87 deletions obspy/core/tests/test_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ def test_get_response(self):

def test_get_response_regression(self):
units = ["DISP", "VEL", "ACC"]
filenames = ["AU.MEEK", "IRIS_single_channel_with_response",
"XM.05", "IU_ANMO_00_BHZ"]
filenames = ["IRIS_single_channel_with_response", "XM.05", "AU.MEEK"]

for filename in filenames:
xml_filename = os.path.join(self.data_dir,
Expand All @@ -123,94 +122,20 @@ def test_get_response_regression(self):

for unit in units:
# Full response.
evrs_resp = resp.get_evalresp_response_for_frequencies(
xml_resp = resp.get_evalresp_response_for_frequencies(
frequencies=freqs, output=unit)
new_resp = resp.get_response(
frequencies=freqs, output=unit)

np.testing.assert_allclose(np.abs(evrs_resp),
np.testing.assert_allclose(np.abs(xml_resp),
np.abs(new_resp), rtol=1E-5)
# Phase starts to differ slightly before Nyquist and quite a
# bit after. Evalresp appears to have some Gibb's artifacts
# and scipy's solution does look better.
np.testing.assert_allclose(
np.unwrap(np.angle(evrs_resp))[:800],
np.unwrap(np.angle(xml_resp))[:800],
np.unwrap(np.angle(new_resp))[:800],
rtol=1E-2, atol=2E-2)
print("passed:", xml_filename)

def test_get_response_per_stage(self):
filenames = ["AU.MEEK", "IU_ANMO_00_BHZ",
"IRIS_single_channel_with_response", "XM.05"]
units = ["DISP", "VEL", "ACC"]

for filename in filenames:
xml_filename = os.path.join(self.data_dir,
filename + os.path.extsep + "xml")
inv = read_inventory(xml_filename)
resp = inv[0][0][0].response
freqs = np.logspace(-2, 2, 1000)
print(xml_filename)
for unit in units:
for x in range(1, len(resp.response_stages)+1):
print("Type of stage ", str(x) +
":", type(resp.response_stages[x-1]))
xml_resp = resp.get_evalresp_response_for_frequencies(
frequencies=freqs, start_stage=x,
end_stage=x, output=unit)
new_resp = resp.get_response(
frequencies=freqs, start_stage=x,
end_stage=x, output=unit)

np.testing.assert_allclose(np.abs(xml_resp),
np.abs(new_resp), rtol=1E-5)
# Phase starts to differ slightly before
# Nyquist and quite a bit after. Evalresp
# appears to have some Gibb's artifacts and scipy's
# solution does look better.
np.testing.assert_allclose(
np.unwrap(np.angle(xml_resp))[:800],
np.unwrap(np.angle(new_resp))[:800],
rtol=1E-2, atol=2E-2)

print("Succeeded with case for stage no.", x,
"with units", unit)

def test_get_response_per_stage(self):
filenames = ["IRIS_single_channel_with_response", "XM.05", "IU_ANMO_00_BHZ", "AU.MEEK"]
units = ["DISP", "VEL", "ACC"]

for filename in filenames:
xml_filename = os.path.join(self.data_dir,
filename + os.path.extsep + "xml")
inv = read_inventory(xml_filename)
resp = inv[0][0][0].response
freqs = np.logspace(-2, 2, 1000)
print(xml_filename)
for unit in units:
for x in range(1, len(resp.response_stages)+1):
print("Type of stage ", str(x) +
":", type(resp.response_stages[x-1]))
xml_resp = resp.get_evalresp_response_for_frequencies(
frequencies=freqs, start_stage=x,
end_stage=x, output=unit)
new_resp = resp.get_response(
frequencies=freqs, start_stage=x,
end_stage=x, output=unit)

np.testing.assert_allclose(np.abs(xml_resp),
np.abs(new_resp), rtol=1E-5)
# Phase starts to differ slightly before
# Nyquist and quite a bit after. Evalresp
# appears to have some Gibb's artifacts and scipy's
# solution does look better.
np.testing.assert_allclose(
np.unwrap(np.angle(xml_resp))[:800],
np.unwrap(np.angle(new_resp))[:800],
rtol=1E-2, atol=2E-2)

print("Succeeded with case for stage no.", x,
"with units", unit)

def test_get_response_disp_vel_acc(self):
units = ["DISP", "VEL", "ACC"]
Expand All @@ -225,39 +150,39 @@ def test_get_response_disp_vel_acc(self):

for unit in units:
# Full response.
evrs_resp = resp.get_evalresp_response_for_frequencies(
xml_resp = resp.get_evalresp_response_for_frequencies(
frequencies=freqs, output=unit)
new_resp = resp.get_response(
frequencies=freqs, output=unit)

np.testing.assert_allclose(np.abs(evrs_resp),
np.testing.assert_allclose(np.abs(xml_resp),
np.abs(new_resp), rtol=1E-5)
# Phase starts to differ slightly before Nyquist and quite a bit
# after. Evalresp appears to have some Gibb's artifacts and
# scipy's solution does look better.
np.testing.assert_allclose(
np.unwrap(np.angle(evrs_resp))[:800],
np.unwrap(np.angle(xml_resp))[:800],
np.unwrap(np.angle(new_resp))[:800],
rtol=1E-2, atol=2E-2)

# import matplotlib.pyplot as plt
# plt.subplot(411)
# plt.semilogx(freqs, np.abs(evrs_resp), label="evalresp")
# plt.semilogx(freqs, np.abs(xml_resp), label="evalresp")
# plt.semilogx(freqs, np.abs(new_resp), label="scipy")
# plt.legend(loc=3)
# plt.subplot(412)
# plt.semilogx(freqs, np.abs(new_resp) - np.abs(evrs_resp))
# plt.semilogx(freqs, np.abs(new_resp) - np.abs(xml_resp))
# plt.subplot(413)
#
# new_phase = np.angle(new_resp)
# evrs_phase = np.angle(evrs_resp)
# xml_phase = np.angle(xml_resp)
#
# plt.semilogx(freqs, evrs_phase, label="evalresp")
# plt.semilogx(freqs, xml_phase, label="evalresp")
# plt.semilogx(freqs, new_phase, label="scipy")
# plt.legend(loc=3)
# plt.xlim(1E-2, 20.0)
# plt.subplot(414)
# plt.semilogx(freqs, evrs_phase - new_phase)
# plt.semilogx(freqs,xml_phase - new_phase)
# plt.show()

def test_evalresp_with_output_from_seed(self):
Expand Down
6 changes: 1 addition & 5 deletions obspy/signal/invsim.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,13 +371,9 @@ def paz_to_freq_resp(poles, zeros, scale_fac, t_samp=None, nfft=None,
:type t_samp: float
:param t_samp: Sampling interval in seconds
:type nfft: int
:param nfft: Number of FFT points of signal which needs correction.
If not specified, the length of the frequencies parameter will be used.
If specified, the value t_samp is required.
If the frequencies parameter is specified, both this and t_samp are ignored.
:param nfft: Number of FFT points of signal which needs correction
:type frequencies: list of float
:param frequencies: Discrete frequencies to get resp values for.
If nfft and t_samp are not specified, this value is required.
:type freq: bool
:param freq: If true, returns tuple of resp result with freq array input (i.e., x-values)
:rtype: :class:`numpy.ndarray` complex128
Expand Down

0 comments on commit 67c2e34

Please sign in to comment.