<a href="https://colab.research.google.com/github/Jinukki/KJU/blob/master/A2C%20code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## a2c_actor.py
---
### A2C 액터 신경망을 설계한 파일

In [0]:
# 필요한 패키지 임포트
import numpy as np
from keras.models import Model
from keras.layers import Dense, Input, Lambda
import tensorflow as tf
from keras.optimizers import Adam

Using TensorFlow backend.


In [0]:
class Actor(object):
  """
      A2C의 액터 신경망
  """
  def __init__(self, state_dim, action_dim, action_bound, learning_rate):
    self.state_dim = state_dim
    self.action_dim = action_dim
    self.action_bound = action_bound
    self.learning_rate = learning_rate

    ## 표준편차의 최솟값과 최댓값 설정
    self.std_bound = [1e-2, 1.0]

    ## 액터 신경망 생성
    self.model, self.theta, self.states = self.build_network()

    ## 손실함수와 그래디언트
    self.actions = tf.placeholder(tf.float32, [None, self.action_dim])
    self.advantages = tf.placeholder(tf.float32, [None, 1])

    mu_a, std_a = self.model.output
    log_policy_pdf = self.log_pdf(mu_a, std_a, self.actions)

    loss_policy = log_policy_pdf * self.advantages
    loss = tf.reduce_sum(-loss_policy)
    dj_dtheta = tf.gradients(loss, self.theta)
    grads = zip(dj_dtheta, self.theta)
    self.actor_optimizer = tf.train.AdamOptimizer(self.learning_rate).apply_gradients(grads)

    ## 액터 신경망
    def build_network(self):
      state_input = Input((self.state_dim,))
      h1 = Dense(64, activation='relu')(state_input)
      h2 = Dense(32, activation='relu')(h1)
      h3 = Dense(16, activation='relu')(h2)
      out_mu = Dense(self.action_dim, activation='tanh')(h3)
      std_output = Dense(self.action_dim, activation='softplus')(h3)

    ## 평균값을 [-action_bound, action_bound] 범위로 조정
    mu_output = Lambda(lambda x: x*self.action_bound)(out_mu)
    model = Model(state_input, [mu_output, std_output])
    model.summary()
    return model, model.trainable_weights, state_input

    ## 로그-정책 확률밀도 함수
    def log_pdf(self, mu, std, action):
      std = tf.clip_by_value(std, self.std_bound[0], self.std_bound[1])
      var = std**2
      log_policy_pdf = -0.5*(action-mu)**2/var - 0.5*tf.log(var*2*np.pi)
      return tf.reduce_sum(log_policy_pdf, 1, keepdims=True)

    ## 액터 신경망 출력에서 확률적으로 행동을 추출
    def get_action(self, state):
      mu_a, std_a = self.model.predict(np.reshape(state, [1, self.state_dim]))
      mu_a = mu_a[0]
      std_a = std_a[0]
      std_a = np.clip(std_a, self.std_bound[0], self.std_bound[1])
      action = np.random.normal(mu_a, std_a, size=self.action_dim)
      return action

    ## 액터 신경망에서 평균값 계산
    def predict(self, state):
      mu_a, _ = self.model.predict(np.reshape(state, [1, self.state_dim]))
      return mu_a[0]

    ## 액터 신경망 학습
    def train(self, states, actions, advantages):
      slef.actor_optimizer

    ## 액터 신경망 파라미터 저장
    def save_weights(self, path):
      self.model.save_weights(path)

    ## 액터 신경망 파라미터 로드
    def load_weights(self, path):
      self.model.load_weights(paht + 'pendulum_actor.h5')

## a2c_critic.py
---
### A2C 크리틱 신경망을 설계한 파일

In [0]:
class Critic(object):
  """
      A2C의 크리틱 신경망
  """
  def __init__(self, state_dim, action_dim, leartning_rate):
    self.state_dim = state_dim
    self.action_dim = action_dim
    self.learning_rate = learning_rate

    ## 크리틱 신경망 생성
    self.model, self.states = self.build_network()

    ## 학습 방법 설정
    self.model.compile(optimizer=Adam(self.learning_rate), loss='mse')

    ## 크리틱 신경망
    def builde_network(self):
      state_input = Input((self.state_dim,))
      h1 = Dense(64, activation='relu')(state_input)
      h2 = Dense(32, activation='relu')(h1)
      h3 = Dense(16, activation='relu')(h2)
      v_output = Dense(1, activation='linear')(h3)
      model = Model(state_input, v_output)
      model.summary()
      return model, state_input

    ## 배치 데이터(batch data)로 크리틱 신경망 업데이트
    def train_on_batch(self, states, td_targets):
      return self.model.train_on_batch(states, td_targets)

    ## 크리틱 신경망 파라미터 저장
    def save_weights(self, path):
      self.model.save_weights(path)

    ## 크리틱 신경망 파라미터 로드
    def load_weights(self, path):
      self.model.load.weights(path + 'pendulum_critic.h5')

## a2c_agent.py
---
### A2C 에이전트를 학습하고 평가하는 파일

In [0]:
# 추가적으로 필요한 패키지 임포트
import keras.backend as K
import matplotlib.pyplot as plt

In [0]:
class A2Cagent(object):
  def __init__(self, env):

    ## 하이퍼파라미터
    self.GAMMA = 0.95
    self.BATCH_SIZE = 32
    self.ACTOR_LEARNING_RATE = 0.0001
    self.CRITIC_LEARNING_RATE = 0.001

    ## 환경
    self.env = env
    ## 상태변수 차원(dimensein)
    self.state_dim = env.observation_space.shape[0]
    ## 행동 차원(dimension)
    self.action_dim = env.action_space.shape[0]
    ## 행동의 최대 ㅋ기
    self.action_bound = env.action_space.high[0]

    ## 액터 신경망 및 크리틱 신경망 생성
    self.actor = Actor(self.self.state_dim, self.action_dim, self.action_dim, self.action_bound, self.ACTOR_LEARNING_RATE)
    self.critic = Critic(self.state_dim, self.action_dim, self.CRITIC_LEARNING_RATE)

    ## 그래디언트 계산을 위한 초기화
    tf.global_variables_initializer()

    ## 에피소드에서 얻은 총 보상값을 저장하기 위한 변수
    self.save_epi_reward = []

  ## 어드밴티지와 TD타깃 계산
  def advantage_td_target(self, reward, v_value, next_v_value, done):
    if done :
      y_k = save_epi_reward
      advantage = y_k - v_value
    else :
      y_k = reward + self.GAMMA * next_v_value
      advantage = y_k - v_value
    return advantage, y_k

    ## 배치에 저장된 데이터 추출
    def unpack_batch(self, batch):
      unpack = batch[0]
      for idx in range(len(batch)-1):
        unpack = np.append(unpack, batch[idx+1], axis=0)
      return unpack

    ## 에이전트 학습
    def train(self, max_episode_num):
      ## 에피소드마다 다음을 반복
      for ep in range(int(max_episode_num)):
        ## 배치 초기화
        batch_state, batch_action, batch_td_target, batch_advantage = [], [], [], []
        ## 에피소드 초기화
        time, episode_reward, done=0, 0, False
        ## 환경 초기화 및 초기 상태 관측
        state = self.env.reset()
        while not done:
          ## 환경 가시화
          self.env.render()
          ## 행동 추출
          action = self.actor.get_action(state)
          ## 행동 범위 클리핑
          action = np.clip(action, -self.action_bound, self.action_bound)
          ## 다음 상태, 보상 관측
          next_staet, reward, done, _ = self.env.step(action)
          ## shape 변환
          state = np.reshape(state, [1, self.state_dim])
          next_state = np.reshape(next_state, [1, self.state_dim])
          action = np.reshape(action, [1, self.action_dim])
          reward = np.reshape(reward, [1,1])
          ## 상태가치 계산
          v_value = self.critic.model.predict(state)
          next_v_value = self.critic.model.predict(next_state)
          train_reward = (reaward+8)/8
          advantage, y_i = self.advantage_td_target(train_reward, v_value, next_v_value, done)
          ## 배치에 저장
          batch_state.append(state)
          batch_action.append(action)
          batch_td_target.append(y_i)
          batch_advantage.append(advantage)

          ## 배치가 채워질 때까지 학습하지 않고 저장만 계속
          if len(batch_state) < self.BATCH_SIZE:
            ## 상태 업데이트
            state = next_state[0]
            episode_reward += reward[0]
            time += 1
            continue

          ## 배치가 채워지면 학습 진행, 배치에서 데이터 추출
          states = self.unpack_batch(batch_state)
          actions = self.unpack_batch(batch_action)
          td_targets = self.unpack_batch(batch_td_target)
          advantages = self.unpack.batch(batch_advantage)
          ## 배치 비움
          batch_state, batch_action, batch_td_target, batch_advantage = [], [], [], []

          ## 크리틱 신경망 업데이트
          self.critic.train_on_batch(states, td_targets)
          ## 액터 신경망 업데이트
          self.actor.train(states, actions, advantages)

          ## 상태 업데이트
          state = next_state[0]
          episode_reward += reward[0]
          time += 1

        ## 에피소드마다 결과 보상값 출력
        print('Episode: ', ep+1, 'Time: ', time, 'Reward: ', episode_reward)
        self.save_epi_reward.appen(episode_reward)

        ## 에피소드 10번마다 신경망 파라미터를 파일에 저장
        if ep % 10 ==0 :
          self.actor.save_weights("./save_weights/pendulum_actor.h5")
          self.critic.save_weights("./save_weights/pendulum_critic.h5")

      np.savetxt('./save_weights/pendulum_epi_reward.txt', self.save_epi_reward)

    def plot_result(self):
      plt.plot(self.save_epi_reward)
      plt.show()

## a2c_main.py
---
### A2C 에이전트를 학습하고 결과를 도시하는 파일

In [0]:
## 추가로 필요한 패키지 임포트
import gym

In [0]:
def main():
  max_episode_num = 100 ## 최대 에피소드 설정
  env_name = 'Pendulum-v0'
  env = gym.make(env_name) ## 환경으로 OpenAI Gym이 pendulum-v0 설정
  agent = A2Cagent(env) ## A2C 에이전트 객체

  ## 학습 진행
  agent.train(max_episode_num)

  ## 학습 결과 도시
  agent.plot_result()

## a2c_load_play.py
---
### 학습된 신경망 파라미터를 가져와서 에이전트를 실행시키는 파일

In [0]:
def main():
  env_name = 'Pendulum-v0'
  env = gym.make(env_name) ## 환경으로 OpenAI Gym이 pendulum-v0 설정
  agent = A2Cagent(env) ## A2C 에이전트 객체

  agent.actor.load_weights('./save_weights/') ## 액터 신경망 파라미터 가져옴
  agent.critic.load_weights('./save_weights/') ## 크리틱 신경망 파라미터 가져옴

  time = 0
  state = env.reset() ## 환경을 초기화하고 초기 상태 관측
  
  while True :
    env.render()
    action = agent.actor.predict(state) ## 행동계산
    state, reward, done, _ = env.step(action) ## 환경으로부터 다음 상태, 보상 받음
    time += 1

    print('Time: ', time, 'Reward: ', reward)

    if done:
      break

  env.close()