*This notebook implements a **Deep Reinforcement Learning Environment** where two Unmanned Aerial Vehicles (UAVs), acting as agents, learn to navigate effectively to reach their respective goals. The environment is designed with various constraints and features crucial for realistic UAV navigation, including altitude, velocity, speed, obstacles, and no-fly zones.*

The main components of the environment include:

**UAV Dynamics:** Each UAV's state includes information such as position (x, y, z), velocity (dx, dy, dz), and other relevant parameters.

**Goals:** Each UAV has a specific goal position to reach within the environment.

**Obstacles:** Various obstacles are present in the environment, such as rectangular zones, circular zones, and elliptical zones, which UAVs must avoid colliding with.

**No-Fly Zones:** Designated areas where UAVs are prohibited from entering, which adds complexity to the navigation task.

The reinforcement learning algorithm employed is **Double Deep Q-Network (DDQN)**, which enables the agents to learn effective policies for navigation through trial and error. The agents interact with the environment by choosing actions (e.g., changing altitude or velocity) based on their current states. Rewards are provided based on reaching the goals, avoiding obstacles, and adhering to the constraints.

Key features of the notebook implementation include:

**Training Loop:** Iteratively trains the DDQN agents over multiple episodes, updating the neural network parameters to improve performance.

**Experience Replay:** Utilizes a replay buffer to store and sample experiences, enabling more efficient learning by breaking temporal correlations.

**Target Network Update:** Periodically updates the target network to stabilize training and improve convergence.

**Epsilon-Greedy Exploration:** Balances exploration and exploitation during action selection using an epsilon-greedy policy.

**Early Stopping:** Implements early stopping based on a specified threshold to prevent overfitting and expedite training.

Overall, this notebook provides a comprehensive framework for training UAV agents to navigate complex environments effectively while considering real-world constraints and features.

In [1]:
import gym  # OpenAI Gym library
import random  # Python random module
from gym import spaces  # Gym spaces module for defining action and observation spaces
import numpy as np  # NumPy for numerical computations
import matplotlib.pyplot as plt  # Matplotlib for visualization
import matplotlib.patches as patches  # Matplotlib patches for drawing shapes
import logging  # Python logging module for logging messages
import torch  # PyTorch deep learning framework
import torch.nn as nn  # PyTorch neural network module
import torch.optim as optim  # PyTorch optimization module
from collections import deque  # Python deque (double-ended queue) for replay buffer
import random  # Python random module for random number generation

In [2]:
class UAVEnv(gym.Env):
    """Custom Environment that follows gym interface for a UAV with 3D space considerations."""
    metadata = {'render.modes': ['human']} # Define render modes for the environment

    def __init__(self, num_uavs=2, goal_positions=None, max_steps=100, debug=False):
        super(UAVEnv, self).__init__()
        """
        Initialize the UAV environment.

        Parameters:
        - num_uavs (int): Number of UAVs in the environment.
        - goal_positions (list of np.ndarray): List of goal positions for each UAV.
        - max_steps (int): Maximum number of steps per episode.
        - debug (bool): Whether to enable debug mode for logging.
        """
        # Define environment bounds and parameters
        self.env_bounds = (15, 15, 30)  # Bounds of the environment including z-axis
        self.num_uavs = num_uavs
        self.max_steps = max_steps
        self.goal_positions = goal_positions if goal_positions is not None else [
            np.array([14, 14, 10]), np.array([0, 14, 10]) ## Default goal positions for two UAVs
        ]
        self.current_step = 0
        # Initialize UAV states, paths, and other variables
        self.uav_states = [np.array([0, 0, 10, 0, 0, 0]) for _ in range(num_uavs)]
        self.previous_distances = [np.linalg.norm(state[:3] - goal) for state, goal in zip(self.uav_states, self.goal_positions)]
        self.paths = [[] for _ in range(num_uavs)]
        self.debug = debug
        # Define no-fly zones and obstacles in the environment
        self.no_fly_zones = [
            {'type': 'rectangle', 'position': (7.5, 2.5, 15), 'width': 2, 'height': 2, 'depth': 15}
        ]
        self.obstacles = [
            {'type': 'rectangle', 'position': (2, 2, 15), 'width': 1, 'height': 1, 'depth': 15},
            {'type': 'circle', 'position': (7.5, 7.5, 20), 'radius': 1, 'depth': 20},
            {'type': 'ellipse', 'position': (2.5, 7.5, 25), 'width': 2, 'height': 1, 'depth': 25}
        ]
        # Define action and observation spaces
        self.action_space = spaces.MultiDiscrete([6] * num_uavs)
        low = np.array([-10, -10, 0, -1, -1, -1] * num_uavs, dtype=np.float32)
        high = np.array([10, 10, 30, 1, 1, 1] * num_uavs, dtype=np.float32)
        self.observation_space = spaces.Box(low=low, high=high, dtype=np.float32)

    def step(self, actions):
        """
        Execute one step within the environment.

        Parameters:
        - actions (list of int or np.ndarray): Actions to be taken by each UAV.

        Returns:
        - observations (np.ndarray): Concatenated observations of each UAV after the step.
        - rewards (np.ndarray): Rewards obtained by each UAV after the step.
        - dones (np.ndarray): Whether each UAV has reached a terminal state after the step.
        - infos (dict): Additional information for debugging purposes.
        """
        # Execute one time step within the environment
        self.current_step += 1
        rewards = [0] * self.num_uavs
        dones = [False] * self.num_uavs
        infos = [{}] * self.num_uavs
        try:
            for i in range(self.num_uavs):
                # Take action for each UAV
                action = actions[i][0] if isinstance(actions[i], np.ndarray) and actions[i].size > 0 else None
                if action is not None:
                    self._take_action(i, action)
                    self.paths[i].append(self.uav_states[i][:3].copy())
                    dones[i] = self._is_done(i)
                    rewards[i] = self._get_reward(i)
                    # Penalize if UAV enters no-fly zone
                    for zone in self.no_fly_zones:
                        if self._in_no_fly_zone(self.uav_states[i][:3], zone):
                            rewards[i] -= 100
                            dones[i] = True
                            if self.debug:
                                logging.debug(f"UAV {i} entered no-fly zone at {self.uav_states[i][:3]}")
            # Return observations, rewards, done flags, and additional info
            return np.concatenate([uav_state.flatten() for uav_state in self.uav_states]), np.array(rewards), np.array(dones), infos
        except Exception as e:
            logging.error("Error during step execution:", exc_info=True)
            raise

    def reset(self):
        # Reset the state of the environment to an initial state
        """
        Reset the environment to its initial state.

        Returns:
        - observations (np.ndarray): Concatenated initial observations of each UAV.
        """
        self.current_step = 0
        initial_positions = [np.array([0, 0, 10]), np.array([14, 0, 10])]
        self.uav_states = [np.concatenate([pos, np.zeros(3)]) for pos in initial_positions]
        self.paths = [[] for _ in range(self.num_uavs)]
        self.previous_distances = [np.linalg.norm(state[:3] - goal) for state, goal in zip(self.uav_states, self.goal_positions)]
        return np.concatenate([state.flatten() for state in self.uav_states])

    def render(self, mode='human'):
        """
        Render the environment for visualization.

        Parameters:
        - mode (str): Rendering mode ('human' for interactive plots).
        """
        # Render the environment
        if mode == 'human':
            fig, ax = plt.subplots(figsize=(10, 10))
            ax.set_xlim(self.observation_space.low[0], self.observation_space.high[0])
            ax.set_ylim(self.observation_space.low[1], self.observation_space.high[1])
            ax.set_xlabel('X Position')
            ax.set_ylabel('Y Position')
            ax.set_title('Multi-UAV Navigation Environment')
            ax.grid(True)
            colors = ['blue', 'green']
            for idx, state in enumerate(self.uav_states):
                x, y, z, dx, dy, dz = state
                goal = self.goal_positions[idx]
                ax.scatter(goal[0], goal[1], c='red', s=100, label=f'Goal {idx}', alpha=0.5)
                ax.scatter(x, y, c=colors[idx], s=100, label=f'UAV {idx}', alpha=0.5)
                if self.paths[idx]:
                    path_array = np.array(self.paths[idx])
                    ax.plot(path_array[:, 0], path_array[:, 1], label=f'Path {idx}', color=colors[idx], alpha=0.6)
            for obstacle in self.obstacles:
                self._plot_obstacle(ax, obstacle, 'orange')
            for zone in self.no_fly_zones:
                self._plot_obstacle(ax, zone, 'purple')
            ax.legend(loc='upper right')
            plt.show()
    # Helper method to plot obstacles in the environment
    def _plot_obstacle(self, ax, obstacle, color):
        """
        Plot obstacle on the environment plot.

        Parameters:
        - ax: Matplotlib Axes object to plot on.
        - obstacle (dict): Obstacle parameters including type, position, dimensions, etc.
        - color (str): Color for plotting the obstacle.
        """
        if obstacle['type'] == 'rectangle':
            rect = patches.Rectangle((obstacle['position'][0] - obstacle['width']/2, obstacle['position'][1] - obstacle['height']/2),
                                     obstacle['width'], obstacle['height'], linewidth=1, edgecolor=color, facecolor=color, alpha=0.5)
            ax.add_patch(rect)
        elif obstacle['type'] == 'circle':
            circle = patches.Circle((obstacle['position'][0], obstacle['position'][1]),
                                    obstacle['radius'], linewidth=1, edgecolor=color, facecolor=color, alpha=0.5)
            ax.add_patch(circle)
        elif obstacle['type'] == 'ellipse':
            ellipse = patches.Ellipse((obstacle['position'][0], obstacle['position'][1]),
                                      obstacle['width'], obstacle['height'], linewidth=1, edgecolor=color, facecolor=color, alpha=0.5)
            ax.add_patch(ellipse)

    def choose_action(state, epsilon):
        """
        Choose actions for each UAV based on the current state and epsilon-greedy policy.

        Parameters:
        - state (np.ndarray): Current state of the environment.
        - epsilon (float): Epsilon value for epsilon-greedy policy.

        Returns:
        - actions (list of list): List of actions for each UAV.
        """
        actions = []
        for _ in range(env.num_uavs):  # Assuming env.num_uavs gives the number of UAVs
            if random.random() > epsilon:
                with torch.no_grad():
                    state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
                    q_values = main_network(state_tensor)
                    action = torch.argmax(q_values, dim=-1).item()  # Ensure this takes the correct dimension
            else:
                action = random.randint(0, 5)  # Ensuring actions are within bounds
            actions.append([action])  # Encapsulate action in a list
        return actions  # Return a list of action lists

                
                
    def _detect_collision(self, new_position, uav_index):
        """
        Detect collision with obstacles or other UAVs.

        Parameters:
        - new_position (np.ndarray): New position of the UAV.
        - uav_index (int): Index of the UAV.

        Returns:
        - collision (bool): Whether collision is detected.
        """
        # Check collision with obstacles
        for obstacle in self.obstacles:
            if self._collision_with_obstacle(new_position, obstacle):
                return True
        # Check collision with other UAVs
        for idx, state in enumerate(self.uav_states):
            if idx != uav_index and np.linalg.norm(new_position[:3] - state[:3]) < 1.0:  # Ensure the 3D distance check
                return True
        return False

    # Define method to take action for each UAV
    def _take_action(self, uav_index, action):
        """
        Take actions for the specified UAV.

        Parameters:
        - uav_index (int): Index of the UAV.
        - action (int): Action to be taken by the UAV.
        """
        x, y, z, dx, dy, dz = self.uav_states[uav_index]
        speed_change = 0.1

        # Modify velocity based on action
        if action == 0:
            dz += speed_change
        elif action == 1:
            dz -= speed_change
        elif action == 2:
            dx -= speed_change
        elif action == 3:
            dx += speed_change
        elif action == 4:
            dy += speed_change
        elif action == 5:
            dy -= speed_change

        # Apply damping to velocities
        dx *= 0.95
        dy *= 0.95
        dz *= 0.95

        # Update positions and ensure they stay within bounds
        x, y, z = np.clip([x + dx, y + dy, z + dz], [0, 0, 0], self.env_bounds)

        # Update state
        self.uav_states[uav_index] = np.array([x, y, z, dx, dy, dz])


    def _get_proximity_penalty(self, current_position, uav_index):
        """
        Calculate proximity penalty for the specified UAV.

        Parameters:
        - current_position (np.ndarray): Current position of the UAV.
        - uav_index (int): Index of the UAV.

        Returns:
        - penalty (float): Proximity penalty for the UAV.
        """
        penalty = 0
        safe_distance = 1.0
        for obstacle in self.obstacles:
            if obstacle['type'] == 'rectangle':
                rect_min_x = obstacle['position'][0] - obstacle['width'] / 2
                rect_max_x = obstacle['position'][0] + obstacle['width'] / 2
                rect_min_y = obstacle['position'][1] - obstacle['height'] / 2
                rect_max_y = obstacle['position'][1] + obstacle['height'] / 2
                # Enhanced proximity check for rectangles
                if (rect_min_x <= current_position[0] <= rect_max_x and
                    rect_min_y <= current_position[1] <= rect_max_y):
                    penalty += safe_distance
            elif obstacle['type'] == 'circle':
                distance = np.linalg.norm(np.array(obstacle['position'][:2]) - current_position[:2])
                if distance < obstacle['radius'] + safe_distance:
                    penalty += safe_distance
            elif obstacle['type'] == 'ellipse':
                # Improved ellipse proximity check
                distance = np.linalg.norm(np.array(obstacle['position'][:2]) - current_position[:2])
                if distance < max(obstacle['width'], obstacle['height']) / 2 + safe_distance:
                    penalty += safe_distance

        # Check proximity to other UAVs
        '''for idx, other_state in enumerate(self.uav_states):
            if idx != uav_index:
                distance = np.linalg.norm(other_state[:3] - current_position[:3])
                if distance < safe_distance:
                    penalty += safe_distance * 10  # Increased penalty for proximity to other UAVs
        return penalty'''
        for idx, other_uav_position in enumerate(self.uav_states):
            if idx != uav_index:
                distance = np.linalg.norm(other_uav_position[:3] - current_position)
                if distance < safe_distance:
                    penalty += (safe_distance - distance) * 10  # Increased penalty for closer proximity

        return penalty
        
    # Define method to calculate reward for each UAV
    def _get_reward(self, uav_index):
        """
        Calculate reward for the specified UAV.

        Parameters:
        - uav_index (int): Index of the UAV.

        Returns:
        - reward (float): Reward for the UAV.
        """
        current_position = self.uav_states[uav_index][:3]
        goal_distance = np.linalg.norm(current_position - self.goal_positions[uav_index])
        reward = self.previous_distances[uav_index] - goal_distance
        self.previous_distances[uav_index] = goal_distance
        speed_penalty = -np.linalg.norm(self.uav_states[uav_index][3:]) * 0.05
        if goal_distance < 1:
            reward += 100  # Reward for reaching the goal
        return reward + speed_penalty
        
    # Define method to check if episode is done for each UAV
    def _is_done(self, uav_index):
        """
        Check if the specified UAV has reached a terminal state.

        Parameters:
        - uav_index (int): Index of the UAV.

        Returns:
        - done (bool): Whether the UAV has reached a terminal state.
        """
        current_position = self.uav_states[uav_index][:3]
        goal_distance = np.linalg.norm(current_position - self.goal_positions[uav_index])
        if goal_distance < 1:
           print(f"Goal reached by UAV {uav_index}!")
           return True
        if self.current_step >= self.max_steps:
           print("Max steps exceeded.")
           return True
        return False    

    
    # Define method to check if position is within a no-fly zone  
    def _in_no_fly_zone(self, position, zone):
        """
        Check if a position is within a specified no-fly zone.

        Parameters:
        - position (np.ndarray): Position to be checked.
        - zone (dict): No-fly zone parameters including type, position, dimensions, etc.

        Returns:
        - in_zone (bool): Whether the position is within the no-fly zone.
        """
        x, y, z = position
        if zone['type'] == 'rectangle':
            return self.is_in_rectangle(zone, x, y, z)
        elif zone['type'] == 'circle':
            return self.is_in_circle(zone, x, y, z)
        elif zone['type'] == 'ellipse':
            return self.is_in_ellipse(zone, x, y, z)
        return False

    # Separate methods for different obstacle types improve readability and maintenance
    def is_in_rectangle(self, zone, x, y, z):
        """Check if a position is within a rectangle."""
        zx, zy, zz = zone['position']
        return (zx - zone['width'] / 2 <= x <= zx + zone['width'] / 2 and
                zy - zone['height'] / 2 <= y <= zy + zone['height'] / 2 and
                0 <= z <= zz)  # Assuming the rectangle extends in the z-axis from 0 to zz

    def is_in_circle(self, zone, x, y, z):
        """Check if a position is within a circle."""
        zx, zy, zz = zone['position']
        distance = np.sqrt((x - zx) ** 2 + (y - zy) ** 2)
        return distance <= zone['radius'] and 0 <= z <= zz

    def is_in_ellipse(self, zone, x, y, z):
        """Check if a position is within an ellipse."""
        zx, zy, zz = zone['position']
        distance = ((x - zx) ** 2 / (zone['width'] / 2) ** 2) + ((y - zy) ** 2 / (zone['height'] / 2) ** 2)
        return distance <= 1 and 0 <= z <= zz
    
    # Define method to check collision with obstacles   
    def _collision_with_obstacle(self, position, obstacle):

        """
        Check collision with a specified obstacle.

        Parameters:
        - position (np.ndarray): Position to be checked.
        - obstacle (dict): Obstacle parameters including type, position, dimensions, etc.

        Returns:
        - collision (bool): Whether collision is detected.
        """
        x, y, z = position  # Assuming z-axis is also considered for collision
        if obstacle['type'] == 'rectangle':
            rect_min_x = obstacle['position'][0] - obstacle['width'] / 2
            rect_max_x = obstacle['position'][0] + obstacle['width'] / 2
            rect_min_y = obstacle['position'][1] - obstacle['height'] / 2
            rect_max_y = obstacle['position'][1] + obstacle['height'] / 2
            rect_min_z = obstacle['position'][2] if 'position' in obstacle and len(obstacle['position']) > 2 else 0
            rect_max_z = obstacle['position'][2] + obstacle['depth'] if 'depth' in obstacle else float('inf')
            return (rect_min_x <= x <= rect_max_x and
                    rect_min_y <= y <= rect_max_y and
                    rect_min_z <= z <= rect_max_z)
        elif obstacle['type'] == 'circle':
            circle_center_x, circle_center_y, circle_center_z = obstacle['position']
            circle_radius = obstacle['radius']
            distance = np.sqrt((x - circle_center_x) ** 2 + (y - circle_center_y) ** 2)
            return distance <= circle_radius and abs(z - circle_center_z) <= obstacle.get('depth', float('inf'))
        elif obstacle['type'] == 'ellipse':
            ellipse_center_x, ellipse_center_y, ellipse_center_z = obstacle['position']
            rel_x = (x - ellipse_center_x) / (obstacle['width'] / 2)
            rel_y = (y - ellipse_center_y) / (obstacle['height'] / 2)
            return (rel_x**2 + rel_y**2) <= 1 and abs(z - ellipse_center_z) <= obstacle.get('depth', float('inf'))
        return False

    
    """
    Set the random seed for the environment's random number generator.

    Parameters:
    - seed (int): Random seed value.
    """
    def seed(self, seed=None):
        # Set random seed for reproducibility
        np.random.seed(seed)
        random.seed(seed)
        if self.debug:
            logging.debug(f"Random seed set to: {seed}")
    

In [3]:
# Define the Q-Network architecture
class DDQN(nn.Module):
    def __init__(self, state_size, action_size, num_uavs=2):
        """
        Initialize the Double Deep Q-Network.

        Parameters:
        - state_size (int): Size of the state space.
        - action_size (int): Size of the action space.
        - num_uavs (int): Number of UAVs.
        """
        super(DDQN, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)  # First fully connected layer
        self.relu = nn.ReLU()  # ReLU activation function
        self.fc2 = nn.Linear(64, 128)  # Second fully connected layer
        # Adjust the output layer to match the number of UAVs and actions per UAV
        self.output = nn.Linear(128, action_size * num_uavs)  # Flattened output for each UAV
    
    def forward(self, x):
        """
        Forward pass through the network.

        Parameters:
        - x (torch.Tensor): Input tensor.

        Returns:
        - torch.Tensor: Output tensor.
        """
        x = self.relu(self.fc1(x))  # Apply ReLU activation to the first layer
        x = self.relu(self.fc2(x))  # Apply ReLU activation to the second layer
        # Reshape the output to [batch_size, num_uavs, action_size]
        return self.output(x).view(-1, num_uavs, action_size)

# Setup for the training parameters
state_size = 12  # State size of the environment
action_size = 6  # Action size of the environment
num_uavs = 2  # Number of UAVs
main_network = DDQN(state_size, action_size, num_uavs)  # Main Q-network
target_network = DDQN(state_size, action_size, num_uavs)  # Target Q-network
target_network.load_state_dict(main_network.state_dict())  # Load main network's weights to target network
optimizer = optim.Adam(main_network.parameters())  # Adam optimizer for training
loss_fn = nn.MSELoss()  # Mean Squared Error loss function
replay_buffer = deque(maxlen=10000)  # Replay buffer for storing experiences
epsilon = 1.0  # Initial value of epsilon for epsilon-greedy exploration
epsilon_decay = 0.995  # Decay rate of epsilon
min_epsilon = 0.01  # Minimum value of epsilon
gamma = 0.99  # Discount factor
batch_size = 32  # Batch size for training

# Function to choose an action based on epsilon-greedy policy
def choose_action(state, epsilon):
    """
    Choose an action for each UAV based on an epsilon-greedy policy.

    Parameters:
    - state (np.ndarray): Current state of the environment.
    - epsilon (float): Epsilon value for exploration-exploitation trade-off.

    Returns:
    - list of lists: List of actions for each UAV.
    """
    state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)  # Prepare state tensor
    if random.random() > epsilon:
        with torch.no_grad():
            q_values = main_network(state_tensor)  # Get Q-values for all UAVs and actions
        # Select the best action for each UAV based on max Q-value
        actions = q_values.max(2)[1].squeeze().tolist()  # Max over actions dimension, for each UAV
    else:
        # Random actions for each UAV
        actions = [random.randrange(action_size) for _ in range(num_uavs)]
    return [[action] for action in actions]  # Encapsulate each action in a list for each UAV

def train_network():
    """
    Train the Q-network using a batch of experiences from the replay buffer.
    """
    global epsilon
    batch = random.sample(replay_buffer, batch_size)  # Sample a batch of experiences from the replay buffer
    states, actions, rewards, next_states, dones = zip(*batch)  # Unzip the batch into separate components

    # Convert all components of the batch into tensors with correct shapes
    states = torch.tensor(states, dtype=torch.float32).view(-1, 12)  # Shape: [batch_size, state_size]
    actions = torch.tensor(actions, dtype=torch.long)  # Shape: [batch_size, num_uavs, 1]
    rewards = torch.tensor(rewards, dtype=torch.float32)  # Shape: [batch_size, num_uavs]
    next_states = torch.tensor(next_states, dtype=torch.float32).view(-1, 12)  # Shape: [batch_size, state_size]
    dones = torch.tensor(dones, dtype=torch.float32)  # Shape: [batch_size, num_uavs]
    # Clamp actions to the valid range if needed
    actions = actions.clamp(0, 5)  # Assuming 6 actions (0 to 5)

    # Process current and next states through the network
    current_q_values = main_network(states)  # Shape: [batch_size, num_uavs, num_actions]
    next_actions = main_network(next_states).max(2)[1].unsqueeze(-1)  # Shape: [batch_size, num_uavs, 1]
    next_q_values = target_network(next_states).gather(2, next_actions).squeeze(-1)  # Shape: [batch_size, num_uavs]

    # Ensure rewards and dones are correctly shaped to match the dimensions of next_q_values
    if rewards.ndim == 1:
        rewards = rewards.unsqueeze(-1)  # Adding an axis for clarity if needed
    if dones.ndim == 1:
        dones = dones.unsqueeze(-1)  # Adding an axis for clarity if needed

    # Bellman equation calculation
    expected_q_values = rewards + gamma * next_q_values * (1 - dones)  # Shape: [batch_size, num_uavs]

    # Gather current Q-values according to the actions taken
    current_q_values = current_q_values.gather(2, actions).squeeze(-1)  # Shape: [batch_size, num_uavs]

    # Compute the loss
    loss = loss_fn(current_q_values, expected_q_values.detach())
    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_norm_(main_network.parameters(), max_norm=1.0)  # Clip gradients to prevent exploding gradients
    optimizer.step()

    epsilon = max(min_epsilon, epsilon * epsilon_decay)  # Decay epsilon

    # Debugging outputs to verify shapes after modification
    print(f"Actions shape: {actions.shape}")
    print(f"Rewards shape: {rewards.shape}")
    print(f"Dones shape: {dones.shape}")
    print(f"Current Q-values shape: {current_q_values.shape}")
    print(f"Next Q-values shape: {next_q_values.shape}")
    print(f"Expected Q-values shape: {expected_q_values.shape}")

`env = UAVEnv(): Initializes the UAV environment.

`state = env.reset(): Resets the environment and retrieves the initial state.

`print("Initial state shape:", state.shape): Prints the shape of the initial state.

`action = choose_action(state, epsilon): Chooses an action based on the current state using an epsilon-greedy policy.

`print("Action taken:", action): Prints the action chosen for each UAV.

`next_state, reward, done, _ = env.step(action): Takes a step in the environment by executing the chosen action.

`print("Next state shape:", next_state.shape): Prints the shape of the next state after taking the step.

In [4]:
env = UAVEnv()  # Assuming this is your environment initialization
state = env.reset()
print("Initial state shape:", state.shape)
action = choose_action(state, epsilon)
print("Action taken:", action)
next_state, reward, done, _ = env.step(action)
print("Next state shape:", next_state.shape)

Initial state shape: (12,)
Action taken: [[4], [3]]
Next state shape: (12,)


`train_ddqn is a function for training the Double Deep Q-Network (DDQN) agent.

`It takes num_episodes as the number of episodes to train and early_stopping_threshold as the threshold for early stopping.
                                                                                                          
`The function iterates over each episode and within each episode, it interacts with the environment, collects experiences, and trains the neural network.

`It prints the progress of each episode, including the total reward and current epsilon value.

`The best model is saved if it achieves a higher total reward compared to previous episodes.

`Early stopping is triggered if there is no improvement in the total reward for a specified number of episodes.

`The target network is updated periodically every 20 episodes.

`Finally, an example usage of the function is provided with 10 episodes and an early stopping threshold of 10 episodes.

In [None]:
def train_ddqn(num_episodes, early_stopping_threshold=None):
    global epsilon  # Ensure to use the global epsilon value for decay
    best_reward = float('-inf')
    no_improve = 0  # Counter to track the number of episodes with no improvement

    for episode in range(num_episodes):
        state = env.reset()  # Reset the environment for each episode
        total_reward = 0
        done = False
        step = 0

        while not np.any(done):
            action = choose_action(state, epsilon)  # Choose action using epsilon-greedy policy
            next_state, rewards, done, _ = env.step(action)  # Take a step in the environment
            total_reward += sum(rewards)
            replay_buffer.append((state, action, rewards, next_state, done))  # Store the transition in replay buffer
            state = next_state

            if len(replay_buffer) > batch_size:
                train_network()  # Train the Q-network if replay buffer is sufficiently filled

            step += 1
            if step % 100 == 0:
                print(f"Episode {episode}, Step {step}, Total Reward: {total_reward}, Current Epsilon: {epsilon}")

            # Update epsilon using decay schedule
            epsilon = max(min_epsilon, epsilon * epsilon_decay)

        print(f"Episode {episode} finished with total reward: {total_reward}")

        # Check if the current episode achieved the best reward so far
        if total_reward > best_reward:
            best_reward = total_reward
            no_improve = 0
            torch.save(main_network.state_dict(), 'best_model.pth')  # Save the best model
        else:
            no_improve += 1

        # Early stopping if no improvement is observed for a specified number of episodes
        if early_stopping_threshold and no_improve >= early_stopping_threshold:
            print("Early stopping triggered after no improvement in the last {} episodes.".format(early_stopping_threshold))
            break

        # Update the target network periodically
        if episode % 20 == 0:
            print(f"Updating target network at Episode {episode}")
            target_network.load_state_dict(main_network.state_dict())

# Usage
train_ddqn(10, early_stopping_threshold=10)

  states = torch.tensor(states, dtype=torch.float32).view(-1, 12)  # Shape: [batch_size, state_size]


Actions shape: torch.Size([32, 2, 1])
Rewards shape: torch.Size([32, 2])
Dones shape: torch.Size([32, 2])
Current Q-values shape: torch.Size([32, 2])
Next Q-values shape: torch.Size([32, 2])
Expected Q-values shape: torch.Size([32, 2])
Actions shape: torch.Size([32, 2, 1])
Rewards shape: torch.Size([32, 2])
Dones shape: torch.Size([32, 2])
Current Q-values shape: torch.Size([32, 2])
Next Q-values shape: torch.Size([32, 2])
Expected Q-values shape: torch.Size([32, 2])
Actions shape: torch.Size([32, 2, 1])
Rewards shape: torch.Size([32, 2])
Dones shape: torch.Size([32, 2])
Current Q-values shape: torch.Size([32, 2])
Next Q-values shape: torch.Size([32, 2])
Expected Q-values shape: torch.Size([32, 2])
Actions shape: torch.Size([32, 2, 1])
Rewards shape: torch.Size([32, 2])
Dones shape: torch.Size([32, 2])
Current Q-values shape: torch.Size([32, 2])
Next Q-values shape: torch.Size([32, 2])
Expected Q-values shape: torch.Size([32, 2])
Actions shape: torch.Size([32, 2, 1])
Rewards shape: tor

**Conclusion**
*Despite great progress in training the model to navigate complicated situations with reinforcement learning, memory limits caused out-of-memory errors. A combination of time and memory constraints prohibited the model from being fully trained to achieve peak performance.*

*While the framework and algorithms presented in this notebook show promise for effective UAV agent training, further optimisation and resource allocation are required to overcome the memory constraints observed. To solve these limits and allow for longer lengthy training sessions, strategies like as batch processing, memory-efficient data structures, and cloud-based computing resources could be investigated.*

*Despite the challenges, the notebook is an excellent starting place for future study and improvement in UAV navigation using reinforcement learning. Addressing memory limits and continuing the training process can increase the model's performance, allowing UAV agents to navigate complicated surroundings more effectively.*
