In [None]:
import csaf.config as cconf
import csaf.system as csys

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format='retina'

# create a csaf configuration out of toml
my_conf = cconf.SystemConfig.from_toml("/csaf-system/f16_acas_shield.toml")

In [None]:
from IPython.display import Image

import pathlib

plot_fname = f"pub-sub-plot.png"

# plot configuration pub/sub diagram as a file -- proj specicies a dot executbale and -Gdpi is a valid dot
# argument to change the image resolution
my_conf.plot_config(fname=pathlib.Path(plot_fname).resolve(), prog=["dot", "-Gdpi=400"])

# display written file to notebook
Image(plot_fname, height=600)

In [None]:
def air_collision_condition(ctraces):
        """ground collision premature termination condition
        """
        # get the aircraft states
        sa, sb = ctraces['plant']['states'], ctraces['plant_intruder']['states']
        if sa and sb:
            # look at distance between last state
            return (np.linalg.norm(np.array(sa[-1][9:11]) - np.array(sb[-1][9:11]))) < 10

In [None]:
# create pub/sub components out of the configuration
my_system = csys.System.from_config(my_conf)

# set the scenario states
my_system.set_state('plant', [500.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 6000.0, 9.0])
my_system.set_state('plant_intruder', [500.0, 0.0, 0.0, 0.0, 0.0, np.pi, 0.0, 0.0, 0.0, 2000.0, 0.0, 6000.0, 9.0])

In [None]:
# create an environment from the system, allowing us to act as the controller
my_env = csys.SystemEnv("predictor", my_system, terminating_conditions_all=air_collision_condition)

In [None]:
from collections import deque


class PredictorBuffer:
    # number of steps to take before re-running predictor
    n_steps = 10
    
    def __init__(self, env: csys.SystemEnv):
        self.pstates = []
        self.init_out = [0.,0.,0.,0.7]
        self.env = env
        self._finished = False
        
    def step(self, pred_out = False):
        """step through the simulation for n steps and collect a buffer for prediction"""
        for _ in range(self.n_steps):
            try:
                # send the outputs, can collect the inputs
                comp_input = self.env.step({"predictor-outputs": [pred_out]}) 

                # get the states and track them over time
                self.pstates.append((comp_input['plant-states'], comp_input['plant_intruder-states']))

            # stop iteration occurs when the termination conditions are satisfied
            except StopIteration as e:
                self._finished = True
                break

            # other errors can occur -- maybe solver error
            except Exception as e:
                self._finished = True
                raise e
                break
      
    @property
    def buffer(self) -> np.array:
        """get the buffer as a numpy array"""
        return np.array(self.pstates)
                
    @property
    def is_finished(self):
        """if simulation terminated"""
        return self._finished
    

In [None]:
# create a buffer for the predictor
pb = PredictorBuffer(my_env)

# fill it
pb.step(False)
pb.step(False)
pb.step(False)

# get buffer
pstates = pb.buffer

In [None]:
plt.scatter(*pstates[:, 1, 9:11].T)

In [None]:
plt.plot(pstates[:, 1, 0])

In [None]:
plt.scatter(*pstates[:, 0, 9:11].T)