In [1]:
import gym

In [2]:
env = gym.make("CartPole-v1")
obs = env.reset()

In [3]:
obs

array([-0.0254418 ,  0.01290775, -0.02330415,  0.02241122])

In [4]:
env.render()

True

In [5]:
img = env.render(mode="rgb_array")
img.shape

(400, 600, 3)

In [6]:
env.action_space

Discrete(2)

In [7]:
action=1

In [8]:
obs, reward, done, info = env.step(action)

In [9]:
obs

array([-0.02518364,  0.20835602, -0.02285592, -0.27753238])

In [10]:
reward

1.0

In [11]:
done

False

In [12]:
info

{}

In [13]:
env.reset()

array([-0.00322629, -0.00589353,  0.04804873, -0.02059597])

In [14]:
env.render()

True

In [15]:
env.close()

In [16]:
def basic_policy(obs):
    angle = obs[2]
    return 0 if angle <0 else 1

In [17]:
totals = []
for episode in range(500):
    episode_rewards = 0
    obs = env.reset()

    for step in range(200):
        action = basic_policy(obs)
        obs, reward, done, info = env.step(action)
        episode_rewards += reward
        if done:
            break
    totals.append(episode_rewards)

In [18]:
import numpy as np
np.mean(totals), np.std(totals), np.min(totals), np.max(totals)

(41.802, 9.272043787644664, 24.0, 72.0)

In [19]:
import matplotlib.pyplot as plt
import matplotlib.animation as animation

In [20]:
env.seed(42)
frames = []
obs = env.reset()

In [21]:
for step in range(200):
    img = env.render(mode="rgb_array")
    frames.append(img)
    action = basic_policy(obs)

    obs, reward, done, info = env.step(action)
    if done:
        break

In [22]:
def update_scene(num, frames, patch):
    patch.set_data(frames[num])
    return patch, 

In [23]:
def plot_animation(frames, repeat=False, interval=40):
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    anim = animation.FuncAnimation(fig, update_scene, fargs=(frames, patch), frames=len(frames), repeat=repeat, interval=interval)
    plt.close()
    return anim

In [24]:
plot_animation(frames)
plt.show()

In [25]:
env.close()

In [26]:
#  신경망 정책

import tensorflow as tf
import numpy as np

from tensorflow import keras


In [27]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)


In [28]:
n_inputs = 4 # == env.observation_space.shape[0]

model = keras.models.Sequential([
    keras.layers.Dense(5, activation='elu', input_shape=[n_inputs]),
    keras.layers.Dense(1, activation='sigmoid')
])

In [29]:
def render_policy_net(model, n_max_steps = 200, seed = 42):
    framse = []

    env = gym.make("CartPole-v1")
    env.seed(seed)
    np.random.seed(seed)

    obs = env.reset()

    for step in range(n_max_steps):
        frames.append(env.render(mode="rgb_array"))
        left_proba = model.predict(obs.reshape(1, -1))
        action = int(np.random.rand() > left_proba)
        obs, reward, done, info = env.step(action)
        if done:
            break
    env.close()

    return frames

In [30]:
frames = render_policy_net(model)
plot_animation(frames)

<matplotlib.animation.FuncAnimation at 0x7fe530109be0>

In [31]:
n_environments = 50
n_iterations = 5000

envs = [gym.make("CartPole-v1") for _ in range(n_environments)]

for index, env in enumerate(envs):
    env.seed(index)

np.random.seed(42)

observations = [env.reset() for env in envs]
optimizer = keras.optimizers.RMSprop()
loss_fn = keras.losses.binary_crossentropy

for iteraion in range(n_iterations):
    # if angle < 0 we want proba(left) = 1., or else proba(left) = 0
    target_probas = np.array([([1.] if obs[2] < 0 else [0.]) for obs in observations])

    with  tf.GradientTape() as tape:
        left_probas = model(np.array(observations))
        loss = tf.reduce_mean(loss_fn(target_probas, left_probas))

    print("\rIteration: {}, Loss: {:.3f}".format(iteraion, loss.numpy()), end="")
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

    actions = (np.random.rand(n_environments, 1) > left_probas.numpy()).astype(np.int32)

    for env_index, env in enumerate(envs):
        obs, reward, done, info = env.step(actions[env_index][0])
        observations[env_index] = obs if not done else env.reset()

for env in envs:
    env.close()

Iteration: 4999, Loss: 0.094

In [32]:
frames = render_policy_net(model)
plot_animation(frames)

<matplotlib.animation.FuncAnimation at 0x7fe5300ef390>

### Policy Gradients

In [33]:
def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        left_proba = model(obs[np.newaxis])
        action = (tf.random.uniform([1, 1]) > left_proba)
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
        loss = tf.reduce_mean(loss_fn(y_target, left_proba))
    grads = tape.gradient(loss, model.trainable_variables)
    obs, reward, done, info = env.step(int(action[0, 0].numpy()))
    
    return obs, reward, done, info

In [34]:
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_reward = []
    all_grads = []
    for episode in range(n_episodes):
        current_reward = []
        current_grads = []
        obs = env.reset()
        
        for step in range(n_max_steps):
            obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
            current_reward.append(reward)
            current_grads.append(grads)
            
            if done:
                break
                
        all_reward.append(current_reward)
        all_grads.append(current_grads)
    return all_reward, all_grads

In [35]:
def discount_rewards(rewards, dicount_factor):
    discounted = np.array(rewards)
    
    for step in range(len(rewards)-2, -1, -1):
        discounted[step] += discounted[step + 1] * dicount_factor
    return discounted

In [36]:
def discount_and_normalize_rewards(all_rewards, discount_factor):
    all_discounted_rewards = [discount_rewards(rewards, discount_factor) for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    
    return [(discount_rewards - reward_mean) / reward_std for discount_rewards in all_discounted_rewards]

In [37]:
discount_rewards([10, 0, -50], 0.8)

array([-22, -40, -50])

In [38]:
discount_and_normalize_rewards([[10,0,-50],[10,20]],0.8)

[array([-0.28435071, -0.86597718, -1.18910299]),
 array([1.26665318, 1.0727777 ])]

In [39]:
n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_factor = 0.95

In [40]:
optimizer = keras.optimizers.Adam(0.01)
loss_fn = keras.losses.binary_crossentropy

In [41]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Dense(5, activation='elu', input_shape=[4]),
    keras.layers.Dense(1, activation='sigmoid')
])

In [42]:
env = gym.make("CartPole-v1")
env.seed(42)

for iteraion in range(n_iterations):
    all_rewards, all_grads = play_multiple_episodes(env, n_episodes_per_update, n_max_steps, model, loss_fn)
    all_final_rewards = discount_and_normalize_rewards(all_rewards, discount_factor)
    
    all_mean_grads = []
    
    for var_index in range(len(model.trainable_variables)):
        mean_grads = tf.reduce_mean(
            [final_reward * all_grads[episode_index][step][var_index] 
            for episode_index, final_rewards in enumerate(all_final_rewards) 
            for step, final_reward in enumerate(final_rewards)], axis = 0)
        all_mean_grads.append(mean_grads)
        
    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))
         
env.close()

KeyError: 0

In [None]:
# env = gym.make("CartPole-v1")
# env.seed(42);

# for iteration in range(n_iterations):
#     all_rewards, all_grads = play_multiple_episodes(
#         env, n_episodes_per_update, n_max_steps, model, loss_fn)
#     total_rewards = sum(map(sum, all_rewards))                     # Not shown in the book
#     print("\rIteration: {}, mean rewards: {:.1f}".format(          # Not shown
#         iteration, total_rewards / n_episodes_per_update), end="") # Not shown
#     all_final_rewards = discount_and_normalize_rewards(all_rewards,
#                                                        discount_factor)
#     all_mean_grads = []
#     for var_index in range(len(model.trainable_variables)):
#         mean_grads = tf.reduce_mean(
#             [final_reward * all_grads[episode_index][step][var_index]
#              for episode_index, final_rewards in enumerate(all_final_rewards)
#                  for step, final_reward in enumerate(final_rewards)], axis=0)
#         all_mean_grads.append(mean_grads)
#     optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))

# env.close()

In [None]:
#Markov Chains

In [43]:
np.random.seed(42)

transition_probabilities = [ # shape=[s, s']
        [0.7, 0.2, 0.0, 0.1],  # from s0 to s0, s1, s2, s3
        [0.0, 0.0, 0.9, 0.1],  # from s1 to ...
        [0.0, 1.0, 0.0, 0.0],  # from s2 to ...
        [0.0, 0.0, 0.0, 1.0]]  # from s3 to ...

n_max_steps = 50

In [44]:
def print_sequence():
    current_state = 0
    print("State:", end=" ")
    for step in range(n_max_steps):
        print(current_state, end=" ")
        if current_state == 3:
            break
        current_state = np.random.choice(range(4), p=transition_probabilities[current_state])
    else:
        print("...", end="")
    print()

In [45]:
for _ in range(10):
    print_sequence()

State: 0 0 3 
State: 0 1 2 1 2 1 2 1 2 1 3 
State: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 
State: 0 3 
State: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 
State: 0 1 3 
State: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 ...
State: 0 0 3 
State: 0 0 0 1 2 1 2 1 3 
State: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 


In [46]:
transition_probabilities = [ # shape=[s, a, s']
        [[0.7, 0.3, 0.0], [1.0, 0.0, 0.0], [0.8, 0.2, 0.0]],
        [[0.0, 1.0, 0.0], None, [0.0, 0.0, 1.0]],
        [None, [0.8, 0.1, 0.1], None]]
rewards = [ # shape=[s, a, s']
        [[+10, 0, 0], [0, 0, 0], [0, 0, 0]],
        [[0, 0, 0], [0, 0, 0], [0, 0, -50]],
        [[0, 0, 0], [+40, 0, 0], [0, 0, 0]]]
possible_actions = [[0, 1, 2], [0, 2], [1]]

In [47]:
Q_values = np.full((3,3), -np.inf)
for state, actions in enumerate(possible_actions):
    Q_values[state, actions] = 0.0

In [48]:
Q_values

array([[  0.,   0.,   0.],
       [  0., -inf,   0.],
       [-inf,   0., -inf]])

In [49]:
gamma = 0.90

for iteration in range(50):
    Q_prev = Q_values.copy()
    for s in range(3):
        for a in possible_actions[s]:
            Q_values[s, a] = np.sum([transition_probabilities[s][a][sp] * (rewards[s][a][sp] + gamma * np.max(Q_prev[sp])) for sp in range(3)])

In [50]:
Q_values

array([[18.91891892, 17.02702702, 13.62162162],
       [ 0.        ,        -inf, -4.87971488],
       [       -inf, 50.13365013,        -inf]])

In [51]:
np.argmax(Q_values, axis=1)

array([0, 0, 1])

In [52]:
Q_values = np.full((3,3), -np.inf)
for state, actions in enumerate(possible_actions):
    Q_values[state, actions] = 0.0

In [53]:
gamma = 0.95

for iteration in range(50):
    Q_prev = Q_values.copy()
    for s in range(3):
        for a in possible_actions[s]:
            Q_values[s, a] = np.sum([transition_probabilities[s][a][sp] * (rewards[s][a][sp] + gamma * np.max(Q_prev[sp])) for sp in range(3)])

In [54]:
np.argmax(Q_values, axis=1)

array([0, 2, 1])

### Q-Learning

In [55]:
def step(state, action):
    probas = transition_probabilities[state][action]
    next_state = np.random.choice([0,1,2], p = probas)
    reward = rewards[state][action][next_state]
    
    return next_state, reward

In [56]:
def exploration_policy(state):
    return np.random.choice(possible_actions[state])

In [57]:
np.random.seed(42)

Q_values = np.full((3, 3), -np.inf)

for state, actions in enumerate(possible_actions):
    Q_values[state][actions] = 0

In [59]:
ALPHA0 = 0.05          # 초기 합습률
DECAY = 0.005          # 학습률 감쇠
GAMMA = 0.90           # 할인 계수
STATE = 0              # 초기 상태
history2 = []

In [60]:
for iteraion in range(10000):
    history2.append(Q_values.copy())
    action = exploration_policy(STATE)
    next_state, reward = step(STATE, action)
    next_value = np.max(Q_values[next_state])
    alpha = ALPHA0 / (1 + iteraion * DECAY)
    Q_values[state, action] *= 1 - alpha
    Q_values[state, action] += alpha * (reward + GAMMA * next_value)
    state = next_state

In [61]:
history2 = np.array(history2)

In [63]:
Q_values

array([[33.2635756 , 28.65384801, 26.92310168],
       [24.71728496,        -inf, 19.32345199],
       [       -inf,  0.        ,        -inf]])

In [65]:
np.argmax(Q_values, axis=1)

array([0, 0, 1])

In [66]:
import matplotlib.pyplot as plt

In [69]:
keras.backend.clear_session()

In [72]:
env = gym.make("CartPole-v0")

input_shape = [4]
n_outputs = 2

model = keras.models.Sequential([
    keras.layers.Dense(32, activation='elu', input_shape=input_shape),
    keras.layers.Dense(32, activation='elu'),
    keras.layers.Dense(n_outputs)
])

In [73]:
def epsilon_greedy_policy(state, epsilon=0):
    if np.random.rand() < epsilon:
        return np.random.randint(2)
    else:
        Q_values  = model.predict(state[np.newaxis])
        return np.argmax(Q_values[0])

In [74]:
from collections import deque

replay_buffer = deque(maxlen = 2000)

In [75]:
def sample_experiences(batch_size):
    indices = np.random.randint(len(replay_buffer), size = batch_size)
    batch = [replay_buffer[index] for index in indices]
    states, actions, rewards, next_states, dones = [
        np.array([experience[filed_index] for experience in batch])
        for filed_index in range(5)]
    
    return states, actions, rewards, next_states, done

In [81]:
def play_one_step(env, state, epsilon):
    action = epsilon_greedy_policy(state, epsilon)
    next_state, reward, done, info = env.step(action)
    replay_buffer.append((state, action, reward, next_state, done))
    
    return next_state, reward, done, info

In [77]:
BATCH_SIZE = 32
DISCOUNT_FACTOR = 0.95
OPTIMIZER = keras.optimizers.Adam(1e-3)
LOSS_FN = keras.losses.mean_squared_error

In [83]:
#DQN Training Function
def training_step(batch_size):
    experiences = sample_experiences(batch_size)
    states, actions, rewards, next_states, dones = experiences
    next_Q_values = model.predict(next_states)
    max_next_Q_values = np.max(next_Q_values, axis = 1)
    target_Q_values = (rewards + (1 - dones) * DISCOUNT_FACTOR * max_next_Q_values)
    target_Q_values = target_Q_values.reshape(-1, 1)
    mask = tf.one_hot(actions, n_outputs)
    with tf.GradientTape() as tape:
        all_Q_values = model(states)
        Q_values = tf.reduce_mean(all_Q_values * mask, axis=1, keepdims=True)
        loss = tf.reduce_mean(LOSS_FN(target_Q_values, Q_values))
    grads = tape.gradient(loss, model.trainable_variables)
    OPTIMIZER.apply_gradients(zip(grads, model.trainable_variables))

In [85]:
for episode in range(600):
    obs = env.reset()
    for step in range(200):
        epsilon = max(1 - episode / 500, 0.01)
        obs, reward, done, info = play_one_step(env, obs, epsilon)
        
        if done:
            break
    if episode > 50:
        training_step(BATCH_SIZE)