## Base code

In [None]:
import torch


class Model(torch.nn.Module):
  def __init__(self, num_states: int, num_actions: int, hidden_layer_size: int) -> None:
    """
      DQN using mlp
    """
    super(Model, self).__init__()

    self.layer1 = torch.nn.Sequential(
        torch.nn.Linear(num_states, hidden_layer_size),
        torch.nn.BatchNorm1d(hidden_layer_size),
        torch.nn.PReLU()
    )

    self.layer2 = torch.nn.Sequential(
        torch.nn.Linear(hidden_layer_size, hidden_layer_size),
        torch.nn.BatchNorm1d(hidden_layer_size),
        torch.nn.PReLU()
    )

    self.final_layer = torch.nn.Linear(hidden_layer_size, num_actions)

  def forward(self, x: torch.Tensor) -> torch.Tensor:
    x = self.layer1(x)
    x = self.layer2(x)
    x = self.final_layer(x)
    return x


In [None]:
import random
import numpy as np
from collections import namedtuple
from typing import List, Tuple, NamedTuple


class ReplayMemory(object):
  def __init__(self, capacity: int) -> None:
    self.capacity = capacity
    self.sum_tree = Sumtree(capacity=capacity)
    self.transition = namedtuple("Transition",
                                 field_names=["prev_state", "action",
                                              "reward", "curr_state",
                                              "done"])
    # Hyperparameter that we use to avoid some experiences
    # to have 0 probability of being taken
    self.err = 0.01
    # Hyperparameter that we use to make a tradeoff between
    # taking only exp with high priority and sampling randomly
    self.alpha = 0.6
    # importance-sampling, from initial value increasing to 1
    self.beta = 0.4
    self.beta_increment_per_sampling = 0.001
    # clipped abs error
    self.absolute_error_upper = 1.0

  def __len__(self) -> int:
    return self.sum_tree.n_entries

  def _get_priority(self, abs_err: np.float) -> float:
    abs_errs = np.abs(abs_err) + self.err
    clipped_errors = np.minimum(abs_errs, self.absolute_error_upper).item()
    return clipped_errors ** self.alpha

  def update(self, tree_idx: int, abs_err: np.float) -> None:
    priority = self._get_priority(abs_err=abs_err)
    self.sum_tree.update(tree_idx=tree_idx, priority=priority)

  def push(self, prev_state: np.ndarray, action: int,
           reward: int, curr_state: np.ndarray, done: bool) -> None:
    # Find the max priority (from the leaves)
    max_priority = np.max(self.sum_tree.tree[-self.sum_tree.capacity:]).item()

    # if the max priority = 0 we can't put priority = 0
    # since this exp will never have a chance to be selected
    # so we use a minimum priority
    if max_priority == 0:
      max_priority = self.absolute_error_upper

    new_exp = self.transition(prev_state, action, reward, curr_state, done)
    self.sum_tree.add(max_priority, new_exp)

  def sample(self, batch_size: int) -> Tuple[List, List, List]:
    """
      return a sample of with length of batch size.
      batch contains transitions (experience).
      indices and is_weights are lists of floats.
    """
    batch = []
    indices = []
    priorities = []

    priority_segment = self.sum_tree.total_priority / batch_size
    self.beta = np.min([1.0, self.beta + self.beta_increment_per_sampling])

    for i in range(batch_size):
      a = priority_segment * i
      b = priority_segment * (i+1)
      uniform_sample = random.uniform(a, b)

      idx, priority, data = self.sum_tree.get(uniform_sample)
      indices.append(idx)
      priorities.append(priority)
      batch.append(data)

    # p(j)
    sampling_probabilities = priorities / self.sum_tree.total_priority
    # IS = (1/N * 1/P(i))**b /max wi == (N*P(i))**-b  /max wi
    is_weights = np.power(self.sum_tree.n_entries * sampling_probabilities,
                          -self.beta)
    is_weights /= is_weights.max()

    return batch, indices, is_weights.tolist()


class Sumtree(object):
  def __init__(self, capacity: int) -> None:
    """
      A binary tree data structure where the value of a node is equal 
      to sum of the nodes present in its left subtree and right subtree.
    """
    self.capacity = capacity
    self.n_entries = 0
    self.data_pointer = 0

    # remember we are in a binary node (each node has max 2 children)
    # so 2x size of leaf (capacity) - 1 (root node)
    self.tree = np.zeros(2*capacity-1)
    # contains the experiences (so the size of data is capacity)
    self.data = np.zeros(capacity, dtype=object)

  def add(self, priority: int, new_data: NamedTuple) -> None:
    """
      store the priority and experience
    """
    tree_idx = self.data_pointer + self.capacity - 1
    self.data[self.data_pointer] = new_data
    self.update(tree_idx, priority)

    self.data_pointer += 1
    # If we're above the capacity, you go back to first index (we overwrite)
    if self.data_pointer >= self.capacity:
      self.data_pointer = 0

    if self.n_entries < self.capacity:
      self.n_entries += 1

  def update(self, tree_idx: int, priority: float) -> None:
    """
      update the priority
    """
    change = priority - self.tree[tree_idx]
    self.tree[tree_idx] = priority

    # update the parent nodes
    while True:
      tree_idx = (tree_idx-1) // 2
      self.tree[tree_idx] += change
      if tree_idx == 0:
        break

  def get(self, value: float) -> Tuple[int, int, int]:
    """
      get the leaf index, priority value of that leaf, and experience associated with that index
    """
    parent_idx = 0
    left_child_idx = 2 * parent_idx + 1
    right_child_idx = left_child_idx + 1

    while left_child_idx < len(self.tree):
      # downward search, always search for a higher priority node
      if value < self.tree[left_child_idx]:
        parent_idx = left_child_idx
      else:
        value -= self.tree[left_child_idx]
        parent_idx = right_child_idx

      left_child_idx = 2 * parent_idx + 1
      right_child_idx = left_child_idx + 1

    data_idx = parent_idx - self.capacity + 1
    return parent_idx, self.tree[parent_idx], self.data[data_idx]

  @property
  def total_priority(self) -> np.float64:
    """
      find the sum of nodes by returning the root node
    """
    return self.tree[0]


In [None]:
import os
import gym
from gym.envs.classic_control import CartPoleEnv
import torch
import numpy as np
from typing import List, Dict, Tuple


class Agent():
  def __init__(self, env: gym.envs.classic_control.CartPoleEnv, debug: bool, checkpoint_path: str,
               hidden_layer_size: int, replay_memory_cap: int, batch_size: int,
               learning_rate: float, learning_rate_decay: float, discount_factor: float) -> None:
    """
      Agent class that is responsible for training our neural network and overall 
      managing the DQN.
    """
    self.env = env
    self.num_states = env.observation_space.shape[0]
    self.num_actions = env.action_space.n

    self.debug = debug

    # if gpu is to be used
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    torch.device(self.device)
    # if debug:
    print("Using device: %s" % self.device)

    self.__model = Model(num_states=self.num_states, num_actions=self.num_actions,
                         hidden_layer_size=hidden_layer_size).to(self.device)

    self.learning_rate_decay = learning_rate_decay
    self.optimizer = torch.optim.Adam(
        self.__model.parameters(), lr=learning_rate)
    self.__scheduler = torch.optim.lr_scheduler.ExponentialLR(
        optimizer=self.optimizer, gamma=learning_rate_decay)
    self.loss_function = torch.nn.MSELoss()
    self.checkpoint_path = checkpoint_path
    self.discount_factor = discount_factor

    self.replay_memory = ReplayMemory(replay_memory_cap)
    self.batch_size = batch_size

  @property
  def model(self) -> Model:
    return self.__model

  def preprocess_observation(self, observation: np.ndarray) -> torch.Tensor:
    return torch.autograd.Variable(torch.Tensor(observation).to(self.device))

  def predict(self, input_data: np.ndarray) -> torch.Tensor:
    processed_data = self.preprocess_observation(
        input_data.reshape(-1, self.num_states))
    self.__model.train(mode=False)
    return self.__model(processed_data)

  def get_action(self, observation: np.ndarray, epsilon: float) -> int:
    """
      Only 2 actions, left and right. 
      0 = move black box thing left.
      1 = move black box thing right

      args:
        observation: List of floats. List[cart position, cart velocity, pole angle, pole velocity at tip].
    """
    if np.random.rand() < epsilon:
      final_action = self.env.action_space.sample()
    else:
      self.__model.train(mode=False)
      scores = self.predict(observation)
      _, max_val_idx = torch.max(scores.cpu().data, 1)
      final_action = int(max_val_idx.numpy())

    return final_action

  def decay_learning_rate(self) -> None:
    if len(self.replay_memory) >= self.batch_size and self.learning_rate_decay > 0.00:
      self.__scheduler.step()

  def get_last_lr(self) -> float:
    return self.__scheduler.get_last_lr()[0]

  def save_weights(self) -> None:
    # make the file if not exists, torch.save doesn't work without existing file
    try:
      if not os.path.exists(self.checkpoint_path):
        if not os.path.isdir(os.path.dirname(self.checkpoint_path)):
          os.mkdir(os.path.dirname(self.checkpoint_path))
        with open(self.checkpoint_path, 'w+'):
          pass

      if self.debug:
        print("Saving weights to: " + str(self.checkpoint_path))

      torch.save(self.__model.state_dict(), self.checkpoint_path)
    except Exception as e:
      print("Could not save weights to: " + str(self.checkpoint_path))
      print("ERROR: %s" % e)

  def load_weights(self, name: str = '') -> None:
    try:
      self.__model.load_state_dict(torch.load(self.checkpoint_path))
      if self.debug:
        print("Loaded weights for " + name +
              ", from: " + str(self.checkpoint_path))
    except Exception as e:
      print("Could not load weights for " + name +
            ", from: " + str(self.checkpoint_path))
      print("ERROR: %s" % e)

  def copy_weights(self, agent_to_copy: 'Agent') -> None:
    self.__model.load_state_dict(agent_to_copy.model.state_dict())

  def add_experience(self, prev_state: np.ndarray, action: int,
                     reward: int, curr_state: np.ndarray, done: bool) -> None:
    self.replay_memory.push(prev_state, action, reward, curr_state, done)

  def train(self, target_agent: 'Agent') -> Tuple[float, float, float]:
    """
      Train on a single game. Only train if our replay memory has enough saved memory, 
      which should be >= batch size.

      We take a minibatch (of size batch_size) from our replay memory. We use our 
      train_agent (policy network) to predict the Q values for the previous states, 
      and take the Q-value of the action taken in prev_state (we can do this using 
      our actions batch). We use our target_agent (target network) to predict the 
      max Q values for the current states, but we use these Q values from target_agent 
      in our bellman equation to get the max Q values. Finally, we compare the Q 
      values from the policy network with the Q value we get from the bellman equation.
    """
    # only start training process when we have enough experiences in the replay
    if len(self.replay_memory) < self.batch_size:
      return 0.00, 0.00, 0.00

    # sample random batch from replay memory
    (minibatch,
     minibatch_indices,
     minibatch_IS_weight) = self.replay_memory.sample(self.batch_size)

    prev_states = np.vstack([x.prev_state for x in minibatch])
    actions = torch.LongTensor(
        np.array([x.action for x in minibatch]).reshape(-1, 1)).to(self.device)
    rewards = torch.FloatTensor(
        np.array([x.reward for x in minibatch]).reshape(-1, 1)).to(self.device)
    curr_states = np.vstack([x.curr_state for x in minibatch])
    dones = torch.FloatTensor(
        np.array([x.done for x in minibatch]).reshape(-1, 1)).to(self.device)

    # use train network to predict q values of prior states (before actual states)
    # get the q value of action taken for the prior state (given by actions)
    q_predict = self.predict(prev_states).gather(1, actions)

    # use bellman equation to get expected q-value of actual states
    # we get the max q value here, regardless of action taken for prior state
    q_curr_state_values = target_agent.predict(
        curr_states).max(dim=1, keepdim=True)[0].detach()
    mask = 1 - dones
    q_target = (rewards + self.discount_factor *
                q_curr_state_values * mask).to(self.device)

    # get the absolute error between predict and target
    # and update our replay memory
    errors = torch.abs(q_predict - q_target).cpu().data.numpy()
    for i in range(self.batch_size):
      self.replay_memory.update(minibatch_indices[i], errors[i])

    # train our network based on the results from its
    # q_predict to expected values given by our target network (q_target)
    self.__model.train(mode=True)
    self.optimizer.zero_grad()
    loss = (torch.FloatTensor(minibatch_IS_weight).to(self.device)
            * self.loss_function(q_predict, q_target)).mean()
    loss.backward()

    # gradient clipping (use if you want)
    # for param in self.__model.parameters():
    #   param.grad.data.clamp_(-1, 1)

    self.optimizer.step()

    # convert the loss and q_target to floats and
    # return them so we can analyze later
    float_loss = np.mean(loss.cpu().detach().numpy()).item()
    float_q_target = np.mean(q_target.cpu().detach().numpy()).item()
    float_err = np.mean(errors).item()
    return float_loss, float_q_target, float_err


In [None]:
import os
import gym
from gym.envs.classic_control import CartPoleEnv
from tqdm.auto import tqdm
import numpy as np
import matplotlib.pyplot as plt
from typing import Tuple, Dict
from collections import deque


def train_agent(env: gym.envs.classic_control.CartPoleEnv, train_agent: Agent, target_agent: Agent,
                progress_print_per_iter: int, total_episodes: int, episode_epsilon: float,
                min_epsilon: float, max_epsilon_episodes: int, epsilon_decay: float,
                copy_max_count: int, saved_results_path: str, saved_results_name: str,
                hyperparams_dict: Dict) -> None:
  """
    Train the agent on a number of games/episodes.
    Decay epsilon by epsilon_decay if given, otherwise decay using max_epsilon_episodes.
    Decay learning rate by learning_rate_decay if present, otherwise don't decay.
    Record and plot data on matplotlib and also save the figues/numbers.
  """
  total_rewards = 0
  total_steps = 0
  total_loss = 0.0
  total_bellman_eq = 0.0
  total_errs = 0.0
  avg_reward = deque(maxlen=100)
  progress_bar = tqdm(total=total_episodes)
  solved_game = False

  plotting_data = {
      'avg_rewards[last_%s]' % avg_reward.maxlen: np.empty(total_episodes),
      'total_rewards': np.empty(total_episodes),
      'epsilon': np.empty(total_episodes),
      'loss': np.empty(total_episodes),
      'bellman_eq': np.empty(total_episodes),
      'errors': np.empty(total_episodes),
      'learning_rate': np.empty(total_episodes),
  }

  for episode in range(total_episodes):
    # epsilon decay
    episode_epsilon = epsilon_decay_formula(episode=episode, max_episode=max_epsilon_episodes,
                                            min_epsilon=min_epsilon, epsilon=episode_epsilon,
                                            epsilon_decay=epsilon_decay)

    # train game/episode, save weights, decay learning rate
    (total_rewards, total_steps,
     total_loss, total_bellman_eq,
     total_errs) = train_single_game(env=env,
                                     train_agent=train_agent,
                                     target_agent=target_agent,
                                     epsilon=episode_epsilon,
                                     copy_max_count=copy_max_count,
                                     total_steps=total_steps)
    train_agent.decay_learning_rate()
    train_agent.save_weights()

    # update matplotlib data
    avg_reward.append(total_rewards)
    plotting_data['avg_rewards[last_%s]' %
                  avg_reward.maxlen][episode] = np.mean(avg_reward)
    plotting_data['total_rewards'][episode] = total_rewards
    plotting_data['epsilon'][episode] = episode_epsilon
    plotting_data['loss'][episode] = total_loss
    plotting_data['bellman_eq'][episode] = total_bellman_eq
    plotting_data['errors'][episode] = total_errs
    plotting_data['learning_rate'][episode] = train_agent.get_last_lr()

    # update progress bar
    if ((episode+1) % progress_print_per_iter) == 0:
      progress_bar.update(progress_print_per_iter)
      progress_bar.set_postfix({
          'episode reward': total_rewards,
          'avg reward (last %s)' % avg_reward.maxlen: np.mean(avg_reward),
          'epsilon': episode_epsilon,
      })
    
    if not solved_game and np.mean(avg_reward) >= 195:
      solved_game = True
      print("Solved in %s games/episodes" % (episode+1))

  env.close()
  save_results(plotting_data=plotting_data, progress_bar=progress_bar,
               name=saved_results_name, directory_name=saved_results_path,
               hyperparams_dict=hyperparams_dict)


def train_single_game(env: gym.envs.classic_control.CartPoleEnv,
                      train_agent: Agent, target_agent: Agent,
                      epsilon: float, copy_max_count: int,
                      total_steps: int) -> Tuple[int, int, float, float, float]:
  """
    Train the agent on one game/episode.
    Update target agent model weights to the same as train agent's after some number of steps.
    Return the total rewards given by the environment, and the loss given by the agent/model.

    observation := [cart position, cart velocity, pole angle, pole velocity at tip]
  """
  prev_observation = env.reset()
  observation = None
  total_rewards = 0
  total_loss = 0
  avg_bellman_eq = 0
  total_bellman_eq = 0
  total_errs = 0
  reward, game_done = None, False

  while not game_done:
    # Get our agent's action and record the environment
    action = train_agent.get_action(
        observation=prev_observation, epsilon=epsilon)
    observation, reward, game_done, _ = env.step(action)
    total_rewards += reward
    total_steps += 1

    if game_done:
      reward -= 1

    # Add the observations we got from the environment
    train_agent.add_experience(prev_observation, action, reward,
                               observation, game_done)
    # Get the loss and bellman equation values from training
    total_loss, avg_bellman_eq, total_errs = train_agent.train(target_agent)
    total_bellman_eq = avg_bellman_eq if avg_bellman_eq > 0 else total_bellman_eq
    # adjust prev state to curr state for next iteration
    prev_observation = observation

    # copy weights of policy net to our target net after a certain amount of steps
    if total_steps % copy_max_count == 0:
      target_agent.copy_weights(train_agent)

  return total_rewards, total_steps, total_loss, total_bellman_eq, total_errs


def save_results(plotting_data: Dict[str, np.ndarray], progress_bar: tqdm,
                 name: str, directory_name: str, hyperparams_dict: Dict) -> None:
  """
    Save the progress bar and the matplotlib figures to a directory.
  """
  # create all the necessary directories
  parent_dir = os.path.abspath(os.path.join(directory_name, os.pardir))
  if not os.path.exists(parent_dir):
    os.mkdir(parent_dir)
  if not os.path.exists(directory_name):
    os.mkdir(directory_name)
  directory_name = directory_name + "/" + name

  # plot all our data
  def plot_figure(data: np.ndarray, xlabel: str, ylabel: str, save_path: str) -> None:
    plt.clf()
    plt.plot(data)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.savefig(save_path)
    plt.show()

  for name, plot_data in plotting_data.items():
    plot_figure(data=plot_data, xlabel='Episode', ylabel=name,
                save_path=directory_name + name + '.png')

  # Save the hyperparams and progress bar in a text file
  with open(directory_name+'pbar.txt', 'w', encoding="utf-8") as filetowrite:
    filetowrite.write("==== Hyperparams: ====\n")
    for key, val in hyperparams_dict.items():
      filetowrite.write(key + ": %s" % val)
      filetowrite.write("\n")
    filetowrite.write("\n")
    filetowrite.write(str(progress_bar))


def play_game(env: gym.envs.classic_control.CartPoleEnv, agent: Agent,
              epsilon: float, game_render: bool = False) -> None:
  """
    Play a single game.

    observation := [cart position, cart velocity, pole angle, pole velocity at tip]
  """
  observation = env.reset()
  done = False
  total_episodes = 0

  while not done:
    if game_render:
      env.render()
    action = agent.get_action(observation=observation, epsilon=epsilon)
    observation, _, done, _ = env.step(action)
    total_episodes += 1

  env.close()
  print("\nTotal rewards/time steps: {0}.".format(total_episodes))


def epsilon_decay_formula(episode: int, max_episode: int, min_epsilon: float,
                          epsilon: float, epsilon_decay: float) -> float:
  """
    If there is an epsilon decay value, then we use that.

    Otherwise use max_epsilon_episodes, which will look like the graph below:
    Returns 𝜺-greedy
    1.0---|\
          | \
          |  \
    min_e +---+------->
              |
              max_episode
  """
  if epsilon_decay > 0:
    new_epsilon = max(min_epsilon, epsilon * epsilon_decay)
  else:
    slope = (min_epsilon - 1.0) / max_episode
    new_epsilon = max(min_epsilon, slope * episode + epsilon)

  return new_epsilon


## Main

In [None]:
import os

curr_try = 1
hyperparams_dict = {
    # total eps, printing, and memory
    'total_episodes':  500,
    'replay_memory_cap': 50000,
    'progress_per_iteration': 10,

    # learning rate
    'learning_rate':  0.001,
    'learning_rate_decay': 0.00,
    # 'learning_rate_decay':  0.001,

    # epsilon
    'epsilon': 1.0,
    'max_epsilon_episodes': 50,
    'min_epsilon': 0.01,
    'epsilon_decay': 0.00,
    # 'epsilon_decay': 0.995,

    # other factors
    'discount_factor': 0.99,
    'batch_size': 32,
    'copy_max_step': 25,
    'hidden_layer_size': 48,

    # weights save path
    'checkpoint_path': os.path.join(os.path.join(os.getcwd(), 'nn_saved_weights'), 'training_%s.pth' % curr_try),

    # results save path
    'saved_results_path': os.path.join(os.path.join(os.getcwd(), 'saved_results'), 'training_%s' % curr_try),

    # results save name
    'saved_results_name': '',
}


In [None]:
import os
import gym
from gym.envs.classic_control import CartPoleEnv


def print_all_hyperparams() -> None:
  """
    Print all the hyper parameters.
  """
  print()
  print("==== Hyperparams: ====")
  for key, val in hyperparams_dict.items():
    print(key + ": %s" % val)
  print()


def get_agent(env: gym.envs.classic_control.CartPoleEnv, agent_debug: bool) -> Agent:
  """
    Returns Agent class.
  """
  return Agent(env=env,
               debug=agent_debug,
               checkpoint_path=str(hyperparams_dict['checkpoint_path']),
               hidden_layer_size=int(hyperparams_dict['hidden_layer_size']),
               batch_size=int(hyperparams_dict['batch_size']),
               learning_rate=float(hyperparams_dict['learning_rate']),
               learning_rate_decay=float(
                   hyperparams_dict['learning_rate_decay']),
               discount_factor=float(hyperparams_dict['discount_factor']),
               replay_memory_cap=int(hyperparams_dict['replay_memory_cap']))


def main(agent_debug: bool = False, train_model: bool = False) -> None:
  """
    Train on multiple episodes or play a single game.

    observation space (input size) = 4 --> env.observation_space.shape[0]
    action space (output size) = 2 --> env.action_space.n
  """
  my_env = gym.make('CartPole-v0')
  my_train_agent = get_agent(env=my_env, agent_debug=False)
  my_target_agent = get_agent(env=my_env, agent_debug=False)

  if os.path.exists(str(hyperparams_dict['checkpoint_path'])):
    my_train_agent.load_weights(name="training agent")
    my_target_agent.load_weights(name="target agent")

  if train_model:
    print_all_hyperparams()
    train_agent(env=my_env, train_agent=my_train_agent, target_agent=my_target_agent,
                progress_print_per_iter=int(
                    hyperparams_dict['progress_per_iteration']),
                total_episodes=int(hyperparams_dict['total_episodes']),
                episode_epsilon=float(hyperparams_dict['epsilon']),
                min_epsilon=float(hyperparams_dict['min_epsilon']),
                max_epsilon_episodes=int(
                    hyperparams_dict['max_epsilon_episodes']),
                epsilon_decay=float(hyperparams_dict['epsilon_decay']),
                copy_max_count=int(hyperparams_dict['copy_max_step']),
                saved_results_path=str(hyperparams_dict['saved_results_path']),
                saved_results_name=str(hyperparams_dict['saved_results_name']),
                hyperparams_dict=hyperparams_dict)
  else:
    play_game(env=my_env, agent=my_train_agent, epsilon=0.00)


if __name__ == "__main__":
  gym.logger.set_level(40)
  # main(train_model=True)
  main()
