"""
Description:
This code implements a UAV (Unmanned Aerial Vehicle) navigation system using a combination of Deep Q-Learning (DQL) and D* pathfinding.

Key Features:
- **Deep Q-Network (DQN):** A neural network trained to optimize UAV movement decisions.
- **Experience Replay Buffer:** Stores past experiences to improve learning efficiency.
- **D* Pathfinding Algorithm:** Used as an alternative to A* to dynamically plan paths in changing environments.
- **UAV Environment:**
  - Drones navigate a 100x100 grid with walls and dynamic obstacles.
  - The environment is randomly generated with obstacles and walls.
  - Each drone receives rewards based on its progress towards the goal.
  - The training stops when all drones reach the predefined goal position.
- **GPU Optimization:** Ensures efficient computation by leveraging CUDA if available.

"""

In [None]:
"""
Description:
This code implements a UAV (Unmanned Aerial Vehicle) navigation system using a combination of Deep Q-Learning (DQL) and D* pathfinding.

Key Features:
- **Deep Q-Network (DQN):** A neural network trained to optimize UAV movement decisions.
- **Experience Replay Buffer:** Stores past experiences to improve learning efficiency.
- **D* Pathfinding Algorithm:** Used as an alternative to A* to dynamically plan paths in changing environments.
- **UAV Environment:**
  - Drones navigate a 100x100 grid with walls and dynamic obstacles.
  - The environment is randomly generated with obstacles and walls.
  - Each drone receives rewards based on its progress towards the goal.
  - The training stops when all drones reach the predefined goal position.
- **GPU Optimization:** Ensures efficient computation by leveraging CUDA if available.

D* Pathfinding Algorithm:
- **D* (Dynamic A*) is an incremental pathfinding algorithm** that improves upon A* by allowing real-time updates to the environment.
- Unlike A*, which computes a single static path, **D* continuously updates the path when new obstacles are detected**.
- It works by **back-propagating cost changes** when the environment changes, making it ideal for dynamic environments like UAV navigation.
- The algorithm is particularly useful when **obstacles move or new obstacles appear**, as it does not require complete recomputation of the path.
- **Key Steps:**
  1. Compute an initial path from the goal to the UAV.
  2. As the UAV moves, update the path based on new obstacle information.
  3. If a newly detected obstacle blocks the path, the algorithm updates only the affected parts instead of recomputing the entire path.
  4. The UAV follows the dynamically adjusted path to the goal.
- **Benefit:** More efficient path planning in dynamic environments, reducing unnecessary recomputation and allowing real-time adaptation.
"""

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
import heapq
import time
import cv2
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from torch.utils.data import DataLoader, TensorDataset

# Set device: Use GPU if available, otherwise fallback to CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Deep Q-Network (DQN)
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128).to(device)
        self.fc2 = nn.Linear(128, 128).to(device)
        self.fc3 = nn.Linear(128, output_dim).to(device)

    def forward(self, x):
        x = x.to(device)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Experience Replay Buffer
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = []
        self.capacity = capacity

    def push(self, state, action, reward, next_state, done):
        if len(self.buffer) >= self.capacity:
            self.buffer.pop(0)
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            torch.tensor(states, dtype=torch.float32).to(device),
            torch.tensor(actions, dtype=torch.long).view(batch_size, -1).to(device),
            torch.tensor(rewards, dtype=torch.float32).view(batch_size, 1).to(device),
            torch.tensor(next_states, dtype=torch.float32).to(device),
            torch.tensor(dones, dtype=torch.float32).view(batch_size, 1).to(device)
        )

    def __len__(self):
        return len(self.buffer)

# UAV Environment with DQL and D*
class UAVEnv:
    def __init__(self, grid_size=(100, 100), num_drones=3, goal_position=(99, 99)):
        self.grid_size = grid_size
        self.num_drones = num_drones
        self.goal_position = goal_position
        self.action_space = ["up", "down", "left", "right", "stay", "follow_dstar"]
        self.reset()

    def reset(self):
        self.drones = [(0, i) for i in range(self.num_drones)]
        self.obstacles = {(random.randint(1, 98), random.randint(1, 98)) for _ in range(500)}
        self.walls = {(i, random.randint(10, 90)) for i in range(100) if random.random() < 0.2}  # Add walls
        return self.get_state()

    def get_state(self):
        return torch.tensor(np.array(self.drones, dtype=np.float32).flatten()).to(device)

    def step(self, actions):
        new_positions = []
        for i, (x, y) in enumerate(self.drones):
            action = actions[i]
            if action == "up":
                next_pos = (x, y + 1)
            elif action == "down":
                next_pos = (x, y - 1)
            elif action == "left":
                next_pos = (x - 1, y)
            elif action == "right":
                next_pos = (x + 1, y)
            else:
                next_pos = (x, y)
            if next_pos in self.obstacles or next_pos in self.walls or not (0 <= next_pos[0] < self.grid_size[0] and 0 <= next_pos[1] < self.grid_size[1]):
                next_pos = (x, y)
            new_positions.append(next_pos)
        self.drones = new_positions
        reward = -1 if not all(drone == self.goal_position for drone in self.drones) else 100
        done = all(drone == self.goal_position for drone in self.drones)
        return self.get_state(), reward, done

    def print_labyrinth(self):
        grid = [["." for _ in range(self.grid_size[1])] for _ in range(self.grid_size[0])]
        for obs in self.obstacles:
            grid[obs[0]][obs[1]] = "#"
        for wall in self.walls:
            grid[wall[0]][wall[1]] = "*"
        for i, (x, y) in enumerate(self.drones):
            grid[x][y] = str(i)
        grid[self.goal_position[0]][self.goal_position[1]] = "G"
        print("\n".join(" ".join(row) for row in grid))

# Instantiate environment
env = UAVEnv()

# Training loop
num_epochs = 1000
for epoch in range(num_epochs):
    state = env.reset()
    done = False
    while not done:
        actions = [random.choice(env.action_space) for _ in range(env.num_drones)]
        state, reward, done = env.step(actions)
    if epoch % 5 == 0:
        print(f"Epoch {epoch + 1}, Drones: {env.drones}")
        env.print_labyrinth()
    if done:
        print(f"Drones reached the goal in epoch {epoch + 1}.")
        break


Using device: cuda
