# Creating a Gymnasium Environment for the LambertPulser

## Description
The LambertPulser is a Gymnasium environment for a pulsable rocket in space to reach a specified point in a specified amount of time in space under the influence of gravity from another body. The rocket is allowed to make orbital transfers at any point in.

The problem is formulated as a finite-thrust minimum fuel

## Imports
Here, we import the relevant packages for creating the environment.

In [147]:
# Environment-related packages
from gym import Env
from gym.spaces import Discrete, Box, Dict
from gym.wrappers import FlattenObservation

# Neural network formulation
import torch
import torch.nn as nn

# Mathematics packages
import numpy as np
import scipy as sp
from scipy.integrate import solve_ivp
import random

# Support functions
import argparse, pdb
from dataclasses import dataclass

## Configuration
The configuration contains all hyperparameters and parameters required for defining the environment, the learnable agent, and storing results.

In [177]:
@dataclass
class Bounds:
    # State bounds
    r: list[float]      # Radial distance from orbital center
    theta: list[float]  # True anomaly
    v_r: list[float]    # Radial velocity
    v_t: list[float]    # Tangential velocity
    rf: list[float]     # Remaining amount of fuel, lower bound = 0
    rt: list[float]     # Remaining amount of time-to-go

    # Action bounds
    T: list[float]      # Thrust
    phi: list[float]    # Direction of thrust
    pT: int             # Pulse the thrust or not (0 or 1)

@dataclass 
class Seeds:
    # State randomization
    r: int
    theta: int
    v_r: int
    v_t: int
    rf: int
    rt: int
    
    # Action randomization
    T: int
    phi: int
    pT: int

@dataclass
class Target:
    r: float
    theta: float
    v_r: float
    v_t: float

@dataclass
class Dynamics:
    mu: float = 100
    c: float = 5
    update: float = 0.1

class LPConfig:
    def __init__(self, seed=1):
        self.env_name = "LambertPulser-v1"
        self.record = False

        # Randomization
        self.seed = Seeds(1,1,1,1,1,1,2,2,2)
        seed_str = "seed=" + str(self.seed)

        # State-action bounds
        self.bounds = Bounds(
            [0, 10**3],     # r
            [0 ,2*np.pi],   # theta
            [-10, 10],      # v_r
            [-100, 100],    # v_t
            [0, 100.0],     # rf
            [0, 20.0],      # rt
            [10**2, 10**3],        # T
            [0, 2*np.pi],   # phi
            2               # pT, on/off
        )

        # Dynamics
        self.dynamics = Dynamics(
            100,
            5
        )

        # output config
        self.output_path = "results/{}-{}/".format( self.env_name, seed_str )
        self.model_output = self.output_path + "model.weights/"
        self.log_path = self.output_path + "log.txt"
        self.scores_output = self.output_path + "scores.npy"
        self.plot_output = self.output_path + "scores.png"
        self.record_path = self.output_path
        self.record_freq = 5
        self.summary_freq = 1

        # model and training config
        self.num_batches = 100  # number of batches trained on
        self.batch_size = 2000  # number of steps used to compute each policy update
        self.max_ep_len = 200  # maximum episode length
        self.learning_rate = 3e-2
        self.gamma = 1.0  # the discount factor
        self.normalize_advantage = True

        # parameters for the policy and baseline models
        self.n_layers = 1
        self.layer_size = 64

        # hyperparameters for PPO
        self.eps_clip = 0.2
        self.update_freq = 5

        # since we start new episodes for each batch
        assert self.max_ep_len <= self.batch_size
        if self.max_ep_len < 0:
            self.max_ep_len = self.batch_size


## Environment

In [178]:
class LambertPulser(Env):
    metadata = {
        "render_modes":["human", "anim"],
        "render_fps": 24
    }

    def __init__(self, render_mode=None, seed=1):
        # Configuration
        self.config = LPConfig(seed)

        # Define the observation and action space
        self.observation_space = Dict(
            {
                "r":        Box(self.config.bounds.r[0], self.config.bounds.r[1], dtype=float, shape=(1,)), 
                "theta":    Box(self.config.bounds.theta[0], self.config.bounds.theta[1], dtype=float, shape=(1,)), 
                "v_r":      Box(self.config.bounds.v_r[0], self.config.bounds.v_r[1], dtype=float, shape=(1,)),
                "v_t":      Box(self.config.bounds.v_t[0], self.config.bounds.v_t[1], dtype=float, shape=(1,)),
                "dr":       Box(0.0, self.config.bounds.r[1]-self.config.bounds.r[0], dtype=float, shape=(1,)), 
                "dtheta":   Box(0.0, self.config.bounds.theta[1]-self.config.bounds.theta[1], dtype=float, shape=(1,)), 
                "dv_r":     Box(0.0, self.config.bounds.v_r[1]-self.config.bounds.v_r[0], dtype=float, shape=(1,)),
                "dv_t":     Box(0.0, self.config.bounds.v_t[1]-self.config.bounds.v_t[0], dtype=float, shape=(1,)),
                "rf":       Box(self.config.bounds.rf[0], self.config.bounds.rf[1], dtype=float, shape=(1,)),
                "rt":       Box(self.config.bounds.rt[0], self.config.bounds.rt[1], dtype=float, shape=(1,))
            }
        )
        self.action_space = Dict(
            {
                "T":        Box(self.config.bounds.T[0], self.config.bounds.T[1], dtype=float, shape=(1,)), 
                "phi":      Box(self.config.bounds.phi[0], self.config.bounds.phi[1], dtype=float, shape=(1,)),
                "pulse":    Discrete(self.config.bounds.pT)
            }
        )

        self.observation = self.reset()

        # Target position
        self.target = Target(0,0,0,0)

        # Rendering options
        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode

        """
        If human-rendering is used, `self.window` will be a reference
        to the window that we draw to. `self.clock` will be a clock that is used
        to ensure that the environment is rendered at the correct framerate in
        human-mode. They will remain `None` until human-mode is used for the
        first time.
        """
        self.window = None
        self.clock = None

    def reset(self):
        # Radius bounds for sampling
        r0_lower = self.config.bounds.r[0]
        r0_upper = self.config.bounds.r[1]
        r0_rand_bnd = [1/3, 2/3]

        # Set the initial state to a circular orbit
        r0 = r0_lower + (r0_upper - r0_lower) * np.random.uniform(r0_rand_bnd[0], r0_rand_bnd[1])
        theta0 = np.random.uniform(self.config.bounds.theta[0], self.config.bounds.theta[1])
        v_r0 = 0.0
        v_t0 = np.sqrt(self.config.dynamics.mu / r0)
        rf0 = np.random.uniform(self.config.bounds.rf[0], self.config.bounds.rf[1])

        # Target
        self.target = Target(
            np.random.uniform(r0_lower, r0_upper),
            np.random.uniform(self.config.bounds.theta[0], self.config.bounds.theta[1]),
            np.random.uniform(self.config.bounds.v_r[0],self.config.bounds.v_r[1]),
            np.random.uniform(self.config.bounds.v_t[0],self.config.bounds.v_t[1])
        )

        # Observation based on state and target
        observation = {
            "r":        r0,
            "theta":    theta0,
            "v_r":      v_r0,
            "v_t":      v_t0,
            "dr":       r0 - self.target.r,
            "dtheta":   theta0 - self.target.theta,
            "dv_r":     v_r0 - self.target.v_r,
            "dv_t":     v_t0 - self.target.v_t,
            "rf":       rf0,
            "rt":       self.config.bounds.rt[1]-self.config.bounds.rt[0]
        }

        return observation
    
    def reward(self, action=None):
        # Initialize
        reward = 0

        # Reach the desired point
        reward += 0.25 * sp.stats.norm(0, 0.10*(self.config.bounds.r[1]-self.config.bounds.r[0])).pdf(self.observation["dr"])
        reward += 0.25 * sp.stats.norm(0, 0.10*(self.config.bounds.theta[1]-self.config.bounds.theta[0])).pdf(self.observation["dtheta"])
        reward += 0.05 * sp.stats.norm(0, 0.10*(self.config.bounds.v_r[1]-self.config.bounds.v_r[0])).pdf(self.observation["dv_r"])
        reward += 0.025 * sp.stats.norm(0, 0.10*(self.config.bounds.v_t[1]-self.config.bounds.v_t[0])).pdf(self.observation["dv_t"])

        # Use as little resources as possible
        reward += (self.observation["rf"])*0.025

        # Complete the mission as quickly as possible
        reward += (self.observation["rt"])*0.01

        # Encourage the thrusts to be fired in the direction of the motion
        
        return reward
    
    def dynamics(self, action):
        def ODE(t, y):
            # Observations
            obs = self.dict2array(self.observation)
            y = obs[0,1,2,3,8]

            # Actions
            a = self.dict2array(action)

            # State dynamics
            dydt    = np.zeros(self.dict2array(self.observation).shape)
            dydt[0] = y[2]
            dydt[1] = y[3] / y[0]
            dydt[2] = y[3]**2 / y[0] - self.config.dynamics.mu / y[0]**2 + a[2]*(a[0]*np.sin(a[1])) / y[4]
            dydt[3] = y[2]*y[3] / y[0] + a[2]*(a[0]*np.cos(a[1])) / y[4]
            dydt[4] = -a[2]*a[0] / self.config.dynamics.c
            return dydt

        res = solve_ivp(ODE, (0,self.config.dynamics.update), self.dict2array(self.observation), 'RK45', rtol = 1e-6)
        

    def step(self, action):
        # Calculate the reward at the current step
        reward = self.reward(action)

        # Forward propagate the dynamics
        self.dynamics(action)
        observation = self.observation

        # Determine termination criteria
        done = False
        #if ($$$NEED TO DO THIS$$$): done = True

        # Dynamics model
        return observation, reward, done, info

    def dict2array(self, data):
        return np.fromiter(data.values(), dtype=float)
    
    def array2dict(self, dict, data):
        i = 0
        for key in dict.keys():
            dict[key] = data[i]
            i += 1
        return 

    def render(self, mode='none'):
        return

In [200]:
# Create environment
env = LambertPulser(render_mode=None)
env.reward()

2.676835334371524

In [209]:
def ODE(t, y):
    # Observations
    obs = env.dict2array(env.observation)
    y = obs[(0,1,2,3,8),]

    # Actions
    a = env.dict2array(env.action_space.sample())
    a[2] = 1

    # State dynamics
    dydt    = np.zeros(env.dict2array(env.observation).shape)
    dydt[0] = y[2]
    dydt[1] = y[3] / y[0]
    dydt[2] = y[3]**2 / y[0] - env.config.dynamics.mu / y[0]**2 + a[2]*(a[0]*np.sin(a[1])) / y[4]
    dydt[3] = y[2]*y[3] / y[0] + a[2]*(a[0]*np.cos(a[1])) / y[4]
    dydt[4] = -a[2]*a[0] / env.config.dynamics.c
    return dydt

# print(env.dict2array(env.observation))
res = solve_ivp(ODE, (0,env.config.dynamics.update), env.dict2array(env.observation), 'LSODA', rtol = 1e-6)
print(res)

  message: The solver successfully reached the end of the integration interval.
  success: True
   status: 0
        t: [ 0.000e+00  1.322e-05 ...  1.000e-01  1.000e-01]
        y: [[ 4.820e+02  4.820e+02 ...  4.820e+02  4.820e+02]
            [ 4.918e+00  4.918e+00 ...  4.918e+00  4.918e+00]
            ...
            [ 9.905e+01  9.905e+01 ...  9.905e+01  9.905e+01]
            [ 2.000e+01  2.000e+01 ...  2.000e+01  2.000e+01]]
      sol: None
 t_events: None
 y_events: None
     nfev: 132140
     njev: 0
      nlu: 0


In [214]:
idx = 4
res.y[idx,0]-res.y[idx,-1]

10.93367396913726