In [None]:
from collections import namedtuple
from typing import List
from random import choices
from gymnasium import Env
import numpy as np
import torch
from torch import nn
from torch import Tensor
from torch.nn import functional as F

In [249]:
class Agent(object):
  def __init__(self, env): ...
  def process_transition(self, observation, action, reward, next_observation, done): ...
  def get_action(self, observation, learning): ...

In [250]:
def select_greedy_actions(states: Tensor, model: nn.Module) -> Tensor:
  """Select the greedy action for the current state given some Q-network."""
  _, actions = model(states).max(dim=1, keepdim=True)
  return actions

def evaluate_actions(states: Tensor, actions: Tensor, rewards: Tensor, dones: Tensor, gamma: float, model: nn.Module) -> Tensor:
  """Compute the Q-values by evaluating the actions given the current states and Q-network."""
  next_q_values = model(states).gather(dim=1, index=actions)
  q_values = rewards + (gamma * next_q_values * (1 - dones))
  return q_values

def single_update(states: Tensor, rewards: Tensor, dones: Tensor, gamma: float, network: nn.Module) -> Tensor:
  """Q-Learning update with explicitly decoupled action selection and evaluation steps."""
  actions = select_greedy_actions(states, network)
  q_values = evaluate_actions(states, actions, rewards, dones, gamma, network)
  return q_values

def double_update(states: Tensor, rewards: Tensor, dones: Tensor, gamma: float, online: nn.Module,
                  target: nn.Module) -> Tensor:
  """Double Q-Learning uses Q-network 1 to select actions and Q-network 2 to evaluate the selected actions."""
  actions = select_greedy_actions(states, online)
  q_values = evaluate_actions(states, actions, rewards, dones, gamma, target)
  return q_values

In [251]:
class Configuration(dict):
  __getattr__ = dict.get
  __setattr__ = dict.__setitem__
  __delattr__ = dict.__delitem__

Experience = namedtuple("Experience", field_names=[
  "state",
  "action",
  "reward",
  "next_state",
  "done"
])
class MemoryModule(object):
  def __init__(self, capacity: int = None) -> None:
    self.capacity = capacity
    self.contents = []

  def __len__(self) -> int:
    return len(self.contents)

  def append(self, experience: Experience) -> None:
    if self.capacity and len(self) >= self.capacity: self.contents.pop(0)
    self.contents.append(experience)

  def sample(self, count: int) -> List[Experience]:
    return choices(self.contents, k=count)


In [252]:
class ModelModule(torch.nn.Module):
  def __init__(self, input_size: int, hidden_sizes: List[int], output_size: int):
    super().__init__()

    self.model = torch.nn.Sequential(
      torch.nn.Linear(input_size, hidden_sizes[0]),
      torch.nn.ReLU(),
      *[layer for i in range(len(hidden_sizes) - 1) for layer in [
        torch.nn.Linear(hidden_sizes[i], hidden_sizes[i + 1]),
        torch.nn.ReLU()
      ]],
      torch.nn.Linear(hidden_sizes[-1], output_size)
    )

  def forward(self, x): return self.model(x)

  def sync_with(self, other: 'ModelModule'):
    self.load_state_dict(other.state_dict())

  def update_params(self, other: 'ModelModule', influence: float):
    for owned, influencer in zip(self.parameters(), other.parameters()):
      owned.data = owned.data * (1.0 - influence) + influencer.data * influence


In [253]:
class NeuralQLearningAgent(Agent):
  def __init__(
      self, env: Env,
      network: Configuration,
      use_memory: Configuration = None,
      use_double: Configuration = None,
      use_freeze: Configuration = None
  ):
    super().__init__(env)
    def create_model():
      return ModelModule(self.env.observation_space.shape[0], network.layers, self.env.action_space.n)
    self.env = env

    self.uses_memory = bool(use_memory)
    self.uses_double = bool(use_double)
    self.uses_freeze = bool(use_freeze)

    # Memory-parameters
    self.memory = MemoryModule(use_memory.capacity)
    self.update_frequency = use_memory.update_frequency
    self.batch_size = use_memory.batch_size

    # Double-parameters
    self.online_model = create_model()
    self.target_model = create_model()
    self.target_model.sync_with(self.online_model)
    self.influence = use_double.influence

    # Freeze-parameters
    self.freeze_frequency = use_freeze and use_freeze.frequency

    # Hyper-parameters
    self.optimizer = torch.optim.RMSprop(self.online_model.parameters(), lr=0.001)
    self.gamma = 0.99
    self.epsilon = 1
    self.epsilon_decay = 0.99
    self.iterations = 0

  def get_action(self, observation, learning):
    if not self.can_train or (learning and np.random.random() < self.epsilon): return self.env.action_space.sample()

    state = (torch.from_numpy(observation).unsqueeze(dim=0))
    return self.online_model(state).argmax().item()

  def train(self, experiences: List[Experience]) -> None:
    states, actions, rewards, next_states, dones = map(Tensor, zip(*experiences))

    actions = actions.long().unsqueeze(dim=1)
    rewards = rewards.unsqueeze(dim=1)
    dones = dones.unsqueeze(dim=1)

    if self.uses_double:
      target = double_update(
        next_states,
        rewards,
        dones,
        self.gamma,
        self.online_model,
        self.target_model
      )
    else:
      target = single_update(
        next_states,
        rewards,
        dones,
        self.gamma,
        self.target_model
      )

    predicted = self.online_model(states).gather(dim=1, index=actions)
    loss = F.mse_loss(predicted, target)

    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

    self.target_model.update_params(self.online_model, self.influence)

  @property
  def should_train(self) -> bool:
    return not self.uses_memory or self.iterations % self.update_frequency == 0

  @property
  def can_train(self) -> bool:
    return not self.uses_memory or len(self.memory) >= self.batch_size

  @property
  def should_freeze(self):
    return self.uses_freeze and self.iterations % self.freeze_frequency == 0

  def process_transition(self, observation, action, reward, next_observation, done):
    experience = Experience(observation, action, reward, next_observation, done)
    self.memory.append(experience)
    self.iterations += 1

    if self.should_train and self.can_train:
      if self.uses_memory:
        self.train(self.memory.sample(self.batch_size))
      else:
        self.train([experience])

      if not self.uses_freeze:
        self.target_model.update_params(self.online_model, self.influence)

    if self.should_freeze:
      self.target_model.sync_with(self.online_model)

    if done: self.update_epsilon()

  def update_epsilon(self) -> float:
    self.epsilon = max(0.01, self.epsilon * self.epsilon_decay)


The code for the training loop remains unchanged from the previous post.

In [254]:
import gymnasium as gym
from tqdm.notebook import tqdm

def train(agent: Agent, env: Env, episodes: int) -> List[float]:
  def run_episode() -> float:
    state, _ = env.reset()
    score = 0
    done = False
    while not done:
      action = agent.get_action(state, True)
      next_state, reward, done, term, _ = env.step(action)
      agent.process_transition(state, action, reward, next_state, done or term)
      state = next_state
      score += reward
    return score

  scores = []
  for episode in tqdm(range(episodes)):
    scores.append(run_episode())

    if (episode + 1) % 100 == 0:
      average = np.mean(scores[-100:])
      print(f"\rEpisode {episode + 1:<0}\tAverage Score: {average:.2f}")

  return scores


In [255]:
env = gym.make('CartPole-v1')

### Comparing DQN and Double DQN

To make it a bit easier to compare the overall performance of the two algorithms I will now re-train both agents for the same number of episodes (rather than training for the minimum number of episodes required to achieve a target score).

In [258]:
network = Configuration(layers=[128, 128])
memory = Configuration(update_frequency=4, batch_size=64, capacity=1_000_000)
double = Configuration(influence=0.001)
freeze = Configuration(frequency=25)

In [259]:
use_double_agent = NeuralQLearningAgent(env, network=network, use_double=double, use_memory=memory, use_freeze=freeze)

use_double_scores = train(
  use_double_agent,
  env,
  episodes=2000,
)

  0%|          | 0/2000 [00:00<?, ?it/s]

Episode 100	Average Score: 84.77
Episode 200	Average Score: 299.44
Episode 300	Average Score: 240.36
Episode 400	Average Score: 212.32
Episode 500	Average Score: 270.42
Episode 600	Average Score: 326.61
Episode 700	Average Score: 184.58
Episode 800	Average Score: 238.25
Episode 900	Average Score: 250.40
Episode 1000	Average Score: 245.03
Episode 1100	Average Score: 269.88
Episode 1200	Average Score: 362.81
Episode 1300	Average Score: 334.78
Episode 1400	Average Score: 281.05
Episode 1500	Average Score: 263.78
Episode 1600	Average Score: 247.43
Episode 1700	Average Score: 306.47
Episode 1800	Average Score: 260.90
Episode 1900	Average Score: 297.36
Episode 2000	Average Score: 144.83


In [None]:
use_single_agent = NeuralQLearningAgent(env, network=network, use_double=False, use_memory=memory, use_freeze=freeze)

use_single_scores = train(
  use_single_agent,
  env,
  episodes=2000,
)

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
% matplotlib inline

In [None]:
use_single_scores = pd.Series(use_single_scores, name="scores")
use_double_scores = pd.Series(use_double_scores, name="scores")

In [None]:
fig, axes = plt.subplots(2, 1, figsize=(10, 6), sharex=True, sharey=True)
use_single_scores.plot(ax=axes[0], label="Single DQN Scores")
use_single_scores.rolling(window=100).mean().rename("Rolling Average").plot(ax=axes[0])
axes[0].legend()
axes[0].set_ylabel("Score")

use_double_scores.plot(ax=axes[1], label="Double DQN Scores")
use_double_scores.rolling(window=100).mean().rename("Rolling Average").plot(ax=axes[1])
axes[1].legend()
axes[1].set_ylabel("Score")
axes[1].set_xlabel("Episode Number")
