# Experiment 2: Two stage suborbital launcher


In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib widget

In [2]:
from dataclasses import dataclass
from math import radians
import time

import numpy as np
import matplotlib.pyplot as plt
from tqdm.auto import trange

from cw.context import time_it
from cw.simulation import Simulation, StatesBase, AB3Integrator, BatchLogger, Logging

from topone.sim_post_processing import sim_post_processing
from topone.environment import Environment, Stage
from topone.pid_agent import PIDAgent

from agent import Agent
from ideal_agent import IdealAgent

## Simulation configuration

In [3]:
@dataclass
class States(StatesBase):
    t: float = 0
    command_engine_on: bool = False
    command_drop_stage: bool = False
    gii: np.ndarray = np.zeros(2)
    xii: np.ndarray = np.zeros(2)
    vii: np.ndarray = np.zeros(2)
    aii: np.ndarray = np.zeros(2)
    fii_thrust: np.ndarray = np.zeros(2)
    theta: float = 0.
    theta_dot: float = 0.
    mass: float = 0.
    mass_dot: float = 0.
    h: float = 0.
    engine_on: bool = False
    stage_state: int = 0
    stage_idx: int = 0
    gamma_i: float = 0.
    gamma_e: float = 0.
    latitude: float = 0.
        
    reward: float = 0.
    reward_integral: float = 0.
    delta_v: float = 0.

    def get_y_dot(self):
        y = np.empty(7)
        y[:2] = self.vii
        y[2:4] = self.aii
        y[4] = self.theta_dot
        y[5] = self.mass_dot
        y[6] = self.reward
        return y

    def get_y(self):
        y = np.empty(7)
        y[:2] = self.xii
        y[2:4] = self.vii
        y[4] = self.theta
        y[5] = self.mass
        y[6] = self.reward_integral
        return y

    def set_t_y(self, t, y):
        self.t = t
        self.xii = y[:2]
        self.vii = y[2:4]
        self.theta = y[4]
        self.mass = y[5]
        self.reward_integral = y[6]

In [4]:
# agent = Agent(
#     epsilon=0.1,
#     alpha=0.9,
#     gamma=0.7,
#     path="./set_1"
# )

agent = IdealAgent()

simulation = Simulation(
    states_class=States,
    integrator=AB3Integrator(
        h=0.1,
        rk4=False,
        fd_max_order=1),
    modules=[
        Environment(
            surface_diameter=1737.4e3,
            mu=4.9048695e12,
            stages=(
                Stage(
                    dry_mass=2,
                    propellant_mass=0.02,
                    specific_impulse=100,
                    thrust=4*1.7),
                Stage(
                    dry_mass=1,
                    propellant_mass=0.02,
                    specific_impulse=100,
                    thrust=2*1.7),
            ),
            initial_altitude=1000,
            initial_theta_e=radians(90),
            initial_latitude=radians(90),
            reward_function=lambda s: s.h - 1000,
        ),
        agent
    ],
    logging=Logging(),
    initial_state_values=None,
)
batch_logger = BatchLogger()
batch_logger.initialize(simulation)
simulation.stash_states()

In [5]:
def post_processing(result):
    sim_post_processing(result)

## Batch run

In [26]:
def run_batch(n_episodes, backup_period=30):
    batch_logger.reset_batch()
    
    # Backup original logger and swap with faster logger
    original_logger = simulation.logging
    simulation.logging = batch_logger
    
    last_backup_time = time.time()
    
    try:
        for i in trange(n_episodes):
            simulation.restore_states()
            result = simulation.run(100)
            if time.time() - last_backup_time >= backup_period:                
                agent.store()
                last_backup_time = time.time()
    except:
        raise
    finally:
        agent.store()
        simulation.logging = original_logger
        batch_results = batch_logger.finish_batch()        
        return batch_results

In [25]:
batch_results = run_batch(100)
agent.display_value()
agent.display_greedy_policy()
display(batch_results)

NameError: name 'run_batch' is not defined

## Single simulation

In [15]:
simulation.restore_states()
with time_it("simulation run"):
    result = simulation.run(100)
post_processing(result)
result

simulation run: 0.014032055005372968 [s]


In [16]:
agent.display_greedy_policy()

Stage 0
  UNFIRED: engine_on 
  FIRING: engine_on
  FIRED: drop_stage
Stage 1
  UNFIRED: engine_on 
  FIRING: engine_on
  FIRED: drop_stage


In [17]:
plt.figure()
result.command_engine_on.plot.line(x="t", label="command_engine_on")
result.command_drop_stage.plot.line(x="t", label="command_drop_stage")
plt.legend()

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

<matplotlib.legend.Legend at 0x7f58c5459c70>

In [24]:
plt.figure()
(result.mass_dot * 1e3).plot.line(x="t")
(result.stage_idx + 1).plot()
(result.stage_state).plot()

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

[<matplotlib.lines.Line2D at 0x7f58c5179400>]