In [1]:
import tensorflow as tf
import numpy as np
from tensorflow import keras
import matplotlib as mpl
import matplotlib.pyplot as plt
from gym_game import Game
#import gym_2048
import gym

In [2]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

env = Game()
input_shape = [4,4] # == env.observation_space.shape
n_outputs = 4 # == env.action_space.n

model = keras.models.Sequential([
    keras.layers.Flatten(input_shape = (4,4)),
    keras.layers.Dense(32, activation="elu"),
    keras.layers.Dense(32, activation="elu"),
    keras.layers.Dense(n_outputs)
])
#model.summary()

In [3]:
def epsilon_greedy_policy(state, epsilon=0):
    if np.random.rand() < epsilon:
        return np.random.randint(n_outputs)
    else:
        Q_values = model.predict(state[np.newaxis], verbose=0)
        return np.argmax(Q_values[0])

In [4]:
class ReplayMemory:
    def __init__(self, max_size):
        self.buffer = np.empty(max_size, dtype=np.object)
        self.max_size = max_size
        self.index = 0
        self.size = 0

    def append(self, obj):
        self.buffer[self.index] = obj
        self.size = min(self.size + 1, self.max_size)
        self.index = (self.index + 1) % self.max_size

    def sample(self, batch_size):
        indices = np.random.randint(self.size, size=batch_size)
        return self.buffer[indices]

In [5]:
from collections import deque

replay_memory = ReplayMemory(max_size = 1000000)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  self.buffer = np.empty(max_size, dtype=np.object)


In [6]:
def sample_experiences(batch_size):
    batch = replay_memory.sample(batch_size)
    states, actions, rewards, next_states, dones = [
        np.array([experience[field_index] for experience in batch])
        for field_index in range(5)]
    return states, actions, rewards, next_states, dones

In [7]:
def play_one_step(env, state, epsilon):
    action = epsilon_greedy_policy(state, epsilon)
    next_state, reward, done, info = env.step(action)
    replay_memory.append((state, action, reward, next_state, done))
    return next_state, reward, done, info

In [8]:
batch_size = 32
discount_rate = 0.95
optimizer = keras.optimizers.RMSprop(learning_rate=2.5e-4, rho=0.95, momentum=0.0,
                                     epsilon=0.00001, centered=True)
loss_fn = keras.losses.mean_squared_error

def training_step(batch_size):
    experiences = sample_experiences(batch_size)
    states, actions, rewards, next_states, dones = experiences
    next_Q_values = model.predict(next_states, verbose=0)
    max_next_Q_values = np.max(next_Q_values, axis=1)
    target_Q_values = (rewards +
                       (1 - dones) * discount_rate * max_next_Q_values)
    target_Q_values = target_Q_values.reshape(-1, 1)
    mask = tf.one_hot(actions, n_outputs)
    with tf.GradientTape() as tape:
        all_Q_values = model(states)
        Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True)
        loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

In [9]:
np.random.seed(42)
tf.random.set_seed(42)
episode_nums = 2000
lengths = []
rewards = []
best_score = 0

In [None]:
for episode in range(episode_nums):
    obs = env.reset()   
    epsilon = max(1 - episode / (episode_nums * 0.9), 0.01)
    for step in range(1000):
        obs, reward, done, info = play_one_step(env, obs, epsilon)
        if done:
            break
    lengths.append(step) # Not shown in the book
    avg_length = 0.0
    if len(lengths) > 100:
        avg_length = sum(lengths[-100:]) / 100 
    if step >= best_score: # Not shown
        best_weights = model.get_weights() # Not shown
        best_score = step # Not shown
    print("\rEpisode: {}, Steps: {}, eps: {:.3f}, Avg length: {:.3f}".format(episode, step + 1, epsilon, avg_length), end="") # Not shown
    if episode > 50:
        training_step(batch_size)

model.set_weights(best_weights)

Episode: 1794, Steps: 83, eps: 0.010, Avg length: 71.3800

In [None]:
plt.figure(figsize=(8, 4))
plt.plot(lengths)
plt.xlabel("Episode", fontsize=14)
plt.ylabel("Sum of rewards", fontsize=14)
x = list(range(episode_nums))
z = np.polyfit(x, lengths, 10)
p = np.poly1d(z)
plt.plot(x, p(x))
plt.show()

In [None]:
state = env.reset()
state

In [None]:
action = np.argmax(model.predict(state))
print(action)
state, reward, done, _ = env.step(action)
print(state)
print(reward)
print(done)