In [None]:
!pip3 install gymnasium numpy gym

In [1]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
from sklearn.cluster import KMeans
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import random
import time
from IPython.display import clear_output

In [4]:
rng_state = torch.load('rng_state.pth')
torch.set_rng_state(rng_state)
np.random.seed(42)
rng_state

  rng_state = torch.load('rng_state.pth')


In [6]:
# Define the Q-Network using a simple feedforward neural network
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 128)  # Increased neurons for better GPU utilization
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_size)

    def forward(self, x):
        x = x.view(x.size(0), -1)  # Flatten the input tensor
        x = F.leaky_relu(self.fc1(x)) # Optimized for GPU
        x = F.leaky_relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [7]:
# Define the DQN agent
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=10000)
        self.gamma = 0.99  # discount rate
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.0005
        self.batch_size = 256
        self.model = QNetwork(state_size, action_size).to(device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        self.criterion = nn.MSELoss()

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        state = torch.FloatTensor(state.flatten()).unsqueeze(0).to(device)
        act_values = self.model(state)
        return torch.argmax(act_values[0]).item()

    def replay(self):
        if len(self.memory) < self.batch_size:
            return
        minibatch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*minibatch)

        states = torch.FloatTensor(states).to(device)
        next_states = torch.FloatTensor(next_states).to(device)
        rewards = torch.FloatTensor(rewards).to(device)
        dones = torch.FloatTensor(dones).to(device)

        q_values = self.model(states)
        next_q_values = self.model(next_states)
        q_target = q_values.clone()

        for i in range(self.batch_size):
            q_target[i, actions[i]] = rewards[i] + (self.gamma * torch.max(next_q_values[i]) * (1 - dones[i]))

        self.optimizer.zero_grad()
        loss = self.criterion(q_values, q_target)
        loss.backward()
        self.optimizer.step()

        # Decay epsilon after each replay
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        return loss.item()

In [33]:
class UAVClusterEnv(gym.Env):
    def __init__(self, num_users=10, num_uavs=3, area_size=100, max_steps=200):
        super(UAVClusterEnv, self).__init__()
        self.num_users = num_users
        self.num_uavs = num_uavs
        self.area_size = area_size
        self.max_steps = max_steps  # Maximum number of steps in an episode
        
        # State space: positions of users and UAVs
        self.observation_space = spaces.Box(low=0, high=area_size, shape=(self.num_users + self.num_uavs, 2))
        
        # Action space: UAVs can move in 4 directions (up, down, left, right)
        self.action_space = spaces.Discrete(4 * self.num_uavs)
        
        # Initialize environment state
        self.reset()

    def reset(self):
        self.users = np.random.rand(self.num_users, 2) * self.area_size
        
        # Use K-means to find clusters among the users
        kmeans = KMeans(n_clusters=self.num_uavs, n_init=10)
        kmeans.fit(self.users)
        self.uavs = kmeans.cluster_centers_  # Initialize UAVs at the centroids of the clusters
        
        self.user_velocities = np.random.uniform(-1, 1, (self.num_users, 2))
        # self.uav_velocities = np.random.uniform(-1, 1, (self.num_uavs, 2))  # Add velocities for UAVs
        self.current_step = 0
        return self._get_state()

    def step(self, action):
        self.current_step += 1
        
        # Update UAV positions based on the selected actions
        for i in range(self.num_uavs):
            direction = action % 4  # Direction for each UAV (0: up, 1: down, 2: left, 3: right)
            action = action // 4
            
            if direction == 0:  # Up
                self.uavs[i][1] += 1
            elif direction == 1: # Down
                self.uavs[i][1] -= 1
            elif direction == 2: # Left
                self.uavs[i][0] -= 1
            elif direction == 3:  # Right
                self.uavs[i][0] += 1
            
            # Boundary check to prevent UAVs from moving outside the map
            self.uavs[i][0] = np.clip(self.uavs[i][0], 0, self.area_size)  # Ensure x is within bounds
            self.uavs[i][1] = np.clip(self.uavs[i][1], 0, self.area_size)  # Ensure y is within bounds
            
        # Update user positions
        for i in range(self.num_users):
            self.users[i] += self.user_velocities[i]
            
            # Boundary check to prevent users from moving outside the map
            if self.users[i][0] < 0:
                self.users[i][0] = 0
                self.user_velocities[i][0] *= -1  # Reverse direction
                self.user_velocities[i][1] = np.random.uniform(-1, 1)  # Randomize the y direction
            elif self.users[i][0] > self.area_size:
                self.users[i][0] = self.area_size
                self.user_velocities[i][0] *= -1  # Reverse direction
                self.user_velocities[i][1] = np.random.uniform(-1, 1)  # Randomize the y direction
                
            if self.users[i][1] < 0:
                self.users[i][1] = 0
                self.user_velocities[i][1] *= -1  # Reverse direction
                self.user_velocities[i][0] = np.random.uniform(-1, 1)  # Randomize the x direction
            elif self.users[i][1] > self.area_size:
                self.users[i][1] = self.area_size
                self.user_velocities[i][1] *= -1  # Reverse direction
                self.user_velocities[i][0] = np.random.uniform(-1, 1)  # Randomize the x direction
            
            # Ensure the velocity isn't too small to cause the user to stop moving
            if np.linalg.norm(self.user_velocities[i]) < 0.1:
                self.user_velocities[i] += np.random.uniform(-0.5, 0.5, 2)  # Add a small random vector
        
        # Calculate the reward
        reward = self._calculate_reward()
        
        # Check if the episode is done
        done = self._is_done()

        return self._get_state(), reward, done, {}

    def _is_done(self):
        # Terminal condition: episode ends after a certain number of steps
        return self.current_step >= self.max_steps

    def _get_state(self):
        return np.concatenate([self.users, self.uavs])

    def _calculate_reward(self):
        # Reward is based on the distance between UAVs and users
        total_reward = 0
        for uav in self.uavs:
            distances = np.linalg.norm(self.users - uav, axis=1)
            coverage = np.sum(np.exp(-distances))  # Exponential decay with distance
            total_reward += coverage

            # Penalize UAV for being far from users
            total_reward -= np.sum(distances) * 0.001  # Adjust weight as needed

            # Penalize UAV for being close to the boundaries
            boundary_penalty = 0.1 * (uav[0] / self.area_size + uav[1] / self.area_size)  # Closer to boundary, higher penalty
            boundary_penalty += 0.1 * ((self.area_size - uav[0]) / self.area_size + (self.area_size - uav[1]) / self.area_size)
            total_reward -= boundary_penalty
        return total_reward

    def render(self, mode='human'):
        # Optional: Visualize the environment using a plotting library
        pass

In [28]:
def train_dqn_agent(env, agent, n_episodes=500, max_steps_per_episode=100, stop_loss_threshold=0.01, patience=10):
    save_path = f"dqn_weights_users{env.num_users}_uavs{env.num_uavs}_area{env.area_size}.pth"
    losses = []
    rewards = []
    stop_loss_counter = 0

    for episode in range(n_episodes):
        state = env.reset()
        total_reward = 0
        episode_losses = []
        start_time = time.time()

        for step in range(max_steps_per_episode):
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
            
            loss = agent.replay()
            if loss is not None:
                episode_losses.append(loss)
            
            if done:
                break

        avg_loss = np.mean(episode_losses) if episode_losses else 0
        epsilon = agent.epsilon

        rewards.append(total_reward)
        losses.append(avg_loss)

        end_time = time.time()
        episode_duration = end_time - start_time
        avg_reward = np.mean(rewards[-100:])

        print(f"Episode {episode + 1}/{n_episodes} | Epsilon: {epsilon:.2f} | "
              f"Avg Reward: {avg_reward:.2f} | Avg Loss: {avg_loss:.4f} | "
              f"Time: {episode_duration:.2f}s")

        # Check for stop loss condition
        if avg_loss < stop_loss_threshold:
            stop_loss_counter += 1
            print(f"stop loss counter: {stop_loss_counter}")
            if stop_loss_counter >= patience:
                print(f"Training stopped early at episode {episode + 1} due to consistently low loss.")
                break
        else:
            stop_loss_counter = 0  # Reset counter if loss increases

    # Save the model weights after training
    torch.save(agent.model.state_dict(), save_path)
    print(f"Model weights saved to {save_path}")

    # Plotting the losses
    plt.figure(figsize=(10, 5))
    plt.plot(losses, label="Loss")
    plt.xlabel("Episode")
    plt.ylabel("Loss")
    plt.title("Loss per Episode")
    plt.legend()
    plt.show()

    return rewards, losses

In [34]:
# Create the environment and DQN agent
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = UAVClusterEnv(num_users=10, num_uavs=3, area_size=15)
state_size = np.prod(env.observation_space.shape)
action_size = env.action_space.n

# IF using non-existing model, use this code:
agent = DQNAgent(state_size, action_size)
agent.model.to(device)

# IF using existing model, uncomment this code:

# Initialize the model (make sure it matches the structure of the saved model)
# agent = DQNAgent(state_size, action_size)

# # Load the saved weights
# agent.model.load_state_dict(torch.load("dqn_weights.pth"))

QNetwork(
  (fc1): Linear(in_features=26, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=128, bias=True)
  (fc3): Linear(in_features=128, out_features=12, bias=True)
)

In [None]:
# Train the DQN agent
rewards, losses = train_dqn_agent(env, agent, n_episodes=100, max_steps_per_episode=100, stop_loss_threshold=0.001)

In [38]:
def test_dqn_agent(env, agent, n_episodes=5, max_steps_per_episode=100):
    """
    Test the DQN agent in the environment for a given number of episodes with visualization.
    
    Args:
        env: The environment to test in.
        agent: The DQN agent.
        n_episodes: Number of episodes to run.
        max_steps_per_episode: Maximum number of steps per episode.
    
    Returns:
        total_rewards: List of total rewards obtained in each episode.
        metrics: Dictionary containing various evaluation metrics.
    """
    total_rewards = []
    avg_user_distances = []
    coverage_overlaps = []
    coverage_uniformities = []
    normalized_rewards = []

    for episode in range(n_episodes):
        state = env.reset()
        total_reward = 0

        for step in range(max_steps_per_episode):
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            total_reward += reward

            # Clear the previous plot
            clear_output(wait=True)

            # Plot the environment
            plt.figure(figsize=(8, 8))
            plt.xlim(0, env.area_size)
            plt.ylim(0, env.area_size)
            plt.scatter(env.users[:, 0], env.users[:, 1], color='blue', label='Users')
            plt.scatter(env.uavs[:, 0], env.uavs[:, 1], color='red', label='UAVs')

            # Calculate and plot distances/rewards
            total_coverage = np.zeros(env.users.shape[0])
            for i, uav in enumerate(env.uavs):
                distances = np.linalg.norm(env.users - uav, axis=1)
                coverage = np.exp(-distances)  # Exponential decay with distance
                total_coverage += coverage

                for j, user in enumerate(env.users):
                    plt.plot([uav[0], user[0]], [uav[1], user[1]], 'k-', lw=0.5)
                    plt.text((uav[0] + user[0]) / 2, (uav[1] + user[1]) / 2, f'{coverage[j]:.2f}', fontsize=8, color='green')

            # Plot coverage as a color intensity for each user
            plt.scatter(env.users[:, 0], env.users[:, 1], c=total_coverage, cmap='coolwarm', s=200, alpha=0.6, edgecolor='black')
            plt.colorbar(label='Coverage Intensity')

            plt.title(f'Episode {episode + 1}, Step {step + 1}')
            plt.legend()
            plt.show()

            state = next_state

            if done:
                break

        # Calculate metrics after the episode
        avg_distance = np.mean([np.min(np.linalg.norm(env.users - uav, axis=1)) for uav in env.uavs])
        coverage_overlap = sum(
            [max(0, env.area_size / 3 - np.linalg.norm(env.uavs[i] - env.uavs[j]))
             for i in range(len(env.uavs)) for j in range(i + 1, len(env.uavs))]
        )
        user_counts = np.zeros(len(env.uavs))
        for user in env.users:
            nearest_uav_idx = np.argmin(np.linalg.norm(env.uavs - user, axis=1))
            user_counts[nearest_uav_idx] += 1
        coverage_uniformity = np.var(user_counts)
        normalized_reward = total_reward / (len(env.users) * len(env.uavs))

        # Store the metrics
        avg_user_distances.append(avg_distance)
        coverage_overlaps.append(coverage_overlap)
        coverage_uniformities.append(coverage_uniformity)
        normalized_rewards.append(normalized_reward)

        total_rewards.append(total_reward)
        print(f"Test Episode {episode + 1}/{n_episodes} | Total Reward: {total_reward:.2f} | Avg Distance: {avg_distance:.2f} | "
              f"Overlap: {coverage_overlap:.2f} | Uniformity: {coverage_uniformity:.2f} | Normalized Reward: {normalized_reward:.4f}")

    avg_reward = np.mean(total_rewards)
    avg_distance = np.mean(avg_user_distances)
    avg_overlap = np.mean(coverage_overlaps)
    avg_uniformity = np.mean(coverage_uniformities)
    avg_normalized_reward = np.mean(normalized_rewards)

    print(f"\nAverage Reward over {n_episodes} Test Episodes: {avg_reward:.2f}")
    print(f"Average User Distance: {avg_distance:.2f}")
    print(f"Average Coverage Overlap: {avg_overlap:.2f}")
    print(f"Average Coverage Uniformity: {avg_uniformity:.2f}")
    print(f"Average Normalized Reward: {avg_normalized_reward:.4f}")

    metrics = {
        "avg_reward": avg_reward,
        "avg_distance": avg_distance,
        "avg_overlap": avg_overlap,
        "avg_uniformity": avg_uniformity,
        "avg_normalized_reward": avg_normalized_reward
    }
    
    return total_rewards, metrics

In [None]:
test_rewards = test_dqn_agent(env, agent, n_episodes=5, max_steps_per_episode=100)