Getting and reading CCSDS OEM ephemerides for the ISS using Orekit.

In [None]:
import orekit
if orekit.getVMEnv() is None:
    orekit.initVM()

import os
from orekit.pyhelpers import download_orekit_data_curdir, setup_orekit_curdir
if not os.path.exists('orekit-data.zip'):
    download_orekit_data_curdir()
setup_orekit_curdir()

In [None]:
from datetime import datetime, timedelta
date = datetime(2021, 4, 3, 17, 4, 29)
duration_s = 3600.0  # seconds, minimum duration starting from date that we want to have in the OEM file

In [None]:
directory_before_2021 = 'ISS.OEM_J2K_EPH'
directory_from_2021 = 'ISS_OEM'

Searching for the best OEM file for the given date. This part only works from October 2020. If ephemeris files are already available on the local computer, then this block is not needed anymore, just use `oem_file = OEMParser().parse(filename)`.

In [None]:
from org.orekit.frames import FramesFactory, ITRFVersion
from org.orekit.utils import IERSConventions
tod = FramesFactory.getTOD(IERSConventions.IERS_2010, False) # Taking tidal effects into account when interpolating EOP parameters
eme2000 = FramesFactory.getEME2000()
gcrf = FramesFactory.getGCRF()
itrf = FramesFactory.getITRF(IERSConventions.IERS_2010, False)

from org.orekit.models.earth import ReferenceEllipsoid
wgs84Ellipsoid = ReferenceEllipsoid.getWgs84(itrf)

from org.orekit.time import TimeScalesFactory
utc = TimeScalesFactory.getUTC()

In [None]:
import urllib.request
from orekit.pyhelpers import absolutedate_to_datetime, datetime_to_absolutedate
from org.orekit.files.ccsds.ndm import ParserBuilder
from org.orekit.data import DataSource 

date_to_try = date
error = True

#while error:
 #   try:
filename = f'ISS.OEM_J2K_EPH_{date_to_try:%Y-%m-%d}.txt'

if date_to_try.year < 2021:
    directory = directory_before_2021
else:
    directory = directory_from_2021

# Downloading CCSDS OEM file        
target_url = f'https://nasa-public-data.s3.amazonaws.com/iss-coords/{date_to_try:%Y-%m-%d}/{directory}/ISS.OEM_J2K_EPH.txt'
urllib.request.urlretrieve(target_url, filename)
print(f'Successfully downloaded CCSDS OEM for date {date_to_try:%Y-%m-%d}')

# Replacing wrong keyword USABLE_START_TIME to USEABLE_START_TIME in OEM files prior to August 2021...
with open(filename) as f:
    newText=f.read().replace('USABLE', 'USEABLE')
with open(filename, "w") as f:
    f.write(newText)

# Parsing the OEM file using Orekit
oem_file = ParserBuilder().buildOemParser().parse(DataSource(filename))
# Reading the first ephemeris block in the CCSDS OEM file. 
# We assume that the ISS OEMs always contain only one block, which seems to be always the case.
ephem_first_block = oem_file.getEphemeridesBlocks().get(0)

# Checking if the OEM file contains data for the desired time range
if absolutedate_to_datetime(ephem_first_block.getStartTime()) > date:
    print('The current OEM file starts after the desired date, searching another file at an earlier date')
    date_to_try = date_to_try + timedelta(days=-1)
elif absolutedate_to_datetime(ephem_first_block.getStopTime()) < (date + timedelta(seconds=duration_s)):
    print('Something is wrong, the OEM file (normally 15 days long) stops before the desired time range')      
    break
else:
    print('The current OEM file contains data at the date we want')
    error = False
        
   # except:
    #    print(f'Date {date_to_try:%Y-%m-%d} has no available data, retrying an earlier date')
     #   date_to_try = date_to_try + timedelta(days=-1)

In [None]:
from org.orekit.files.general import EphemerisFile
from orekit.pyhelpers import datetime_to_absolutedate
# Must cast to the EphemerisFile.EphemerisSegment interface because of the limitations of the Orekit Python wrapper
bounded_propagator = EphemerisFile.EphemerisSegment.cast_(ephem_first_block).getPropagator()
date_min = bounded_propagator.getMinDate()
date_max = bounded_propagator.getMaxDate()
date_end = date + timedelta(seconds=86400)
print(f'File contains data from {date_min} to {date_max}')
print(f'We will process data from {datetime_to_absolutedate(date)} to {datetime_to_absolutedate(date_end)}')

In [None]:
bounded_propagator.getFrame()

In [None]:
import pandas as pd
ground_stations_file = 'ground-stations.csv'
ground_station_df = pd.read_csv(ground_stations_file, index_col=0)

display(ground_station_df)

In [None]:
import numpy as np

sigma_position = 1.0  # Position noise in meters
sigma_velocity = 1e-3 # Velocity noise in meters per second
step_pv = 60.0  # Step time in seconds for output PV data

sigma_range = 1.0  # Range noise in meters
sigma_range_rate = 1e-3  # Range rate noise in meters per second
sigma_az = float(np.deg2rad(0.01))  # Azimuth noise in radians
sigma_el = float(np.deg2rad(0.01))  # Elevation noise in radians
sigma_ra = float(np.deg2rad(0.01))  # Right ascension noise in radians
sigma_dec = float(np.deg2rad(0.01))  # Declination noise in radians
step_ground_station_measurements = 10.0  # When a ground station is in view, take measurements every 10 seconds

In [None]:
from org.orekit.bodies import GeodeticPoint
from org.orekit.frames import TopocentricFrame
from org.orekit.propagation.events import ElevationDetector
from org.orekit.estimation.measurements import GroundStation, Range, AngularAzEl, ObservableSatellite
from org.orekit.estimation.measurements.generation import PVBuilder, RangeBuilder, RangeRateBuilder, AngularAzElBuilder, AngularRaDecBuilder, EventBasedScheduler, SignSemantic, Generator, ContinuousScheduler
from org.orekit.time import FixedStepSelector
from org.orekit.estimation.measurements import ObservableSatellite
from org.orekit.propagation.events.handlers import ContinueOnEvent

observableSatellite = ObservableSatellite(0) # Propagator index = 0

meas_generator = Generator()
meas_generator.addPropagator(bounded_propagator)

# Add PV builder
meas_generator.addScheduler(
    ContinuousScheduler(
        PVBuilder(None,  # no noise
                  sigma_position,
                  sigma_velocity,
                  1.0,  # Base weight
                  observableSatellite), 
        FixedStepSelector(step_pv, utc))
)

for gs_name, gs_data in ground_station_df.iterrows():
    geodetic_point = GeodeticPoint(float(np.deg2rad(gs_data['latitude_deg'])),
                                   float(np.deg2rad(gs_data['longitude_deg'])),
                                   float(gs_data['altitude']))
    topocentric_frame = TopocentricFrame(wgs84Ellipsoid, geodetic_point, gs_name)
    ground_station_df.loc[gs_name, 'GroundStation'] = GroundStation(topocentric_frame)

    # Range builder
    meas_generator.addScheduler(
        EventBasedScheduler(
            RangeBuilder(None, 
                         GroundStation(topocentric_frame), 
                         False,  # one-way, this is important for telescope observations
                         sigma_range, 
                         1.0,  # Base weight
                         observableSatellite), 
            FixedStepSelector(step_ground_station_measurements, utc), 
            bounded_propagator, 
            ElevationDetector(topocentric_frame).withHandler(ContinueOnEvent()), 
            SignSemantic.FEASIBLE_MEASUREMENT_WHEN_POSITIVE)
    )

    # Range rate builder
    meas_generator.addScheduler(
        EventBasedScheduler(
            RangeRateBuilder(None,  # no noise
                             GroundStation(topocentric_frame), 
                             True,  # two-way
                             sigma_range_rate, 
                             1.0,  # Base weight
                             observableSatellite), 
            FixedStepSelector(step_ground_station_measurements, utc), 
            bounded_propagator, 
            ElevationDetector(topocentric_frame).withHandler(ContinueOnEvent()), 
            SignSemantic.FEASIBLE_MEASUREMENT_WHEN_POSITIVE)
    )

    # Az/el builder
    meas_generator.addScheduler(
        EventBasedScheduler(
            AngularAzElBuilder(None,  # no noise
                               GroundStation(topocentric_frame),
                               [sigma_az, sigma_el], 
                               [1.0, 1.0],  # Base weight
                               observableSatellite), 
            FixedStepSelector(step_ground_station_measurements, utc), 
            bounded_propagator, 
            ElevationDetector(topocentric_frame).withHandler(ContinueOnEvent()), 
            SignSemantic.FEASIBLE_MEASUREMENT_WHEN_POSITIVE)
    )

    # RA/DEC builder
    meas_generator.addScheduler(
        EventBasedScheduler(
            AngularRaDecBuilder(None,  # no noise
                               GroundStation(topocentric_frame),
                               eme2000,  # RA/DEC measurements are defined in Earth-centered inertial frame
                               [sigma_ra, sigma_dec], 
                               [1.0, 1.0],  # Base weight
                               observableSatellite), 
            FixedStepSelector(step_ground_station_measurements, utc), 
            bounded_propagator, 
            ElevationDetector(topocentric_frame).withHandler(ContinueOnEvent()), 
            SignSemantic.FEASIBLE_MEASUREMENT_WHEN_POSITIVE)
    )

Propagating, retrieving the generated measurements.

Warning: the cell below cannot be run a second time without running the cell above again before. Otherwise the result structure will be empty.

In [None]:
from orekit.pyhelpers import absolutedate_to_datetime
from org.orekit.estimation.measurements import ObservedMeasurement, PV, Range, RangeRate, AngularAzEl, AngularRaDec
from org.orekit.utils import PVCoordinates
from org.hipparchus.geometry.euclidean.threed import Vector3D

generated = meas_generator.generate(datetime_to_absolutedate(date), datetime_to_absolutedate(date_end))
pv_itrf_df = pd.DataFrame(columns=['x', 'y', 'z', 'vx', 'vy', 'vz'])
pv_eme2000_df = pd.DataFrame(columns=['x', 'y', 'z', 'vx', 'vy', 'vz'])
range_df = pd.DataFrame(columns=['ground_station', 'range'])
range_rate_df = pd.DataFrame(columns=['ground_station', 'range_rate'])
az_el_df = pd.DataFrame(columns=['ground_station', 'az_deg', 'el_deg'])
ra_dec_df = pd.DataFrame(columns=['ground_station', 'ra_deg', 'dec_deg'])

for meas in generated.toArray():
    observed_meas = ObservedMeasurement.cast_(meas)
    py_datetime = absolutedate_to_datetime(observed_meas.date)

    if PV.instance_(observed_meas):
        '''
        PV objects are given in propagator frame (ECI)
        We transform them to ITRF and to EME2000 frame (depending on the user's needs)
        '''
        observed_pv = PV.cast_(observed_meas)
        pv_eci_jarray = observed_meas.getObservedValue()
        pv_eci = PVCoordinates(Vector3D(pv_eci_jarray[0:3]), Vector3D(pv_eci_jarray[3:6]))
        
        eci_to_itrf = eme2000.getTransformTo(itrf, observed_meas.date)
        pv_itrf = eci_to_itrf.transformPVCoordinates(pv_eci)
        pv_itrf_df.loc[py_datetime] = np.concatenate(([np.array(pv_itrf.getPosition().toArray()),
                                                       np.array(pv_itrf.getVelocity().toArray())]))
        
        pv_eme2000_df.loc[py_datetime] = np.concatenate(([np.array(pv_eci.getPosition().toArray()),
                                                       np.array(pv_eci.getVelocity().toArray())]))
        
        

    elif Range.instance_(observed_meas):
        observed_range = Range.cast_(observed_meas)
        range_df.loc[py_datetime] = np.concatenate(([observed_range.getStation().getBaseFrame().name], 
                                                    np.array(observed_range.getObservedValue())))

    elif RangeRate.instance_(observed_meas):
        observed_range_rate = RangeRate.cast_(observed_meas)
        range_rate_df.loc[py_datetime] = np.concatenate(([observed_range_rate.getStation().getBaseFrame().name], 
                                                         np.array(observed_range_rate.getObservedValue())))

    elif AngularAzEl.instance_(observed_meas):
        observed_az_el = AngularAzEl.cast_(observed_meas)
        az_el_df.loc[py_datetime] = np.concatenate(([observed_az_el.getStation().getBaseFrame().name], 
                                                    np.rad2deg(observed_az_el.getObservedValue())))

    elif AngularRaDec.instance_(observed_meas):
        observed_ra_dec = AngularRaDec.cast_(observed_meas)
        ra_dec_df.loc[py_datetime] = np.concatenate(([observed_ra_dec.getStation().getBaseFrame().name], 
                                                     np.rad2deg(observed_ra_dec.getObservedValue())))

Position and velocity in ITRF frame

In [None]:
pv_itrf_df

Position and velocity in EME2000 frame

In [None]:
pv_eme2000_df

Azimuth and elevation, here selecting only the Tunka ground station.

In [None]:
az_el_df[az_el_df['ground_station'] == 'Tunka']

Right ascension and declination when the spacecraft is in view of any station. Although the RA/DEC coordinates are in theory independent on the station's position, here the time-of-flight to a ground telescope for instance plays a role.

In [None]:
ra_dec_df