### Simple Deep Q-Network Leanring

In [27]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random

# Define the Deep Q-Network (DQN) architecture
class DQN(nn.Module):
    def __init__(self, input_size, output_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, output_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x.view(x.size(0), -1)  # Reshape output to have shape [batch_size, num_actions]


# Define the Deep Q-Learning agent
class DQNAgent:
    def __init__(self, input_size, output_size, learning_rate=0.001, gamma=0.99, epsilon=1.0, epsilon_decay=0.999, epsilon_min=0.01):
        # self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.device = torch.device("cpu")
        self.q_network = DQN(input_size, output_size).to(self.device)
        self.target_network = DQN(input_size, output_size).to(self.device)
        self.target_network.load_state_dict(self.q_network.state_dict())
        self.target_network.eval()
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate)
        self.loss_function = nn.MSELoss()
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.output_size = output_size

    def select_action(self, state):
        if np.random.rand() < self.epsilon:
            return random.randrange(self.output_size)  # Choose a random action
        else:
            with torch.no_grad():
                state_tensor = torch.FloatTensor(state).to(self.device)
                q_values = self.q_network(state_tensor)
                return torch.argmax(q_values).item()  # Choose action with highest Q-value

    def train(self, state, action, next_state, reward, done):
        state_tensor = torch.FloatTensor(state).to(self.device)
        next_state_tensor = torch.FloatTensor(next_state).to(self.device)
        action_tensor = torch.LongTensor([action]).to(self.device)
        reward_tensor = torch.FloatTensor([reward]).to(self.device)
        done_tensor = torch.FloatTensor([done]).to(self.device)

        # Calculate Q-values for current and next states
        q_values = self.q_network(state_tensor)
        next_q_values = self.target_network(next_state_tensor)
        print(f"Q-values shape: {q_values.shape}")
        print(f"Next Q-values shape: {next_q_values.shape}")

        q_value = q_values.gather(1, action_tensor.unsqueeze(1)).squeeze(1)
        print(f"Q-value shape: {q_value.shape}")

        # Calculate target Q-value using the Bellman equation
        target_q = reward_tensor + (1 - done_tensor) * self.gamma * next_q_values.max(1)[0]
        print(f"Target Q-value shape: {target_q.shape}")

        # Calculate loss and update Q-network
        loss = self.loss_function(q_value, target_q.detach())
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Decay epsilon
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

# Define the environment
class GridWorld:
    def __init__(self, grid_size=5, num_obstacles=5):
        self.grid_size = grid_size
        self.num_obstacles = num_obstacles
        self.reset()

    def reset(self):
        self.agent_position = torch.tensor([0, 0], dtype=torch.float32)
        self.goal_position = torch.tensor([self.grid_size - 1, self.grid_size - 1], dtype=torch.float32)
        self.obstacle_positions = {(torch.randint(0, self.grid_size, (1,)).item(), torch.randint(0, self.grid_size, (1,)).item()) for _ in range(self.num_obstacles)}
        self.done = False
        
    def step(self, action):
        if self.done:
            raise ValueError("Episode has ended, please call reset() to restart.")

        if action == 0:  # Move right
            next_position = (self.agent_position[0], min(self.grid_size - 1, self.agent_position[1] + 1))
        elif action == 1:  # Move left
            next_position = (self.agent_position[0], max(0, self.agent_position[1] - 1))
        elif action == 2:  # Move down
            next_position = (min(self.grid_size - 1, self.agent_position[0] + 1), self.agent_position[1])
        elif action == 3:  # Move up
            next_position = (max(0, self.agent_position[0] - 1), self.agent_position[1])
        else:
            raise ValueError("Invalid action.")

        next_position = tuple(next_position)  # Convert to tuple
        next_state = self.render()  # Get the entire grid state
        if next_position in self.obstacle_positions:
            reward = torch.tensor(-1, dtype=torch.float32)
        elif torch.all(torch.tensor(next_position) == self.goal_position):
            reward = torch.tensor(1, dtype=torch.float32)
            self.done = True
        else:
            reward = torch.tensor(0, dtype=torch.float32)

        self.agent_position = torch.tensor(next_position)

        return next_state, reward, self.done

    def render(self):
        grid = torch.zeros((self.grid_size, self.grid_size))
        agent_row, agent_col = self.agent_position.long().tolist()  # Convert to integers
        grid[agent_row, agent_col] = 0.5  # Agent position
        goal_row, goal_col = self.goal_position.long().tolist()  # Convert to integers
        grid[goal_row, goal_col] = 0.8  # Goal position
        for obstacle_pos in self.obstacle_positions:
            obstacle_row, obstacle_col = obstacle_pos
            grid[obstacle_row, obstacle_col] = 0.2  # Obstacle position
        return grid


### Running above environment


In [28]:
import matplotlib.pyplot as plt

# Training loop
env = GridWorld(grid_size=5, num_obstacles=5)
input_size = env.grid_size * env.grid_size  # Flatten the grid
output_size = 4  # Number of possible actions (right, left, down, up)
agent = DQNAgent(input_size, output_size)
num_epochs = 10

for epoch in range(num_epochs):
    env.reset()
    done = False
    step_count = 0
    state = env.render().flatten().tolist()
    
    while not done:
        action = agent.select_action(state)
        next_state, reward, done = env.step(action)
        next_state_flat = np.ravel(next_state).tolist()  # Flatten the next state
        next_state_tensor = torch.FloatTensor(next_state_flat).unsqueeze(0)  # Convert to tensor and add batch dimension
        print(f"Next state tensor shape: {next_state_tensor.shape}")  # Print the shape of next_state_tensor
        print(f"Next state flattened: {next_state_flat}")  # Print the flattened next state
        print(f"Environment grid size: {env.grid_size}")  # Print the grid size of the environment
        agent.train(state, action, next_state_flat, reward, done)  # Pass the flattened next state to the agent
        state = next_state_flat
        step_count += 1

    print(f"Epoch {epoch + 1}: Total Steps = {step_count}")


Next state tensor shape: torch.Size([1, 25])
Next state flattened: [0.5, 0.0, 0.0, 0.20000000298023224, 0.0, 0.0, 0.20000000298023224, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.20000000298023224, 0.0, 0.0, 0.0, 0.20000000298023224, 0.0, 0.0, 0.20000000298023224, 0.0, 0.0, 0.800000011920929]
Environment grid size: 5
Q-values shape: torch.Size([4, 1])
Next Q-values shape: torch.Size([4, 1])
Q-value shape: torch.Size([1])
Target Q-value shape: torch.Size([4])


  return F.mse_loss(input, target, reduction=self.reduction)


RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [21]:
if torch.cuda.is_available():
    print(f"CUDA is available. Found {torch.cuda.device_count()} GPU(s):")
    for i in range(torch.cuda.device_count()):
        print(f"  - GPU {i}: {torch.cuda.get_device_name(i)}")
else:
    print("CUDA is not available.")

CUDA is available. Found 1 GPU(s):
  - GPU 0: NVIDIA GeForce GTX 1660 Ti
