In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.distributions as distributions

import matplotlib.pyplot as plt
import numpy as np
import gym

In [6]:
class Generate_actor_critic(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout = 0.5):
        super().__init__()

        self.fully_connected_1 = nn.Linear(input_dim, hidden_dim)
        self.fully_connected_2 = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        x = self.fully_connected_1(x)
        x = self.dropout(x)
        x = F.relu(x)
        x = self.fully_connected_2(x)
        return x

  and should_run_async(code)


In [7]:
class ActorCritic(nn.Module):

  
    def __init__(self, actor, critic):
        super().__init__()
        
        self.actor = actor
        self.critic = critic
        
    def forward(self, state):
        
        action_pred = self.actor(state)
        value_pred = self.critic(state)
        
        return action_pred, value_pred

In [8]:
class Agent():


  def __init__(self, env):
    self.env = env
    self.input_dims  = env.observation_space.shape[0]
    self.output_dims = env.action_space.n
    self.hidden_dims = 128
    self.learning_rate = 3e-4
    self.discount_factor = 0.99
    

    self.actor = Generate_actor_critic(self.input_dims, self.hidden_dims, self.output_dims)
    self.critic = Generate_actor_critic(self.input_dims, self.hidden_dims, 1)

    self.policy = ActorCritic(self.actor, self.critic)
    self.optimizer = optim.Adam(self.policy.parameters(), lr = self.learning_rate)


  
    
  def init_weights(m):
    if type(m) == nn.Linear:
        torch.nn.init.xavier_normal_(m.weight)
        m.bias.data.fill_(0)

  def train_agent(self):
    
    self.policy.train()
    
    log_probability_actions = []
    values = []
    rewards = []
    done = False
    reward_per_episode = 0

    state = self.env.reset()
    while not done:

        state = torch.FloatTensor(state).unsqueeze(0)

        action_prediction = self.actor(state)
        value_prediction = self.critic(state)
                
        action_probability = F.softmax(action_prediction, dim = -1)
                
        dist = distributions.Categorical(action_probability)

        action = dist.sample()
        
        log_prob_action = dist.log_prob(action)
        
        state, reward, done, _ = self.env.step(action.item())

        log_probability_actions.append(log_prob_action)
        values.append(value_prediction)
        rewards.append(reward)

        reward_per_episode += reward
    
    log_probability_actions = torch.cat(log_probability_actions)
    values = torch.cat(values).squeeze(-1)
    
    returns = self.calculate_returns(rewards)
    advantages = self.calculate_advantages(returns, values)
    
    policy_loss, value_loss = self.update_policy(advantages, log_probability_actions, returns, values)

    return policy_loss, value_loss, reward_per_episode







  def calculate_returns(self,rewards,normalize = True):
    
        returns = []
        R = 0
    
        for r in reversed(rewards):
            R = r + R * self.discount_factor
            returns.insert(0, R)
        
        returns = torch.tensor(returns)
    
        if normalize:
        
            returns = (returns - returns.mean()) / returns.std()
        
        return returns










  def calculate_advantages(self,returns, values, normalize = True):
    
      advantages = returns - values
    
      if normalize:
        
          advantages = (advantages - advantages.mean()) / advantages.std()
        
      return advantages







  def update_policy(self,advantages, log_prob_actions, returns, values):
        
        advantages = advantages.detach()
        returns = returns.detach()
        
        policy_loss = - (advantages * log_prob_actions).sum()
    
        value_loss = F.smooth_l1_loss(returns, values).sum()
        
        self.optimizer.zero_grad()
    
        policy_loss.backward()
        value_loss.backward()
    
        self.optimizer.step()
    
        return policy_loss.item(), value_loss.item()





  def evaluate_agent(self):
    
      self.policy.eval()
    
      rewards = []
      done = False
      episode_reward = 0

      state = self.env.reset()

      while not done:

          state = torch.FloatTensor(state).unsqueeze(0)

          with torch.no_grad():
        
              action_pred, _ = self.policy(state)

              action_prob = F.softmax(action_pred, dim = -1)
                
          action = torch.argmax(action_prob, dim = -1)
                
          state, reward, done, _ = self.env.step(action.item())

          episode_reward += reward
        
      return episode_reward

In [10]:
max_training_episodes = 2
number_of_trials = 100
rewards_threshold = 475
print_every = 100

train_rewards = []
test_rewards = []

train_env = gym.make('CartPole-v1')
test_env = gym.make('CartPole-v1')

agent = Agent(train_env)

for episode in range(1, max_training_episodes+1):
    
    policy_loss, value_loss, train_reward = agent.train_agent()
    
    test_reward = agent.evaluate_agent()
    
    train_rewards.append(train_reward)
    test_rewards.append(test_reward)
    
    mean_train_rewards = np.mean(train_rewards[-number_of_trials:])
    mean_test_rewards = np.mean(test_rewards[-number_of_trials:])
    
    if episode % print_every == 0:
    
        print(f'| Episode: {episode:3} | Mean Train Rewards: {mean_train_rewards:5.1f} | Mean Test Rewards: {mean_test_rewards:5.1f} |')
    
    if mean_test_rewards >= rewards_threshold:
        
        print(f'Reached reward threshold in {episode} episodes')
        
        break