# Imports

In [None]:
import os
import numpy as np
import gym
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim

# Define some global variables

In [None]:
# maximum number of training episodes
NUM_EPISODES = 90
# maximum number of steps per episode
# CartPole-V0 has a maximum of 200 steps per episodes
MAX_EP_STEPS = 200
# reward discount factor
GAMMA = .6
# once MAX_EPISODES or ctrl-c is pressed, number of test episodes to run
NUM_TEST_EPISODES = 3
# batch size used for the training
BATCH_SIZE = 1000
# maximum number of transitions stored in the replay buffer
MAX_REPLAY_BUFFER_SIZE = BATCH_SIZE * 10
# reward that is returned when the episode terminates early (i.e. the controller fails)
FAILURE_REWARD = -10.

# setting the random seed makes things reproducible
random_seed = 2
np.random.seed(random_seed)

# Define the Actor class

In [None]:
class Actor(nn.Module):

  def __init__(self, state_dim, action_dim):
    """An actor for Actor-Critic reinforcement learning.

    This actor represent a stochastic policy. It predicts a distribution over
    actions condition on a given state. The distribution can then be sampled to
    produce an single control action.

    Arguments:
        state_dim: an integer, number of states of the system
        action_dim: an integer, number of possible actions of the system
    Returns:
        nothing
    """
    super().__init__()
    self.state_dim = state_dim
    self.action_dim = action_dim

    # the neural network (input will be state, output is unscaled probability distribution)
    # note: the neural network must be entirely linear to support verification
    self.nn = nn.Sequential(
        nn.Linear(state_dim, 128),
        nn.ReLU(),
        nn.Linear(128, action_dim)
    )

  def forward(self, state, action, td_error):
    """Runs the forward pass and gets the expected value

    Arguments:
        state: a tensor representing a batch of states (batch_size X
        state_dim)
        action: a tensor of integers representing a batch of actions
        (batch_size X 1)
        where the integers correspond to the action number (0 indexed)
        td_error: a tensor of floats (batch_size X 1) the temporal
        differences
    Returns:
        expected_v: a tensor of the expected reward for each of the
        samples in the batch (batch_size X 1)
    """

    action_logits = self.nn(state)
    action_probs = nn.Softmax()(action_logits)
    clipped_action_probs = torch.clip(action_probs, 1e-14, 1.0)
    action_oh = torch.nn.functional.one_hot(action[:, 0].long())
    log_action_probs = (torch.log(clipped_action_probs) * action_oh).sum(axis=1, keepdims=True)
    expected_v = log_action_probs * td_error
    return expected_v

  def get_action(self, state):
    """Get an action for a given state by predicting a probability distribution over actions and sampling one.

    Arguments:
        state: a tensor of size (state_dim) representing the current
        state
    Returns:
        action: an integer (0 indexed) corresponding to the action taken
        by the actor
    """
    logits = self.nn(state.unsqueeze(0))
    action_probs = nn.Softmax()(logits)
    action = np.random.choice(self.action_dim, p=action_probs.detach().numpy()[0, :])
    return action

# Define the Critic class

In [None]:
class Critic(nn.Module):

  def __init__(self, state_dim):
    """A critic for Actor-Critic reinforcement learning.

    This critic works by estimating a value function (expected reward-to-go) for
    given states. It is trained using Temporal Difference error learning (TD
    error).

    Arguments:
        state_dim: an interger, number of states of the system
    Returns:
        nothing
    """
    super().__init__()
    self.state_dim = state_dim

    ######### Your code starts here #########
    # hint: look at the implementation of the actor, the TD error and
    # the loss functions described in the writeup.

    ######### Your code ends here #########

  def forward(self, state, reward, state_next):
    """Runs the training step

    Arguments:
        state: a tensor representing a batch of initial states
        (batch_size X state_dim)
        reward: a tensor representing a batch of rewards (batch_size X
        1)
        state_next: a tensor representing a batch of 'future states'
        (batch_size X state_dim)
        each sample (state, reward, state_next) correspond to a given
        transition
    Returns:
        td_error: the td errors of the batch, as a numpy array
        (batch_size X 1)
    """
    ######### Your code starts here #########


    ######### Your code ends here #########
    return td_error

# Inference function

In [None]:
def run_actor(env, actor, num_episodes, render=True):
  """Runs the actor on the environment for num_episodes

  Arguments:
      env: the openai gym environment
      actor: an instance of the Actor class
      num_episodes: number of episodes to run the actor for
  Returns:
      nothing
  """
  for i_episode in range(num_episodes):
    state = env.reset()
    state = torch.Tensor(state)
    total_reward = 0.0
    for t in range(MAX_EP_STEPS):
      if render:
        env.render()
      action = actor.get_action(state)
      state, reward, done, info = env.step(action)
      state = torch.Tensor(state)
      total_reward += reward
      if done:
        print("Reward: ", str(total_reward))
        break

# Training Loop

In [None]:
def train_actor_critic():
  # setup the OpenAI gym environment
  env = gym.make('CartPole-v0')
  env.seed(random_seed)
  state_dim = env.observation_space.shape[0]
  action_dim = env.action_space.n

  print("state dim: ", state_dim)
  print("action dim: ", action_dim)

  # create an actor and a critic network and initialize their variables
  actor = Actor(state_dim, action_dim)
  critic = Critic(state_dim)

  actor_optim = optim.Adam(actor.parameters(), lr=3e-4)
  critic_optim = optim.Adam(critic.parameters(), lr=3e-4)

  # the replay buffer will store observed transitions
  replay_buffer = np.zeros((0, 2 * state_dim + 2))

  # allocate memory to keep track of episode rewards
  reward_history = np.zeros(NUM_EPISODES)

  for i_episode in range(NUM_EPISODES):
    # very inneficient way of making sure the buffer isn't too full
    if replay_buffer.shape[0] > MAX_REPLAY_BUFFER_SIZE:
      replay_buffer = replay_buffer[-MAX_REPLAY_BUFFER_SIZE:, :]

    # reset the OpenAI gym environment to a random initial state for each episode
    state = env.reset()
    state = torch.Tensor(state)
    episode_reward = 0.0

    for t in range(MAX_EP_STEPS):
      # uses the actor to get an action at the current state
      action = actor.get_action(state)
      # call gym to get the next state and reward, given we are taking action at the current state
      state_next, reward, done, info = env.step(action)
      state_next = torch.Tensor(state_next)

      # done=True means either the cartpole failed OR we've reached the maximum number of episode steps
      if done and t < (MAX_EP_STEPS - 1):
        reward = FAILURE_REWARD
      # accumulate the reward for this whole episode
      episode_reward += reward
      # store the observed transition in our replay buffer for training
      replay_buffer = np.vstack(
          (replay_buffer, np.hstack((state, action, reward, state_next)))
      )

      # if our replay buffer has accumulated enough samples, we start learning the actor and the critic
      if replay_buffer.shape[0] >= BATCH_SIZE:
        # we sample BATCH_SIZE transition from our replay buffer
        samples_i = np.random.choice(
            replay_buffer.shape[0], BATCH_SIZE, replace=False
        )
        state_samples = torch.Tensor(replay_buffer[samples_i, 0:state_dim])
        action_samples = torch.Tensor(replay_buffer[samples_i, state_dim : state_dim + 1])
        reward_samples = torch.Tensor(replay_buffer[samples_i, state_dim + 1 : state_dim + 2])
        state_next_samples = torch.Tensor(replay_buffer[
            samples_i, state_dim + 2 : 2 * state_dim + 2
        ])

        # compute the TD error using the critic
        actor_optim.zero_grad()
        critic_optim.zero_grad()

        td_error = critic(
            state_samples, reward_samples, state_next_samples
        )
        critic_loss = torch.square(td_error).mean()

        critic_loss.backward()
        critic_optim.step()

        # train the actor (we don't need the expected value unless you want to log it)
        expected_v = actor(state_samples, action_samples, td_error.detach())
        actor_loss = -expected_v.mean()
        actor_loss.backward()
        actor_optim.step()

        if done:
          # print how well we did on this episode
          print(episode_reward)
          reward_history[i_episode] = episode_reward

      # update current state for next iteration
      state = state_next

      if done:
        break
    reward_history[i_episode] = episode_reward

  # plot reward history
  plt.figure()
  plt.plot(reward_history)
  plt.xlabel('Number of Episodes')
  plt.ylabel('Episode Reward')
  plt.title('History of Episode Reward')
  if not os.path.exists('../plots'):
    os.makedirs('../plots')
  plt.savefig('../plots/p2_reward_history.png')
  plt.show()
  run_actor(env, actor, NUM_TEST_EPISODES)

  # closes the environement
  env.close()

# Run everything together

In [None]:
train_actor_critic()