<a href="https://colab.research.google.com/github/balakrishnanvinchu/deep-reinforcement-learning/blob/main/DQN_and_DDQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### `---------------Mandatory Information to fill------------`

### Group ID:
### Group Members Name with Student ID:
1. Student 1
2. Student 2
3. Student 3
4. Student 4

`-------------------Write your remarks (if any) that you want should get consider at the time of evaluation---------------`

Remarks: ##Add here

## Autonomous Drone Battery Management for Urban Surveillance using DQN and DDQN - 7 Marks

### Import Statements

In [None]:
import numpy as np
import tensorflow as tf
from collections import deque
import random
import matplotlib.pyplot as plt
import time

In [None]:
# Environment Parameters
GRID_SIZE = (10, 10)
BATTERY_CAPACITY = 100.0
INITIAL_BATTERY = 100.0

# Action Costs
BASE_MOVE_COST = 0.5
BASE_HOVER_COST = 0.2
RECHARGE_RATE = 5.0

# Reward Structure
BATTERY_CRASH_PENALTY = -100.0
TIME_PENALTY = -0.1
RECHARGE_BONUS = 1.0

# POI Parameters
POI_SPAWN_CHANCE = 0.05
MAX_ACTIVE_POIS = 3
POI_LIFESPAN_RANGE = (10, 30)
POI_VALUE_RANGE = (10, 50)

# Atmospheric Disturbance Parameters
DISTURBANCE_CHANGE_PROB = 0.1
DISTURBANCE_MAGNITUDE_CHANGE = 0.05
DISTURBANCE_FACTOR = 0.5

# Agent Configuration
STATE_SIZE = 7
ACTION_SIZE = 6

# Learning Parameters
LEARNING_RATE = 0.001
DISCOUNT_FACTOR = 0.95
REPLAY_BUFFER_SIZE = 5000
MIN_REPLAY_SIZE = 1000
BATCH_SIZE = 34

# Exploration Strategy
EXPLORATION_MAX = 1.0
EXPLORATION_MIN = 0.01
EXPLORATION_DECAY = 0.995
TARGET_UPDATE_FREQUENCY = 10

# Training Setup
EPISODES = 200
MAX_TIMESTEPS_PER_EPISODE = 100

In [None]:
# --- 2. Replay Buffer Class ---

class ReplayBuffer:
    def __init__(self, buffer_size):
        self.buffer = deque(maxlen=buffer_size)

    def add(self, experience):
        """Add new experience to buffer"""
        self.buffer.append(experience)

    def sample(self, batch_size):
        """Randomly sample a batch of experiences"""
        return random.sample(self.buffer, min(len(self.buffer), batch_size))

    def __len__(self):
        return len(self.buffer)

### --- 3. Custom Environment: DroneSurveillanceEnv --- - 2 Marks

In [None]:
class DroneSurveillanceEnv:
    def __init__(self, grid_size, battery_capacity, initial_battery):
        """Initialize the drone surveillance environment"""
        self.grid_size = grid_size
        self.battery_capacity = battery_capacity
        self.initial_battery = initial_battery

        # Drone State
        self.drone_pos = [0, 0]  # Start at origin
        self.battery_level = initial_battery

        # Environmental Dynamics
        self.disturbance = 0.0  # Initial atmospheric disturbance level (0-1)
        self.pois = []  # Active POIs: [x, y, value, remaining_lifespan]
        self.time_step = 0

        # Charging Stations (fixed at grid corners)
        self.charging_stations = [
            [0, 0],
            [0, grid_size[1]-1],
            [grid_size[0]-1, 0],
            [grid_size[0]-1, grid_size[1]-1]
        ]

        # Action mapping
        self.action_map = {
            0: [0, 1],   # North
            1: [0, -1],  # South
            2: [1, 0],   # East
            3: [-1, 0],  # West
            4: [0, 0],   # Hover
            5: [0, 0]    # Recharge
        }

    def reset(self):
        """Reset environment to initial state"""
        self.drone_pos = [0, 0]
        self.battery_level = self.initial_battery
        self.disturbance = 0.0
        self.pois = []
        self.time_step = 0
        return self._get_obs()

    def _get_obs(self):
        """Convert environment state to observation vector for NN"""
        # Find nearest POI
        nearest_poi = None
        min_dist = float('inf')
        poi_value = 0.0
        poi_lifespan = 0.0

        for poi in self.pois:
            dist = np.sqrt((poi[0]-self.drone_pos[0])**2 + (poi[1]-self.drone_pos[1])**2)
            if dist < min_dist:
                min_dist = dist
                nearest_poi = poi
                poi_value = poi[2]
                poi_lifespan = poi[3]

        # Normalize all values for NN input
        max_distance = np.sqrt(self.grid_size[0]**2 + self.grid_size[1]**2)
        obs = [
            self.drone_pos[0] / (self.grid_size[0]-1),  # Normalized x position
            self.drone_pos[1] / (self.grid_size[1]-1),  # Normalized y position
            self.battery_level / self.battery_capacity,  # Battery percentage
            self.disturbance,  # Atmospheric disturbance (0-1)
            min_dist / max_distance if nearest_poi else 1.0,  # Normalized distance
            poi_value / POI_VALUE_RANGE[1] if nearest_poi else 0.0,  # Normalized value
            poi_lifespan / POI_LIFESPAN_RANGE[1] if nearest_poi else 0.0  # Normalized lifespan
        ]
        return np.array(obs)

    def _spawn_poi(self):
        """Randomly spawn new POIs according to spawn chance"""
        if len(self.pois) >= MAX_ACTIVE_POIS:
            return

        if random.random() < POI_SPAWN_CHANCE:
            # Generate random position not occupied by drone, charging station, or existing POI
            while True:
                x = random.randint(0, self.grid_size[0]-1)
                y = random.randint(0, self.grid_size[1]-1)
                pos = [x, y]

                # Check if position is available
                occupied = False
                if pos == self.drone_pos:
                    occupied = True
                for station in self.charging_stations:
                    if pos == station:
                        occupied = True
                for poi in self.pois:
                    if pos == poi[:2]:
                        occupied = True

                if not occupied:
                    value = random.uniform(POI_VALUE_RANGE[0], POI_VALUE_RANGE[1])
                    lifespan = random.randint(POI_LIFESPAN_RANGE[0], POI_LIFESPAN_RANGE[1])
                    self.pois.append([x, y, value, lifespan])
                    break

    def _update_pois(self, drone_at_poi_pos):
        """Update POI states and handle collection"""
        reward = 0.0
        new_pois = []

        for poi in self.pois:
            # Check if drone is at this POI's position
            if drone_at_poi_pos and poi[0] == self.drone_pos[0] and poi[1] == self.drone_pos[1]:
                reward += poi[2]  # Collect POI value
            else:
                # Decrement lifespan and keep if still active
                poi[3] -= 1
                if poi[3] > 0:
                    new_pois.append(poi)

        self.pois = new_pois
        return reward

    def _update_atmospheric_disturbance(self):
        """Randomly update atmospheric disturbance level"""
        if random.random() < DISTURBANCE_CHANGE_PROB:
            # Random walk disturbance
            change = random.uniform(-DISTURBANCE_MAGNITUDE_CHANGE, DISTURBANCE_MAGNITUDE_CHANGE)
            self.disturbance = np.clip(self.disturbance + change, 0.0, 1.0)

    def step(self, action):
        """Execute one time step in the environment"""
        done = False
        reward = TIME_PENALTY  # Default time penalty

        # Update atmospheric disturbance
        self._update_atmospheric_disturbance()

        # Handle battery changes based on action
        if action == 5:  # Recharge
            # Check if at charging station
            at_station = any(self.drone_pos == station for station in self.charging_stations)
            if at_station:
                self.battery_level = min(self.battery_capacity,
                                       self.battery_level + RECHARGE_RATE)
                reward += RECHARGE_BONUS
        else:
            # Calculate movement cost with disturbance factor
            cost_multiplier = 1 + self.disturbance * DISTURBANCE_FACTOR
            if action == 4:  # Hover
                battery_cost = BASE_HOVER_COST * cost_multiplier
            else:  # Movement
                battery_cost = BASE_MOVE_COST * cost_multiplier

            self.battery_level -= battery_cost

            # Execute movement
            if action in [0, 1, 2, 3]:  # Movement actions
                move = self.action_map[action]
                new_x = np.clip(self.drone_pos[0] + move[0], 0, self.grid_size[0]-1)
                new_y = np.clip(self.drone_pos[1] + move[1], 0, self.grid_size[1]-1)
                self.drone_pos = [new_x, new_y]

        # Check for battery crash
        if self.battery_level <= 0:
            if not any(self.drone_pos == station for station in self.charging_stations):
                reward += BATTERY_CRASH_PENALTY
                done = True
            self.battery_level = 0.0

        # Update POIs and collect rewards
        drone_at_poi = any(self.drone_pos == poi[:2] for poi in self.pois)
        reward += self._update_pois(drone_at_poi)

        # Spawn new POIs
        self._spawn_poi()

        # Increment time step
        self.time_step += 1

        return self._get_obs(), reward, done, {}

    def render(self):
        """Visualize the current environment state"""
        grid = np.zeros(self.grid_size, dtype=str)
        grid.fill('.')  # Empty cells

        # Mark charging stations
        for x, y in self.charging_stations:
            grid[x, y] = 'C'

        # Mark active POIs
        for poi in self.pois:
            x, y, val, life = poi
            grid[x, y] = f'P{int(val)}'

        # Mark drone position
        x, y = self.drone_pos
        grid[x, y] = 'D' if grid[x, y] == '.' else grid[x, y] + 'D'

        # Print grid
        print(f"Time: {self.time_step} | Battery: {self.battery_level:.1f}% | Disturbance: {self.disturbance:.2f}")
        for row in grid.T:
            print(' '.join(row))
        print()

In [None]:
class DQNAgent:
    def __init__(self, state_size, action_size, learning_rate, use_ddqn=False):
        """Initialize the DQN agent"""
        self.state_size = state_size
        self.action_size = action_size
        self.use_ddqn = use_ddqn
        self.learning_rate = learning_rate

        # Main Q-Network and Target Network
        self.q_network = self._build_model()
        self.target_network = self._build_model()
        self.update_target_network()  # Initialize target network weights

        # Experience replay buffer
        self.memory = deque(maxlen=REPLAY_BUFFER_SIZE)

        # Exploration parameters
        self.epsilon = EXPLORATION_MAX
        self.epsilon_min = EXPLORATION_MIN
        self.epsilon_decay = EXPLORATION_DECAY

        # Training parameters
        self.batch_size = BATCH_SIZE
        self.discount_factor = DISCOUNT_FACTOR
        self.target_update_frequency = TARGET_UPDATE_FREQUENCY
        self.train_step_counter = 0

    def _build_model(self):
        """Build neural network for Q-value approximation"""
        model = tf.keras.Sequential([
            tf.keras.layers.Dense(64, input_dim=self.state_size, activation='relu'),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(self.action_size, activation='linear')
        ])
        model.compile(
            loss='mse',
            optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate))
        return model

    def update_target_network(self):
        """Update target network weights with Q-network weights"""
        self.target_network.set_weights(self.q_network.get_weights())

    def choose_action(self, state):
        """Select action using epsilon-greedy policy"""
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)  # Random action

        # Predict Q-values and select best action
        state = np.reshape(state, [1, self.state_size])
        q_values = self.q_network.predict(state, verbose=0)
        return np.argmax(q_values[0])  # Best action

    def learn(self):
        """Train the Q-network using experience replay"""
        # Only train if we have enough experiences
        if len(self.memory) < MIN_REPLAY_SIZE:
            return

        # Sample random batch from memory
        minibatch = random.sample(self.memory, self.batch_size)

        # Prepare batch data
        states = np.array([experience[0] for experience in minibatch])
        actions = np.array([experience[1] for experience in minibatch])
        rewards = np.array([experience[2] for experience in minibatch])
        next_states = np.array([experience[3] for experience in minibatch])
        dones = np.array([experience[4] for experience in minibatch])

        # Calculate target Q-values
        if self.use_ddqn:
            # Double DQN update
            q_values_next = self.q_network.predict(next_states, verbose=0)
            best_actions = np.argmax(q_values_next, axis=1)
            target_q_values = self.target_network.predict(next_states, verbose=0)
            targets = rewards + (1 - dones) * self.discount_factor * target_q_values[
                np.arange(self.batch_size), best_actions]
        else:
            # Standard DQN update
            target_q_values = self.target_network.predict(next_states, verbose=0)
            targets = rewards + (1 - dones) * self.discount_factor * np.amax(
                target_q_values, axis=1)

        # Update Q-network
        q_values = self.q_network.predict(states, verbose=0)
        q_values[np.arange(self.batch_size), actions] = targets
        self.q_network.fit(states, q_values, verbose=0)

        # Decay exploration rate
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        # Update target network periodically
        self.train_step_counter += 1
        if self.train_step_counter % self.target_update_frequency == 0:
            self.update_target_network()

### --- 4. DQNAgent Class --- 1 Mark

### --- 5. Main Training Loop --- 1 Mark

In [None]:
def train_agent(env, agent, num_episodes, max_timesteps_per_episode, render=False):
    """Train the agent in the environment and track performance metrics"""
    # Initialize tracking metrics
    episode_rewards = []
    episode_lengths = []
    exploration_rates = []
    crash_counts = 0
    poi_collection_counts = 0

    # Store initial weights for comparison (optional)
    initial_weights = agent.q_network.get_weights()

    # Training loop
    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        done = False
        timestep = 0
        episode_crashed = False

        for timestep in range(max_timesteps_per_episode):
            if render and episode % 100 == 0:  # Render periodically
                env.render()

            # Select and execute action
            action = agent.choose_action(state)
            next_state, reward, done, _ = env.step(action)

            # Store experience in replay buffer
            agent.memory.append((state, action, reward, next_state, done))

            # Learn from experiences
            agent.learn()

            # Update tracking metrics
            total_reward += reward
            state = next_state

            # Check for crash
            if reward == BATTERY_CRASH_PENALTY:
                episode_crashed = True
                crash_counts += 1
                break

            # Check for POI collection
            if reward > TIME_PENALTY + RECHARGE_BONUS:  # POI collected
                poi_collection_counts += 1

            if done:
                break

        # Update target network periodically
        if episode % agent.target_update_frequency == 0:
            agent.update_target_network()

        # Store episode metrics
        episode_rewards.append(total_reward)
        episode_lengths.append(timestep)
        exploration_rates.append(agent.epsilon)

        # Print progress
        if (episode + 1) % 10 == 0:
            avg_reward = np.mean(episode_rewards[-10:])
            print(f"Episode {episode+1}/{num_episodes} | "
                  f"Avg Reward: {avg_reward:.1f} | "
                  f"Epsilon: {agent.epsilon:.3f} | "
                  f"POIs Collected: {poi_collection_counts} | "
                  f"Crashes: {crash_counts}")

    # Training complete
    print("\nTraining completed!")
    print(f"Final Avg Reward (last 100 eps): {np.mean(episode_rewards[-100:]):.1f}")
    print(f"Total POIs Collected: {poi_collection_counts}")
    print(f"Total Crashes: {crash_counts}")

    # Return training metrics
    return {
        'episode_rewards': episode_rewards,
        'episode_lengths': episode_lengths,
        'exploration_rates': exploration_rates,
        'initial_weights': initial_weights,
        'final_weights': agent.q_network.get_weights(),
        'poi_collections': poi_collection_counts,
        'crash_counts': crash_counts
    }

def plot_trajectory(trajectory, grid_size, charging_stations):
    """Plot the drone's trajectory"""
    plt.figure(figsize=(8, 8))
    x = [step['position'][0] for step in trajectory]
    y = [step['position'][1] for step in trajectory]
    plt.plot(x, y, marker='o', linestyle='-', label='Drone Trajectory')

    # Plot charging stations
    for cx, cy in charging_stations:
        plt.plot(cx, cy, 's', color='red', markersize=10, label='Charging Station' if (cx, cy) == charging_stations[0] else "")

    plt.xlim(0, grid_size[0]-1)
    plt.ylim(0, grid_size[1]-1)
    plt.gca().invert_yaxis() # Invert y-axis to match grid rendering
    plt.title('Drone Trajectory during Evaluation')
    plt.xlabel('X Position')
    plt.ylabel('Y Position')
    plt.grid(True)
    plt.legend()
    plt.show()

### --- Main Execution Block ---

In [None]:
# Configure TensorFlow to use GPU if available
physical_devices = tf.config.list_physical_devices('GPU')
if len(physical_devices) > 0:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
    tf.config.set_visible_devices(physical_devices[0], 'GPU')
    print("Using GPU for acceleration")
else:
    print("Using CPU")

# Initialize environment
env = DroneSurveillanceEnv(GRID_SIZE, BATTERY_CAPACITY, INITIAL_BATTERY)

# --- Train DQN Agent ---
print("\n" + "="*50)
print("Training DQN Agent")
print("="*50)
dqn_agent = DQNAgent(STATE_SIZE, ACTION_SIZE, LEARNING_RATE, use_ddqn=False)
dqn_metrics = train_agent(env, dqn_agent, EPISODES, MAX_TIMESTEPS_PER_EPISODE)


Using CPU

Training DQN Agent


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Episode 10/200 | Avg Reward: 1.4 | Epsilon: 0.995 | POIs Collected: 4 | Crashes: 0


In [None]:
# --- Train Double DQN Agent ---
print("\n" + "="*50)
print("Training Double DQN Agent")
print("="*50)
ddqn_agent = DQNAgent(STATE_SIZE, ACTION_SIZE, LEARNING_RATE, use_ddqn=True)
ddqn_metrics = train_agent(env, ddqn_agent, EPISODES, MAX_TIMESTEPS_PER_EPISODE)

In [None]:
if __name__ == "__main__":
    # # Configure TensorFlow to use GPU if available
    # physical_devices = tf.config.list_physical_devices('GPU')
    # if len(physical_devices) > 0:
    #     tf.config.experimental.set_memory_growth(physical_devices[0], True)
    #     tf.config.set_visible_devices(physical_devices[0], 'GPU')
    #     print("Using GPU for acceleration")
    # else:
    #     print("Using CPU")

    # # Initialize environment
    # env = DroneSurveillanceEnv(GRID_SIZE, BATTERY_CAPACITY, INITIAL_BATTERY)

    # # --- Train DQN Agent ---
    # print("\n" + "="*50)
    # print("Training DQN Agent")
    # print("="*50)
    # dqn_agent = DQNAgent(STATE_SIZE, ACTION_SIZE, LEARNING_RATE, use_ddqn=False)
    # dqn_metrics = train_agent(env, dqn_agent, EPISODES, MAX_TIMESTEPS_PER_EPISODE)

    # # --- Train Double DQN Agent ---
    # print("\n" + "="*50)
    # print("Training Double DQN Agent")
    # print("="*50)
    # ddqn_agent = DQNAgent(STATE_SIZE, ACTION_SIZE, LEARNING_RATE, use_ddqn=True)
    # ddqn_metrics = train_agent(env, ddqn_agent, EPISODES, MAX_TIMESTEPS_PER_EPISODE)

    # --- Plotting Results ---
    plt.figure(figsize=(12, 8))

    # Plot rewards
    plt.subplot(2, 2, 1)
    plt.plot(dqn_metrics['episode_rewards'], label='DQN')
    plt.plot(ddqn_metrics['episode_rewards'], label='DDQN')
    plt.title('Episode Rewards')
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.legend()

    # Plot exploration rates
    plt.subplot(2, 2, 2)
    plt.plot(dqn_metrics['exploration_rates'], label='DQN')
    plt.plot(ddqn_metrics['exploration_rates'], label='DDQN')
    plt.title('Exploration Rate (Epsilon)')
    plt.xlabel('Episode')
    plt.ylabel('Epsilon')
    plt.legend()

    # Plot moving average rewards
    plt.subplot(2, 2, 3)
    window_size = 50
    plt.plot(np.convolve(dqn_metrics['episode_rewards'], np.ones(window_size)/window_size, mode='valid'),
             label='DQN (MA{})'.format(window_size))
    plt.plot(np.convolve(ddqn_metrics['episode_rewards'], np.ones(window_size)/window_size, mode='valid'),
             label='DDQN (MA{})'.format(window_size))
    plt.title('Moving Average Rewards')
    plt.xlabel('Episode')
    plt.ylabel('Average Reward')
    plt.legend()

    plt.tight_layout()
    plt.savefig('training_results.png')
    plt.show()

    # --- Policy Analysis ---
    print("\n" + "="*50)
    print("Policy Analysis")
    print("="*50)

    # Compare final performance
    final_dqn_reward = np.mean(dqn_metrics['episode_rewards'][-100:])
    final_ddqn_reward = np.mean(ddqn_metrics['episode_rewards'][-100:])
    print(f"\nFinal Performance (last 100 episodes):")
    print(f"DQN Average Reward: {final_dqn_reward:.1f}")
    print(f"DDQN Average Reward: {final_ddqn_reward:.1f}")
    print(f"Improvement: {((final_ddqn_reward-final_dqn_reward)/final_dqn_reward)*100:.1f}%")

    # Compare crash rates
    dqn_crash_rate = dqn_metrics['crash_counts'] / EPISODES * 100
    ddqn_crash_rate = ddqn_metrics['crash_counts'] / EPISODES * 100
    print(f"\nCrash Rates:")
    print(f"DQN: {dqn_crash_rate:.1f}%")
    print(f"DDQN: {ddqn_crash_rate:.1f}%")

    # Compare POI collection
    print(f"\nPOIs Collected:")
    print(f"DQN: {dqn_metrics['poi_collections']}")
    print(f"DDQN: {ddqn_metrics['poi_collections']}")

    # --- Evaluation of learned policy ---
    print("\n" + "="*50)
    print("Running Evaluation Episode with DDQN Policy")
    print("="*50)

    # Run evaluation with DDQN agent (better performing)
    state = env.reset()
    done = False
    total_reward = 0
    trajectory = []

    while not done:
        # Use greedy policy (epsilon=0)
        original_epsilon = ddqn_agent.epsilon
        ddqn_agent.epsilon = 0
        action = ddqn_agent.choose_action(state)
        ddqn_agent.epsilon = original_epsilon

        next_state, reward, done, _ = env.step(action)
        total_reward += reward
        trajectory.append({
            'position': env.drone_pos.copy(),
            'battery': env.battery_level,
            'action': action,
            'reward': reward,
            'disturbance': env.disturbance
        })
        state = next_state
        env.render()
        time.sleep(0.1)  # Slow down for visualization

    print(f"\nEvaluation Episode Results:")
    print(f"Total Reward: {total_reward}")
    print(f"Final Battery: {env.battery_level:.1f}%")
    print(f"POIs Collected: {sum(1 for step in trajectory if step['reward'] > TIME_PENALTY + RECHARGE_BONUS)}")

    # Plot evaluation trajectory
    plot_trajectory(trajectory, env.grid_size, env.charging_stations)

### Hyperparameter Tuning & Discussion: (1 Mark)