In [1]:
import gym

In [2]:
env = gym.make('CartPole-v1')
obs = env.reset()
obs

array([-0.01274851, -0.03042583,  0.04722203,  0.02659708])

In [4]:
def basic_policy(obs):
    angle = obs[2]
    return 0 if angle < 0 else 1

totals = []
for episode in range(500):
    episode_rewards = 0
    obs = env.reset()
    for step in range(200):
        action = basic_policy(obs)
        obs, reward, done, info = env.step(action)
        episode_rewards += reward
        if done:
            break
    totals.append(episode_rewards)

In [5]:
totals

[45.0,
 56.0,
 43.0,
 52.0,
 44.0,
 43.0,
 44.0,
 38.0,
 43.0,
 39.0,
 44.0,
 42.0,
 61.0,
 45.0,
 36.0,
 25.0,
 25.0,
 40.0,
 41.0,
 25.0,
 36.0,
 47.0,
 35.0,
 38.0,
 51.0,
 53.0,
 52.0,
 31.0,
 34.0,
 52.0,
 34.0,
 51.0,
 32.0,
 32.0,
 34.0,
 25.0,
 35.0,
 52.0,
 38.0,
 25.0,
 54.0,
 45.0,
 46.0,
 40.0,
 46.0,
 51.0,
 51.0,
 36.0,
 25.0,
 37.0,
 45.0,
 35.0,
 38.0,
 43.0,
 51.0,
 54.0,
 41.0,
 26.0,
 46.0,
 62.0,
 37.0,
 51.0,
 38.0,
 40.0,
 40.0,
 42.0,
 45.0,
 57.0,
 47.0,
 44.0,
 34.0,
 57.0,
 59.0,
 42.0,
 52.0,
 31.0,
 64.0,
 39.0,
 35.0,
 35.0,
 31.0,
 44.0,
 35.0,
 41.0,
 50.0,
 41.0,
 31.0,
 37.0,
 36.0,
 62.0,
 25.0,
 39.0,
 43.0,
 31.0,
 35.0,
 38.0,
 40.0,
 37.0,
 50.0,
 52.0,
 49.0,
 47.0,
 45.0,
 25.0,
 55.0,
 34.0,
 45.0,
 47.0,
 40.0,
 38.0,
 62.0,
 54.0,
 62.0,
 40.0,
 41.0,
 50.0,
 56.0,
 39.0,
 34.0,
 37.0,
 39.0,
 45.0,
 43.0,
 41.0,
 42.0,
 39.0,
 39.0,
 52.0,
 37.0,
 40.0,
 51.0,
 35.0,
 35.0,
 38.0,
 47.0,
 45.0,
 40.0,
 56.0,
 34.0,
 58.0,
 38.0,
 33.0,
 56.0,

In [40]:
import tensorflow as tf
from tensorflow import keras
from tqdm import tqdm

In [28]:
n_inputs = 4  # == env.observation_space.shape[0]

model = keras.models.Sequential([
    keras.layers.Dense(5, activation='elu', input_shape=[n_inputs]),
    keras.layers.Dense(1, activation='sigmoid')
])

In [7]:
# 한 스텝을 진행할 함수 (아래 play_mutliple_episodes 함수안에서 사용됨)

def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        left_proba = model(obs[np.newaxis]) 
            # 배치를 기대하므로 np.newaxis 넣어서 하나의 햄플이 있는 배치가 
            # 되도록 만들어 준 후, 왼쪽으로 이동할 확률구하기
            
        action = (tf.random.uniform([1, 1]) > left_proba)
            # 0~1 사이에서 랜덤한 실수를 샘플링하고 이 값이 left_prob보다 큰지
            # 확인 (이 불리언값을 숫자로 변환하면 0(True), 1(False))
            
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
            # 왼쪽으로 이동할 타깃 확률 정의 1 - (행동)
            # 행동이 0이면 왼쪽으로 이동할 타깃 확률이 1
            
        loss = tf.reduce_mean(loss_fn(y_target, left_proba)) # 손실 계산
            
    grads = tape.gradient(loss, model.trainable_variables)
        # 훈련 가능한 변수마다 그레디언트 계산
    obs, reward, done, info = env.step(int(action[0, 0].numpy())) 
    return obs, reward, done, grads

In [36]:
# 여러 에피소드를 플레이하는 함수

def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = []
    all_grads = []
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs = env.reset()
        for step in range(n_max_steps):
            obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    return all_rewards, all_grads

In [37]:
# 각 스텝에서 할인된 미래 보상의 합을 계산하는 함수 

def discount_rewards(rewards, discount_factor): # discount_factor : 할인계수 감마
    discounted = np.array(rewards)
    for step in range(len(rewards) - 2, -1, -1):
        discounted[step] += discounted[step + 1] * discount_factor
    return discounted

In [38]:
# 여러 에피소드에 걸쳐 계산된 할인된 이 모든 보상(대가)을 정규화하는 함수

def discount_and_normalize_rewards(all_rewards, discount_factor):
    all_discounted_rewards = [discount_rewards(rewards, discount_factor)
                             for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean) / reward_std
           for discounted_rewards in all_discounted_rewards]

In [15]:
# 작동확인
discount_rewards([10, 0, -50], discount_factor=0.8)

array([-22, -40, -50])

In [16]:
discount_and_normalize_rewards([[10, 0, -50], [10, 20]],
                              discount_factor=0.8)

[array([-0.28435071, -0.86597718, -1.18910299]),
 array([1.26665318, 1.0727777 ])]

In [17]:
# 하이퍼파라미터 정의

n_iterations = 150
n_episodes_per_update = 10  # 각 반복은 에피소드 10개를 진행
n_max_steps = 200
discount_factor = 0.95

In [21]:
optimizer = tf.keras.optimizers.Adam(lr=0.01)
loss_fn = tf.keras.losses.binary_crossentropy

In [41]:
# 실행

for iteration in tqdm(range(n_iterations)):
    all_rewards, all_grads = play_multiple_episodes(
        env, n_episodes_per_update, n_max_steps, model, loss_fn)
    all_final_rewards = discount_and_normalize_rewards(all_rewards,
                                                      discount_factor)
    all_mean_grads = []
    for var_index in range(len(model.trainable_variables)):
        # 각 훈련 가능한 변수를 순회하면서 모든 에피소드와 모든 스텝에 대한 
        # 그레디언트를 final_reward로 가중치를 두어 평균
        mean_grads = tf.reduce_mean(
            [final_reward * all_grads[episode_index][step][var_index]
                for episode_index, final_rewards in enumerate(all_final_rewards)
                for step, final_reward in enumerate(final_rewards)], axis=0)
        all_mean_grads.append(mean_grads)
        
    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))
        # 평균 그레디언트를 옵티마이저에 적용 (trainable 변수가 변경되고 정책 더 나아질 것)

100%|████████████████████████████████████████████████████████████████████████████████| 150/150 [13:18<00:00,  5.32s/it]


### 7. 마르코프 결정 과정

In [68]:
# MDP 정의

transition_probabilities = [  # shape=[s, a, s']
    [[0.7, 0.3, 0.0], [1.0, 0.0, 0.0], [0.8, 0.2, 0.0]],
    [[0.0, 1.0, 0.0], None, [0.0, 0.0, 1.0]],
    [None, [0.8, 0.1, 0.1], None]
]
rewards = [  # shape=[s, a, s']
    [[+10, 0, 0], [0, 0, 0], [0, 0, 0]],
    [[0, 0, 0], [0, 0, 0], [0, 0, -50]],
    [[0, 0, 0], [+40, 0, 0], [0, 0, 0]]
]
possible_actions = [[0, 1, 2], [0, 2], [1]]

# 행동 a1을 통해 s2에서 s0으로 전이할 확률 : transition_probabilities[2][1][0]
# 비슷하게 이에 해당하는 보상  : rewards[2][1][0]

In [62]:
ex = np.array(transition_probabilities)
ex

array([list([[0.7, 0.3, 0.0], [1.0, 0.0, 0.0], [0.8, 0.2, 0.0]]),
       list([[0.0, 1.0, 0.0], None, [0.0, 0.0, 1.0]]),
       list([[None, [0.8, 0.1, 0.1], None]])], dtype=object)

In [45]:
# Q-가치 0으로 초기화 (불가는한 행동은 -np.Inf로)

Q_values = np.full((3, 3), -np.inf)  # 불가능한 행동에 대해서
for state, actions in enumerate(possible_actions):
    Q_values[state, actions] = 0.0   # 모든 가능한 행동에 대해서

In [71]:
# Q-가치 반복 알고리즘

gamma = 0.90  # 할인 계수

for iteration in range(50):
    Q_prev = Q_values.copy()
    for s in range(3):
        for a in possible_actions[s]:
            Q_values[s, a] = np.sum([
                transition_probabilities[s][a][sp] 
                * (rewards[s][a][sp] + gamma * np.max(Q_prev[sp]))
                for sp in range(3)])

In [72]:
Q_values

array([[18.91891892, 17.02702703, 13.62162162],
       [ 0.        ,        -inf, -4.87971488],
       [       -inf, 50.13365013,        -inf]])

In [73]:
# 각 상태에 대해 가장 높은 Q-가치를 갖는 행동 확인하기

np.argmax(Q_values, axis=1)

array([0, 0, 1], dtype=int64)

### 9. Q-러닝

In [77]:
# 에이전트가 한 행동을 실행하고 결과 상태와 보상을 받을 수 있는 스텝함수 만들기

def step(state, action):
    probas = transition_probabilities[state][action]
    next_state = np.random.choice([0, 1, 2], p=probas)
    reward = rewards[state][action][next_state]
    return next_state, reward

In [75]:
# 에이전트의 탐색 정책 구현하기 (여기서는 상태공간이 작으므로 단순한 랜덤정책)

def exploration_policy(state):
    return np.random.choice(possible_actions[state])

In [78]:
# Q-가치를 초기화 한 후, 학습률 감쇠(거듭제곱 기반 스케줄링)를 사용해 Q-러닝 알고리즘 실행

alpha0 = 0.05  # 초기 학습률
decay = 0.005  # 학습률 감쇠
gamma = 0.90   # 할인 계수
state = 0      # 초기 상태

for iteration in range(10000):
    action = exploration_policy(state)
    next_state, reward = step(state, action)
    next_value = np.max(Q_values[next_state])
    alpha = alpha0 / (1 + iteration * decay) 
    Q_values[state, action] *= 1 - alpha
    Q_values[state, action] += alpha * (reward + gamma * next_value)
    state = next_state
    
# 많은 반복과 하이퍼파라미터 튜닝이 필요

### 10. 심층 Q-러닝 구현하기
- 이론적으로 상태-행동 쌍을 입력으로 받고 근사 Q-가치를 출력하는 신경망이 필요
- But, 실전에서는 상태를 받고 가능한 모든 행동에 대한 근사 Q-가치를 각각 출력하는 것이 훨씬 효율적

In [79]:
env = gym.make('CartPole-v0')
input_shape = [4]  # == env.observation_space.shape
n_outputs = 2      # == env.action_space.n

In [80]:
model = keras.models.Sequential([
    keras.layers.Dense(32, activation='elu', input_shape=input_shape),
    keras.layers.Dense(32, activation='elu'),
    keras.layers.Dense(n_outputs)
])

In [81]:
# epsilon-greedy policy

def epsilon_greedy_policy(state, epsilon=0):
    if np.random.rand() < epsilon:
        return np.random.randint(2)
    else:
        Q_values = model.predict(state[np.newaxis])
        return np.argmax(Q_values[0])

In [82]:
# 재생 버퍼를 만들어 모든 경험을 저장하고 훈련 반복마다 여기서 랜덤한 훈련 배치를
# 샘플링할 수 있도록 (deque으로)
from collections import deque

replay_buffer = deque(maxlen=2000)

In [83]:
# 경험 원소 5개에 상응하는 넘파이 배열 5개 반환
# (상태, 선택한 행동, 결과보상, 도달한 상태, 종료되었는지를 나타내는 불리언 값)

def sample_experiences(batch_size):
    indices = np.random.randint(len(replay_buffer), size=batch_size)
    batch = [replay_buffer[index] for index in indices]
    states, actions, rewards, next_states, dones = [
        np.array([experience[field_index] for experience in batch])
            for field_index in range(5)]
    return states, actions, rewards, next_states, dones

In [89]:
# epsilon-greedy 정책을 사용해 하나의 스텝을 플레이하고 반환된 경험을
# 재생 버퍼에 저장하는 함수

def play_one_step(env, state, epsilon):
    action = epsilon_greedy_policy(state, epsilon)
    next_state, reward, done, info = env.step(action)
    replay_buffer.append((state, action, reward, next_state, done))
    return next_state, reward, done, info

In [98]:
# 재생 버퍼에서 경험 배치를 샘플링하고 이 배치에서 경사 하강법 한 스텝을 수행하여 
# DQN을 훈련하는 함수 만들기

batch_size = 32
discount_factor = 0.95
optimizer = keras.optimizers.Adam(lr=1e-3)
loss_fn  = keras.losses.mean_squared_error

def training_step(batch_size):
    experiences = sample_experiences(batch_size)
    states, actions, rewards, next_states, dones = experiences
    next_Q_values = model.predict(next_states)
    max_next_Q_values = np.max(next_Q_values, axis=1)
    target_Q_values = (rewards + 
                      (1 - dones) * discount_factor * max_next_Q_values)
    target_Q_values = target_Q_values.reshape(-1, 1)
    mask = tf.one_hot(actions, n_outputs)
    with tf.GradientTape() as tape:
        all_Q_values = model(states)
        Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True)
        loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

In [99]:
for episode in tqdm(range(600)):
    obs = env.reset()
    for step in range(200):
        epsilon = max(1 - episode / 500, 0.01)
        obs, reward, done, info = play_one_step(env, obs, epsilon)
        if done:
            break
    if episode > 50:
        training_step(batch_size)

100%|████████████████████████████████████████████████████████████████████████████████| 600/600 [06:40<00:00,  1.50it/s]
