<a href="https://colab.research.google.com/github/Soy0ungPark/study_mldl/blob/master/18_reinforcement_learning_test1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Q-learning 에이전트 구현

In [1]:
import numpy as np
import pandas as pd
import random
from collections import defaultdict

class QLearning:
  def __init__(self, actions):
    self.ACTIONS = np.array(actions)
    self.step_size = 0.01
    self.discount_factor = 0.9
    self.epsilon = 0.1
    self.q_table = defaultdict(lambda:[0.0 for _ in range(len(actions))])

  def get_action(self, state):      # 행동 가져오기
    if np.random.rand() < self.epsilon:
        action = np.random.choice(self.ACTIONS)
    else:
        q_list = self.q_table[str(state)]
        action = self.ACTIONS[np.random.choice(np.argwhere(q_list == np.amax(q_list)).flatten().tolist())]
    return action

  def save(self, root):
    df = pd.DataFrame([[state, self.q_table[state]] for state in self.q_table.keys()], columns=['states', 'q_list'])
    df.to_csv(root)

  def load(self, root):
    def str_to_list(s):
      s = s.split('[')
      s = s[1].split(']')
      s = s[0].split(', ')
      for i in range(len(s)):
        s[i] = float(s[i])
      return s

    self.q_table = defaultdict(lambda:[0.0 for _ in range(len(self.Actions))])
    df = pd.read_csv(root)
    for idx in df.index:
      self.q_table[df['states'][idx]] = str_to_list(df['q_list'][idx])

  def learn(self, state, action, reward, next_state):

    state, next_state = str(state), str(next_state)

    current_q = self.q_table[state][np.argwhere(self.ACTIONS==action)[0][0]]
    next_state_q =  max(self.q_table[next_state])  # 큐러닝의 큐함수 업데이트 식: S A R S' 샘플을 받아 상태 S'에서의 최대 큐함수를 구하고 업데이트

    td = reward + self.discount_factor * next_state_q - current_q
    new_q = current_q + self.step_size * td
    self.q_table[state][np.argwhere(self.ACTIONS==action)[0][0]] = new_q

DQN 구현


In [2]:
import random
import numpy as np
from collections import deque
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import RandomUniform
import tensorflow as tf

class NN(tf.keras.Model):
  def __init__(self, action_size):
    super(NN, self).__init__()
    self.fc1 = Dense(24, activation='relu')
    self.fc2 = Dense(24, activation='relu')
    self.fc_out = Dense(action_size, kernel_initializer=RandomUniform(-1e-3, 1e-3))

  def call(self, x):
    x = self.fc1(x)
    x = self.fc2(x)
    q = self.fc_out(x)
    return q

class DQN:
  def __init__(self, state_size, aciton_size):
    self.state_size = state_size
    self.action_size = aciton_size

    self.discount_factor = 0.99
    self.learning_rate = 0.001
    self.epsilon = 1.0
    self.epsilon_decay = 0.999
    self.epsilon_min = 0.001
    self.batch_size = 64
    self.train_start = 1000

    self.memory = deque(maxlen=2000)  # queue(큐) 구조를 사용해 FIFO 선입선출

    self.model = DQN(self.action_size)
    self.target_model = DQN(self.action_size)
    self.optimizer = Adam(learning_rate=self.learning_rate)

    self.update_target_model()

  def update_target_model(self):
    self.target_model.set_weights(self.model.get_weights())

  def get_action(self, state):
    if np.random.rand() <= self.epsilon:
      return random.randrange(self.action_size)
    else:
      q = self.model(state) #리스트 형태로 반환됨
      return np.argmax(q[0])

  def append_sample(self, state, action, reward, next_state, done):
    self.memory.append((state, action, reward, next_state, done))  # DQN -> S, A, R, S' 원소 이용

  def train_model(self):
    if self.epsilon > self.epsilon_min:
      self.epsilon *= self.epsilon_decay

    mini_batch = random.sample(self.memory, self.batch_size)
    states = np.array([sample[0][0] for sample in mini_batch])
    actions = np.array([sample[1] for sample in mini_batch])
    rewards = np.array([sample[2] for sample in mini_batch])
    next_states = np.array([sample[3][0] for sample in mini_batch])
    dones = np.array([sample[4] for sample in mini_batch])

    model_params = self.model.trainable_variables

    with tf.GradientTape() as tape:
      predicts = self.model(states)
      one_hot_action = tf.one_hot(actions , self.action_size)
      predicts = tf.reduce_sum(one_hot_action * predicts, axis=1)

      target_predicts = self.target_model(next_states)
      target_predicts = tf.stop_gradient(target_predicts)

      max_q = np.amax(target_predicts, axis=-1)
      targets = rewards + (1-dones) * self.discount_factor * max_q
      loss = tf.reduce_mean(tf.square(targets - predicts))

    grads = tape.gradient(loss, model_params)
    self.optimizer.apply_gradients(zip(grads, model_params))