# Fighter Environment

Experiments and figures.

In [1]:
from typing import List, Optional, Dict, Union
import numpy as np
from tqdm.notebook import tqdm
from itertools import chain
import plotly.graph_objects as go
import plotly.figure_factory as ff
import plotly.express as px

from io_agent.plant.fighter import FighterEnv, fighter_env_params
from io_agent.trainer import ControlLoop, Transition
from io_agent.control.mpc import MPC
from io_agent.control.rmpc import RobustMPC
from io_agent.control.io import IOController
from io_agent.utils import FeatureHandler


def run_agent(agent: MPC,
              env_length: int,
              use_foresight: bool,
              disturbance_bias: Optional[np.ndarray] = None,
              bias_aware: bool = True,
              ) -> List[Transition]:
    plant = FighterEnv(max_length=env_length, env_params=fighter_env_params, disturbance_bias=disturbance_bias)
    state_disturbance = plant.state_disturbance.copy()
    if disturbance_bias is not None and not bias_aware:
        state_disturbance -= disturbance_bias
    trainer = ControlLoop(
        state_disturbance=state_disturbance,
        output_disturbance=plant.output_disturbance,
        plant=plant,
        controller=agent
    )
    return trainer.simulate(
        initial_state=None,
        use_foresight=use_foresight,
    )


def run_mpc(env_length: int = 60,
            horizon: int = 20,
            use_foresight: bool = True,
            disturbance_bias: Optional[np.ndarray] = None,
              bias_aware: bool = True,
            ) -> List[Transition]:
    agent = MPC(
        env_params=fighter_env_params,
        horizon=horizon)
    return run_agent(
        agent=agent,
        env_length=env_length,
        use_foresight=use_foresight,
        disturbance_bias=disturbance_bias,
        bias_aware=bias_aware
    )


def run_rmpc(env_length: int = 60,
             horizon: int = 20,
             use_foresight: bool = True,
             rho: float = 0.1,
             disturbance_bias: Optional[np.ndarray] = None,
              bias_aware: bool = True,

             ) -> List[Transition]:
    agent = RobustMPC(env_params=fighter_env_params,
                      horizon=horizon,
                      rho=rho,
                      state_constraints_flag=True,
                      input_constraints_flag=True)
    return run_agent(
        agent=agent,
        env_length=env_length,
        use_foresight=use_foresight,
        disturbance_bias=disturbance_bias,
        bias_aware=bias_aware
    )


def prepare_io(dataset: List[Transition],
           expert_agent: MPC,
           expert_horizon: int = 20,
           env_length: int = 60,
           ) -> List[Transition]:
    feature_handler = FeatureHandler(
        env_params=fighter_env_params,
        n_accumulate=expert_horizon + 1,
        n_past=1,
        add_bias=True,
        use_action_regressor=False,
        use_noise_regressor=True,
        use_state_regressor=False)
    agent = IOController(
        env_params=fighter_env_params,
        expert_agent=expert_agent,
        include_constraints=True,
        soften_state_constraints=True,
        state_constraints_flag=True,
        action_constraints_flag=True,
        dataset_length=(env_length - feature_handler.n_past) * len(dataset),
        feature_handler=feature_handler)
    agent.train(dataset)
    agent.action_optimizer = agent.prepare_action_optimizer()
    return agent
    

def run_io_mpc(dataset: List[Transition],
               n_trials: int = 10,
             horizon: int = 20,
             env_length: int = 60,
             disturbance_bias: Optional[np.ndarray] = None,
              bias_aware: bool = True,
             ) -> List[Transition]:
    expert_agent = MPC(
        env_params=fighter_env_params,
        horizon=horizon)
    io_agent = prepare_io(dataset,
           expert_agent=expert_agent,
           env_length=env_length)
    return [run_agent(
            agent=io_agent,
            env_length=env_length,
            disturbance_bias=disturbance_bias,
            bias_aware=bias_aware,
            use_foresight=False,   # IO agent does not look into the future
        ) for _ in tqdm(range(n_trials))]

def run_io_rmpc(dataset: List[Transition],
             rho: float,  
             n_trials: int = 10,
             horizon: int = 20,
             env_length: int = 60,
             disturbance_bias: Optional[np.ndarray] = None,
              bias_aware: bool = True,
             ) -> List[Transition]:
    expert_agent = RobustMPC(env_params=fighter_env_params,
                    horizon=horizon,
                    rho=rho,
                    state_constraints_flag=True,
                    input_constraints_flag=True)
    io_agent = prepare_io(dataset,
           expert_agent=expert_agent,
           env_length=env_length)
    return [run_agent(
            agent=io_agent,
            env_length=env_length,
            disturbance_bias=disturbance_bias,
            bias_aware=bias_aware,
            use_foresight=False,   # IO agent does not look into the future
        ) for _ in tqdm(range(n_trials))]


def make_figure(cost_data: Dict[str, List[float]],
                title: str,
                color_list: List[str] = px.colors.qualitative.T10
    ) -> go.FigureWidget:
    fig = go.FigureWidget()
    fig.update_layout(
    )
    
    cost_label_pair = list(cost_data.items())
    costs = [item[1] for item in cost_label_pair]
    labels = [item[0] for item in cost_label_pair]
    colors = [color_list[index % len(color_list)] for index in range(len(labels))]

    fig = ff.create_distplot(
        costs,
        group_labels=labels,
        colors=colors,
        bin_size=4,
        show_rug=False)
    for color, cost_list in zip(colors, costs):
        fig.add_vline(
            x=np.median(cost_list),
            line_width=3,
            line_dash="dash",
            line_color=color
            )

    common_axis_layout = dict(
            showline=True,
            linecolor = "#a2a2a2",
            linewidth = 1,
            showgrid = True,
            gridcolor = "#a2a2a2",
    )
    fig.update_layout(
        template="plotly_white",
        width=700,
        height=400,
        title=dict(text=f"{title}", x=0.5),
        yaxis=dict(
            **common_axis_layout,
             title=dict(text="density"),
            #  type="log"
             ),
        xaxis=dict(
            **common_axis_layout,
             title=dict(text="cost")
             ),
        bargap=0.1,
        font=dict(
            size=12,
            color="Black"
        )
    )
    return fig
    

## Trajectory Cost Distributions

- Experiment in Figure 1 Left


In [58]:
n_trials = 50

mpc_obl_trajectories = [run_mpc(use_foresight=False) for _ in tqdm(range(n_trials))]
mpc_dst_trajectories = [run_mpc(use_foresight=True) for _ in tqdm(range(n_trials))]
io_mpc_dst_trajectories = run_io_mpc(mpc_obl_trajectories[:5], n_trials=n_trials)

  0%|          | 0/50 [00:00<?, ?it/s]

  0%|          | 0/50 [00:00<?, ?it/s]

  0%|          | 0/50 [00:00<?, ?it/s]

Plot the cost distributions.

In [91]:
mpc_obl_costs = [transition.cost for transition in chain(*[traj[int(60 * 0.6):] for traj in mpc_obl_trajectories])]
mpc_dst_costs = [transition.cost for transition in chain(*[traj[int(60 * 0.6):] for traj in mpc_dst_trajectories])]
io_mpc_dst_costs = [transition.cost for transition in chain(*[traj[int(60 * 0.6):] for traj in io_mpc_dst_trajectories])]

fig = make_figure({
    "MPC (obl)": mpc_obl_costs,
    "MPC (dst)": mpc_dst_costs,
    "IO-MPC (dst)": io_mpc_dst_costs,
    },
    title=f"Figure 1 left with {len(mpc_dst_trajectories)} trials",
)
# fig.update_layout(width=1200, height=600)
fig

- Experiment in Figure 1 Middle

In [48]:
n_trials = 50
disturbance_bias = np.array([0.0, 0.005]).reshape(-1, 1)
rho=0.01

io_mpc_dst_trajectories = run_io_mpc(mpc_obl_trajectories[:5], disturbance_bias=disturbance_bias, n_trials=n_trials)
io_rmpc_dst_trajectories = run_io_rmpc(mpc_obl_trajectories[:5], disturbance_bias=disturbance_bias, rho=rho, n_trials=n_trials)



You are solving a parameterized problem that is not DPP. Because the problem is not DPP, subsequent solves will not be faster than the first one. For more information, see the documentation on Discplined Parametrized Programming, at
	https://www.cvxpy.org/tutorial/advanced/index.html#disciplined-parametrized-programming



  0%|          | 0/50 [00:00<?, ?it/s]

  0%|          | 0/50 [00:00<?, ?it/s]

In [92]:
io_mpc_dst_costs = [transition.cost for transition in chain(*[traj[int(60 * 0.6):] for traj in io_mpc_dst_trajectories])]
io_rmpc_dst_costs = [transition.cost for transition in chain(*[traj[int(60 * 0.6):] for traj in io_rmpc_dst_trajectories])]

fig = make_figure({
    "IO-MPC": io_mpc_dst_costs,
    "IO-RMPC": io_rmpc_dst_costs,
    },
    title=f"Figure 1 middle with {len(io_rmpc_dst_trajectories)} trials",
)
# fig.update_layout(width=1200, height=600)
fig


- Experiment in Figure 1 Right

In [97]:
n_trials = 50
disturbance_bias = np.array([0.0, 0.005]).reshape(-1, 1)

mpc_obl_trajectories = [run_mpc(use_foresight=False, disturbance_bias=disturbance_bias, bias_aware=True) for _ in tqdm(range(n_trials))]
mpc_fdst_trajectories = [run_mpc(use_foresight=True, disturbance_bias=disturbance_bias, bias_aware=True) for _ in tqdm(range(n_trials))]
mpc_pdst_trajectories = [run_mpc(use_foresight=True, disturbance_bias=disturbance_bias, bias_aware=False) for _ in tqdm(range(n_trials))]


  0%|          | 0/50 [00:00<?, ?it/s]

  0%|          | 0/50 [00:00<?, ?it/s]

  0%|          | 0/50 [00:00<?, ?it/s]

In [96]:
mpc_obl_costs = [transition.cost for transition in chain(*[traj[int(60 * 0.6):] for traj in mpc_obl_trajectories])]
mpc_fdst_costs = [transition.cost for transition in chain(*[traj[int(60 * 0.6):] for traj in mpc_fdst_trajectories])]
mpc_pdst_costs = [transition.cost for transition in chain(*[traj[int(60 * 0.6):] for traj in mpc_pdst_trajectories])]
io_rmpc_dst_costs = [transition.cost for transition in chain(*[traj[int(60 * 0.6):] for traj in io_rmpc_dst_trajectories])]

fig = make_figure({
        "MPC (obl)": mpc_obl_costs,
        "MPC (f-dst)": mpc_fdst_costs,
        "MPC (p-dst)": mpc_pdst_costs,
        "IO-RMPC": io_rmpc_dst_costs,
    },
    title=f"Figure 1 Right with {len(io_rmpc_dst_trajectories)} trials"
)
# fig.update_layout(width=1200, height=600)
fig

## Uncertainty Radius

- Experiment in Figure 2 Middle

In [3]:
n_trajectories = 5
n_trials = 50
disturbance_bias = np.array([0.0, 0.005]).reshape(-1, 1)

# mpc obl
mpc_train_trajectories = [run_mpc(use_foresight=False) for _ in tqdm(range(n_trajectories))]

io_rmpc_trajectories = {}
for rho in [0.001]:#np.logspace(-3, -1.6, 12):
    io_rmpc_trajectories[rho] = run_io_rmpc(dataset=mpc_train_trajectories, disturbance_bias=disturbance_bias, rho=rho, n_trials=n_trials)


  0%|          | 0/5 [00:00<?, ?it/s]

	https://www.cvxpy.org/tutorial/advanced/index.html#disciplined-parametrized-programming


In [81]:
io_rmpc_costs = {rho: [trans.cost for trans in chain(*[traj[int(60 * 0.6):] for traj in trajectories])]
    for rho, trajectories in io_rmpc_trajectories.items()}


In [None]:
io_rmpc_costs

In [82]:
def make_rho_figure(cost_data: Dict[str, Dict[int, List[float]]],
                    title: str,
                    color_list: List[str] = px.colors.qualitative.T10
                    ) -> go.FigureWidget:
    fig = go.FigureWidget()
    cost_label_pair = list(cost_data.items())
    cost_data = [item[1] for item in cost_label_pair]
    labels = [item[0] for item in cost_label_pair]
    colors = [color_list[index % len(color_list)] for index in range(len(labels))]

    for color, cost_dict, label in zip(colors, cost_data, labels):
        rho_values = {rho: np.percentile(cost_list, [20, 50, 80]) for rho, cost_list in cost_dict.items()}

        fig.add_trace(go.Scatter(
            x=list(rho_values.keys()),
            y=[item[1] for item in rho_values.values()],
            line=dict(color=color),
            mode="lines",
            name=label,
            legendgroup=label
        ))
        fig.add_trace(
            go.Scatter(
            name="Upper Bound",
            x=list(rho_values.keys()),
            y=[item[2] for item in rho_values.values()],
            mode="lines",
            marker=dict(color=color),
            line=dict(width=0),
            showlegend=False,
            legendgroup=label
        ))
        fig.add_trace(
            go.Scatter(
                name="Lower Bound",
                x=list(rho_values.keys()),
                y=[item[0] for item in rho_values.values()],
                marker=dict(color=color),
                line=dict(width=0),
                mode="lines",
                # fillcolor=color,
                opacity=0.5,
                fill="tonexty",
                showlegend=False,
                legendgroup=label
            ))

    common_axis_layout = dict(
        showline=True,
        linecolor="#a2a2a2",
        linewidth=1,
        showgrid=True,
        gridcolor="#a2a2a2",
    )
    fig.update_layout(
        template="plotly_white",
        width=700,
        height=400,
        title=dict(text=f"{title}", x=0.5),
        yaxis=dict(
            **common_axis_layout,
            title=dict(text="costs"),
        ),
        xaxis=dict(
            **common_axis_layout,
            title=dict(text="uncertainty radius"),
            type="log"
        ),
        font=dict(
            size=12,
            color="Black"
        )
    )
    return fig


make_rho_figure(
    {
        "IO-RMPC": io_rmpc_costs,
        "MPC (f-dst)": {rho: mpc_fdst_costs for rho in io_rmpc_costs.keys()}
        },
          title="Figure 2 Left"
          )

FigureWidget({
    'data': [{'legendgroup': 'IO-RMPC',
              'line': {'color': '#4C78A8'},
           …

- Experiment in Figure 2 Right

Optimal IO-RMPC

In [88]:
opt_rho, opt_costs = min(list(io_rmpc_costs.items()), key=lambda item: np.percentile(item[1], 50))

fig = make_figure({
        "MPC (f-dst)": mpc_fdst_costs,
        "IO-RMPC": io_rmpc_dst_costs,
        "IO-MPC": io_mpc_dst_costs
    },
    title=f"Figure 2 Right with {len(io_rmpc_dst_trajectories)} trials"
)
fig
fig.update_layout(width=1200, height=600)