## Uavs Collision Avoidance

In [1]:
import os
import subprocess
import numpy as np

# Custom modules
from vehicle_logic import VehicleLogic,VehicleMode
from helpers.change_coordinates import heading_to_yaw,find_spawns,global2local,local2global
from helpers.visualization.gazebo_world import update_world
from helpers.visualization.plots import plot_3d_interactive
from plan.actions.navegation import get_local_position,make_go_to
from plan import Plan,State

Kill all related process

In [2]:
for process in ["QGroundControl", "sim_vehicle.py", "arducopter", "gazebo", "mavproxy"]:
    os.system(f"pkill -9 -f {process}")

Paths

In [3]:
QGC_path='~/QGroundControl.AppImage'
gazebo_word_path="gazebo_worlds/runway.world"
ardupilot_vehicle_path='~/ardupilot/Tools/autotest/sim_vehicle.py'

## Choose Simulator

In [4]:
simulator ='gazebo' # 'QGroundControl' #

## Create Plans

In [5]:
offsets = [(-10, 0, 0, 0),(0, -5, 0, 0)]#
n_uavs = len(offsets)


homes=np.array([offset[:3] for offset in offsets])

#global_paths=[np.array([[-10, 0, 5],[10, 0, 5]])]

global_paths=[np.array([[-10, 0, 5],[10, 0, 5]]),
              np.array([[0, -5, 5]])]

local_paths=[global2local(path, home,pairwise=True) for path,home in zip(global_paths,homes)]


radar_radius=10
avoidance_radius =5
spoofing_pos=np.array([[0,0,5]])
markers = {f'waypoints': {'pos':global_paths[0],'color':'blue'},
       f'real pos': {'pos':global_paths[1],'color':'orange','radius':0.05},
       f'fake_pos': {'pos':spoofing_pos,'color':'red','radius':0.1},
       f'avoid zone': {'pos':spoofing_pos,'color':'red','radius':avoidance_radius,'alpha': 0.84},
       f'radar zone': {'pos':global_paths[1],'color':'orange','radius':radar_radius,'alpha': 0.9}}



# markers = {f'waypoints': {'pos':global_paths[0],'color':'blue'},
#        f'real pos': {'pos':global_paths[1],'color':'orange','radius':0.1},
#        f'avoid zone': {'pos':global_paths[1],'color':'orange','radius':avoidance_radius,'alpha': 0.84},
#        f'radar zone': {'pos':global_paths[1],'color':'orange','radius':radar_radius,'alpha': 0.9}}



spoofing_pos=spoofing_pos[0]

plans=[Plan.basic(wps=local_paths[0],alt=5,wp_margin=0.5,navegation_speed=1),
       Plan.hover(wps=local_paths[1],alt=5,wp_margin=0.5,navegation_speed=5)]
print(plans)

plot_3d_interactive(markers,title='Simulation Markers',expand=[0.2,0.2,0.6],ground=-0.05)


if simulator == 'QGroundControl':
    home_position=(-35.3633245,149.1652241,0,0)
    spawns=find_spawns(home_position, offsets)

Plan 'basic' created — no connection yet 🧩
Action 'Pre-Arm Check' created — no connection yet 🧩
Step 'Check disarmed' created — no connection yet 🧩
Step 'Check EKF' created — no connection yet 🧩
Step 'Check GPS' created — no connection yet 🧩
Step 'Check system' created — no connection yet 🧩
Action 'Set Navigation Speed' created — no connection yet 🧩
Step 'Set speed to 1.00 m/s' created — no connection yet 🧩
Action 'Set Mode: GUIDED' created — no connection yet 🧩
Step 'Switch to GUIDED' created — no connection yet 🧩
Action 'Arm' created — no connection yet 🧩
Step 'arm' created — no connection yet 🧩
Action 'takeoff' created — no connection yet 🧩
Step 'takeoff' created — no connection yet 🧩
Action 'fly' created — no connection yet 🧩
Step 'go to  -> (0, 0, 5)' created — no connection yet 🧩
Step 'go to  -> (20, 0, 5)' created — no connection yet 🧩
Action 'Land' created — no connection yet 🧩
Step 'land' created — no connection yet 🧩
Plan 'hover' created — no connection yet 🧩
Action 'Pre-Arm 

In [6]:
plans

[🕓 <Plan 'basic'>
   🕓 <Action 'Pre-Arm Check'>
     🕓 <Step 'Check disarmed'>
     🕓 <Step 'Check EKF'>
     🕓 <Step 'Check GPS'>
     🕓 <Step 'Check system'>
   🕓 <Action 'Set Navigation Speed'>
     🕓 <Step 'Set speed to 1.00 m/s'>
   🕓 <Action 'Set Mode: GUIDED'>
     🕓 <Step 'Switch to GUIDED'>
   🕓 <Action 'Arm'>
     🕓 <Step 'arm'>
   🕓 <Action 'takeoff'>
     🕓 <Step 'takeoff'>
   🕓 <Action 'fly'>
     🕓 <Step 'go to  -> (0, 0, 5)'>
     🕓 <Step 'go to  -> (20, 0, 5)'>
   🕓 <Action 'Land'>
     🕓 <Step 'land'>,
 🕓 <Plan 'hover'>
   🕓 <Action 'Pre-Arm Check'>
     🕓 <Step 'Check disarmed'>
     🕓 <Step 'Check EKF'>
     🕓 <Step 'Check GPS'>
     🕓 <Step 'Check system'>
   🕓 <Action 'Set Mode: GUIDED'>
     🕓 <Step 'Switch to GUIDED'>
   🕓 <Action 'Arm'>
     🕓 <Step 'arm'>
   🕓 <Action 'takeoff'>
     🕓 <Step 'takeoff'>
   🕓 <Action 'fly'>
     🕓 <Step 'go to  -> (0, 0, 5)'>]

## Waypoint seletion algorithm

In [7]:
if simulator == 'QGroundControl':
    sim_path = os.path.expanduser(QGC_path)
    sim_cmd =[sim_path]
elif simulator == 'gazebo':
    # Convert to Gazebo format (name, x, y, z, roll, pitch, yaw)
    drones = [(east, north, up, 0, 0, heading_to_yaw(heading)) for i, (east, north, up, heading) in enumerate(offsets)]
    world_path = os.path.expanduser(gazebo_word_path)
    updated_world_path = update_world(drones,markers,world_path)
    sim_cmd = ["gazebo", "--verbose", updated_world_path] 

simulator_process = subprocess.Popen(
                    sim_cmd,
                    stdout=subprocess.DEVNULL,  # Suppress standard output
                    stderr=subprocess.DEVNULL,  # Suppress error output
                    shell=False  # Ensure safety when passing arguments
                    )

Launch Vehicle

In [8]:
vehicle_path = os.path.expanduser(ardupilot_vehicle_path)

In [9]:
for i in range(n_uavs):
    vehicle_cmd = f"python3 {vehicle_path} -v ArduCopter -I{i} --sysid {i+1} --no-rebuild"
    if simulator == 'QGroundControl':
        spawn=','.join(map(str, spawns[i]))
        vehicle_cmd += f" --custom-location={spawn}"
    elif simulator == 'gazebo':
        vehicle_cmd += " -f gazebo-iris"
    subprocess.Popen(["gnome-terminal", "--", "bash", "-c", f"{vehicle_cmd}; exec bash"])

Create UavLogic

In [10]:
uavs=[]
for i,(plan,home) in enumerate(zip(plans,homes)):
    uavs.append(VehicleLogic(sys_id=i+1,
                    home=home,
                    plan= plan))

vehicle 1 created
vehicle 2 created


In [11]:
def get_nearest_neighbor(target_id,positions_dict):
    target_position = np.array(positions_dict[target_id])  # Get the position of the target ID
    
    min_distance = float('inf')
    nearest_id = None

    for other_id, position in positions_dict.items():
        if other_id == target_id:
            continue  # Skip itself
        
        distance = np.linalg.norm(target_position - np.array(position))  # Compute Euclidean distance
        
        if distance < min_distance:
            min_distance = distance
            nearest_id = other_id  # Update nearest ID
    
    return nearest_id,min_distance

In [12]:
print(uavs[0].plan)

🕓 <Plan 'basic'>
  🕓 <Action 'Pre-Arm Check'>
    🕓 <Step 'Check disarmed'>
    🕓 <Step 'Check EKF'>
    🕓 <Step 'Check GPS'>
    🕓 <Step 'Check system'>
  🕓 <Action 'Set Navigation Speed'>
    🕓 <Step 'Set speed to 1.00 m/s'>
  🕓 <Action 'Set Mode: GUIDED'>
    🕓 <Step 'Switch to GUIDED'>
  🕓 <Action 'Arm'>
    🕓 <Step 'arm'>
  🕓 <Action 'takeoff'>
    🕓 <Step 'takeoff'>
  🕓 <Action 'fly'>
    🕓 <Step 'go to  -> (0, 0, 5)'>
    🕓 <Step 'go to  -> (20, 0, 5)'>
  🕓 <Action 'Land'>
    🕓 <Step 'land'>


In [13]:
def get_avoidance_wp(pos: np.ndarray,
                    obj_pos:np.ndarray,
                    goal_pos:np.ndarray,
                    distance:float = 1,
                    direction: str = 'left'):
    """
    Sends a velocity command in body frame, orthogonal to the direction of wp.
    `direction` can be 'left' or 'right' (relative to wp direction).
    """

    # Normalize wp direction (ignore Z)
    obj_dir = (obj_pos -pos)[:2]
    goal_dir = (goal_pos -pos)[:2]
    if np.dot(obj_dir,goal_dir)<0:
        return goal_pos
    obj_dir = obj_dir / np.linalg.norm(obj_dir)*distance
    # Get orthogonal direction
    if direction == 'left':
        ortho = np.array([-obj_dir[1], obj_dir[0],0])
    elif direction == 'right':
        ortho = np.array([obj_dir[1], -obj_dir[0],0])
    else:
        raise ValueError("Direction must be 'left' or 'right'")

    # Scale to desired speed
    return pos+ortho
   

In [None]:
mission_incomplete = set(range(n_uavs))
global_poss = {}
while mission_incomplete:
    ## Get global positions
    update_global_positions(mission_incomplete,global_poss)
    global_poss[1] = spoofing_pos
    to_remove = set()
    for i in mission_incomplete:
        uav = uavs[i]
        if i in global_poss.keys():
            j,min_dist=get_nearest_neighbor(target_id=i,positions_dict=global_poss)
            if min_dist<avoidance_radius and uav.get_mode()==VehicleMode.MISSION:  
                uav.set_mode(VehicleMode.AVOIDANCE)
                wp=uav.plan.current.current.wp
                next_wp = get_avoidance_wp(pos=global2local(global_poss[i],uav.home,pairwise=True),
                                           obj_pos=global2local(global_poss[j],uav.home,pairwise=True),
                                           goal_pos=wp)
                next_step=make_go_to(wp=next_wp,wp_margin=0.5,verbose=2,cause_text='(avoidance)')
                uav.plan.current.add_now(next_step)
            if uav.get_mode()==VehicleMode.AVOIDANCE and uav.current_step().state==State.DONE:
                uav.set_mode(VehicleMode.MISSION)
        uav.act_plan()
        if uav.plan.state == State.DONE:
            to_remove.add(i)
            
    mission_incomplete -= to_remove  # batch-remove outside the loop

In [None]:
def update_global_positions(mission_incomplete,global_poss):
    for i in mission_incomplete:
        uav=uavs[i]
        current_action=uav.current_action().name 
        if current_action in ['takeoff','fly']:
            local_pos=get_local_position(uav.conn)
            if local_pos is not False:
                global_poss[i] = local2global(local_pos,uav.home,pairwise=True)


                # if i == 1:
                #     global_poss[i] = spoofing_pos

def avoid_object(uav,curr_pos):
    uav.set_mode(VehicleMode.AVOIDANCE)
    wp=uav.plan.current.current.wp
    next_wp = get_avoidance_wp(pos=global2local(global_poss[i],uav.home,pairwise=True),
                                obj_pos=global2local(global_poss[j],uav.home,pairwise=True),
                                goal_pos=wp)
    next_step=make_go_to(wp=next_wp,wp_margin=0.5,verbose=2,cause_text='(avoidance)')
    uav.plan.current.add_now(next_step)

## Execute Plan

In [15]:

mission_incomplete = set(range(n_uavs))
global_poss = {}
while mission_incomplete:
    ## Get global positions
    update_global_positions(mission_incomplete,global_poss)
    global_poss[1] = spoofing_pos
    to_remove = set()
    for i in mission_incomplete:
        uav = uavs[i]
        if i in global_poss.keys():
            j,min_dist=get_nearest_neighbor(target_id=i,positions_dict=global_poss)
            if min_dist<avoidance_radius and uav.get_mode()==VehicleMode.MISSION:  
                uav.set_mode(VehicleMode.AVOIDANCE)
                wp=uav.plan.current.current.wp
                next_wp = get_avoidance_wp(pos=global2local(global_poss[i],uav.home,pairwise=True),
                                           obj_pos=global2local(global_poss[j],uav.home,pairwise=True),
                                           goal_pos=wp)
                next_step=make_go_to(wp=next_wp,wp_margin=0.5,verbose=2,cause_text='(avoidance)')
                uav.plan.current.add_now(next_step)
            if uav.get_mode()==VehicleMode.AVOIDANCE and uav.current_step().state==State.DONE:
                uav.set_mode(VehicleMode.MISSION)
        uav.act_plan()
        if uav.plan.state == State.DONE:
            to_remove.add(i)
            
    mission_incomplete -= to_remove  # batch-remove outside the loop

Vehicle 1: Plan 'basic' is now connected ✅🔗
Vehicle 1: ▶️ Starting Plan: basic
Vehicle 2: Plan 'hover' is now connected ✅🔗
Vehicle 2: ▶️ Starting Plan: hover
Vehicle 1: Action 'Pre-Arm Check' is now connected ✅🔗
Vehicle 1: ▶️ Starting Action: Pre-Arm Check
Vehicle 2: Action 'Pre-Arm Check' is now connected ✅🔗
Vehicle 2: ▶️ Starting Action: Pre-Arm Check
Vehicle 1: Step 'Check disarmed' is now connected ✅🔗
Vehicle 1: ▶️ Starting Step: Check disarmed
Vehicle 2: Step 'Check disarmed' is now connected ✅🔗
Vehicle 2: ▶️ Starting Step: Check disarmed
Vehicle 1: ✅ Step: Check disarmed is done
Vehicle 1: Step 'Check EKF' is now connected ✅🔗
Vehicle 1: ▶️ Starting Step: Check EKF
Vehicle 2: ✅ Step: Check disarmed is done
Vehicle 2: Step 'Check EKF' is now connected ✅🔗
Vehicle 2: ▶️ Starting Step: Check EKF
Vehicle 2: ✅ Step: Check EKF is done
Vehicle 2: Step 'Check GPS' is now connected ✅🔗
Vehicle 2: ▶️ Starting Step: Check GPS
Vehicle 1: ✅ Step: Check EKF is done
Vehicle 1: Step 'Check GPS' is 

In [16]:
print(uavs[0].plan)

✅ <Plan 'basic'>
  ✅ <Action 'Pre-Arm Check'>
    ✅ <Step 'Check disarmed'>
    ✅ <Step 'Check EKF'>
    ✅ <Step 'Check GPS'>
    ✅ <Step 'Check system'>
  ✅ <Action 'Set Navigation Speed'>
    ✅ <Step 'Set speed to 1.00 m/s'>
  ✅ <Action 'Set Mode: GUIDED'>
    ✅ <Step 'Switch to GUIDED'>
  ✅ <Action 'Arm'>
    ✅ <Step 'arm'>
  ✅ <Action 'takeoff'>
    ✅ <Step 'takeoff'>
  ✅ <Action 'fly'>
    ✅ <Step 'go to  -> (0, 0, 5)'>
    ✅ <Step 'go to (avoidance) -> (5.845461321452364, 1.0170413200258062, 4.972641468048096)'>
    ✅ <Step 'go to (avoidance) -> (6.41807164175344, 1.6174667970437286, 4.959656715393066)'>
    ✅ <Step 'go to (avoidance) -> (6.26294106693981, 1.8606095368851978, 4.9590911865234375)'>
    ✅ <Step 'go to (avoidance) -> (6.26294106693981, 1.8606095368851978, 4.9590911865234375)'>
    ✅ <Step 'go to (avoidance) -> (6.26294106693981, 1.8606095368851978, 4.9590911865234375)'>
    ✅ <Step 'go to (avoidance) -> (6.26294106693981, 1.8606095368851978, 4.9590911865234375)'>
  

In [17]:
print(uavs[1].plan)

✅ <Plan 'hover'>
  ✅ <Action 'Pre-Arm Check'>
    ✅ <Step 'Check disarmed'>
    ✅ <Step 'Check EKF'>
    ✅ <Step 'Check GPS'>
    ✅ <Step 'Check system'>
  ✅ <Action 'Set Mode: GUIDED'>
    ✅ <Step 'Switch to GUIDED'>
  ✅ <Action 'Arm'>
    ✅ <Step 'arm'>
  ✅ <Action 'takeoff'>
    ✅ <Step 'takeoff'>
  ✅ <Action 'fly'>
    ✅ <Step 'go to  -> (0, 0, 5)'>


## Kill all related process

In [18]:
# for process in ["QGroundControl", "sim_vehicle.py", "arducopter", "gazebo", "mavproxy"]:
#     os.system(f"pkill -9 -f {process}")