# Deep Q-learning

Q-learning is great, because we have a convergence / optimality statement (Bellman equation). But severely limited to small environments, because storing the Q-table in memory is prohibitive. Also not very well suited for continuous spaces, but we'll get to that later.

We'll still use the same environment for now

In [4]:
import numpy as np

class GridWorld:
    def __init__(self, size=(3, 3), goal=(2, 2)):
        self.size = size
        self.goal = goal
        self.state = (0, 0)  # Start at the top-left corner
        self.actions = ['up', 'down', 'left', 'right']
    
    def reset(self):
        """Reset the environment to the initial state."""
        self.state = (0, 0)
        return self.state
    
    def step(self, action):
        """Take an action and return the next state, reward, and if done."""
        if action == 'up':
            self.state = (max(0, self.state[0] - 1), self.state[1])
        elif action == 'down':
            self.state = (min(self.size[0] - 1, self.state[0] + 1), self.state[1])
        elif action == 'left':
            self.state = (self.state[0], max(0, self.state[1] - 1))
        elif action == 'right':
            self.state = (self.state[0], min(self.size[1] - 1, self.state[1] + 1))

        # Reward of -1 for each step, +10 for reaching the goal
        reward = 10 if self.state == self.goal else -1
        done = self.state == self.goal
        return self.state, reward, done

    def render(self):
        """Visualize the gridworld."""
        grid = np.full(self.size, '.')
        grid[self.goal] = 'G'  # Goal location
        grid[self.state] = 'A'  # Agent location
        print('\n'.join([' '.join(row) for row in grid]))


In [5]:
import torch
import torch.nn as nn
import torch.optim as optim

class DQNAgent:
    def __init__(self, state_dim, action_dim, gamma=0.9, epsilon=0.1, learning_rate=0.01):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = epsilon
        self.learning_rate = learning_rate

        # Define the Q-network (simple feed-forward neural network)
        self.model = nn.Sequential(
            nn.Linear(state_dim, 24),
            nn.ReLU(),
            nn.Linear(24, 24),
            nn.ReLU(),
            nn.Linear(24, action_dim)
        )
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        self.loss_fn = nn.MSELoss()

        # Define possible actions
        self.actions = ['up', 'down', 'left', 'right']
    
    def act(self, state):
        """Select an action using epsilon-greedy policy."""
        if np.random.rand() < self.epsilon:
            return np.random.choice(range(self.action_dim))
        else:
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            q_values = self.model(state_tensor)
            return torch.argmax(q_values, dim=1).item()

    def learn(self, state, action, reward, next_state, done):
        """Update the Q-network based on the observed transition."""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)

        # Get Q-values for current and next state
        q_values = self.model(state_tensor)
        next_q_values = self.model(next_state_tensor)

        # Compute the target Q-value (TD target)
        target = reward + (1 - done) * self.gamma * torch.max(next_q_values)
        
        # Update the Q-value for the taken action
        q_values[0][action] = target

        # Compute the loss
        loss = self.loss_fn(q_values, self.model(state_tensor))
        
        # Backpropagate and optimize
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

In [6]:
def train_agent(episodes=1000):
    env = GridWorld()
    agent = DQNAgent(state_dim=2, action_dim=4)

    for episode in range(episodes):
        state = env.reset()
        done = False
        total_reward = 0
        
        while not done:
            # Render the environment (optional)
            #env.render()
            
            # Choose an action
            action = agent.act(state)
            
            # Take the action in the environment
            next_state, reward, done = env.step(agent.actions[action])
            
            # Learn from the experience
            agent.learn(state, action, reward, next_state, done)
            
            state = next_state
            total_reward += reward
        
        print(f"Episode {episode+1}/{episodes}, Total Reward: {total_reward}")

# Run the training
train_agent(episodes=10)

Episode 1/10, Total Reward: -38
Episode 2/10, Total Reward: -55
Episode 3/10, Total Reward: -18
Episode 4/10, Total Reward: 7
Episode 5/10, Total Reward: -11
Episode 6/10, Total Reward: -38
Episode 7/10, Total Reward: 7
Episode 8/10, Total Reward: 7
Episode 9/10, Total Reward: 7
Episode 10/10, Total Reward: -36


# TODO

- Implement replay buffer (no peaking below!)

## Experience Replay

In [7]:
import random
from collections import deque

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, done):
        """Add a new experience to the buffer."""
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        """Randomly sample a batch of experiences from the buffer."""
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)

In [8]:
class DQNAgentWithReplay(DQNAgent):
    def __init__(self, state_dim, action_dim, buffer_capacity=10000, batch_size=32, **kwargs):
        super().__init__(state_dim, action_dim, **kwargs)
        self.replay_buffer = ReplayBuffer(buffer_capacity)
        self.batch_size = batch_size

    def learn_from_experience(self):
        """Train the network using experiences sampled from the replay buffer."""
        # Only train if the buffer has enough samples
        if len(self.replay_buffer) < self.batch_size:
            return
        
        # Sample a batch of experiences
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)
        
        # Convert to tensors
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones)
        
        # Compute current Q-values
        q_values = self.model(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        
        # Compute target Q-values using the target network or self.model
        with torch.no_grad():
            next_q_values = self.model(next_states).max(1)[0]
            target_q_values = rewards + (1 - dones) * self.gamma * next_q_values
        
        # Compute the loss
        loss = self.loss_fn(q_values, target_q_values)
        
        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def store_experience(self, state, action, reward, next_state, done):
        """Store a single experience in the replay buffer."""
        self.replay_buffer.add(state, action, reward, next_state, done)

In [13]:
def train_agent_with_replay(episodes=10):
    env = GridWorld()
    agent = DQNAgentWithReplay(state_dim=2, action_dim=4, buffer_capacity=1000, batch_size=32)

    for episode in range(episodes):
        state = env.reset()
        done = False
        total_reward = 0
        
        while not done:
            # Render the environment
            #env.render()
            
            # Choose an action
            action = agent.act(state)
            
            # Take the action in the environment
            next_state, reward, done = env.step(agent.actions[action])
            
            # Store the experience
            agent.store_experience(state, action, reward, next_state, done)
            
            # Train the agent
            agent.learn_from_experience()
            
            state = next_state
            total_reward += reward
        
        print(f"Episode {episode+1}/{episodes}, Total Reward: {total_reward}")


In [15]:
train_agent_with_replay(20)

Episode 1/20, Total Reward: -11
Episode 2/20, Total Reward: -1
Episode 3/20, Total Reward: -17
Episode 4/20, Total Reward: 6
Episode 5/20, Total Reward: 7
Episode 6/20, Total Reward: 7
Episode 7/20, Total Reward: 6
Episode 8/20, Total Reward: 7
Episode 9/20, Total Reward: 7
Episode 10/20, Total Reward: 7
Episode 11/20, Total Reward: 7
Episode 12/20, Total Reward: 7
Episode 13/20, Total Reward: 7
Episode 14/20, Total Reward: 7
Episode 15/20, Total Reward: 7
Episode 16/20, Total Reward: 7
Episode 17/20, Total Reward: 5
Episode 18/20, Total Reward: 7
Episode 19/20, Total Reward: 7
Episode 20/20, Total Reward: 7
