In [1]:
import os
import numpy as np
import math
import random
import datetime
import gymnasium as gym
import json

import orekit
vm = orekit.initVM()

In [2]:
import gymnasium as gym
from gymnasium import spaces

In [3]:
import matplotlib.pyplot as plt
import numpy as np
from numpy.random import RandomState
from org.hipparchus.geometry.euclidean.threed import Vector3D
from org.orekit.attitudes import FrameAlignedProvider
from org.orekit.bodies import CelestialBodyFactory
from org.orekit.bodies import OneAxisEllipsoid
from org.orekit.forces.gravity import HolmesFeatherstoneAttractionModel
from org.orekit.forces.gravity import NewtonianAttraction
from org.orekit.forces.gravity import ThirdBodyAttraction
from org.orekit.forces.gravity.potential import GravityFieldFactory
from org.orekit.forces.maneuvers import ConstantThrustManeuver
from org.orekit.forces.radiation import IsotropicRadiationSingleCoefficient, IsotropicRadiationClassicalConvention
from org.orekit.forces.radiation import SolarRadiationPressure
from org.orekit.frames import FramesFactory
from org.orekit.orbits import KeplerianOrbit, CartesianOrbit
from org.orekit.orbits import Orbit
from org.orekit.orbits import OrbitType
from org.orekit.orbits import PositionAngleType
from org.orekit.propagation import SpacecraftState
from org.orekit.propagation.conversion import DormandPrince853IntegratorBuilder
from org.orekit.propagation.numerical import NumericalPropagator
from org.orekit.time import AbsoluteDate
from org.orekit.time import TimeScalesFactory
from org.orekit.utils import Constants
from org.orekit.utils import IERSConventions
from org.orekit.utils import PVCoordinates
from org.orekit.ssa.metrics import ProbabilityOfCollision
from org.hipparchus.linear import RealMatrix
from org.orekit.propagation import StateCovariance
from org.orekit.frames import FramesFactory
from org.hipparchus.linear import MatrixUtils
from org.orekit.ssa.collision.shorttermencounter.probability.twod import Patera2005

from orekit.pyhelpers import datetime_to_absolutedate

In [4]:
from orekit.pyhelpers import download_orekit_data_curdir, setup_orekit_curdir
download_orekit_data_curdir()
setup_orekit_curdir()

Downloading file from: https://gitlab.orekit.org/orekit/orekit-data/-/archive/master/orekit-data-master.zip


In [5]:
DATA_FOLDER = r"../data"
iss_sat_data_file_path = os.path.join(DATA_FOLDER, "iss_sat_data.json")

In [6]:
class SatelliteData:
    """ Class that describes the properties and attributes of the specific data of a satellite """

    # class constants
    ACCEPTED_ANGLE_TYPES = {
        "RADIAN_TYPE": "RADIAN",
        "DEGREE_TYPE": "DEGREE"
    }

    def __init__(self, sma: float, ecc: float, inc: float, argp: float, raan: float, tran: float = 0.0,
                 angle_type: str = ACCEPTED_ANGLE_TYPES["DEGREE_TYPE"], mass: float = 1.0, 
                 area: float = 1.0, reflection_idx: float = 2.0, thruster_max_force: float = 1.0, 
                 thruster_isp: float = 10.0):
        """ Constructor method for the satellite data class.

        :param sma: Semi-major axis value of the satellite's orbit [meters]
        :type sma: float
        :param ecc: Eccentricity angle value of the satellite's orbit
        :type ecc: float
        :param inc: Inclination angle value of the satellite's orbit
        :type inc: float
        :param argp: Argument of the perigee angle value of the satellite's orbit
        :type argp: float
        :param raan: Right ascension of the ascending node angle value of the satellite's orbit
        :type raan: float
        :param tran: True anomaly angle value of the satellite's orbit
        :type tran: float
        :param angle_type: The angle type, indicating whether the angles describing the orbit are in radians or degrees
        :type angle_type: str
        :param mass: The mass of the spacecraft
        :type mass: float
        :param area: The surface area of the spacecraft
        :type area: float
        :param reflection_idx: The reflection index of the spacecraft
        :type reflection_idx: float
        :param thruster_max_force: The maximum force that the thruster can produce [N]
        :type thruster_max_force: float
        :param thruster_isp: The specific impulse of the thruster [s]
        :type thruster_isp: float
        """
        self._sma = sma
        self._ecc = ecc
        self._inc = inc
        self._argp = argp
        self._raan = raan
        self._tran = tran

        if angle_type not in self.ACCEPTED_ANGLE_TYPES.values():
            raise ValueError(f"The angle_type attribute is not properly set. It should be one of the following: {self.ACCEPTED_ANGLE_TYPES.values()}")
        self._angle_type = angle_type

        self._mass = mass
        self._area = area
        self._reflection_idx = reflection_idx

        self._thruster_max_force = thruster_max_force
        self._thruster_isp = thruster_isp

    def set_random_tran(self):
        if self.angle_type == self.ACCEPTED_ANGLE_TYPES["DEGREE_TYPE"]:
            self.tran = 360.0 * random.random()
        if self.angle_type == self.ACCEPTED_ANGLE_TYPES["RADIAN_TYPE"]:
            self.tran = 2.0 * np.pi * random.random()
        return self.tran

    def change_angles_to_degrees(self):
        if self.angle_type == self.ACCEPTED_ANGLE_TYPES["DEGREE_TYPE"]:
            return None
        
        self.ecc = self.rad_to_deg(self.ecc)
        self.inc = self.rad_to_deg(self.inc)
        self.argp = self.rad_to_deg(self.argp)
        self.raan = self.rad_to_deg(self.raan)
        self.tran = self.rad_to_deg(self.tran)

        self.angle_type = self.ACCEPTED_ANGLE_TYPES["DEGREE_TYPE"]

    def change_angles_to_radians(self):
        if self.angle_type == self.ACCEPTED_ANGLE_TYPES["RADIAN_TYPE"]:
            return None
        
        self.ecc = self.deg_to_rad(self.ecc)
        self.inc = self.deg_to_rad(self.inc)
        self.argp = self.deg_to_rad(self.argp)
        self.raan = self.deg_to_rad(self.raan)
        self.tran = self.deg_to_rad(self.tran)

        self.angle_type = self.ACCEPTED_ANGLE_TYPES["RADIAN_TYPE"]

    def to_dict(self):
        return {
            "sma": self.sma,
            "ecc": self.ecc,
            "inc": self.inc,
            "argp": self.argp,
            "raan": self.raan,
            "tran": self.tran,
            "mass": self.mass,
            "area": self.area,
            "reflection_idx": self.reflection_idx,
            "thruster_max_force": self.thruster_max_force,
            "thruster_isp": self.thruster_isp,
            "angle_type": self.angle_type
        }

    @property
    def sma(self):
        return self._sma
    
    @sma.setter
    def sma(self, x):
        self._sma = x

    @property
    def ecc(self):
        return self._ecc
    
    @ecc.setter
    def ecc(self, x):
        self._ecc = x

    @property
    def inc(self):
        return self._inc
    
    @inc.setter
    def inc(self, x):
        self._inc = x

    @property
    def argp(self):
        return self._argp
    
    @argp.setter
    def argp(self, x):
        self._argp = x

    @property
    def raan(self):
        return self._raan
    
    @raan.setter
    def raan(self, x):
        self._raan = x

    @property
    def tran(self):
        return self._tran
    
    @tran.setter
    def tran(self, x):
        self._tran = x

    @property
    def angle_type(self):
        return self._angle_type
    
    @angle_type.setter
    def angle_type(self, x):
        if x in self.ACCEPTED_ANGLE_TYPES.values():
            self._angle_type = x
        else:
            print(f"{x} is not an accepted angle type value: {self.ACCEPTED_ANGLE_TYPES.values()}")

    @property
    def mass(self):
        return self._mass
    
    @mass.setter
    def mass(self, x):
        self._mass = x

    @property
    def area(self):
        return self._area
    
    @area.setter
    def area(self, x):
        self._area = x

    @property
    def reflection_idx(self):
        return self._reflection_idx
    
    @reflection_idx.setter
    def reflection_idx(self, x):
        self._reflection_idx = x

    @property
    def thruster_max_force(self):
        return self._thruster_max_force
    
    @thruster_max_force.setter
    def thruster_max_force(self, x):
        self._thruster_max_force = x

    @property
    def thruster_isp(self):
        return self._thruster_isp
    
    @thruster_isp.setter
    def thruster_isp(self, x):
        self._thruster_isp = x

    @staticmethod
    def rad_to_deg(rad_value: float)-> float:
        return (180.0 * rad_value) / math.pi
    
    @staticmethod
    def deg_to_rad(deg_value: float)-> float:
        return (math.pi * deg_value) / 180.0

In [7]:
with open(iss_sat_data_file_path, 'r') as openfile:
    sat_data_dict = json.load(openfile)

In [8]:
sat_data_dict

{'sma': 6795000.0,
 'ecc': 0.00048,
 'inc': 51.6413,
 'argp': 21.0174,
 'raan': 60,
 'tran': 0.0,
 'mass': 100.0,
 'area': 1.0,
 'reflection_idx': 2.0,
 'angle_type': 'DEGREE',
 'thruster_max_force': 0.01,
 'thruster_isp': 4000.0}

In [9]:
ISSSatellite = SatelliteData(**sat_data_dict)
ISSSatellite.change_angles_to_radians()
ISSSatellite.set_random_tran()

4.154718830717467

In [10]:
ISSSatellite.sma

6795000.0

In [11]:
ISSSatellite.tran

4.154718830717467

In [12]:
ISSSatellite.angle_type

'RADIAN'

In [13]:
ISSSatellite.thruster_isp

4000.0

In [16]:
# The observation space contains:
#       1. Sequence of the primary spacecraft states from n days before TCA to n days after TCA, discretized over some time steps
#       2. Sequence of the secondary spacecraft states from 1/4 * orbital period of the primary sc before TCA to 
# 1/4 * orbital period of the primary sc after TCA
#       3. Time to TCA in days (maybe minutes, smth)
#       4. Mass of the Sc (is this necessary though?)

In [None]:
ref_time = AbsoluteDate(2022, 6, 16, 0, 0, 0.0, TimeScalesFactory.getUTC())

# get the orbital period and the time discretization around the reference time (which should be close to the TCA)
orb_period = primary_orbit_cart.getKeplerianPeriod()
ref_period = orb_period / 8.0 # an eith of the orbital period
step_duration = 5.0  # seconds

num_points = int(2 * ref_period / step_duration)
time_discretisation_positive = np.linspace(start=0, stop=ref_period, num=num_points//2)
time_discretisation_neggative = np.linspace(start=-ref_period, stop=-step_duration, num=num_points//2)
time_discretisation = np.append(time_discretisation_neggative, time_discretisation_positive)

primary_orbital_positions = []
secondary_orbital_positions = []

# get the positions of each satellite at the required times
for time_step in time_discretisation:
    current_time = ref_time.shiftedBy(float(time_step))
    primary_state = propagate_(propagator=primary_propagator, time=current_time)
    secondary_state = propagate_(propagator=secondary_propagator, time=current_time)

    primary_orbital_positions.append(np.array(primary_state.getPosition().toArray()))
    secondary_orbital_positions.append(np.array(secondary_state.getPosition().toArray()))

In [21]:
time_step = 5 * 60.0  # seconds
num_days_bound = 5.0  # days - for primary
num_time_steps = (num_days_bound * 24.0 * 60.0 * 60.0) / time_step

In [22]:
num_time_steps

1440.0

In [None]:
observation_max_values = np.array(
    []
)


observation_space = Box(low=-1. * box, high=box, dtype=np.float64)

In [14]:
thruster_max_force = 0.01
action_space = spaces.Box(low=-1.0 * thruster_max_force,
                          high=1.0 * thruster_max_force,
                          shape=(3,), dtype=np.float64)

In [15]:
action_space

Box(-0.01, 0.01, (3,), float64)

In [None]:
class CollisionAvoidance(gym.Env):
    def __init__(self, satellite: SatelliteData, ref_time: AbsoluteDate, ref_frame: FramesFactory,
                 use_perturbations: bool = False, earth_degree: int = 16, earth_order: int = 16, **kwargs):
        super(gym.Env, self).__init__(**kwargs)
        self._satellite = satellite
        self._ref_time = ref_time
        self._ref_frame = ref_frame
        self._use_perturbations = use_perturbations
        self._earth_degree = earth_degree
        self._earth_order = earth_order

        self._propagator = None
        self.hist_sc_state = None
        self.hist_action = None
        self.prev_hist_sc_state = None
        self.prev_hist_action = None
        self._current_step = None
        self._random_generator = None

        self.close()
        self.seed()
        self.reset()

    def reset(self):
        self._propagator = self._create_propagator()

        self.prev_hist_sc_state = self.hist_sc_state
        self.hist_sc_state = []

        self._current_step = 0
        self.prev_hist_action = self.hist_action
        self.hist_action = []

        state = self._propagate(self._propagator.getInitialState().getDate())
        return state

    def _create_propagator(self):
        kep = (self._ref_sv + (self._random_generator.rand(6) * 2. - 1.) * self._ref_sv_pert).tolist()
        orbit = KeplerianOrbit(kep[0], kep[1], kep[2], kep[3], kep[4], kep[5],
                               PositionAngle.MEAN, self._ref_frame, self._ref_time, Constants.WGS84_EARTH_MU)

        integrator = DormandPrince853IntegratorBuilder(1.0, 1000., 1.0)\
            .buildIntegrator(orbit, OrbitType.CARTESIAN)
        propagator = NumericalPropagator(integrator)
        propagator.setSlaveMode()
        propagator.setOrbitType(OrbitType.CARTESIAN)
        propagator.setInitialState(SpacecraftState(orbit, self._ref_mass))

        # Earth gravity field
        if not self._use_perturbations:
            point_gravity = NewtonianAttraction(Constants.WGS84_EARTH_MU)
            propagator.addForceModel(point_gravity)
        else:
            earth = OneAxisEllipsoid(Constants.WGS84_EARTH_EQUATORIAL_RADIUS,
                                     Constants.WGS84_EARTH_FLATTENING,
                                     FramesFactory.getITRF(IERSConventions.IERS_2010, True))
            harmonics_gravity_provider = GravityFieldFactory.getNormalizedProvider(self._earth_degree, self._earth_order)
            propagator.addForceModel(
                HolmesFeatherstoneAttractionModel(earth.getBodyFrame(), harmonics_gravity_provider))

        if self._use_perturbations:
            # Sun and Moon attraction
            propagator.addForceModel(ThirdBodyAttraction(CelestialBodyFactory.getSun()))
            propagator.addForceModel(ThirdBodyAttraction(CelestialBodyFactory.getMoon()))

            # Solar radiation pressure
            propagator.addForceModel(
                SolarRadiationPressure(CelestialBodyFactory.getSun(),
                                       earth.getEquatorialRadius(),
                                       IsotropicRadiationSingleCoefficient(self._spacecraft_area,
                                                                           self._spacecraft_reflection)))

        rotation = FramesFactory.getEME2000().getTransformTo(self._ref_sc_frame, self._ref_time).getRotation()
        attitude = InertialProvider(rotation)
        propagator.setAttitudeProvider(attitude)
        return propagator

    def step(self, action):
        assert all(abs(a) <= 1.0 for a in action), f"Force in each direction can't be greater than 1: {action}"

        if self._perturb_action:
            action *= np.random.normal(1.0, 0.1)

        self.hist_action.append(action)

        current_time = self.hist_sc_state[-1].getDate()
        self._current_step += 1
        new_time = self.hist_sc_state[0].getDate().shiftedBy(self._time_step * self._current_step)

        # We assume we have 3 pairs of thrusters, each of them can be used independently
        for i in range(3):
            if abs(action[i]) > 0.0:
                direction = Vector3D(list((1.0 if action[i] > 0 else -1.0) if i == j else 0.0 for j in range(3)))
                force = (self._thruster_max_force * abs(action[i])).item()
                manoeuvre = ConstantThrustManeuver(current_time, self._time_step,
                                                   force, self._thruster_isp, direction)
                self._propagator.addForceModel(manoeuvre)

        state = self._propagate(new_time)
        done = self._is_done()
        reward = self._get_reward()
        info = {'is_success': True} if done else {}
        return state, reward, done, info

    def seed(self, seed=None):
        self._random_generator = RandomState(seed)
        return [seed]

    # noinspection PyUnusedLocal
    def render(self, mode=None):
        if mode == 'plot':
            return self._plot(self.hist_sc_state, self.hist_action)
        if mode == 'prev_plot':
            return self._plot(self.prev_hist_sc_state, self.prev_hist_action)
        else:
            print(self.hist_sc_state[-1])

    def close(self):
        self._propagator = None
        self.hist_sc_state = None
        self.hist_action = None
        self._current_step = None
        self._random_generator = None

    def _propagate(self, time):
        self.hist_sc_state.append(self._propagator.propagate(time))
        pv = self.hist_sc_state[-1].getPVCoordinates()
        return np.array([self.hist_sc_state[-1].getDate().durationFrom(self.hist_sc_state[0].getDate())] +
                        list(pv.getPosition().toArray()) +
                        list(pv.getVelocity().toArray()) +
                        [self.hist_sc_state[-1].getMass()])

    def _is_done(self):
        return self._current_step >= self._max_steps

    def _get_reward(self):
        # Only give a reward at the end of the episode
        if not self._is_done():
            return 0.0
        ra0, rp0, m0 = self._get_ra_rp_m(self.hist_sc_state[0])
        ra, rp, m = self._get_ra_rp_m(self.hist_sc_state[-1])

        return 1.e-4 * (
            -1.0 * abs(ra - ra0) +
            1.0 * (rp - rp0) +
            2.0e5 * (m - m0))  # 6.25e5 would give the same weight to 25 km and to 0.040 kg

    @staticmethod
    def _get_ra_rp_m(sc_state):
        a = sc_state.getA()
        e = sc_state.getE()
        ra = a * (1.0 + e)
        rp = a * (1.0 - e)
        m = sc_state.getMass()
        return ra, rp, m