In [None]:
import sys, os
if 'google.colab' in sys.modules and not os.path.exists('.setup_complete'):
    !wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/spring20/setup_colab.sh -O- | bash

    !wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/coursera/grading.py -O ../grading.py
    !wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/coursera/week5_policy_based/submit.py

    !touch .setup_complete

# This code creates a virtual display to draw game images on.
# It will have no effect if your machine has a monitor.
if type(os.environ.get("DISPLAY")) is not str or len(os.environ.get("DISPLAY")) == 0:
    !bash ../xvfb start
    os.environ['DISPLAY'] = ':1'

Neural Network as a policy


In [None]:
import sys
import torch  
import gym
import numpy as np  
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
import matplotlib.pyplot as plt

# Constants
GAMMA = 0.9999

class PolicyNetwork(nn.Module):
    def __init__(self, num_inputs, num_actions, hidden_size, learning_rate=3e-6):
        super(PolicyNetwork, self).__init__()

        self.num_actions = num_actions
        self.linear1 = nn.Linear(num_inputs, hidden_size)
        self.linear3 = nn.Linear(hidden_size, hidden_size*2)
        self.linear4 = nn.Linear(hidden_size*2, hidden_size*4)
        self.linear5 = nn.Linear(hidden_size*4, hidden_size*2)
        self.linear2 = nn.Linear(hidden_size*2, num_actions)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

    def forward(self, state):
        x = F.tanh(self.linear5(F.tanh(self.linear4(F.tanh(self.linear3(F.tanh(self.linear1(state))))))))
        x = F.tanh(self.linear2(x))
        return x 
    
    def get_action(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.forward(Variable(state))
        #highest_prob_action = np.random.choice(self.num_actions, p=np.squeeze(probs.detach().numpy()))
        #log_prob = torch.log(probs.squeeze(0)[highest_prob_action])
        rands = np.random.normal(size = probs.shape[0])
        highest_prob_action = torch.tensor(rands, dtype = torch.float32) * torch.sqrt(probs[:,1]**2) +  probs[:,0]
        log_prob = torch.log(torch.exp(-((highest_prob_action-probs[:,0])**2)/(2*probs[:,1]**2))*torch.sqrt(2*np.pi*probs[:,1]**2))
        return highest_prob_action, log_prob

In [None]:
def update_policy(policy_network, rewards, log_probs):
    discounted_rewards = []
    l = []
    for t in range(len(rewards)):
        Gt = 0 
        pw = 0
        for r in rewards[t:]:
            Gt = Gt + GAMMA**pw * r
            pw = pw + 1
        discounted_rewards.append(Gt)
        
    discounted_rewards = torch.tensor(discounted_rewards)
    discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-9) # normalize discounted rewards

    policy_gradient = []
    for log_prob, Gt in zip(log_probs, discounted_rewards):
      policy_gradient.append(-log_prob * Gt)
    
    policy_network.optimizer.zero_grad()
    policy_gradient = torch.stack(policy_gradient).sum()
    policy_gradient.backward()
    policy_network.optimizer.step()
    l.append(policy_gradient)
    return l

In [None]:
env = gym.make("MountainCarContinuous-v0")
policy_net = PolicyNetwork(env.observation_space.shape[0], 2, 128)
    
max_episode_num = 5000
max_steps = 10000
numsteps = []
avg_numsteps = []
all_rewards = []
l = []
for episode in range(max_episode_num):
  state = env.reset()
  log_probs = []
  rewards = []

  for steps in range(max_steps):
    #plt.imshow(env.render("rgb_array"))
    action, log_prob = policy_net.get_action(state)
  
    new_state, reward, done, _ = env.step(action.detach().numpy())
    log_probs.append(log_prob)
    rewards.append(reward)

    if steps == max_steps-1 :
      loss = update_policy(policy_net, rewards, log_probs)
      l.append(loss)
      numsteps.append(steps)
      avg_numsteps.append(np.mean(numsteps[-10:]))
      all_rewards.append(np.sum(rewards))
      if episode % 1 == 0:
        sys.stdout.write("episode: {}, total reward: {}, average_reward: {}, length: {}\n".format(episode, np.round(np.sum(rewards), decimals = 3),  np.round(np.mean(all_rewards[-10:]), decimals = 3), steps))
        print(loss)
        break
            
    state = new_state
        
plt.plot(numsteps)
plt.plot(avg_numsteps)
plt.xlabel('Episode')
plt.show()