# Wind Impact on PID Performance

How robust is the classic cascaded PID architecture to wind? In this notebook, we aim to find that out. We use expert-tuned PID parameters to run an octorotor UAV in a 100x100m square trajectory.

In [39]:
from systems.long_multirotor import LongTrajEnv

import notebook_setup
from typing import Union, Iterable, List
from copy import deepcopy
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from tqdm.autonotebook import tqdm, trange
import optuna
from scipy.stats import pearsonr

from rl import learn_rl, transform_rl_policy, evaluate_rl, PPO, load_agent
from multirotor.simulation import Multirotor
from multirotor.helpers import DataLog
from multirotor.visualize import plot_datalog
from multirotor.controller import Controller
from multirotor.trajectories import Trajectory, GuidedTrajectory
from multirotor.controller.scurves import SCurveController
from multirotor.coords import body_to_inertial, inertial_to_body, direction_cosine_matrix, euler_to_angular_rate
from systems.multirotor import MultirotorTrajEnv, VP
from multirotor.controller import (
    AltController, AltRateController,
    PosController, AttController,
    VelController, RateController,
    Controller
)
from scripts.opt_pidcontroller import (
    get_controller, make_disturbance_fn,
    apply_params as apply_params_pid, get_study as get_study_pid
)
from scripts.opt_multirotorenv import apply_params, get_study, get_established_controller

In [40]:
def convert_wind(wind_vector, multirotor: Multirotor):
        dcm = direction_cosine_matrix(*multirotor.orientation)
        return inertial_to_body(wind_vector, dcm), 0

In [41]:
def get_env(params={}, scurve=False, **kwargs):  
    kw = dict(
        safety_radius=10,
        vp=VP,get_controller_fn=lambda m: get_established_controller(m),
        steps_u=50,
        scaling_factor=0.5,
        disturbance_fn=lambda m: params['wind_vector'],
        proximity=2,
        seed=0)
    return MultirotorTrajEnv(**kw)

In [42]:
square_np = np.array([[100,0,0], [100,100,0], [0,100,0], [0,0,0]])
square_traj = Trajectory(None, points=square_np, resolution=20)
square_wpts = square_traj.generate_trajectory(curr_pos=np.array([0,0,0]))

In [43]:
def get_tte(initial_pos: tuple, waypoints: np.ndarray, x: np.ndarray, y:np.ndarray, z:np.ndarray) -> np.ndarray:
        """
        Calculates the trajectory tracking error. 
        The distance between the current point and the vector between previous and next wp. Uses ||v1 x v2|| / ||v1||.

        Parameters
        ----------
        initial_pos : tuple  
            the initial position of the UAV.
        waypoints : np.ndarray 
            the reference positions at each point in time.
        x : np.ndarray 
            the x positions of the UAV.
        y : np.ndarray 
            the y positions of the UAV.
        z : np.ndarray
            the z positions of the UAV.

        Returns
        -------
        np.ndarray 
            the trajectory tracking error at each point in time.
        """
        ttes = []
        prev = initial_pos
        for i, waypoint in enumerate(waypoints):
            if i > 0 and not np.array_equal(waypoints[i-1], waypoints[i]):
                prev = waypoints[i-1]

            v1 = waypoint - prev
            v2 = np.array([x[i],y[i],z[i]]) - prev
            tte = np.linalg.norm(np.cross(v1, v2)) / np.linalg.norm(v1)
            ttes.append(tte)
                
        return np.array(ttes)

In [44]:
def completed_mission(waypoints: np.ndarray, x: np.ndarray, y: np.ndarray, z: np.ndarray, radius: float = 0.65):
        for waypoint in waypoints:
            reached_waypoint = False

            for position in zip(x,y,z):
                dist = np.linalg.norm(waypoint - position)

                if dist <= radius:
                    reached_waypoint = True
                    break

            if not reached_waypoint:
                return False
            
        return True

In [45]:
def run_trajectory(wind_vector: np.ndarray):
    env = LongTrajEnv(
        waypoints = square_wpts,
        base_env = get_env({'wind_vector': wind_vector})
    )
    done = False
    env.reset()
    log = DataLog(env.base_env.vehicle, env.base_env.ctrl,
                      other_vars=('reward',))
    while not done:
        state, reward, done, info = env.step([0,0,0])
        log.log(reward=reward)

    log.done_logging()
    
    return log, info

In [46]:
wind_results = pd.DataFrame(columns=['X Wind', 'Y Wind', 'Mean TTE', 'Total TTE', 'Completed Mission', 'Out of Bounds', 'Out of Time', 'Tipped', 'Reward'])

In [47]:
def run_wind_sweep(results):
    step = 2.5
    x_range = np.arange(-15,15+step,step)
    y_range = np.arange(-15,15+step,step)

    for x_speed in tqdm(x_range):
        for y_speed in tqdm(y_range, leave=False):
            log, info = run_trajectory(np.array([x_speed,y_speed,0], np.float32))
            new_result = {
                'X Wind': x_speed,
                'Y Wind': y_speed,
                'Mean TTE': np.mean(get_tte(np.array([0,0,0]), log.states[:,12:], log.x, log.y, log.z)),
                'Total TTE': np.sum(get_tte(np.array([0,0,0]), log.states[:,12:], log.x, log.y, log.z)),
                'Completed Mission': completed_mission(square_wpts, log.x, log.y, log.z, radius=2),
                'Out of Bounds': info.get('outofbounds'),
                'Out of Time': info.get('outoftime'),
                'Tipped': info.get('tipped'),
                'Reward': np.sum(log.reward)
            }
            results = pd.concat([results, pd.DataFrame([new_result])], ignore_index=True)

    return results

In [None]:
wind_results = run_wind_sweep(wind_results)

  0%|          | 0/13 [00:00<?, ?it/s]

  0%|          | 0/13 [00:00<?, ?it/s]

  0%|          | 0/13 [00:00<?, ?it/s]

In [None]:
wind_results

In [None]:
wind_results.to_csv('wind_results.csv')

In [None]:
wind_results = pd.read_csv('wind_results.csv')

In [None]:
pearsonr(np.abs(wind_results['X Wind']), wind_results['Completed Mission'])

In [None]:
pearsonr(np.abs(wind_results['X Wind']), wind_results['Mean TTE'])

In [None]:
norms = np.linalg.norm([wind_results['X Wind'], wind_results['Y Wind']], axis=0)

In [None]:
pearsonr(norms, wind_results['Completed Mission'])

In [None]:
plt.title("Wind Magnitude on Completion")
sns.boxplot(y=norms, x=wind_results['Completed Mission'], showfliers=True)
plt.ylabel("Norm of Wind Vector (m/s)")