## Logic Rule Simulation

This notebook implements a uav logic instead of a plan and decisions are done online following that logic

In [17]:
import os
import subprocess
from pymavlink import mavutil
import numpy as np
from collections import OrderedDict

# Custom modules
from vehicle_logic import VehicleLogic
from helpers.change_coordinates import heading_to_yaw,find_spawns,global2local
from helpers.visualization.gazebo_world import update_world
from helpers.math import manhattan_distance
from helpers.navegation_logic import find_path,find_best_waypoint
from helpers.visualization.plots import plot_3d_interactive

## Kill all related process

In [18]:
for process in ["QGroundControl", "sim_vehicle.py", "arducopter", "gazebo", "mavproxy"]:
    os.system(f"pkill -9 -f {process}")

## Paths

In [19]:
QGC_path='~/QGroundControl.AppImage'
gazebo_word_path="gazebo_worlds/attack_city.world"
ardupilot_vehicle_path='~/ardupilot/Tools/autotest/sim_vehicle.py'

# Choose Simulator

In [20]:
simulator ='gazebo' # 'QGroundControl' #

## Gonfiguration

In [21]:
wps = [(40, -10, 5),
            (15, -10, 5),
            (0, -10, 5),
            (0, 4, 5),
            (40, 20, 5),
            (15, 20, 5),
            (0, 20, 5)]

use_nav_wps=[True,False]
wp_plans=[
    [(40, -10,5),(0, 4,5)],
    [(40,  20, 5),(0, 4,5)]
    ]

homes=[(*plan[0][:2],0) for plan in wp_plans]
targets=[plan[-1] for plan in wp_plans]


markers = {'waypoints': {'pos':wps,'color':'orange'},
           'homes': {'pos':homes,'color':'blue'},
           'targets':{'pos':targets,'color':'green'},
           }


n_uavs = len(use_nav_wps)
headings=n_uavs*[0]

offsets=[(*home,heading) for home,heading in zip(homes,headings)]




# Interactive widget to rotate the plot

plot_3d_interactive(markers,title='Simulation Markers')


# Get Local positions

In [22]:
homes_np=np.array(homes)
targets_np=np.array([plan[-1] for plan in wp_plans])
wps_np = np.array(wps)  # Extract keys as a list of tuples


local_wps=global2local(wps_np, homes_np,pairwise=False)
local_targets=global2local(targets_np, homes_np,pairwise=True)
local_wps = np.concatenate([local_wps,local_targets[:,np.newaxis,:]],axis=1)
local_wp_plans=[global2local(np.array(plan), home,pairwise=True) for plan,home in zip(wp_plans,homes_np)]

In [23]:
if simulator == 'QGroundControl':
    # lat,long,alt,heading
    home_position=(-35.3633245,149.1652241,0,90)
    spawns=find_spawns(home_position, offsets)

## Waypoint seletion algorithm

In [24]:
def in_same_orthant(current: np.ndarray, target: np.ndarray, waypoints: np.ndarray, dims=[0,1], eps=1) -> np.ndarray:
    """
    Vectorized function to check if waypoints are in the same quadrant as the target w.r.t. the current position.

    Parameters:
    - current: np.ndarray of shape (3,) (current position)
    - target: np.ndarray of shape (3,) (target position)
    - waypoints: np.ndarray of shape (N,3) (list of waypoints)
    - dims: list of int (dimensions to consider, default [0,1] for 2D checks)
    - eps: float (tolerance for quadrant checks)

    Returns:
    - np.ndarray of shape (N,) (boolean array indicating which waypoints are in the same quadrant)
    """
    # Extract only relevant dimensions
    current = current[dims]
    target = target[dims]
    waypoints = waypoints[:, dims]

    # Check if waypoints are in the same orthant
    target_rel =  current - target
    waypoints_rel=current - waypoints
    return np.all(((target_rel >= -eps) & (waypoints_rel >= -eps))| ((target_rel <= eps) & (waypoints_rel <= eps)), axis=1)


def find_path(start, target, waypoints,eps=1):
    path = [start]
    current = start
    while not np.array_equal(current, target):
        same_quadrant=in_same_orthant(current, target,waypoints,eps=eps)
        waypoints = waypoints[same_quadrant]
        next_waypoint,waypoints= find_best_waypoint(current, target, waypoints,eps=eps)
        if next_waypoint is None:
            break  # No valid path found
        path.append(next_waypoint)
        current = next_waypoint
    return np.stack(path,axis=0)

In [25]:
for i in range(n_uavs):
        print(f'uav-{i} Best path: {find_path(start=np.array([0,0,0]),target = local_targets[i], waypoints=local_wps[i])}') 


uav-0 Best path: [[  0   0   0]
 [  0   0   5]
 [-25   0   5]
 [-40   0   5]
 [-40  14   5]]
uav-1 Best path: [[  0   0   0]
 [  0   0   5]
 [-25   0   5]
 [-40   0   5]
 [-40 -16   5]]


## Launch Simulator

In [26]:
if simulator == 'QGroundControl':
    sim_path = os.path.expanduser(QGC_path)
    sim_cmd =[sim_path]
elif simulator == 'gazebo':
    # Convert to Gazebo format (name, x, y, z, roll, pitch, yaw)
    drones = [(east, north, up, 0, 0, heading_to_yaw(heading)) for i, (east, north, up, heading) in enumerate(offsets)]
    world_path = os.path.expanduser(gazebo_word_path)
    updated_world_path = update_world(drones,markers,world_path)
    sim_cmd = ["gazebo", "--verbose", updated_world_path] 

simulator_process = subprocess.Popen(
                    sim_cmd,
                    stdout=subprocess.DEVNULL,  # Suppress standard output
                    stderr=subprocess.DEVNULL,  # Suppress error output
                    shell=False  # Ensure safety when passing arguments
                    )

## Create Plan

In [27]:
default_plan=['check_prearm','check_pos_est','mode_stabilize','mode_guided','arm','takeoff','fly','land']
uavs_arg = []
for i in range(n_uavs):
    uav_arg={'waypoints':local_wp_plans[i],
             'plan': default_plan,
             'wp_margin': 0.5}
    uavs_arg.append(uav_arg)

# Launch Vehicle

In [28]:
vehicle_path = os.path.expanduser(ardupilot_vehicle_path)

In [29]:
for i in range(n_uavs):
    vehicle_cmd = f"python3 {vehicle_path} -v ArduCopter -I{i} --sysid {i+1} --no-rebuild"
    if simulator == 'QGroundControl':
        spawn=','.join(map(str, spawns[i]))
        vehicle_cmd += f" --custom-location={spawn}"
    elif simulator == 'gazebo':
        vehicle_cmd += " -f gazebo-iris"
    subprocess.Popen(["gnome-terminal", "--", "bash", "-c", f"{vehicle_cmd}; exec bash"])

## Create UavLogic

In [30]:
uavs=[]
for i,uav_arg in enumerate(uavs_arg):
    connection = mavutil.mavlink_connection(f'udp:127.0.0.1:{14551+10*i}')
    connection.wait_heartbeat()
    uav=VehicleLogic(connection,**uav_arg)
    uavs.append(uav)

vehicle 1 created
vehicle 2 created


## Execute Plan

In [31]:
mission_incomplete = n_uavs*[True]
target_reached =n_uavs*[False] 
nav_wps=list(local_wps.copy())
eps=1
wp_index=n_uavs*[0]
while any(mission_incomplete):
    for i,uav in enumerate(uavs):
        mission_incomplete[i] = uav.act_plan()
        if uav.is_reached and (not target_reached[i]):
            wp_index[i]+=1
            target=local_targets[i]
            current = np.array(uav.get_local_position())
            waypoints=nav_wps[i]
            # GPS spoofing
            # if manhattan_distance(current,(-25, 0))<1:
            #     current=(-25, -20)
            same_quadrant=in_same_orthant(current, target,waypoints,eps=eps)
            waypoints= waypoints[same_quadrant]
            next_wp,waypoints= find_best_waypoint(current, target, waypoints,eps=eps)
            
            if not np.array_equal(next_wp, target):
                path=np.stack([current,next_wp,target],axis=0)
            else:
                path=np.stack([current,target],axis=0)
                target_reached[i]=True
            print(f'--- uav {i} rached waypoint {wp_index[i]} --- ')
            print(f'current path:{path}')
            uav.update_waypoints(path)
            nav_wps[i]=waypoints
        

vehicle 1: action land is done
vehicle 2: action land is done
vehicle 1: action check_prearm is done
vehicle 2: action check_prearm is done
vehicle 1: action check_pos_est is done
vehicle 1: action mode_stabilize is done
vehicle 2: action check_pos_est is done
vehicle 1: action mode_guided is done
vehicle 2: action mode_stabilize is done
vehicle 1: action arm is done
(0.00394210871309042, 0.003672418650239706, -0.035856857895851135)
[0 0 5]
vehicle 2: action mode_guided is done
(0.004064746201038361, 0.003788160625845194, -0.03589440882205963)
[0 0 5]
vehicle 2: action arm is done
(0.004218020476400852, 0.003999349195510149, -0.035734280943870544)
[0 0 5]
(0.003184004221111536, 0.002963668666779995, -0.03592262044548988)
[0 0 5]
(0.003521515056490898, 0.0033642456401139498, -0.035781390964984894)
[0 0 5]
(0.0020152716897428036, 0.001890967134386301, -0.03594515472650528)
[0 0 5]
(0.002274109283462167, 0.0022087236866354942, -0.03581174463033676)
[0 0 5]
(0.0008757589384913445, 0.000871

## Kill all related process

In [32]:
# for process in ["QGroundControl", "sim_vehicle.py", "arducopter", "gazebo", "mavproxy"]:
#     os.system(f"pkill -9 -f {process}")