In [None]:
from eose.instruments import CircularGeometry, RectangularGeometry, BasicSensor

from pydantic import ValidationError

# Example usage of SphericalGeometry class used to define the field of view of the instrument

circ_geom = CircularGeometry(diameter=90.0)
print(circ_geom)

rect_geom = RectangularGeometry(angle_height=90.0, angle_width=45)
print(rect_geom)

In [None]:
# Example simple usage of eose-api BasicSensor class

sensor = BasicSensor(
    mass=100.5,
    volume=0.75,
    power=150.0,
    field_of_view=CircularGeometry(diameter=30.0),
    data_rate=10.5,
    bits_per_pixel=16,
)

print(sensor)

sensor = BasicSensor(
    mass=100.5,
    volume=0.75,
    power=150.0,
    orientation=list([0.5, 0.5, 0.5, 0.5]),
    field_of_view=RectangularGeometry(ngle_width=30.0, angle_height=10.0),
    data_rate=50.5,
    bits_per_pixel=8,
)

print(sensor)

sensor = BasicSensor(
    mass=100.5,
    volume=0.75,
    power=150.0,
    orientation=list([0.5, 0.5, 0.5, 0.5]),
    data_rate=50.5,
    bits_per_pixel=8,
)

print(sensor)

display(sensor.model_dump_json())

In [3]:
%reset -f

## Example usage with TAT-C and InstruPy

In [4]:
from joblib import Parallel, delayed

import json
from datetime import datetime, timedelta, timezone
from shapely.geometry import box, mapping
from scipy.stats import hmean
import pandas as pd

from astropy.time import Time as AstroPy_Time

from scipy.spatial.transform import Rotation as Scipy_Rotation

from pydantic import AwareDatetime

from tatc.schemas import (
    Instrument as TATC_Instrument,
    Satellite as TATC_Satellite,
    TwoLineElements,
    Point,
)
from tatc.analysis import collect_orbit_track, OrbitCoordinate, OrbitOutput
from tatc.analysis import (
    collect_multi_observations,
    aggregate_observations,
    reduce_observations,
)

from eose.propagation import (
    PropagationSample,
    PropagationRecord,
    PropagationRequest,
    PropagationResponse,
)
from eose.orbits import GeneralPerturbationsOrbitState, Propagator
from eose.satellites import Satellite
from eose.utils import CartesianReferenceFrame
from eose.targets import TargetPoint
from eose.geometry import Position

from eose.access import (
    AccessSample,
    AccessRecord,
    AccessRequest,
    AccessResponse,
)
from eose.grids import UniformAngularGrid


from instrupy.basic_sensor_model import BasicSensorModel as InstruPy_BasicSensorModel

from eose.instruments import CircularGeometry, BasicSensor
from eose.datametrics import (
    DataMetricsRequest,
    BasicSensorDataMetricsInstantaneous,
    DataMetricsSample,
    DataMetricsRecord,
    DataMetricsResponse,
)

pd.set_option("display.max_rows", None)

### Define mission parameters

In [5]:
# define the orbit and the instrument
iss_omm_str = '[{"OBJECT_NAME":"ISS (ZARYA)","OBJECT_ID":"1998-067A","EPOCH":"2024-06-07T09:53:34.728000","MEAN_MOTION":15.50975122,"ECCENTRICITY":0.0005669,"INCLINATION":51.6419,"RA_OF_ASC_NODE":3.7199,"ARG_OF_PERICENTER":284.672,"MEAN_ANOMALY":139.0837,"EPHEMERIS_TYPE":0,"CLASSIFICATION_TYPE":"U","NORAD_CAT_ID":25544,"ELEMENT_SET_NO":999,"REV_AT_EPOCH":45703,"BSTAR":0.00033759,"MEAN_MOTION_DOT":0.00019541,"MEAN_MOTION_DDOT":0}]'
iss_omm = json.loads(iss_omm_str)[0]

basic_sensor = BasicSensor(
    id="Atom",
    mass=100.5,
    volume=0.75,
    power=150.0,
    field_of_view=CircularGeometry(diameter=60.0),
    data_rate=10.5,
    bits_per_pixel=16,
)

satellites = [
    Satellite(
        id="ISS",
        orbit=GeneralPerturbationsOrbitState.from_omm(iss_omm),
        payloads=[basic_sensor],
    )
]

targets = UniformAngularGrid(
    delta_latitude=20, delta_longitude=20, region=mapping(box(-180, -50, 180, 50))
).as_targets()

mission_start = datetime(2024, 1, 1, tzinfo=timezone.utc)
mission_duration = timedelta(hours=2)
propagate_time_step = timedelta(minutes=0.5)

### Run propagation with TAT-C to get satellite states

In [None]:
def propagate_tatc(request: PropagationRequest) -> PropagationResponse:
    if request.propagator != Propagator.SGP4:
        raise RuntimeError("TAT-C only supports SGP4 propagator.")
    orbit_tracks = Parallel(-1)(
        delayed(collect_orbit_track)(
            TATC_Satellite(
                name=satellite.id,
                orbit=TwoLineElements(tle=satellite.orbit.to_tle()),
            ),
            TATC_Instrument(name="Instrument"),  # dummy entry
            pd.date_range(
                request.start,
                request.start + request.duration,
                freq=request.time_step,
            ),
            coordinates=(
                OrbitCoordinate.ECI
                if request.frame == CartesianReferenceFrame.ICRF
                else OrbitCoordinate.ECEF
            ),
            orbit_output=OrbitOutput.POSITION_VELOCITY,
        )
        for satellite in request.satellites
    )
    return PropagationResponse(
        **request.model_dump(exclude="satellite_records"),
        satellite_records=[
            PropagationRecord(
                satellite_id=satellite.id,
                samples=orbit_tracks[i].apply(
                    lambda r: PropagationSample(
                        time=r.time,
                        frame=request.frame,
                        position=r.geometry.coords[0],
                        velocity=r.velocity.coords[0],
                    ),
                    axis=1,
                ),
            )
            for i, satellite in enumerate(request.satellites)
        ],
    )


request = PropagationRequest(
    satellites=satellites,
    start=mission_start,
    duration=mission_duration,
    time_step=propagate_time_step,
    frame=CartesianReferenceFrame.ICRF,
    propagator=Propagator.SGP4,
)

display(request.model_dump_json())

propagation_response = propagate_tatc(request)

# display(propagation_response.model_dump_json())

propagation_data = propagation_response.as_dataframe()

# display(propagation_data)

### Run access calculations with TAT-C to get access-periods at Target ground points. It is assumed that the sensor shall be of Circular FOV geometry. 
Coverage analysis is not required.

In [None]:
def access_tatc(request: AccessRequest) -> AccessResponse:
    if request.propagator != Propagator.SGP4:
        raise RuntimeError("TAT-C only supports SGP4 propagator.")
    satellites = [
        TATC_Satellite(
            name=satellite.id,
            orbit=TwoLineElements(tle=satellite.orbit.to_tle()),
            instruments=[
                TATC_Instrument(
                    name=str(payload.id), field_of_regard=payload.field_of_view.diameter
                )  # assumed that the instrument has a circular FOV
            ],
        )
        for satellite in request.satellites
        for payload in satellite.payloads
        if payload.id in request.payload_ids
    ]
    observations = Parallel(-1)(
        delayed(collect_multi_observations)(
            Point(
                id=i,
                longitude=target.position[0],
                latitude=target.position[1],
                altitude=(target.position[2] if len(target.position) > 2 else 0),
            ),
            satellites,
            request.start,
            request.start + request.duration,
        )
        for i, target in enumerate(request.targets)
    )
    return AccessResponse(
        **request.model_dump(exclude="target_records"),
        target_records=[
            (
                AccessRecord(target_id=target.id)
                if observations[i].empty
                else AccessRecord(
                    target_id=target.id,
                    samples=observations[i].apply(
                        lambda s: AccessSample(
                            start=s.start,
                            duration=s.end - s.start,
                            satellite_id=s.satellite,
                            instrument_id=s.instrument,
                        ),
                        axis=1,
                    ),
                )
            )
            for i, target in enumerate(request.targets)
        ],
    )


request = AccessRequest(
    satellites=satellites,
    targets=targets,
    start=mission_start,
    duration=mission_duration,
    propagator=Propagator.SGP4,
    payload_ids=["Atom"],
)

display(request.model_dump_json())

access_response = access_tatc(request)

# display(access_response.model_dump_json())

access_data = access_response.as_dataframe()

display(access_data)

### Run Data Metrics calculation with InstruPy

In [8]:
def get_instantaneous_data_metrics_object(instru_type, time_instant, data_metrics):
    """
    Create an instantaneous data metrics object for the specified sensor type.

    :param instru_type: The type of the instrument (e.g., "BasicSensor").
    :paramtype instru_type: str
    :param time_instant: The time at which the data metrics are recorded.
    :paramtype time_instant: AwareDatetime
    :param data_metrics: A dictionary containing data metrics such as incidence angle, look angle, etc.
    :paramtype data_metrics: dict
    :return: An instance of `BasicSensorDataMetricsInstantaneous` with the provided metrics.
    :rtype: BasicSensorDataMetricsInstantaneous
    :raises ValueError: If the sensor type is not supported.
    """
    if instru_type == "BasicSensor":
        return BasicSensorDataMetricsInstantaneous(
            time=time_instant,
            incidence_angle=data_metrics["incidence angle [deg]"],
            look_angle=data_metrics["look angle [deg]"],
            observation_range=data_metrics["observation range [km]"],
            solar_zenith=data_metrics["solar zenith [deg]"],
        )
    else:
        raise ValueError(
            f"{instru_type} instrument type is not supported. Only 'BasicSensor' instrument type is supported."
        )


def data_metrics_instrupy(request):
    """
    Calculate data metrics using the provided request details and return the results.

    :param request: A `DataMetricsRequest` object containing target records, satellites, satellite records, and other metadata.
    :paramtype request: DataMetricsRequest
    :return: A `DataMetricsResponse` object containing the calculated data metrics for the requested targets and instruments.
    :rtype: DataMetricsResponse
    :raises ValueError: If the sensor type is not supported.
    """

    def get_propagation_record(propagation_records, satellite_id):
        """
        Find the PropagationRecord corresponding to the satellite-id.

        :param propagation_records: List of satellite propagation records.
        :paramtype propagation_records: list[PropagationRecord]
        :param satellite_id: The unique identifier of the satellite.
        :paramtype satellite_id: str
        :return: The matching PropagationRecord object.
        :rtype: PropagationRecord
        :raises RuntimeError: If the propagation record for the requested satellite-id is not found.
        """
        for record in propagation_records:
            if record.satellite_id == satellite_id:
                return record
        raise RuntimeError(
            "Propagation record for requested satellite-id was not found."
        )

    def get_instrument_model(satellites, satellite_id, instrument_id):
        """
        Find the instrument model corresponding to the instrument-id among the list of instruments in the satellite.

        :param satellites: List of satellite objects.
        :paramtype satellites: list[Satellite]
        :param satellite_id: The unique identifier of the satellite.
        :paramtype satellite_id: str
        :param instrument_id: The identifier of the instrument. The identifier of the instrument. The identifier needs to be unique within the list of instrument ids for a given satellite object.
        :paramtype instrument_id: str
        :return: The instrument type and corresponding sensor model.
        :rtype: tuple[str, InstruPy_BasicSensorModel]
        :raises ValueError: If the instrument type is unsupported.
        :raises RuntimeError: If the instrument model is not found within the specified satellite.
        """
        for sat in satellites:
            if sat.id == satellite_id:
                for instru in sat.payloads:
                    if instru.id == instrument_id:
                        instru_type = instru.type
                        if instru_type == "BasicSensor":
                            if instru.field_of_view.type == "CircularGeometry":
                                instrupy_fov_geom = {
                                    "shape": "CIRCULAR",
                                    "diameter": instru.field_of_view.diameter,
                                }
                            elif instru.field_of_view.type == "RectangularGeometry":
                                instrupy_fov_geom = {
                                    "shape": "RECTANGULAR",
                                    "angleHeight": instru.field_of_view.angle_height,
                                    "angleWidth": instru.field_of_view.angle_width,
                                }
                            else:
                                raise ValueError(
                                    f"Only Circular and Rectangular geometries are supported and not {instru.field_of_view.type}"
                                )

                            # Convert orientation in Quaternion to Euler rotations
                            r = Scipy_Rotation.from_quat(list(instru.orientation))
                            (x, y, z) = r.as_euler(
                                "XYZ", degrees=True
                            )  # Conventions 'XYZ' are for intrinsic rotations (used by OrbitPy), while 'xyz' are for extrinsic rotations.

                            instrupy_sensor = InstruPy_BasicSensorModel.from_dict(
                                {
                                    "@type": "Basic Sensor",
                                    "orientation": {
                                        "referenceFrame": "SC_BODY_FIXED",
                                        "convention": "REF_FRAME_ALIGNED",
                                    },
                                    "fieldOfViewGeometry": instrupy_fov_geom,
                                    "orientation": {
                                        "referenceFrame": "NADIR_POINTING",
                                        "convention": "XYZ",
                                        "xRotation": x,
                                        "yRotation": y,
                                        "zRotation": z,
                                    },
                                    "@id": instru.id,
                                }
                            )
                            return instru_type, instrupy_sensor
                        else:
                            raise ValueError(
                                f"{instru_type} instrument type is not supported. Only 'BasicSensor' instrument type is supported."
                            )
        raise RuntimeError(
            "Instrument model for the requested instrument-id (within the specified satellite_id) was not found."
        )

    def propagation_samples_within_time_range(
        propagation_samples, start_time, stop_time
    ):
        """
        Return propagation records within the specified time range.

        :param propagation_samples: List of propagation samples containing satellite states.
        :paramtype propagation_samples: list[PropagationSample]
        :param start_time: The start time of the range.
        :paramtype start_time: AwareDatetime
        :param stop_time: The stop time of the range.
        :paramtype stop_time: AwareDatetime
        :return: A list of filtered propagation records within the time range.
        :rtype: list[PropagationSample]
        """
        return [
            prop_record
            for prop_record in propagation_samples
            if start_time <= prop_record.time <= stop_time
        ]

    def get_target_position(target_id, all_targets):
        """
        Get the position of the target by its ID.

        :param target_id: The unique identifier of the target.
        :paramtype target_id: str
        :param all_targets: List of all targets.
        :paramtype all_targets: list[TargetPoint]
        :return: The position of the target as a tuple of (longitude, latitude).
        :rtype: Position
        :raises RuntimeError: If the target is not found.
        """
        for target in all_targets:
            if target.id == target_id:
                return target.position
        raise RuntimeError(f"Target with id {target_id} was not found.")

    # Main processing
    access_target_records = request.target_records
    dm_req_start = request.start
    dm_req_duration = request.duration

    dm_record = (
        []
    )  # Aggregation of data-metrics across multiple targets and multiple overpasses for each target.
    for access_record in access_target_records:
        target_position = get_target_position(access_record.target_id, request.targets)

        dm_sample = (
            []
        )  # Aggregation of data-metrics over multiple overpasses for the target
        for access_sample in access_record.samples:
            _sat_id = access_sample.satellite_id
            _instru_id = access_sample.instrument_id
            _start = max(dm_req_start, access_sample.start)
            _stop = min(
                dm_req_start + dm_req_duration,
                access_sample.start + access_sample.duration,
            )

            propagation_record = get_propagation_record(
                request.satellite_records, _sat_id
            )
            pr = propagation_samples_within_time_range(
                propagation_record.samples, _start, _stop
            )

            instru_type, instrupy_sensor = get_instrument_model(
                request.satellites, _sat_id, _instru_id
            )

            dm_inst = (
                []
            )  # Aggregation of data-metrics over a single overpass (=coverage sample) for the target
            if pr:
                for _pr in pr:
                    time_utc = AstroPy_Time(
                        _pr.time.astimezone(timezone.utc), scale="utc"
                    )
                    time_ut1 = time_utc.ut1
                    jd_ut1 = time_ut1.jd

                    SpacecraftOrbitState = {
                        "time [JDUT1]": jd_ut1,
                        "x [km]": _pr.position[0] * 1e-3,
                        "y [km]": _pr.position[1] * 1e-3,
                        "z [km]": _pr.position[2] * 1e-3,
                        "vx [km/s]": _pr.velocity[0] * 1e-3,
                        "vy [km/s]": _pr.velocity[1] * 1e-3,
                        "vz [km/s]": _pr.velocity[2] * 1e-3,
                    }
                    TargetCoords = {
                        "lat [deg]": target_position[1],
                        "lon [deg]": target_position[0],
                    }
                    data_metrics = instrupy_sensor.calc_data_metrics(
                        SpacecraftOrbitState, TargetCoords
                    )

                    _dm_inst = get_instantaneous_data_metrics_object(
                        instru_type, _pr.time, data_metrics
                    )
                    dm_inst.append(_dm_inst)

            _dm_sample = DataMetricsSample(
                **access_sample.model_dump(exclude="instantaneous_metrics"),
                instantaneous_metrics=dm_inst,
            )
            dm_sample.append(_dm_sample)

        _dm_record = DataMetricsRecord(
            **access_record.model_dump(exclude="samples"), samples=dm_sample
        )
        dm_record.append(_dm_record)

    data_metrics_response = DataMetricsResponse(
        **request.model_dump(exclude="target_records"), target_records=dm_record
    )
    return data_metrics_response

In [None]:
request = DataMetricsRequest(
    start=mission_start,
    duration=mission_duration,
    **access_response.model_dump(exclude=["start", "duration", "propagator"]),
    **propagation_response.model_dump(
        exclude=["start", "duration", "satellites", "time_step"]
    ),
)

print("Data metrics request")
display(request.model_dump_json())

data_metrics_response = data_metrics_instrupy(request)

print("Data metrics response")
display(data_metrics_response.model_dump_json())

In [None]:
display(data_metrics_response.target_records[4].model_dump_json())

In [None]:
display(data_metrics_response.target_records[33].model_dump_json())