In [None]:
%reload_ext autoreload
%autoreload 2
%matplotlib inline

import notebook_setup
from typing import Union, Iterable, List
from copy import deepcopy
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.autograd.functional import jacobian, hessian
from tqdm.autonotebook import tqdm, trange
import optuna

from rl import learn_rl, transform_rl_policy, evaluate_rl, PPO, load_agent
from xform.functions import DynamicsModel, transform_nonlinear_policy
from xform.systems import get_env_samples
from systems.plotting import plot_env_response, multiple_response_plots
from systems.base import functions
from systems.lunarlander import LunarLanderEnv, LanderEnv

# LunarLander

In [None]:
def make_xform_env(t, seed=0):
    env = LunarLanderEnv(seed=seed)
    min_power = 0.75
    max_power = 1.
    side = 1
    relative_power = np.ones(2, np.float32)
    env.relative_power[side] = min_power + (1-t) * (max_power - min_power)
    env.reset()
    return env

In [None]:
learn_kwargs = dict(steps=150_000, seed=0, learning_rate=5e-3,
                    n_steps=1024, batch_size=256, n_epochs=5,
                    gamma=0.99)
env = make_xform_env(0)
agent = learn_rl(env, tensorboard_log=env.name + '/tuning', **learn_kwargs)

In [None]:
# nominal model
env = make_xform_env(0)
agent.set_random_seed(0)
n_states, n_actions = env.observation_space.shape[0], env.action_space.shape[0]
dyn = DynamicsModel(n_states, n_actions, env.dt, states_hidden=128, n_layers=2, separable=True)
xu, x = get_env_samples(env, 10, agent)

losses = dyn.learn(xu, x, lr=0.002, steps=5000)
plt.plot(losses)

In [None]:
# faulty model
agent.set_random_seed(0)
env_ = make_xform_env(0.5)
n_states, n_actions = env_.observation_space.shape[0], env_.action_space.shape[0]
dyn_ = DynamicsModel(n_states, n_actions, env_.dt, states_hidden=128, n_layers=2, separable=True)
xu, x = get_env_samples(env_, 10, agent)

losses = dyn_.learn(xu, x, lr=0.002, steps=5000)
plt.plot(losses)

In [None]:
predict_ = transform_nonlinear_policy(dyn, dyn_, agent.predict)

In [None]:
predict_(env.observation_space.sample(), True)

In [None]:
env, env_, env__ = make_xform_env(0.), make_xform_env(0.5), make_xform_env(0.5)

In [None]:
multiple_response_plots([
    lambda: plot_env_response(env, env.reset(), control_law=agent, state_idx=(0,1,4), state_names='xyw'),
    lambda: plot_env_response(env_, env_.reset(), control_law=agent, state_idx=(0,1,4), state_names='xyw'),
    lambda: plot_env_response(env__, env__.reset(), control_law=predict_, state_idx=(0,1,4), state_names='xyw')
])

# Separable

In [None]:
from systems.cartpole import CartpoleEnv, create_cartpole

In [None]:
sys_kwargs = dict(mc=0.5, mp=0.1, l=1, g=10, df=0.01)
learn_kwargs = dict(steps=100_000, seed=0, learning_rate=2e-3,
                    n_steps=2048, batch_size=64, n_epochs=10,
                    gamma=0.99)
q = np.asarray([[1,0,0,0], [0,0.1,0,0],[0,0,1e-5,0],[0,0,0,1e-1]])
r = np.asarray([[0.00001]])
xformA = np.diagflat(np.random.RandomState(seed=0).randn(4))
xformB = np.diagflat(np.random.RandomState(seed=1).randn(4))
x0 = np.asarray([-np.pi/45, 0, 0, 0])
make_env = lambda: CartpoleEnv(**sys_kwargs, q=q, seed=0)
def make_xform_env():
    env = CartpoleEnv(**sys_kwargs, q=q, seed=0,
                      xformA=xformA, xformB=xformB)
    return env
env = make_env()
sys = create_cartpole(**sys_kwargs)

In [None]:
sys = create_cartpole(**sys_kwargs, use_torch=True)

In [None]:
dxdt = sys.dynamics(None,
            torch.from_numpy(np.zeros(4, np.float32)),
            torch.from_numpy(np.ones(1, np.float32))
            )

In [None]:
u = torch.from_numpy(np.ones(1, np.float32)) * -1
x = torch.from_numpy(np.asarray([0.1, 0.1, 0, 0], np.float32))
u.requires_grad_(True)
dxdt = sys.updfcn(None,
            x,
            u,
            None
            )
from torch.autograd.functional import jacobian
jacobian(lambda u: sys.updfcn(None, x, u, None), u)

In [None]:
env = make_env()
agent = learn_rl(env, tensorboard_log=env.name + '/tuning', **learn_kwargs)

In [None]:
env = make_env()
agent.set_random_seed(0)
n_states, n_actions = env.observation_space.shape[0], env.action_space.shape[0]
dyn = DynamicsModel(n_states, n_actions, env.dt, states_hidden=128, n_layers=2, separable=True)
xu, x = get_env_samples(env, 10, agent)

losses = dyn.learn(xu, x, lr=0.002, steps=5000)
plt.plot(losses)

In [None]:
env_ = make_xform_env()
agent.set_random_seed(0)
n_states, n_actions = env_.observation_space.shape[0], env_.action_space.shape[0]
dyn_ = DynamicsModel(n_states, n_actions, env_.dt, states_hidden=128, n_layers=2, separable=True)
xu, x = get_env_samples(env_, 10, agent)

losses = dyn_.learn(xu, x, lr=0.002, steps=5000)
plt.plot(losses)

In [None]:
predict_ = transform_nonlinear_policy(dyn, dyn_, agent.predict)

In [None]:
env, env_, env__ = make_env(), make_xform_env(), make_xform_env()

In [None]:
multiple_response_plots([
    lambda: plot_env_response(env, env.reset(), control_law=agent, state_idx=(0,1), state_names='xw'),
    lambda: plot_env_response(env_, env_.reset(), control_law=agent, state_idx=(0,1), state_names='xw'),
    lambda: plot_env_response(env__, env__.reset(), control_law=predict_, state_idx=(0,1), state_names='xw')
])

# UAV

In [None]:
from systems.base import functions
from scripts.opt_pidcontroller import (
    get_study as get_study_pid, apply_params as apply_params_pid, run_sim,
    get_controller, apply_fault, make_disturbance_fn
)
from scripts.opt_multirotorenv import apply_params, get_study
from systems.multirotor import VP, SP, MultirotorTrajEnv, Multirotor, run_sim, run_trajectory
from multirotor.trajectories import Trajectory, eight_curve
from multirotor.helpers import DataLog, control_allocation_matrix
from multirotor.visualize import plot_datalog, get_wind_quiver, make_drawing

ns = functions(use_torch=True)

# speeds -> dynamics
def f_t(speeds: torch.Tensor, vp=VP):
    """Forces/torques acting on body given allocated speeds"""
    thrust = torch.zeros(3, len(vp.propellers))
    k_th = torch.tensor([p.k_thrust for p in vp.propellers])
    thrust[2,:] = k_th * speeds**2
    
    torque = torch.zeros(3, len(vp.propellers))
    k_d = torch.tensor([p.k_drag for p in vp.propellers]).float()
    clockwise = torch.tensor(vp.clockwise).float()
    positions = torch.tensor(vp.propeller_vectors).float()
    tau_xy = torch.linalg.cross(positions.T, thrust.T).T
    tau_z = clockwise * k_d * speeds**2
    torque[:2,:] = tau_xy[:2, :]
    torque[2, :] = tau_z
    return thrust.sum(axis=1), torque.sum(axis=1)


# dynamics -> state change
def dxdt(
    forces: np.ndarray, torques: np.ndarray, x: np.ndarray,
    g: float=SP.g, mass=VP.mass,
    inertia_matrix=torch.from_numpy(VP.inertia_matrix).float(),
    inertia_matrix_inverse=torch.from_numpy(VP.inertia_matrix_inverse).float(),
    np=ns,
) -> torch.Tensor:
    """Rate of change of state given forces/torques acting on body"""
    
    # Store state variables in a readable format
    xI = x[0]       # Inertial frame positions
    yI = x[1]
    zI = x[2]
    ub = x[3]       # linear velocity along body-frame-x-axis b1
    vb = x[4]       # linear velocity along body-frame-y-axis b2
    wb = x[5]       # linear velocity along body-frame-z-axis b3
    phi = x[6]      # Roll
    theta = x[7]    # Pitch
    psi = x[8]      # Yaw
    p = x[9]        # body-frame-x-axis rotation rate
    q = x[10]       # body-frame-y-axis rotation rate
    r = x[11]       # body-frame-z-axis rotation rate
    
    cphi = np.cos(phi);   sphi = np.sin(phi)    # roll
    cthe = np.cos(theta); sthe = np.sin(theta)  # pitch
    cpsi = np.cos(psi);   spsi = np.sin(psi)    # yaw
    
    # Calculate the derivative of the state matrix using EOM
    xdot = np.zeros(12, dtype=x.dtype)

    xdot[0] = cthe*cpsi*ub + (-cphi * spsi + sphi*sthe*cpsi) * vb + \
        (sphi*spsi+cphi*sthe*cpsi) * wb  # = xIdot 
    xdot[1] = cthe*spsi * ub + (cphi*cpsi+sphi*sthe*spsi) * vb + \
        (-sphi*cpsi+cphi*sthe*spsi) * wb # = yIdot 
    xdot[2] = (-sthe * ub + sphi*cthe * vb + cphi*cthe * wb) # = zIdot

    xdot[3] = 1/mass * (forces[0])     + g * sthe          + r * vb - q * wb  # = udot
    xdot[4] = 1/mass * (forces[1])     - g * sphi * cthe   - r * ub + p * wb # = vdot
    xdot[5] = 1/mass * (forces[2])     - g * cphi * cthe   + q * ub - p * vb # = wdot

    xdot[6] = p + (q*sphi + r*cphi) * sthe / cthe  # = phidot
    xdot[7] = q * cphi - r * sphi  # = thetadot
    xdot[8] = (q * sphi + r * cphi) / cthe  # = psidot

    gyro = np.cross(x[9:12], inertia_matrix @ x[9:12])
    xdot[9:12] = inertia_matrix_inverse @ (torques - gyro)
    
    return xdot

def dxdt_u(speeds: torch.Tensor, x: torch.Tensor, vp=VP, sp=SP,
           F_P=torch.eye(12), disturb_forces: torch.Tensor=torch.zeros(3)):
    """Rate of change of state given propeller speeds."""
    # speeds = torch.clamp_max(speeds, 700)
    f, t = f_t(speeds=speeds, vp=vp)
    dx = F_P @ dxdt(
        forces=f+disturb_forces, torques=t, x=x,
        g=sp.g, mass=vp.mass,
        inertia_matrix=torch.from_numpy(vp.inertia_matrix).float(),
        inertia_matrix_inverse=torch.from_numpy(vp.inertia_matrix_inverse).float(),
        np=ns,
    )
    return dx

def dxdt_du(speeds: np.ndarray, x: np.ndarray, vp=VP, sp=SP,
            F_P=torch.eye(12), disturb_forces: torch.Tensor=torch.zeros(3),
            dxdt_u=dxdt_u
           ):
    speeds = torch.as_tensor(speeds).float()
    x = torch.as_tensor(x).float()
    speeds.requires_grad_(True)
    d_du = jacobian(
        func=lambda u: dxdt_u(speeds=u, x=x, vp=vp, sp=sp, F_P=F_P, disturb_forces=disturb_forces),
        inputs=speeds
    )
    return d_du

def fault_saturation(motor, value):
    disabled = nn.functional.one_hot(torch.tensor(motor), 8)
    enabled = 1 - disabled
    def dxdt_u_(speeds: torch.Tensor, x: torch.Tensor, **kwargs):
        speeds = speeds * enabled + (value * disabled)
        return dxdt_u(speeds=speeds, x=x, **kwargs)
    return dxdt_u_


class AdaptiveMultirotor(Multirotor):
    def __init__(self, params, simulation,
            dxdt_u=dxdt_u, dxdt_u_=dxdt_u, enable=True,
            disturbance_fn=lambda m: np.zeros(3),
            fault=False
    ):
        super().__init__(params=params, simulation=simulation)
        self.dxdt_u = dxdt_u
        self.dxdt_u_ = dxdt_u_ # faulty dxdt
        self.disturbance_fn = disturbance_fn
        self._enable, self._fault = False, False
        self.enable(enable)
        if fault:
            self.fault(*fault)
    def enable(self, enable=True):
        self._enable = enable
    def fault(self, motor, value):
        self._fault = True
        self.motor = motor
        self.value = value
    def allocate_control(self, thrust, torques):
        speeds = super().allocate_control(thrust=thrust, torques=torques)
        if self._enable:
            speeds = torch.from_numpy(speeds)
            state = torch.from_numpy(self.state)
            disturb_forces = torch.from_numpy(self.disturbance_fn(self))
            d_du = dxdt_du(
                speeds=speeds, x=state, dxdt_u=self.dxdt_u_,
                disturb_forces=disturb_forces
            )
            delta_u = torch.linalg.pinv(d_du) @  (
                self.dxdt_u(speeds,state)
                - self.dxdt_u_(speeds, state, disturb_forces=disturb_forces)
            )
            speeds = delta_u + speeds
            speeds = speeds.detach().numpy()
        return speeds
    def dxdt_speeds(self, t, x, u, disturb_forces=0, disturb_torques=0):
        if self._fault:
            # motor is stuck at that value
            # set to 0 to simulate failure
            u[self.motor] = self.value
        return super().dxdt_speeds(t, x, u, disturb_forces=disturb_forces)
    def step_speeds(self, u: np.ndarray, disturb_forces=0.):
        if self._fault:
            # motor is stuck at that value
            # set to 0 to simulate failure
            u[self.motor] = self.value
        return super().step_speeds(u, disturb_forces=disturb_forces)


def state_traj(x0, policy, n=100, vp=VP, sp=SP, F_P=torch.eye(12)):
    x = [torch.as_tensor(x0)]
    u = []
    for i in trange(n, leave=False):
        u.append(policy(x[-1]))
        dx = F_P @ dxdt_u(u[-1], x[-1], vp=vp, sp=sp)
        x.append((x[-1] + sp.dt * dx).detach())
    return torch.stack(x), torch.stack(u).detach()

def get_study_agent_params(name):
    study = get_study(name)
    best_trial = study.best_trial.number
    log_root_path = './tensorboard/MultirotorTrajEnv/optstudy/%s/'
    best_agent = load_agent((log_root_path + '%03d/run_1/agent') % (name, best_trial))
    best_params = study.best_params
    return study, best_agent, best_params

_, agent, params = get_study_agent_params('MultirotorTrajEnvPIDWind5@0ls')
params_pid = get_study_pid('MultirotorPIDls').best_params

def get_env(fault=False, wind='0@0', **multirotor_kwargs):
    multirotor_kwargs = deepcopy(multirotor_kwargs)
    multirotor_kwargs['fault'] = fault
    # multirotor_kwargs['disturbance_fn'] = make_disturbance_fn(wind)
    env = MultirotorTrajEnv(disturbance_fn=make_disturbance_fn(wind),
                            multirotor_class=AdaptiveMultirotor,
                            multirotor_kwargs=multirotor_kwargs)
    apply_params(env, **params)
    env._proximity = 2.
    env.ctrl.ctrl_p.square_root_scaling = True
    env.ctrl.ctrl_p.leashing = True
    return env

def reallocate_matrix(params, fault=False, wind='0@0'):
    params = deepcopy(params)
    if fault:
        motor, value = fault
        if value==0:
            params.propellers[motor].k_thrust = 0
            params.propellers[motor].k_drag = 0
        else:
            kt0, kd0 = params.propellers[motor].k_thrust, params.propellers[motor].k_drag
            w_nominal = np.sqrt(params.mass * SP.g / (params.nprops * kt0))
            win = np.linspace(max(w_nominal-50,0), w_nominal+50, 50).reshape(-1,1)
            f = np.ones(50) * kt0 * value**2
            t = np.ones(50) * kd0 * value**2
            # f = k * w^2
            kth = (np.linalg.pinv(win**2) @ f).item()
            kd = (np.linalg.pinv(win**2) @ t).item()
            params.propellers[motor].k_thrust, params.propellers[motor].k_drag = kth, kd
    return control_allocation_matrix(params)

def fault_scenario(adapt=False, supervisor=False, reallocate=False, wind='0@0', fault=False):
    env = get_env(dxdt_u=dxdt_u, dxdt_u_=fault_saturation(*fault) if fault else dxdt_u,
              enable=adapt, wind=wind, fault=fault)
    if not supervisor:
        apply_params_pid(env.ctrl, **params_pid)
    if reallocate:
        env.vehicle.alloc, env.vehicle.alloc_inverse = reallocate_matrix(env.vehicle.params, fault=fault)

    wp = eight_curve(a=30)
    log = run_trajectory(env, wp, agent if supervisor else None, verbose=False)
    return log

def plot_log(log: DataLog, fault=False, wind='0@0', params=VP):
    axes = plot_datalog(log, plots=('pos', 'vel', 'traj'))
    axes['traj'].quiver(*get_wind_quiver(wind, axes['traj']))
    arms, *_ = make_drawing(VP, make_2d=True, scale_arms=2)
    if fault:
        arms[fault[0]].set_color('r')
    for a in arms:
        axes['traj'].add_line(a)
    plt.subplot(2,2,4)
    plt.plot(log.t, log.speeds, lw=0.5, ls=':')
    plt.title('Allocated Speeds')

In [None]:
# gradient of forces/torques w.r.t speeds
u = torch.ones(8) * 364
u.requires_grad_(True)
f,t = f_t(u)
d_ft_du = jacobian(
    func=lambda u: f_t(speeds=u),
    inputs=u
)
d_ft_du

In [None]:
# gradient of dxdt w.r.t (forces, torques)
f = torch.zeros(3)
f[2] = VP.mass * SP.g
f.requires_grad_(True)

t = torch.zeros(3)
t[0] = 0.01
t.requires_grad_(True)

ft = torch.cat((f,t))

x = torch.zeros(12)

d_dxdt_du = jacobian(
    func=lambda ft: dxdt(ft[:3], ft[3:], x),
    inputs=ft
)
d_dxdt_du

In [None]:
# Simple example with takeoff
def pi_s(x, *args):
    return torch.ones(8)*370

F_P = torch.eye(12)
vp = deepcopy(VP)
vp.propellers[1].k_thrust = 0.
vp.propellers[1].k_drag = 0.

def pi_t(x, F_P=F_P, pi_s=pi_s, vp=vp, **kwargs):
    u_s = pi_s(x)
    d_dxdt_du = dxdt_du(speeds=u_s, x=x, vp=vp, F_P=F_P, **kwargs)
    delta_u = torch.linalg.pinv(d_dxdt_du) @ (dxdt_u(u_s,x)-dxdt_u(u_s,x,vp=vp,F_P=F_P,**kwargs))
    u_t = delta_u + u_s
    return u_t.detach()

x_s, u_s = state_traj(torch.zeros(12), pi_s, F_P=torch.eye(12))
x_p, u_p = state_traj(torch.zeros(12), pi_s, vp=vp, F_P=F_P)
x_t, u_t = state_traj(torch.zeros(12), pi_t, vp=vp, F_P=F_P)

from matplotlib.gridspec import GridSpec
gs = GridSpec(2,3)
plt.figure(figsize=(9,3))

plt.subplot(gs[:,0])
plt.plot(x_s[:,2], label='Nominal')
plt.plot(x_p[:,2], label='Pre-Adaptation')
plt.plot(x_t[:,2], label='Adapted')
plt.ylim(-3,1)
plt.ylabel('z /m')
plt.legend()

plt.subplot(gs[0,1])
plt.plot(u_s, ls=':')
plt.title('Nominal speeds')

plt.subplot(gs[1,1])
lines = []
for i, u in enumerate(u_t.T):
    l, = plt.plot(u, ls=':', label=str(i))
    lines.append(l)
plt.title('Adapted speeds')
plt.ylabel('Speeds rad/s')

plt.subplot(gs[0,2])
arms, *_ = make_drawing(VP, make_2d=True, scale_arms=2)
arms[1].set_color('r')
for i, a in enumerate(arms):
    plt.gca().add_line(a)
    plt.text(a.get_xdata()[1], a.get_ydata()[1], str(i))
plt.xlim(-1.5,1.5)
plt.ylim(-1.5,1.5)
plt.gca().set_aspect('equal')
plt.gca().set_axis_off()
plt.tight_layout()

plt.subplot(gs[1,2])
plt.legend(handles=lines, ncol=3)
plt.gca().set_axis_off()

In [None]:
# Using Multirotor
adapt = True
reallocate = False
wind = False
fault = True

m = Multirotor(VP, SP) # nominal initialization/allocation
ctrl = get_controller(m)
ctrl.ctrl_p.leashing = True
ctrl.ctrl_p.square_root_scaling = True
apply_params_pid(ctrl, **params_pid)
wp = np.asarray([
    [10,0,0],[10,10,0],[0,10,0],[0,0,0]
]) * 5
traj = Trajectory(m, wp, proximity=2)
log = DataLog(m, ctrl, other_vars=('speeds',))

disturb_forces = np.asarray([-10, 0, 0], np.float32) * wind
if fault: apply_fault(m, '1@1')
if reallocate:
    m.alloc, m.alloc_inverse = control_allocation_matrix(m.params)

i, N = 0, 2_000
for pos, _ in tqdm(traj, total=N, leave=False):
    ref = np.asarray([*pos, 0])
    dynamics = ctrl.step(ref)
    speeds = m.allocate_control(dynamics[0], dynamics[1:]) # nominal policy
    if adapt:
        speeds = pi_t(
            torch.from_numpy(m.state),
            pi_s=lambda _: speeds,
            vp=m.params,
            disturb_forces=torch.from_numpy(disturb_forces)
        ).numpy()
    speeds = np.clip(speeds, a_min=0, a_max=700)
    m.step_speeds(speeds, disturb_forces=disturb_forces)
    log.log(speeds=speeds)
    i += 1
    if i==N: break
log.done_logging()
plot_datalog(log)

In [None]:
# Using AdaptiveMultirotor
adapt = False
reallocate = True
wind = '0@0'
fault = (1,0)

m = AdaptiveMultirotor(
    VP, SP,
    dxdt_u=dxdt_u,
    dxdt_u_=fault_saturation(*fault) if fault else dxdt_u,
    enable=adapt,
    disturbance_fn=make_disturbance_fn(wind),
    fault=fault
)

if reallocate:
    m.alloc, m.alloc_inverse = reallocate_matrix(m.params, fault=fault)
ctrl = get_controller(m)
apply_params_pid(ctrl, **params_pid)
ctrl.ctrl_p.leashing = True
ctrl.ctrl_p.square_root_scaling = True
wp = eight_curve(a=30)
traj = Trajectory(m, wp, proximity=2)
log = DataLog(m, ctrl, other_vars=('speeds',))

i, N = 0, 2_000
for pos, _ in tqdm(traj, total=N, leave=False):
    ref = np.asarray([*pos, 0])
    dynamics = ctrl.step(ref)
    speeds = m.allocate_control(dynamics[0], dynamics[1:])
    speeds = np.clip(speeds, a_min=0, a_max=700)
    m.step_speeds(speeds, disturb_forces=m.disturbance_fn(m))
    log.log(speeds=speeds)
    i += 1
    if i==N: break
log.done_logging()
plot_datalog(log)

In [None]:
# Using MultirotorTrajEnv with AdaptiveMultirotor
adapt = True
supervisor = True
reallocate = False
wind = '5@0'
fault = (1,250)

env = get_env(dxdt_u=dxdt_u, dxdt_u_=fault_saturation(*fault) if fault else dxdt_u,
              enable=adapt, wind=wind, fault=fault)
if not supervisor:
    apply_params_pid(env.ctrl, **params_pid)
if reallocate:
    env.vehicle.alloc, env.vehicle.alloc_inverse = reallocate_matrix(env.vehicle.params, fault=fault)

env.ctrl.ctrl_a.k_p[2] = 1. * env.ctrl.ctrl_a.k_p[0]
env.ctrl.ctrl_a.k_d[2] = 0. * env.ctrl.ctrl_a.k_d[0]
env.ctrl.ctrl_r.k_p[2] = 1. * env.ctrl.ctrl_r.k_p[0]
env.ctrl.ctrl_r.k_d[2] = 0.5 * env.ctrl.ctrl_r.k_d[0]
env.ctrl.ctrl_r.max_acceleration[2] = 0.25*env.ctrl.ctrl_r.max_acceleration[0]
    
wp = eight_curve(a=30)
log = run_trajectory(env, wp, agent if supervisor else env.ctrl, verbose=False)
plot_log(log, fault, wind)

In [None]:
sl = slice(-30,None,1)
plt.figure(figsize=(9,4))
plt.subplot(1,2,1)
D = env.vehicle.alloc @ (log.speeds.T**2)
plt.plot(log.actions[sl,2], label='Allocated')
plt.plot(D[2, sl], label='Actual')
plt.legend()
plt.twinx()
plt.plot(log.pitch[sl] * 180 / np.pi, ls=':', c='y')
plt.subplot(1,2,2)
plt.plot(log.speeds[sl,:]);
plt.tight_layout()

In [None]:
# prediction errors
kths = []
errs = []
w_nominal = np.sqrt(VP.mass * SP.g / (VP.nprops * kt0))
ws = np.linspace(0, 450, 50)
kt0, kd0 = VP.propellers[motor].k_thrust, VP.propellers[motor].k_drag
for value in ws:
    win = np.linspace(max(value-50,0), value+50, 50).reshape(-1,1)
    f = np.ones(50) * kt0 * value**2
    t = np.ones(50) * kd0 * value**2
    # f = k * w^2
    kth = (np.linalg.pinv(win**2) @ f).item()
    kd = (np.linalg.pinv(win**2) @ t).item()
    kths.append(kth)
    err = np.abs(kth * ws**2 - f).mean()
    errs.append(err)

plt.figure(figsize=(9,3))
plt.subplot(1,2,1)
n = 5
kth = kths[n]
value = ws[n]
win = np.linspace(max(value-50,0), value+50, 50).reshape(-1,1)
plt.plot(ws[:10], kt0 * value**2 * np.ones_like(ws[:10]), label='Actual (saturated)')
plt.plot(ws[:10], kt0 * ws[:10]**2, label='Nominal prediction')
plt.plot(ws[:10], kth * ws[:10]**2, label='Approximated')
plt.legend()
plt.title('$k_{thrust,sat}$ for $\Omega=45 rad\;s^{-1}$')
plt.ylabel('$F_z$')
plt.xlabel('Motor speed $\Omega\;/rad\;s^{-1}$')
plt.subplot(1,2,2)
l = plt.plot(ws, kths, label='$k_{thrust,sat}$')
plt.ylabel('$k_{thrust}$')
plt.xlabel('Motor speed $\Omega\;/rad\;s^{-1}$')
plt.twinx()
plt.plot(ws, errs, ls=':', c='r', label='Prediction error')
plt.ylabel('Mean absolute error in $F_z$')
plt.title('$k_{thrust,sat}$ & error for $\Omega=[0,450] rad\;s^{-1}$')
plt.legend(handles=l+plt.gca().lines)
plt.tight_layout()

In [None]:
# Wind vs fault rewards 
adapt = True
supervisor = False
reallocate = False
logs = []
wp = eight_curve()

winds = list(range(5))
motors = list(range(8))
R = np.zeros((len(winds), len(motors)))

for w in tqdm(winds, desc='wind /N', leave=False):
    logs.append([])
    wind = '%d@90' % w
    for m in tqdm(motors, desc='motor', leave = False):
        fault = (m, 0) # failure
        env = get_env(dxdt_u=dxdt_u, dxdt_u_=fault_saturation(*fault) if fault else dxdt_u,
              enable=adapt, wind=wind, fault=fault)
        if not supervisor:
            apply_params_pid(env.ctrl, **params_pid)
        if reallocate:
            env.vehicle.alloc_inverse, env.vehicle.alloc = reallocate_matrix(env.vehicle.params, fault=fault)

        wp = eight_curve(a=-50)
        log = run_trajectory(env, wp, agent if supervisor else None, verbose=False)
        logs[-1].append(log)
        R[w,m] = log.reward.sum()

ax = plt.imshow(R)
plt.xlabel('Motor')
plt.ylabel('Wind Force /N')
plt.colorbar(ax, orientation='horizontal')
plt.title('Robustness to novel disturbances & motor failure')

# EKF

In [None]:
# https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python/blob/master/11-Extended-Kalman-Filters.ipynb
from filterpy.common import Q_discrete_white_noise
from filterpy.kalman import ExtendedKalmanFilter
import numpy as np

dt = env.vehicle.simulation.dt
rk = ExtendedKalmanFilter(dim_x=12, dim_z=4)
radar = RadarSim(dt, pos=0., vel=100., alt=1000.)

# make an imperfect starting guess
rk.x = env.reset()

# d(dx/dt)/dx
rk.F = 

range_std = 5. # meters
# Process noise matrix (state variables)
rk.Q = Q_discrete_white_noise(12, dt=dt, var=0.1)
# Measurement noise matrix (output)
rk.R = rk.Q
rk.P *= 50

xs, track = [], []
for i in range(int(20/dt)):
    # get measurement
    z = radar.get_range()
    track.append((radar.pos, radar.vel, radar.alt))
    
    rk.update(array([z]), HJacobian_at, hx)
    xs.append(rk.x)
    rk.predict()

xs = np.asarray(xs)
track = asarray(track)
time = np.arange(0, len(xs)*dt, dt)
ekf_internal.plot_radar(xs, track, time)