In [None]:
import numpy as np
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import namedtuple

In [None]:
CRITIC_LEARNING_RATE = 0.001
ACTOR_LEARNING_RATE = 0.0001
GAMMA = 0.9

In [None]:
class QACAgent:
    def __init__(self, num_states, num_actions):
        self.num_states = num_states
        self.num_actions = num_actions
        self.action_list = np.arange(num_actions)
        
        self.critic_model = self._get_critic_model()
        self.critic_optimizer = torch.optim.Adam(self.critic_model.parameters(), lr=CRITIC_LEARNING_RATE)
        
        self.actor_model = self._get_actor_model()
        self.actor_optimizer = torch.optim.Adam(self.actor_model.parameters(), lr=ACTOR_LEARNING_RATE)

    def get_action(self, state, train=True, episode = None):
        state = torch.FloatTensor(state).view(1, -1)
        policy = self.actor_model(state)
        return np.random.choice(self.action_list, p=policy.detach().numpy().ravel())
    
    def train(self, state, action, next_state, reward, done, episode=None):
        state = torch.FloatTensor(state).view(1, -1)
        action = torch.LongTensor([action]).view(1, -1)
        next_state = torch.FloatTensor(next_state).view(1, -1)
        reward = torch.FloatTensor([reward]).view(1, -1)
        done = torch.ByteTensor([done]).view(1, -1)
        
        state_action_value = self.critic_model(state).gather(1, action)
        next_state_action_value = self.critic_model(next_state).max(1)[0].view(-1, 1)
        critic_target = reward + GAMMA * (1 - done) * next_state_action_value
        
        critic_loss = F.smooth_l1_loss(state_action_value, critic_target)
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
        
        state_action_value = state_action_value.detach()
        
        policy = self.actor_model(state)
        selected_action_prob = policy.gather(1, action)
        log_selected_action_prob = torch.log(selected_action_prob)
        
        actor_loss = -(log_selected_action_prob * state_action_value)
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
        
        return actor_loss.item()
    
    def _get_critic_model(self):
        model = nn.Sequential()
        model.add_module('fc1', nn.Linear(self.num_states, 32))
        model.add_module('relu1', nn.ReLU())
        model.add_module('fc2', nn.Linear(32, 32))
        model.add_module('relu2', nn.ReLU())
        model.add_module('fc3', nn.Linear(32, self.num_actions))
        return model     
    
    def _get_actor_model(self):
        model = nn.Sequential()
        model.add_module('fc1', nn.Linear(self.num_states, 32))
        model.add_module('relu1', nn.ReLU())
        model.add_module('fc2', nn.Linear(32, 32))
        model.add_module('relu2', nn.ReLU())
        model.add_module('fc3', nn.Linear(32, self.num_actions))
        model.add_module('softmax1', nn.Softmax(dim=1))
        return model

In [None]:
env = gym.make('CartPole-v0')

In [None]:
agent = QACAgent(4, 2)

In [None]:
N_EPOCH = 5000

In [None]:
for episode in range(N_EPOCH):
    done = False
    state = env.reset()
    step = 0
    while not done:
        action = agent.get_action(state)
        next_state, reward, done, _ = env.step(action)
        if done:
            if step < 195:
                reward = -1.0
            else:
                reward = 1.0
        else:
            reward = 0.0
            
        agent.train(state, action, next_state, reward, done)
        
        state = next_state
        step += 1
        
    print("Episode {} Step {}".format(episode, step))