This notebook generates measurement data with noise for orbit determination purposes, using a Keplerian propagator:

* Position/Velocity
* Range as seen from a list of ground stations
* Range rate (doppler) as seen from a list of ground stations
* Az/el as seen from a list of ground stations

The list of (example, not real) ground stations is loaded from the CSV file `ground-stations.csv`

This notebook is thought for small satellite operators wishing to perform orbit determination shortly after launch. Therefore the orbit parameters are defined by the state vector at spacecraft separation. 

The frame is usually defined by freezing the Earth inertial frame at launcher lift-off.

In [None]:
import numpy as np
import datetime
launch_date = datetime.datetime(2020, 1, 1)
separation_time = 4242.0  # seconds after lift-off
pos_init = [-3092.1e3, 1568.9e3, -5975.1e3] # Defined in frame frozen at lift-off time
vel_init = [-4767.1, 4630.9, 3671.0]

n_orbits = 1.0  # Number of orbits to generate

Measurement parameters. For each type of measurement, two files are generated, one with the perfect measurements, one with noise added.
The noise for each type of measurement is modeled by a gaussian distribution with standard deviation $\sigma$

In [None]:
sigma_position = 100e3  # Position noise in meters
sigma_velocity = 100.0  # Velocity noise in meters per second
step_pv = 500.0  # Step time for output PV data

sigma_range = 1e3  # Range noise in meters
sigma_range_rate = 10e3  # Range rate noise in meters per second
sigma_az = float(np.deg2rad(1.0))  # Azimuth noise in radians
sigma_el = float(np.deg2rad(1.0))  # Elevation noise in radians
step_ground_station_measurements = 30.0  # When a ground station is in view, take measurements every 30 seconds

Loading the ground stations

In [None]:
import pandas as pd
ground_station_df = pd.read_csv('ground-stations.csv', index_col=0)
display(ground_station_df)

Firing up a JVM for Orekit

In [None]:
import orekit
orekit.initVM()

Downloading and importing the Orekit data ZIP

In [None]:
from orekit.pyhelpers import download_orekit_data_curdir, setup_orekit_curdir
try:
    download_orekit_data_curdir()
except:
    print('Download failed')
setup_orekit_curdir()

Setting up models (frames, timescales)

In [None]:
from org.orekit.frames import FramesFactory
gcrf = FramesFactory.getGCRF()

from org.orekit.utils import IERSConventions
itrf = FramesFactory.getITRF(IERSConventions.IERS_2010, False)  # Taking tidal effects into account when interpolating EOP parameters

from orekit.pyhelpers import datetime_to_absolutedate
lift_off_frame = itrf.getFrozenFrame(gcrf, 
                                     datetime_to_absolutedate(launch_date), 
                                     "Frame frozen at liftoff")

from org.orekit.models.earth import ReferenceEllipsoid
wgs84_ellipsoid = ReferenceEllipsoid.getWgs84(itrf)

from org.orekit.time import TimeScalesFactory
utc = TimeScalesFactory.getUTC()

Setting up initial orbit and propagator

In [None]:
from org.orekit.orbits import KeplerianOrbit
from org.orekit.utils import Constants as orekit_constants
separation_date = datetime_to_absolutedate(launch_date).shiftedBy(separation_time)

from org.hipparchus.geometry.euclidean.threed import Vector3D
from org.orekit.utils import PVCoordinates
pv_init = PVCoordinates(Vector3D(pos_init), Vector3D(vel_init))

from org.orekit.orbits import CartesianOrbit
orbit = CartesianOrbit(pv_init, lift_off_frame, separation_date, orekit_constants.EIGEN5C_EARTH_MU)

orbit_period = orbit.getKeplerianPeriod()
duration = float(n_orbits*orbit_period)
display(duration)

from org.orekit.propagation.analytical import KeplerianPropagator
propagator = KeplerianPropagator(orbit)

Orekit can generate measurements for all types of measurements that are used for OD. Building the measurements builders:

Creating the Orekit ground station objects from DataFrame data.

Creating a measurement generator for each ground station and for each type of measurement

In [None]:
from org.orekit.bodies import GeodeticPoint
from org.orekit.frames import TopocentricFrame
from org.orekit.propagation.events import ElevationDetector
from org.orekit.estimation.measurements import GroundStation
from org.orekit.estimation.measurements.generation import PVBuilder, RangeBuilder, RangeRateBuilder, AngularAzElBuilder, EventBasedScheduler, SignSemantic, Generator, ContinuousScheduler
from org.orekit.time import FixedStepSelector
from org.hipparchus.random import JDKRandomGenerator, GaussianRandomGenerator, CorrelatedRandomVectorGenerator
from org.hipparchus.linear import DiagonalMatrix
from org.orekit.estimation.measurements import ObservableSatellite
from org.orekit.propagation.events.handlers import ContinueOnEvent

gaussian_random_generator = GaussianRandomGenerator(JDKRandomGenerator())

# Noise generator for range
noise_source_range = CorrelatedRandomVectorGenerator(
    DiagonalMatrix([sigma_range**2]),
    1e-6,  # "small" parameter
    gaussian_random_generator
)

# Noise generator for range rate
noise_source_range_rate = CorrelatedRandomVectorGenerator(
    DiagonalMatrix([sigma_range_rate**2]),
    1e-6,  # "small" parameter
    gaussian_random_generator
)

# Noise generator for AZ/EL
noise_source_az_el = CorrelatedRandomVectorGenerator(
    DiagonalMatrix([sigma_az**2, sigma_el**2]),
    1e-6,  # "small" parameter
    gaussian_random_generator
)

# Noise generator for PV
noise_source_pv = CorrelatedRandomVectorGenerator(
    DiagonalMatrix([sigma_position**2, sigma_position**2, sigma_position**2, 
                    sigma_velocity**2, sigma_velocity**2, sigma_velocity**2]),
    1e-6,  # "small" parameter
    gaussian_random_generator
)

# Builder for PV measurements
observable_satellite = ObservableSatellite(0)
pv_builder = PVBuilder(noise_source_pv, 
                       sigma_position,
                       sigma_velocity,
                       1.0,  # Base weight
                       observable_satellite)

# Step selector for PV measurements
step_selector_pv = FixedStepSelector(step_pv, utc)
pv_scheduler = ContinuousScheduler(pv_builder, step_selector_pv)

meas_generator = Generator()
meas_generator.addPropagator(propagator)
meas_generator.addScheduler(pv_scheduler)

for gs_name, gs_data in ground_station_df.iterrows():
    geodetic_point = GeodeticPoint(float(np.deg2rad(gs_data['latitude_deg'])),
                                   float(np.deg2rad(gs_data['longitude_deg'])),
                                   float(gs_data['altitude']))
    topocentric_frame = TopocentricFrame(wgs84_ellipsoid, geodetic_point, gs_name)
    ground_station_df.loc[gs_name, 'GroundStation'] = GroundStation(topocentric_frame)
    
    # Range builder
    meas_generator.addScheduler(
        EventBasedScheduler(
            RangeBuilder(noise_source_range, 
                         GroundStation(topocentric_frame), 
                         True,  # two-way
                         sigma_range, 
                         1.0,  # Base weight
                         observable_satellite), 
            FixedStepSelector(step_ground_station_measurements, utc), 
            propagator, 
            ElevationDetector(topocentric_frame).withHandler(ContinueOnEvent()), 
            SignSemantic.FEASIBLE_MEASUREMENT_WHEN_POSITIVE)
    )
    
    # Range rate builder
    meas_generator.addScheduler(
        EventBasedScheduler(
            RangeRateBuilder(noise_source_range_rate,
                             GroundStation(topocentric_frame), 
                             True,  # two-way
                             sigma_range_rate, 
                             1.0,  # Base weight
                             observable_satellite), 
            FixedStepSelector(step_ground_station_measurements, utc), 
            propagator, 
            ElevationDetector(topocentric_frame).withHandler(ContinueOnEvent()), 
            SignSemantic.FEASIBLE_MEASUREMENT_WHEN_POSITIVE)
    )
    
    # Az/el builder
    meas_generator.addScheduler(
        EventBasedScheduler(
            AngularAzElBuilder(noise_source_az_el, 
                               GroundStation(topocentric_frame),
                               [sigma_az, sigma_el], 
                               [1.0, 1.0],  # Base weight
                               observable_satellite), 
            FixedStepSelector(step_ground_station_measurements, utc), 
            propagator, 
            ElevationDetector(topocentric_frame).withHandler(ContinueOnEvent()), 
            SignSemantic.FEASIBLE_MEASUREMENT_WHEN_POSITIVE)
    )

In [None]:
from orekit.pyhelpers import absolutedate_to_datetime
date_start = separation_date
date_end = date_start.shiftedBy(duration)
date_current = date_start
generated = meas_generator.generate(date_start, date_end)

In [None]:
from org.orekit.estimation.measurements import ObservedMeasurement, PV, Range, RangeRate, AngularAzEl
pv_with_noise_df = pd.DataFrame(columns=['x', 'y', 'z', 'vx', 'vy', 'vz'])
range_with_noise_df = pd.DataFrame(columns=['ground_station', 'range'])
range_rate_with_noise_df = pd.DataFrame(columns=['ground_station', 'range_rate'])
az_el_with_noise_df = pd.DataFrame(columns=['ground_station', 'az_deg', 'el_deg'])

for meas in generated.toArray():
    observed_meas = ObservedMeasurement.cast_(meas)
    py_datetime = absolutedate_to_datetime(observed_meas.date)
    
    if PV.instance_(observed_meas):
        observed_pv = PV.cast_(observed_meas)
        pv_with_noise_df.loc[py_datetime] = np.array(observed_meas.getObservedValue())
    elif Range.instance_(observed_meas):
        observed_range = Range.cast_(observed_meas)
        ground_station = observed_range.getStation()
        range_with_noise_df.loc[py_datetime] = [ground_station.getBaseFrame().name, observed_range.getObservedValue()]
    elif RangeRate.instance_(observed_meas):
        observed_range_rate = RangeRate.cast_(observed_meas)
        ground_station = observed_range_rate.getStation()
        range_rate_with_noise_df.loc[py_datetime] = [ground_station.getBaseFrame().name, observed_range_rate.getObservedValue()]
    elif AngularAzEl.instance_(observed_meas):
        observed_az_el = AngularAzEl.cast_(observed_meas)
        ground_station = observed_az_el.getStation()
        az_el = np.array(observed_az_el.getObservedValue())
        az_el_with_noise_df.loc[py_datetime] = np.concatenate(([ground_station.getBaseFrame().name], 
                                                                np.rad2deg(az_el)))

Propagating the orbit again to plot the ground track

In [None]:
from org.orekit.propagation.sampling import PythonOrekitFixedStepHandler
from orekit.pyhelpers import absolutedate_to_datetime
from org.orekit.utils import IERSConventions
from org.orekit.frames import FramesFactory
from org.orekit.models.earth import ReferenceEllipsoid
import numpy as np
import pandas as pd

class mystephandler(PythonOrekitFixedStepHandler):    
    itrf = FramesFactory.getITRF(IERSConventions.IERS_2010, False)  # Taking tidal effects into account when interpolating EOP parameters
    wgs84_ellipsoid = ReferenceEllipsoid.getWgs84(itrf)
    lat_lon_df = pd.DataFrame(columns=['lat_deg', 'lon_deg'])
    
    def init(self, s0, t, step):
        pass
        
    def handleStep(self, currentState, isLast):
        geodetic_point = wgs84_ellipsoid.transform(currentState.getPVCoordinates(self.itrf).getPosition(),
                                                   self.itrf,
                                                   currentState.getDate())
        self.lat_lon_df.loc[absolutedate_to_datetime(currentState.getDate())] = [np.rad2deg(geodetic_point.getLatitude()),
                                                                                 np.rad2deg(geodetic_point.getLongitude())]
        
    def getLatLon(self):
        return self.lat_lon_df
    
handler = mystephandler()
propagator = KeplerianPropagator(orbit)
propagator.setMasterMode(60.0, handler)
finalState = propagator.propagate(date_end)
lat_lon_df = handler.getLatLon()
display(lat_lon_df)

In [None]:
import plotly.graph_objects as go

fig = go.Figure(data=[
    go.Scattergeo(
        lon = lat_lon_df['lon_deg'],
        lat = lat_lon_df['lat_deg'],
        mode = 'markers',
        name='Orbit'
    ),
    go.Scattergeo(
        lon = ground_station_df['longitude_deg'],
        lat = ground_station_df['latitude_deg'],
        text = ground_station_df.index,
        mode = 'markers',
        name='Ground stations'
    )
])

fig.update_layout(
        #title = 'Most trafficked US airports<br>(Hover for airport names)',
        #geo_scope='usa',
    )
fig.show()

Finally, saving measurement data to CSV file

In [None]:
position_df.to_csv('pos_vel_data_gcrf_with_noise.csv')
position_without_noise_df.to_csv('pos_vel_data_gcrf_without_noise.csv')