Skip to content

Commit

Permalink
Add code from PR obspy#1718 to remove evalresp dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
amkearns-usgs committed Mar 16, 2020
1 parent 219b24d commit a5028e2
Show file tree
Hide file tree
Showing 3 changed files with 411 additions and 7 deletions.
252 changes: 250 additions & 2 deletions obspy/core/inventory/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
FloatWithUncertainties,
FloatWithUncertaintiesAndUnit,
ObsPyException,
ZeroSamplingRate)
ZeroSamplingRate,
Enum)

from .util import Angle, Frequency

Expand Down Expand Up @@ -215,7 +216,7 @@ def __init__(self, stage_sequence_number, stage_gain,
stage_gain_frequency, input_units, output_units,
pz_transfer_function_type,
normalization_frequency, zeros, poles,
normalization_factor=1.0, resource_id=None, resource_id2=None,
normalization_factor=None, resource_id=None, resource_id2=None,
name=None, input_units_description=None,
output_units_description=None, description=None,
decimation_input_sample_rate=None, decimation_factor=None,
Expand All @@ -225,6 +226,8 @@ def __init__(self, stage_sequence_number, stage_gain,
# handled by properties.
self.pz_transfer_function_type = pz_transfer_function_type
self.normalization_frequency = normalization_frequency
if normalization_factor is None:
normalization_factor = self.calc_normalization_factor()
self.normalization_factor = float(normalization_factor)
self.zeros = zeros
self.poles = poles
Expand Down Expand Up @@ -324,6 +327,54 @@ def pz_transfer_function_type(self, value):
else:
raise ValueError(msg)

def get_response(self, frequencies):
"""
Produce the response curve from this stage's data for a given
range of frequencies
:param frequencies: Frequency range to get resp curve over
:return: The curve describing this response stage
"""
# Has to be imported here for now to avoid circular imports.
from obspy.signal.invsim import paz_to_freq_resp
return paz_to_freq_resp(
poles=np.array(self.poles, dtype=np.complex128),
zeros=np.array(self.zeros, dtype=np.complex128),
scale_fac=self.normalization_factor,
frequencies=frequencies, freq=False) * self.stage_gain

def calc_normalization_factor(self):
"""
Calculate the normalization factor for given poles-zeros
The norm factor A0 is calculated such that
sequence_product_over_n(s - zero_n)
A0 * abs(------------------------------------------) === 1
sequence_product_over_m(s - pole_m)
for s_f=i*2pi*f if the transfer function is in radians
i*f if the transfer funtion is in Hertz
:return: the value A0
"""
# code for this method provided by @Wayne_Crawford
if not self.normalization_frequency:
return None

A0 = 1.0 + (1j * 0.0)
# TODO: ensure that this coercion to float is valid
if self.transfer_function_type == "LAPLACE (HERTZ)":
s = 1j * float(self.normalization_frequency)
elif self.transfer_function_type == "LAPLACE (RADIANS/SECOND)":
s = 1j * 2 * pi * float(self.normalization_frequency)
else:
print("Don't know how to calculate normalization factor "
"for z-transform poles and zeros!")
return False
for p in self.poles:
A0 *= (s - p)
for z in self.zeros:
A0 /= (s - z)

return abs(A0)


class CoefficientsTypeResponseStage(ResponseStage):
"""
Expand Down Expand Up @@ -462,6 +513,67 @@ def cf_transfer_function_type(self, value):
else:
raise ValueError(msg)

def get_response(self, frequencies):
"""
Produce the response curve from this coefficient response stage for a range
of frequencies
:param frequencies: Frequency range to get resp curve over
:return: The curve describing this response stage
"""
# Decimation blockette, e.g. gain only!
if not len(self.numerator):
return np.ones_like(frequencies) * self.stage_gain

# This is effectively blockette 54. According to the SEED manual
# this should never have a denominator. Let's see if this is
# actually the case.
assert not self.denominator

sr = self.decimation_input_sample_rate
frequencies = frequencies / sr * np.pi * 2.0

if self.cf_transfer_function_type == "DIGITAL":
resp = scipy.signal.freqz(
b=self.numerator, a=[1.0], worN=frequencies)[1]
gain_freq_amp = np.abs(scipy.signal.freqz(
b=self.numerator, a=[1.0],
worN=[self.stage_gain_frequency])[1])
elif self.cf_transfer_function_type == "ANALOG (RADIANS/SECOND)":
# XXX: Untested so far!
resp = scipy.signal.freqs(
b=self.numerator, a=[1.0], worN=frequencies / (np.pi * 2.0))[1]
gain_freq_amp = np.abs(scipy.signal.freqs(
b=self.numerator, a=[1.0],
worN=[self.stage_gain_frequency / (np.pi * 2.0)])[1])
elif self.cf_transfer_function_type == "ANALOG (HERTZ)":
# XXX: Untested so far!
resp = scipy.signal.freqs(
b=self.numerator, a=[1.0], worN=frequencies)[1]
gain_freq_amp = np.abs(scipy.signal.freqs(
b=self.numerator, a=[1.0],
worN=[self.stage_gain_frequency])[1])
# Cannot happen as caught in the setter.
else: # pragma: no cover
raise NotImplementedError

resp = resp.conjugate()
# evalresp is a bit funny in how it defines the phase. Here we make
# sure that the phase returned is equivalent to evalpres.
amp = np.abs(resp)
phase = np.radians(np.unwrap(np.angle(resp, deg=False))) / np.pi

# Normalize the amplitude with the given sensitivity value and
# frequency. I'm not sure this is entirely correct, as the digitizer
# will likely just apply the FIR filter and send the data along. But
# evalresp does this and thus so do we.
amp *= self.stage_gain / gain_freq_amp

final_resp = np.empty_like(resp)
final_resp.real = amp * np.cos(phase)
final_resp.imag = amp * np.sin(phase)

return final_resp


class ResponseListResponseStage(ResponseStage):
"""
Expand Down Expand Up @@ -746,6 +858,10 @@ class Response(ComparingObject):
"""
The root response object.
"""
# The various types of units.
core_unit_enum = Enum(["displacement", "velocity", "acceleration",
"volts", "counts", "tesla", "pressure"])

def __init__(self, resource_id=None, instrument_sensitivity=None,
instrument_polynomial=None, response_stages=None):
"""
Expand Down Expand Up @@ -783,6 +899,138 @@ def __init__(self, resource_id=None, instrument_sensitivity=None,
msg = "response_stages must be an iterable."
raise ValueError(msg)

def _get_scale_factor(self, unit):
"""
Get the scale factor to go back to SI units.
:type unit: str
:param unit: The name of the unit.
"""
unit = unit.upper()
if unit in ["CM/S**2", "CM/S", "CM/SEC", "CM"]:
return 1.0E2
elif unit in ["MM/S**2", "MM/S", "MM/SEC", "MM"]:
return 1.0E3
elif unit in ["NM/S**2", "NM/S", "NM/SEC", "NM"]:
return 1.0E9
else:
return 1.0

def _get_unit_type(self, unit):
"""
Get the type of unit.
:type unit: str
:param unit: The name of the unit.
"""
unit = unit.upper()
if unit in ("M", "NM", "CM", "MM"):
return self.core_unit_enum.displacement
elif unit in ("M/S", "M/SEC", "NM/S", "NM/SEC", "CM/S", "CM/SEC",
"MM/S", "MM/SEC"):
return self.core_unit_enum.velocity
elif unit in ("M/S**2", "M/(S**2)", "M/SEC**2", "M/(SEC**2)",
"NM/S**2", "NM/(S**2)", "NM/SEC**2", "NM/(SEC**2)",
"CM/S**2", "CM/(S**2)", "CM/SEC**2", "CM/(SEC**2)",
"MM/S**2", "MM/(S**2)", "MM/SEC**2", "MM/(SEC**2)"):
return self.core_unit_enum.acceleration
elif unit in ("V", "VOLT", "VOLTS",
# This is weird, but evalresp appears to do the same.
"V/M"):
return self.core_unit_enum.volts
elif unit in ("COUNT", "COUNTS"):
return self.core_unit_enum.counts
elif unit in ("T", "TESLA"):
return self.core_unit_enum.tesla
elif unit in ("PA", "MBAR"):
return self.core_unit_enum.pressure
else:
raise ValueError("Unknown unit '%s'." % unit)

def get_response(self, frequencies, output="velocity", start_stage=None,
end_stage=None):
"""
Returns the frequency response for given frequencies.
:type frequencies: list of float
:param frequencies: Discrete frequencies to calculate response for.
:type output: str
:param output: Output units. One of:
``"displacement", "d", "DISP"``
displacement, output unit is meters
``"velocity", "v", "VEL"``
velocity, output unit is meters/second
``"acceleration", "a", "ACC"``
acceleration, output unit is meters/second**2
:type start_stage: int, optional
:param start_stage: Stage sequence number of first stage that will be
used (disregarding all earlier stages).
:type end_stage: int, optional
:param end_stage: Stage sequence number of last stage that will be
used (disregarding all later stages).
:rtype: :class:`numpy.ndarray`
:returns: frequency response at requested frequencies
"""
# Map pretty unit strings to unit type enums.
if output.lower() in ("d", "disp", "displacment"):
output = self.core_unit_enum.displacement
elif output.lower() in ("v", "vel", "velocity"):
output = self.core_unit_enum.velocity
elif output.lower() in ("a", "acc", "acceleration"):
output = self.core_unit_enum.acceleration
else:
raise ValueError("Unknown output '%s'." % output)

apply_sens = False
if start_stage is None and end_stage is None:
apply_sens = True
# Convert to 0-based indexing.
if start_stage is None:
start_stage = 0
else:
start_stage -= 1

stages = self.response_stages[slice(start_stage, end_stage)]
resp = stages.pop(0).get_response(frequencies=frequencies)
for stage in stages[1:]:
resp *= stage.get_response(frequencies=frequencies)

# For the scaling - run the whole chain once again with the
# reference frequency.
if start_stage == 0 and end_stage is None:
f = np.array([self.instrument_sensitivity.frequency])
stages = self.response_stages[slice(start_stage, end_stage)]
ref = stages.pop(0).get_response(frequencies=f)
for stage in stages[1:]:
ref *= stage.get_response(frequencies=f)
resp *= self.instrument_sensitivity.value / np.abs(ref[0])

# By now the response is in the input units of the first stage.
diff_and_int_map = {
self.core_unit_enum.displacement: 0,
self.core_unit_enum.velocity: 1,
self.core_unit_enum.acceleration: 2}
unit_type = self._get_unit_type(self.response_stages[0].input_units)
if unit_type not in diff_and_int_map:
raise ValueError("Cannot convert %s to %s." % (unit_type, output))

# Figure out required integration or differentiation.
w = 2.0 * np.pi * frequencies * 1j
diff_or_int = diff_and_int_map[unit_type] - diff_and_int_map[output]
while diff_or_int:
if diff_or_int < 0:
resp /= w
diff_or_int += 1
elif diff_or_int > 0:
resp *= w
diff_or_int -= 1
else: # pragma: no cover
raise NotImplementedError

scale_factor = self._get_scale_factor(
self.response_stages[0].input_units)
if scale_factor != 1.0:
resp *= scale_factor

return resp

def _attempt_to_fix_units(self):
"""
Internal helper function that will add units to gain only stages based
Expand Down

0 comments on commit a5028e2

Please sign in to comment.