In [1]:
%pip install -q -U gym
%pip install -q -U gym[box2d,atari,accept-rom-license]
!apt update &> /dev/null && apt install -y xvfb &> /dev/null
%pip install -q -U pyvirtualdisplay

[?25l[K     |▌                               | 10 kB 17.9 MB/s eta 0:00:01[K     |█                               | 20 kB 5.6 MB/s eta 0:00:01[K     |█▍                              | 30 kB 7.8 MB/s eta 0:00:01[K     |█▉                              | 40 kB 3.0 MB/s eta 0:00:01[K     |██▎                             | 51 kB 3.4 MB/s eta 0:00:01[K     |██▊                             | 61 kB 3.9 MB/s eta 0:00:01[K     |███▏                            | 71 kB 4.1 MB/s eta 0:00:01[K     |███▋                            | 81 kB 4.6 MB/s eta 0:00:01[K     |████                            | 92 kB 5.1 MB/s eta 0:00:01[K     |████▌                           | 102 kB 3.9 MB/s eta 0:00:01[K     |█████                           | 112 kB 3.9 MB/s eta 0:00:01[K     |█████▍                          | 122 kB 3.9 MB/s eta 0:00:01[K     |█████▉                          | 133 kB 3.9 MB/s eta 0:00:01[K     |██████▎                         | 143 kB 3.9 MB/s eta 0:00:01[K    

In [2]:
import pyvirtualdisplay as pvd
display = pvd.Display(visible=0, size=(1400, 900)).start()

In [4]:
import gym
env = gym.make("CartPole-v1")

  "Initializing wrapper in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."
  "Initializing environment in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."


In [5]:
obs = env.reset(seed=42)
obs

array([ 0.0273956 , -0.00611216,  0.03585979,  0.0197368 ], dtype=float32)

In [6]:
img = env.render(mode="rgb_array")
img.shape

See here for more information: https://www.gymlibrary.ml/content/api/[0m
  "The argument mode in render method is deprecated; "


(400, 600, 3)

In [7]:
env.action_space

Discrete(2)

In [8]:
action = 1
obs, reward, done, info = env.step(action)
obs

array([ 0.02727336,  0.18847767,  0.03625453, -0.26141977], dtype=float32)

In [9]:
reward

1.0

In [10]:
done

False

In [11]:
info

{}

In [12]:
def basic_policy(obs):
  angle = obs[2]
  return 0 if angle < 0 else 1

totals = []
for episode in range(500):
  episode_rewards = 0
  obs = env.reset(seed=episode)
  for step in range(200):
    action = basic_policy(obs)
    obs, reward, done, info = env.step(action)
    episode_rewards += reward
    if done:
      break
  totals.append(episode_rewards)

In [13]:
import numpy as np
np.mean(totals), np.std(totals), min(totals), max(totals)

(41.698, 8.389445512070509, 24.0, 63.0)

In [14]:
import tensorflow as tf
model = tf.keras.Sequential([
    tf.keras.layers.Dense(5, activation="relu"),
    tf.keras.layers.Dense(1, activation="sigmoid"),
])

In [15]:
def play_one_step(env, obs, model, loss_fn):
  with tf.GradientTape() as tape:
    left_proba = model(obs[np.newaxis])
    action = (tf.random.uniform([1, 1]) > left_proba)
    y_target = tf.constant([1.]) - tf.cast(action, tf.float32)
    loss = tf.reduce_mean(loss_fn(y_target, left_proba))
  
  grads = tape.gradient(loss, model.trainable_variables)
  obs, reward, done, info = env.step(int(action))
  return obs, reward, info, grads

In [16]:
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
  all_rewards = []
  all_grads = []
  for episode in range(n_episodes):
    current_rewards = []
    current_grads = []
    obs = env.reset()
    for step in range(n_max_steps):
      obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
      current_rewards.append(reward)
      current_grads.append(grads)
      if done:
        break
    all_rewards.append(current_rewards)
    all_grads.append(current_grads)
  return all_rewards, all_grads

In [17]:
def discount_rewards(rewards, discount_factor):
    discounted = np.array(rewards)
    for step in range(len(rewards) - 2, -1, -1):
        discounted[step] += discounted[step + 1] * discount_factor
    return discounted

def discount_and_normalize_rewards(all_rewards, discount_factor):
    all_discounted_rewards = [discount_rewards(rewards, discount_factor)
                              for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean) / reward_std
            for discounted_rewards in all_discounted_rewards]

In [18]:
discount_rewards([10, 0, -50], discount_factor=0.8)

array([-22, -40, -50])

In [20]:
discount_and_normalize_rewards([[10,0,-50], [10,20]], discount_factor=0.8)

[array([-0.28435071, -0.86597718, -1.18910299]),
 array([1.26665318, 1.0727777 ])]

In [21]:
n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_factor = 0.95

In [22]:
optimizer = tf.keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = tf.keras.losses.binary_crossentropy

In [24]:
for iteration in range(n_iterations):
  all_rewards, all_grads = play_multiple_episodes(
      env, n_episodes_per_update, n_max_steps, model, loss_fn)
  all_final_rewards = discount_and_normalize_rewards(all_rewards, discount_factor)

  all_mean_grads = []
  for var_index in range(len(model.trainable_variables)):
    mean_grads = tf.reduce_mean(
        [final_reward * all_grads[episode_index][step][var_index]
         for episode_index, final_rewards in enumerate(all_final_rewards)
            for step, final_reward in enumerate(final_rewards)], axis=0)
    all_mean_grads.append(mean_grads)
  optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))

In [25]:
transition_probabilities = [  # shape=[s, a, s']
    [[0.7, 0.3, 0.0], [1.0, 0.0, 0.0], [0.8, 0.2, 0.0]],
    [[0.0, 1.0, 0.0], None, [0.0, 0.0, 1.0]],
    [None, [0.8, 0.1, 0.1], None]
]
rewards = [  # shape=[s, a, s']
    [[+10, 0, 0], [0, 0, 0], [0, 0, 0]],
    [[0, 0, 0], [0, 0, 0], [0, 0, -50]],
    [[0, 0, 0], [+40, 0, 0], [0, 0, 0]]
]
possible_actions = [[0, 1, 2], [0, 2], [1]]

In [26]:
Q_values = np.full((3, 3), -np.inf)
for state, actions in enumerate(possible_actions):
  Q_values[state, actions] = 0.0

In [27]:
gamma = 0.90

for iteration in range(50):
  Q_prev = Q_values.copy()
  for s in range(3):
    for a in possible_actions[s]:
      Q_values[s, a] = np.sum([
          transition_probabilities[s][a][sp]
          * (rewards[s][a][sp] + gamma * Q_prev[sp].max())
          for sp in range(3)])

In [28]:
Q_values

array([[18.91891892, 17.02702702, 13.62162162],
       [ 0.        ,        -inf, -4.87971488],
       [       -inf, 50.13365013,        -inf]])

In [29]:
Q_values.argmax(axis=1)

array([0, 0, 1])

In [30]:
def step(state, action):
  probas = transition_probabilities[state][action]
  next_state = np.random.choice([0, 1, 2], p=probas)
  reward = rewards[state][action][next_state]
  return next_state, reward

In [31]:
def exploitation_policy(state):
  return np.random.choice(possible_actions[state])

In [32]:
alpha0 = 0.05
decay = 0.005
gamma = 0.90
state = 0

for iteration in range(10_000):
  action = exploitation_policy(state)
  next_state, reward = step(state, action)
  next_value = Q_values[next_state].max()
  alpha = alpha0 / (1 + iteration * decay)
  Q_values[state, action] *= 1 - alpha
  Q_values[state, action] += alpha * (reward + gamma * next_value)
  state = next_state

In [33]:
input_shape = [4]  # == env.observation_space.shape
n_outputs = 2  # == env.action_space.n

model = tf.keras.Sequential([
    tf.keras.layers.Dense(32, activation="elu", input_shape=input_shape),
    tf.keras.layers.Dense(32, activation="elu"),
    tf.keras.layers.Dense(n_outputs)
])

In [34]:
def epsilon_greedy_policy(state, epsilon=0):
    if np.random.rand() < epsilon:
        return np.random.randint(n_outputs)  # random action
    else:
        Q_values = model.predict(state[np.newaxis])[0]
        return Q_values.argmax()  # optimal action according to the DQN

In [35]:
from collections import deque

replay_buffer = deque(maxlen=2000)

In [36]:
def sample_experiences(batch_size):
    indices = np.random.randint(len(replay_buffer), size=batch_size)
    batch = [replay_buffer[index] for index in indices]
    states, actions, rewards, next_states, dones = [
        np.array([experience[field_index] for experience in batch])
        for field_index in range(5)
    ]
    return states, actions, rewards, next_states, dones

In [37]:
def play_one_step(env, state, epsilon):
    action = epsilon_greedy_policy(state, epsilon)
    next_state, reward, done, info = env.step(action)
    replay_buffer.append((state, action, reward, next_state, done))
    return next_state, reward, done, info

In [38]:
batch_size = 32
discount_factor = 0.95
optimizer = tf.keras.optimizers.Nadam(learning_rate=1e-2)
loss_fn = tf.keras.losses.mean_squared_error

def training_step(batch_size):
    experiences = sample_experiences(batch_size)
    states, actions, rewards, next_states, dones = experiences
    next_Q_values = model.predict(next_states)
    max_next_Q_values = next_Q_values.max(axis=1)
    target_Q_values = (rewards +
                       (1 - dones) * discount_factor * max_next_Q_values)
    target_Q_values = target_Q_values.reshape(-1, 1)
    mask = tf.one_hot(actions, n_outputs)
    with tf.GradientTape() as tape:
        all_Q_values = model(states)
        Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True)
        loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))

    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

In [39]:
for episode in range(600):
    obs = env.reset()
    for step in range(200):
        epsilon = max(1 - episode / 500, 0.01)
        obs, reward, done, info = play_one_step(env, obs, epsilon)
        if done:
            break

    if episode > 50:
        training_step(batch_size)

In [43]:
print(obs)

[2.429335   2.3622649  0.20011337 0.25885397]
