In [1]:
import gym
import numpy as np
import matplotlib.pyplot as plt
from gym.envs.registration import register
import random

In [2]:
env = gym.make('CartPole-v1')
env = env.unwrapped

In [3]:
from collections import deque
import random 

class ReplayMemory(object):
    def __init__(self, capacity):
        self.deque = deque(maxlen=capacity)

    def append(self, state, action, reward, next_state, done):
        if np.ndim(state) and np.ndim(next_state) == 1:
            state = np.expand_dims(state, 0)
            next_state = np.expand_dims(next_state, 0)
            
        self.deque.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.deque, batch_size))
        return np.concatenate(state), action, reward, np.concatenate(next_state), done
    
    def pop_episode(self):
        state, action, reward, next_state, done = zip(*[self.deque.popleft() for _ in range(len(self))])
        return np.concatenate(state), action, reward, np.concatenate(next_state), done
    
    def reset(self):
        [self.deque.pop() for _ in range(len(self))]
        return
        
    def __len__(self):
        return len(self.deque)

# Policy Gradient

In [4]:
import numpy as np
import torch
import torch.nn as nn

In [5]:
class ActorCritic(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(ActorCritic, self).__init__()     
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        
        self.feature_stream = nn.Sequential(
            nn.Linear(input_dim, hidden_dim), nn.ReLU(),
        )
        
        self.actor_lin = nn.Linear(hidden_dim, output_dim)
        self.critic_lin = nn.Linear(hidden_dim, 1)
        
    def to_tensor(self, x):
        return torch.tensor(x, dtype=torch.float)
    
    def forward(self, state):
        if type(state) != torch.tensor:
            state = self.to_tensor(state)
        
        if state.dim() == 1:
            state.unsqueeze_(0)
        
        feature = self.feature_stream(state)
        
        action_prob = self.actor_lin(feature)
        action_prob = torch.softmax(action_prob, dim=-1)
        value_function = self.critic_lin(feature).squeeze(1) # dim=1
        return action_prob, value_function

In [9]:
class Fitter():
    def __init__(self, actor_critic):
        self.actor_critic = actor_critic
        self.gamma = 0.99
        self.lr = 0.003
        self.optim = torch.optim.Adam(self.actor_critic.parameters(), lr=self.lr)
        self.replay_memory = ReplayMemory(capacity=100000)
        
    def run_episode(self, n_episode):
        r_sum_ls = []
        
        for i in range(n_episode):
            # reset episode
            s = env.reset()
            r_sum = 0

            done = False
            while not done:
                a_prob, adv = self.actor_critic(s) # pi(a|s)
                a_dist = torch.distributions.Categorical(a_prob) # pi(a|s)
                a = a_dist.sample().item() # a
                s_new, r, done, _ = env.step(a)
                r_sum += r
                
                if r_sum > 10000:
                    print('Done, No more training!')
                    return
                    
                self.replay_memory.append(s, a, r, s_new, done)
                s = s_new
                
            print(r_sum, end='\r')   
                
            loss = self.compute_loss()
            self.train(loss)
            
            
            r_sum_ls.append(r_sum)
            if i % 100 == 0:
                print('Reward Sum = %.02f'%(np.mean(r_sum_ls)))
                r_sum_ls = []
        return
    
    def compute_loss(self):
        # get mini-batch from replay-memory
        S, A, R, S_next, D = self.replay_memory.pop_episode()
        A = torch.tensor(A, dtype=torch.long)
        R = torch.tensor(R, dtype=torch.float)
        
        # compute loss and gradient descent
        a_prob, v = actor_critic(S) # pi(a_t|s_t)z
        a_dist = torch.distributions.Categorical(a_prob) # pi(a_t|s_t)
        log_a_prob = a_dist.log_prob(A) # A : true actions
        _, v_next = actor_critic(S_next)
        
        TD_error = R + self.gamma*v_next - v
        actor_loss = torch.mean(-log_a_prob*TD_error.data)
        critic_loss = torch.mean(TD_error*TD_error.data)
        loss = actor_loss + critic_loss
        return loss
        
    def train(self, loss):
        self.optim.zero_grad()
        loss.backward()
        self.optim.step()
        return

In [None]:
input_dim = 4
hidden_dim = 128
out_dim = env.action_space.n

actor_critic = ActorCritic(input_dim, hidden_dim, out_dim)
fitter = Fitter(actor_critic)

n_episode = 100000
fitter.run_episode(n_episode)

Reward Sum = 13.00
Reward Sum = 30.75
Reward Sum = 58.14
Reward Sum = 42.29
Reward Sum = 45.02
Reward Sum = 81.19
Reward Sum = 141.73
Reward Sum = 154.94
Reward Sum = 153.86
Reward Sum = 210.13
Reward Sum = 253.67
Reward Sum = 309.67
Reward Sum = 293.90
Reward Sum = 288.18
Reward Sum = 291.85
456.00

# Test

In [8]:
r_sum = 0
s = env.reset()

done = False
while not done:
    a_prob, adv = actor_critic(s) # pi(a|s)
    a_dist = torch.distributions.Categorical(a_prob) # pi(a|s)
    a = a_dist.sample().item() # a
    s_new, r, done, _ = env.step(a)
    r_sum += r

    if r_sum > 10000:
        print('Done, No more training!')
        break
        
print(r_sum)

15.0
