Skip to content

Commit

Permalink
Add segment compare functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
bradsease committed Jun 2, 2021
1 parent 5e8aff3 commit 2acabc1
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 5 deletions.
66 changes: 66 additions & 0 deletions oem/compare.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,79 @@
import numpy as np
import warnings

from oem.tools import epoch_span_overlap, epoch_span_contains, time_range


REFERENCE_FRAMES = {
"inertial": ["EME2000", "GCRF", "ICRF", "MCI", "TEME", "TOD"],
"rotating": ["GRC", "ITRF2000", "ITRF-93", "ITRF-97", "TDR"]
}


class SegmentCompare(object):
"""Comparison of two EphemerisSegment.
Input segments must have identical reference frames and central bodies.
All comparisons are calculated in the input segment reference frame.
Rotating reference frames are not supported for velocity-based or
RIC comparisons.
Attributes:
is_empty (bool): Flag indicating overlap between compared segments. Set
to True if there is no overlap.
"""

def __init__(self, origin, target):
"""Create a SegmentCompare.
Args:
origin (EphemerisSegment): Segment at the origin of the
compare frame.
target (EphemerisSegment): Segment to compare against the
origin state.
"""
if (origin.metadata["REF_FRAME"] == target.metadata["REF_FRAME"]
and origin.metadata["CENTER_NAME"]
== target.metadata["CENTER_NAME"]):
self._span = epoch_span_overlap(origin.span, target.span)
self._origin = origin
self._target = target
else:
raise ValueError(
"Incompatible states: frame or central body mismatch."
)

def __contains__(self, epoch):
return (
self._span is not None and epoch_span_contains(self._span, epoch)
)

def __call__(self, epoch):
if epoch not in self:
raise ValueError(f"Epoch {epoch} not contained in SegmentCompare.")
return self._target(epoch) - self._origin(epoch)

def steps(self, step_size):
"""Sample SegmentCompare at equal time intervals.
This method returns a generator producing state compares at equal time
intervals spanning the useable duration of the parent EphemerisSegment.
Args:
step_size (float): Sample step size in seconds.
Yields:
state_compare: Sampled StateCompare.
"""
for epoch in time_range(*self._span, step_size):
yield self(epoch)

@property
def is_empty(self):
return self._span is None


class StateCompare(object):
"""Comparison of two Cartesian states.
Expand Down
15 changes: 10 additions & 5 deletions oem/components/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

from oem import CURRENT_VERSION
from oem.base import ConstraintSpecification, Constraint
from oem.tools import require, time_range
from oem.tools import require, time_range, epoch_span_contains
from oem.components.metadata import MetaDataSection
from oem.components.data import DataSection
from oem.components.covariance import CovarianceSection
from oem.interp import EphemerisInterpolator
from oem.compare import SegmentCompare


class ConstrainEphemerisSegmentCovariance(Constraint):
Expand Down Expand Up @@ -86,10 +87,7 @@ def __call__(self, epoch):
return self._interpolator(epoch)

def __contains__(self, epoch):
return (
epoch >= self.useable_start_time and
epoch <= self.useable_stop_time
)
return epoch_span_contains(self.span, epoch)

def __iter__(self):
return iter(self.states)
Expand All @@ -102,6 +100,9 @@ def __eq__(self, other):
self._covariance_data == other._covariance_data
)

def __sub__(self, other):
return SegmentCompare(self, other)

def __repr__(self):
start = str(self.useable_start_time)
stop = str(self.useable_stop_time)
Expand Down Expand Up @@ -261,3 +262,7 @@ def useable_start_time(self):
def useable_stop_time(self):
"""Return epoch of end of useable state data range"""
return self.metadata.useable_stop_time

@property
def span(self):
return (self.useable_start_time, self.useable_stop_time)
34 changes: 34 additions & 0 deletions oem/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,37 @@ def regex_block_iter(pattern, contents):
yield match.groups()
else:
raise ValueError("Failed to parse complete file contents.")


def epoch_span_contains(span, epoch):
"""Determine if a given epoch falls within a given timespan.
Args:
span (tuple of Time): Pair of Time objects in increasing order.
epoch (Time): Epoch to compare with span.
Returns:
contains (bool): True if input epoch is in the input span, inclusive of
the endpoint epochs.
"""
return epoch >= span[0] and epoch <= span[1]


def epoch_span_overlap(span1, span2):
"""Find the overlap between two epoch spans.
Args:
span1 (tuple of Time): Range of epochs in increasing order.
span2 (tuple of Time): Range of epochs in increasing order.
Returns:
overlap_range (tuple of Time or None): Overlapping epoch range or None
if there is no overlap.
"""
max_start = max(span1[0], span2[0])
min_end = min(span1[1], span2[1])
if max_start < min_end:
overlap_range = (max_start, min_end)
else:
overlap_range = None
return overlap_range
24 changes: 24 additions & 0 deletions tests/test_compare.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import pytest

from astropy.time import Time
from pathlib import Path
from oem import OrbitEphemerisMessage
from oem.compare import StateCompare
from oem.components import State


SAMPLE_DIR = Path(__file__).parent / "samples"


def test_state_self_difference():
state = State(Time.now(), "ICRF", "EARTH", [1, 0, 0], [0, 1, 0], [0, 0, 1])
compare = state - state
Expand Down Expand Up @@ -46,3 +51,22 @@ def test_state_compare_epoch_mismatch():
target = State(Time.now(), "ICRF", "EARTH", [1, 0, 0], [0, 1, 0])
with pytest.raises(ValueError):
StateCompare(origin, target)


def test_segment_self_compare():
test_file_path = SAMPLE_DIR / "real" / "GEO_20s.oem"
segment = OrbitEphemerisMessage.open(test_file_path).segments[0]
compare = segment - segment
assert not compare.is_empty
for state_compare in compare.steps(600):
assert state_compare.range == 0 and state_compare.range_rate == 0


def test_segment_compare_mismatch():
test_file_path = SAMPLE_DIR / "real" / "GEO_20s.oem"
segment1 = OrbitEphemerisMessage.open(test_file_path).segments[0]
segment2 = segment1.copy()
_ = segment1 - segment2
segment2.metadata["CENTER_NAME"] = "MARS"
with pytest.raises(ValueError):
_ = segment1 - segment2

0 comments on commit 2acabc1

Please sign in to comment.