In [1]:
! pip install gym



In [73]:
import gym
import torch
import matplotlib.pyplot as plt
import torch.nn as nn 
import numpy as np

from torch.distributions.categorical import Categorical

In [3]:
env = gym.make("CartPole-v1")
s_dim = env.observation_space.shape[0]
a_dim = env.action_space.n

In [4]:
#MLP
class MultiLayerPerceptron(nn.Module):
  def __init__(self, 
               input_dim:int, output_dim:int,
               hidden_act:str, out_act: str):

    super().__init__()
    self.input_dim = input_dim
    self.output_dim = output_dim
    self.hidden_act = getattr(nn, hidden_act)()
    self.out_act = getattr(nn, out_act)()

    self.layers = nn.ModuleList()
    self.layers.append(nn.Linear(self.input_dim, 16))
    self.layers.append(self.hidden_act)
    self.layers.append(nn.Linear(16, 32))
    self.layers.append(self.hidden_act)
    self.layers.append(nn.Linear(32, 64))
    self.layers.append(self.hidden_act)
    self.layers.append(nn.Linear(64, self.output_dim))
    self.layers.append(self.out_act)

  def forward(self, xs):
    for layer in self.layers:
      xs = layer(xs)
    return xs

In [70]:
class TDActorCritic(nn.Module):
  def __init__(self, 
               policy_net: nn.Module, 
               value_net: nn.Module,
               gamma: float=1.0,
               lr: float=0.0002):
    super(TDActorCritic, self).__init__()
    self.policy_net = policy_net
    self.value_net = value_net
    self.gamma = gamma
    self.lr = lr

    total_params = list(policy_net.parameters()) + list(value_net.parameters())
    self.opt = torch.optim.Adam(params=total_params, lr=lr)
    
    self._eps = 1e-25
    self._mse = nn.MSELoss()
  
  def get_action(self, state):
    with torch.no_grad():
      logits = self.policy_net(state)
      dist = Categorical(logits=logits) #softmax
      action = dist.sample() #sample action from softmax
      return action

    """
    A2C는 return을 사용하지 않고 TD기법으로 추산한 
    Adavantage를 사용하므로 에피소드 단위가 아닌 
    에피소드 내의 샘플 단위로 업데이트가 가능
    """  

  def update_sample(self, state, action, reward, next_state, done):
    with torch.no_grad():
      td_target = reward + self.gamma*self.value_net(next_state)*(1-done) 
      td_err = td_target - self.value_net(state)
    
    #확률 계산
    logits = self.policy_net(state)[0]
    dist = Categorical(logits=logits)
    prob = dist.probs[action]

    #현재 상태 가치 계산
    v = self.value_net(state) 

    loss = -torch.log(prob+self._eps)*td_err + self._mse(v, td_target)
    loss = loss.mean()
    
    self.opt.zero_grad()
    loss.backward()
    self.opt.step()

In [71]:
#에이전트의 성능 평가를 위한 이동평균 계산기
class EMA:
  def __init__(self, alpha:float = 0.5):
    self.s = None
    self.alpha = alpha

  def update(self, y):
    if self.s is None:
      self.s = y
    else:
      self.s = self.alpha*y + (1-self.alpha)*self.s

In [72]:
#텐서 변환
def to_tensor(data, size):
  return torch.tensor(data).float().view(size)

In [76]:
policy_net = MultiLayerPerceptron(input_dim=s_dim,
                                  output_dim=a_dim,
                                  hidden_act="ReLU",
                                  out_act="Identity")

value_net = MultiLayerPerceptron(input_dim=s_dim,
                                 output_dim=a_dim,
                                 hidden_act="ReLU",
                                 out_act="Identity")

agent = TDActorCritic(policy_net=policy_net,
                      value_net=value_net)

In [77]:
ema = EMA()

In [78]:
n_eps = 2000
print_every = 500

for ep in range(n_eps):
  s = env.reset()
  cum_r = 0

  while True:
    s = to_tensor(s, size=(1, 4))
    a = agent.get_action(s)
    ns, r, done, info = env.step(a.item())

    ns = to_tensor(ns, size=(1, 4))
    agent.update_sample(s, a.view(-1, 1), r, ns, done)
    
    s = ns.numpy()
    cum_r += r
    if done:
      break
    
  ema.update(cum_r)
  if ep % print_every == 0:
    print("Episode {} || EMA: {}".format(ep, ema.s))

Episode 0 || EMA: 15.0
Episode 500 || EMA: 111.59106461767445
Episode 1000 || EMA: 39.49019248746902
Episode 1500 || EMA: 47.86723963381681
