In [1]:
import numpy as np
import time
import pybullet as p
import argparse
from gym_pybullet_drones.utils.utils import sync
from gym_pybullet_drones.envs.BaseAviary import DroneModel, Physics
from gym_pybullet_drones.envs.CtrlAviary import CtrlAviary
from gym_pybullet_drones.control.DSLPIDControl import DSLPIDControl
from gym_pybullet_drones.utils.Logger import Logger


pybullet build time: Apr 18 2025 03:31:46


In [2]:
class DroneSwarmSimulation:
    def __init__(self, 
                 num_drones=15,
                 simulation_freq_hz=240,
                 control_freq_hz=48,
                 duration_sec=60,
                 gui=True,
                 record=False):
        
        # Initialize simulation parameters
        self.NUM_DRONES = num_drones
        self.SIMULATION_FREQ_HZ = simulation_freq_hz
        self.CONTROL_FREQ_HZ = control_freq_hz
        self.DURATION_SEC = duration_sec
        self.GUI = gui
        self.RECORD = record
        
        # Initial drone positions in a grid pattern
        initial_xyzs = np.zeros((self.NUM_DRONES, 3))
        rows = int(np.sqrt(self.NUM_DRONES))
        cols = int(np.ceil(self.NUM_DRONES / rows))
        
        for i in range(self.NUM_DRONES):
            row, col = i // cols, i % cols
            initial_xyzs[i] = [(col - cols/2 + 0.5) * 0.5, 
                              (row - rows/2 + 0.5) * 0.5, 
                              0.5]
        
        # Initialize the environment
        self.env = CtrlAviary(
            drone_model=DroneModel.CF2X,
            num_drones=self.NUM_DRONES,
            initial_xyzs=initial_xyzs,
            initial_rpys=np.zeros((self.NUM_DRONES, 3)),
            physics=Physics.PYB,
            gui=self.GUI,
            record=self.RECORD,
            obstacles=False,
            user_debug_gui=True
        )
        
        # Initialize controllers for each drone
        self.ctrl = [DSLPIDControl(drone_model=DroneModel.CF2X) for _ in range(self.NUM_DRONES)]
        
        # Initialize target positions and orientations
        self.INIT_XYZS = np.array([self.env.INIT_XYZS[i, :] for i in range(self.NUM_DRONES)])
        self.TARGET_XYZ = self.INIT_XYZS.copy()
        self.INIT_RPYS = np.array([self.env.INIT_RPYS[i, :] for i in range(self.NUM_DRONES)])
        self.TARGET_RPYS = np.zeros((self.NUM_DRONES, 3))
        
        # Initialize action array
        self.action = np.zeros((self.NUM_DRONES, 4))
        
        # Initialize logger
        self.logger = Logger(logging_freq_hz=self.CONTROL_FREQ_HZ,
                             num_drones=self.NUM_DRONES,
                             duration_sec=self.DURATION_SEC)
        
        # Formation state variables
        self.current_formation = "takeoff"
        self.formation_change_time = 10  # Change formation every X seconds
        self.last_formation_change = 0
        self.available_formations = ["circle", "grid", "pyramid", "random", "line"]


In [3]:
def set_circle_formation(self, height=1.5, radius=3.0):
    """Set target positions for a circle formation"""
    self.TARGET_XYZ = np.array([
            [np.cos(i/self.NUM_DRONES*2*np.pi)*radius, 
             np.sin(i/self.NUM_DRONES*2*np.pi)*radius, 
             height] 
            for i in range(self.NUM_DRONES)
        ])
        # Set drones to face center
    for i in range(self.NUM_DRONES):
            # Calculate angle to face center
            angle = np.arctan2(-self.TARGET_XYZ[i, 1], -self.TARGET_XYZ[i, 0])
            self.TARGET_RPYS[i] = [0, 0, angle]


In [4]:
def set_grid_formation(self, height=1.5, spacing=0.8):
        """Set target positions for a grid formation"""
        rows = int(np.sqrt(self.NUM_DRONES))
        cols = int(np.ceil(self.NUM_DRONES / rows))
        
        self.TARGET_XYZ = np.zeros((self.NUM_DRONES, 3))
        for i in range(self.NUM_DRONES):
            row, col = i // cols, i % cols
            self.TARGET_XYZ[i] = [(col - cols/2 + 0.5) * spacing, 
                                 (row - rows/2 + 0.5) * spacing, 
                                 height]


In [5]:
def set_pyramid_formation(self, base_height=0.5, spacing=0.8):
        """Set target positions for a pyramid formation"""
        layer = 0
        drones_in_layer = 1
        drones_placed = 0
        
        self.TARGET_XYZ = np.zeros((self.NUM_DRONES, 3))
        
        while drones_placed < self.NUM_DRONES:
            layer_size = min(drones_in_layer**2, self.NUM_DRONES - drones_placed)
            layer_rows = int(np.sqrt(layer_size))
            layer_cols = int(np.ceil(layer_size / layer_rows))
            
            for i in range(layer_size):
                if drones_placed >= self.NUM_DRONES:
                    break
                    
                row, col = i // layer_cols, i % layer_cols
                self.TARGET_XYZ[drones_placed] = [
                    (col - layer_cols/2 + 0.5) * spacing,
                    (row - layer_rows/2 + 0.5) * spacing,
                    base_height + layer * spacing
                ]
                drones_placed += 1
                
            layer += 1
            drones_in_layer += 1

In [6]:
def set_random_formation(self, min_height=1.0, max_height=2.0, area_size=4.0):
        """Set random target positions with collision avoidance"""
        self.TARGET_XYZ = np.array([
            [np.random.uniform(-area_size/2, area_size/2), 
             np.random.uniform(-area_size/2, area_size/2),
             np.random.uniform(min_height, max_height)]
            for _ in range(self.NUM_DRONES)
        ])
        
        # Simple collision avoidance for target positions
        min_distance = 0.5
        for _ in range(5):  # Multiple iterations to resolve conflicts
            for i in range(self.NUM_DRONES):
                for j in range(i+1, self.NUM_DRONES):
                    distance = np.linalg.norm(self.TARGET_XYZ[i, :2] - self.TARGET_XYZ[j, :2])
                    if distance < min_distance:
                        # Move drones away from each other
                        direction = self.TARGET_XYZ[i, :2] - self.TARGET_XYZ[j, :2]
                        if np.linalg.norm(direction) > 0:
                            direction = direction / np.linalg.norm(direction)
                        else:
                            direction = np.random.rand(2) * 2 - 1  # Random direction if overlapping
                            direction = direction / np.linalg.norm(direction)
                        
                        push_distance = (min_distance - distance) / 2
                        self.TARGET_XYZ[i, :2] += direction * push_distance
                        self.TARGET_XYZ[j, :2] -= direction * push_distance
                        
                        # Keep within bounds
                        self.TARGET_XYZ[i, :2] = np.clip(self.TARGET_XYZ[i, :2], -area_size/2, area_size/2)
                        self.TARGET_XYZ[j, :2] = np.clip(self.TARGET_XYZ[j, :2], -area_size/2, area_size/2)

In [7]:
def set_line_formation(self, height=1.5, spacing=0.5):
        """Set target positions for a line formation"""
        self.TARGET_XYZ = np.zeros((self.NUM_DRONES, 3))
        for i in range(self.NUM_DRONES):
            offset = (i - (self.NUM_DRONES-1)/2) * spacing
            self.TARGET_XYZ[i] = [offset, 0, height]

In [8]:
def update_formation(self, current_time):
        """Update formation based on time"""
        if current_time - self.last_formation_change > self.formation_change_time:
            # Choose next formation
            formations = list(self.available_formations)
            if self.current_formation in formations:
                formations.remove(self.current_formation)
            self.current_formation = np.random.choice(formations)
            
            # Set the new formation
            if self.current_formation == "circle":
                self.set_circle_formation()
            elif self.current_formation == "grid":
                self.set_grid_formation()
            elif self.current_formation == "pyramid":
                self.set_pyramid_formation()
            elif self.current_formation == "random":
                self.set_random_formation()
            elif self.current_formation == "line":
                self.set_line_formation()
            
            # Update formation change time
            self.last_formation_change = current_time
            print(f"Changed to {self.current_formation} formation at time {current_time:.1f}s")

In [9]:
def control_drones(self, i):
        """Calculate control inputs for each drone"""
        # Calculate current simulation time
        current_time = i / self.SIMULATION_FREQ_HZ
        
        # Start with takeoff for the first 10 seconds
        if current_time < 10:
            if self.current_formation != "takeoff":
                print("Starting takeoff phase...")
                self.current_formation = "takeoff"
                
            # Staggered takeoff
            for j in range(self.NUM_DRONES):
                takeoff_delay = j * 0.5  # 0.5 second delay between each drone
                if current_time > takeoff_delay:
                    takeoff_height = min(1.5, (current_time - takeoff_delay) * 0.3)
                    takeoff_pos = self.INIT_XYZS[j].copy()
                    takeoff_pos[2] = takeoff_height
                    
                    self.action[j, :] = self.ctrl[j].computeControlFromState(
                        control_timestep=self.SIMULATION_FREQ_HZ/self.CONTROL_FREQ_HZ,
                        state=self.env._getDroneStateVector(j),
                        target_pos=takeoff_pos,
                        target_rpy=self.TARGET_RPYS[j]
                    )
        else:
            # Normal operation after takeoff
            # Update formation if needed
            self.update_formation(current_time)
            
            # Control each drone
            for j in range(self.NUM_DRONES):
                state = self.env._getDroneStateVector(j)
                
                # Add some subtle hovering motion
                hover_offset = np.sin(current_time + j * 0.5) * 0.05
                target_pos = self.TARGET_XYZ[j].copy()
                target_pos[2] += hover_offset
                
                self.action[j, :] = self.ctrl[j].computeControlFromState(
                    control_timestep=self.SIMULATION_FREQ_HZ/self.CONTROL_FREQ_HZ,
                    state=state,
                    target_pos=target_pos,
                    target_rpy=self.TARGET_RPYS[j]
                )

In [10]:
def run_simulation(self):
        """Run the complete simulation"""
        print(f"Starting simulation with {self.NUM_DRONES} drones...")
        START = time.time()
        
        # Main simulation loop
        for i in range(0, int(self.DURATION_SEC*self.SIMULATION_FREQ_HZ)):
            # Step the simulation
            if i % (self.SIMULATION_FREQ_HZ//self.CONTROL_FREQ_HZ) == 0:
                self.control_drones(i)
            
            self.env.step(self.action)
            
            # Log data for drone 0 (as an example)
            if i % (self.SIMULATION_FREQ_HZ//self.CONTROL_FREQ_HZ) == 0:
                self.logger.log(
                    drone=0,
                    timestamp=i/self.SIMULATION_FREQ_HZ,
                    state=self.env._getDroneStateVector(0),
                    control=np.zeros(12)  # Simplified control logging
                )
            
            # Sync the simulation with real time if GUI is enabled
            if self.GUI:
                sync(i, START, self.SIMULATION_FREQ_HZ)
                
        
        # Close the environment
        self.env.close()
        
        # Plot results
        self.logger.plot()

In [11]:
def __init__(self, 
             num_drones=15,
             simulation_freq_hz=240,
             control_freq_hz=48,
             duration_sec=60,
             gui=True,
             record=False):
    
    # Disconnect any existing PyBullet connections
    try:
        import pybullet as p
        if p.isConnected():
            p.disconnect()
    except:
        pass
    
    # Initialize simulation parameters
    self.NUM_DRONES = num_drones
    self.SIMULATION_FREQ_HZ = simulation_freq_hz
    self.CONTROL_FREQ_HZ = control_freq_hz
    self.DURATION_SEC = duration_sec
    self.GUI = gui
    self.RECORD = record
    self.env = CtrlAviary(
    drone_model=DroneModel.CF2X,
    num_drones=self.NUM_DRONES,
    initial_xyzs=initial_xyzs,
    initial_rpys=np.zeros((self.NUM_DRONES, 3)),
    physics=Physics.PYB,
    gui=self.GUI,  # Make sure this is correctly passing your gui parameter
    record=self.RECORD,
    obstacles=False,
    user_debug_gui=True
    )

In [12]:
def __init__(self, 
             num_drones=15,
             simulation_freq_hz=240,
             control_freq_hz=48,
             duration_sec=60,
             gui=True,
             record=False):
    
    # Disconnect any existing PyBullet connections
    try:
        if p.isConnected():
            p.disconnect()
    except:
        pass
    
    # Initialize simulation parameters
    self.NUM_DRONES = num_drones
    self.SIMULATION_FREQ_HZ = simulation_freq_hz
    self.CONTROL_FREQ_HZ = control_freq_hz
    self.DURATION_SEC = duration_sec
    self.GUI = gui
    self.RECORD = record
    
    # Explicitly connect to PyBullet before creating the environment
    if self.GUI:
        self.client = p.connect(p.GUI)
        print(f"Connected to PyBullet with GUI mode, client ID: {self.client}")
    else:
        self.client = p.connect(p.DIRECT)
        print(f"Connected to PyBullet with DIRECT mode, client ID: {self.client}")

In [None]:
import pybullet as p
import pybullet_data
import time
import numpy as np

# Disconnect any existing connections
try:
    if p.isConnected():
        p.disconnect()
except:
    pass

# Try to connect with GUI
client = p.connect(p.GUI)
print(f"Connected with client ID: {client}")

# Set the path to PyBullet's data
p.setAdditionalSearchPath(pybullet_data.getDataPath())
print(f"Search path set to: {pybullet_data.getDataPath()}")

# Load floor plane
plane_id = p.loadURDF("plane.urdf")

# Set gravity
p.setGravity(0, 0, -9.8)

# Configure camera view
p.resetDebugVisualizerCamera(
    cameraDistance=10,
    cameraYaw=45,
    cameraPitch=-30,
    cameraTargetPosition=[0, 0, 2]
)

# Number of drones
num_drones = 10

# Create multiple cube drones
drone_ids = []
for i in range(num_drones):
    # Calculate position in a circle
    angle = 2 * np.pi * i / num_drones
    x = 3 * np.cos(angle)
    y = 3 * np.sin(angle)
    z = 1.5  # Start at same height
    
    # Create a colored box as a simple drone representation
    visual_shape_id = p.createVisualShape(
        shapeType=p.GEOM_BOX,
        halfExtents=[0.2, 0.2, 0.05],
        rgbaColor=[i/num_drones, 0.5, 1-i/num_drones, 1]  # Different color for each drone
    )
    
    collision_shape_id = p.createCollisionShape(
        shapeType=p.GEOM_BOX,
        halfExtents=[0.2, 0.2, 0.05]
    )
    
    drone_id = p.createMultiBody(
        baseMass=0.5,
        baseCollisionShapeIndex=collision_shape_id,
        baseVisualShapeIndex=visual_shape_id,
        basePosition=[x, y, z]
    )
    
    drone_ids.append(drone_id)

# Various formation patterns
def circle_formation(t, i, num_drones):
    radius = 3 + np.sin(t/2) * 1.5  # Dynamic radius
    angle = 2 * np.pi * i / num_drones + t / 5  # Rotating circle
    x = radius * np.cos(angle)
    y = radius * np.sin(angle)
    z = 1.5 + np.sin(t/3 + i * 0.5) * 0.5  # Vary height
    return [x, y, z]

def figure_eight_formation(t, i, num_drones):
    # Phase difference between drones
    phase = i * 2 * np.pi / num_drones
    # Figure-8 parametric equation
    x = 4 * np.sin(t/2 + phase)
    y = 4 * np.sin(t/2 + phase) * np.cos(t/2 + phase)
    z = 1.5 + np.sin(t + phase) * 0.5
    return [x, y, z]

def spiral_formation(t, i, num_drones):
    # Spiral with increasing radius
    angle = 4 * t + i * 2 * np.pi / num_drones
    radius = 1 + (i / num_drones) * 3
    x = radius * np.cos(angle)
    y = radius * np.sin(angle)
    z = 1.5 + (i / num_drones) * 2
    return [x, y, z]

def wave_formation(t, i, num_drones):
    # Line with sine wave
    x = (i - num_drones/2) * 0.8
    y = np.sin(t + i * 0.5) * 2
    z = 1.5 + np.cos(t/2 + i * 0.3) * 0.7
    return [x, y, z]

# List of formations
formations = [
    circle_formation,
    figure_eight_formation,
    spiral_formation,
    wave_formation
]

# Run simulation
print("Running simulation with dynamic drone movements...")
current_formation = 0
formation_change_time = 500  # Steps before changing formation
formation_transition = 100   # Steps for smooth transition

# PD controller parameters for smoother movement
kp = 10.0  # Position gain
kd = 5.0   # Velocity gain

# Target positions and velocities
target_positions = [np.zeros(3) for _ in range(num_drones)]
current_velocities = [np.zeros(3) for _ in range(num_drones)]

for step in range(4000):
    # Change formation periodically
    if step % formation_change_time == 0:
        prev_formation = current_formation
        current_formation = (current_formation + 1) % len(formations)
        print(f"Changing to formation: {formations[current_formation].__name__}")
    
    # Calculate blend factor for smooth transition
    in_transition = step % formation_change_time < formation_transition
    blend = min(1.0, (step % formation_change_time) / formation_transition) if in_transition else 1.0
    
    # Update each drone
    for i, drone_id in enumerate(drone_ids):
        # Get current position and velocity
        drone_pos, drone_orn = p.getBasePositionAndOrientation(drone_id)
        drone_vel, drone_ang_vel = p.getBaseVelocity(drone_id)
        
        # Calculate target position based on current and possibly previous formation
        current_target = formations[current_formation](step/100, i, num_drones)
        
        # If in transition, blend with previous formation
        if in_transition and step > formation_transition:
            prev_target = formations[prev_formation](step/100, i, num_drones)
            target_pos = np.array(prev_target) * (1-blend) + np.array(current_target) * blend
        else:
            target_pos = np.array(current_target)
        
        # Store target position
        target_positions[i] = target_pos
        
        # Calculate PD control
        position_error = target_pos - np.array(drone_pos)
        velocity_error = -np.array(drone_vel)  # Target velocity is zero
        
        # PD control law
        force = kp * position_error + kd * velocity_error
        
        # Add gravity compensation
        force[2] += 9.8 * 0.5
        
        # Apply forces
        p.applyExternalForce(
            drone_id,
            -1,  # Link ID (-1 means base)
            force,
            [0, 0, 0],  # Position (doesn't matter for -1)
            p.WORLD_FRAME
        )
        
        # Add some rotation to make it look more like a drone
        p.applyExternalTorque(
            drone_id,
            -1,
            [np.sin(step/20 + i) * 0.1, 
             np.cos(step/20 + i) * 0.1, 
             np.sin(step/30) * 0.05],
            p.WORLD_FRAME
        )
    
    # Step the simulation
    p.stepSimulation()
    
    # Slow down simulation for visibility
    time.sleep(1/240)

print("Simulation complete.")
p.disconnect()

Version = 4.1 Metal - 89.3
Vendor = Apple
Renderer = Apple M1
Connected with client ID: 0
Search path set to: /opt/anaconda3/envs/tf_310/lib/python3.10/site-packages/pybullet_data
b3Printf: Selected demo: Physics Server
startThreads creating 1 threads.
starting thread 0
started thread 0 
MotionThreadFunc thread started
Running simulation with dynamic drone movements...
Changing to formation: figure_eight_formation
Changing to formation: spiral_formation
Changing to formation: wave_formation
Changing to formation: circle_formation
Changing to formation: figure_eight_formation
Changing to formation: spiral_formation
Changing to formation: wave_formation
Changing to formation: circle_formation
