In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from Environment import Tetris
from pygame.time import Clock
from collections import deque

pygame 1.9.6
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
tf.random.set_seed(42)
np.random.seed(42)

In [3]:
env = Tetris()

In [4]:
observation_shape = env.observation_shape()
n_inputs = observation_shape[0]*observation_shape[1]

In [5]:
n_outputs = env.action_scope_size()

In [6]:
model = keras.Sequential([keras.layers.Dense(32, activation='relu', input_shape=[n_inputs]),
                          keras.layers.Dense(32, activation='relu'),
                          keras.layers.Dense(n_outputs)
                         ])

In [7]:
def epsilon_greedy_policy(state, epsilon=0):
    if np.random.rand() < epsilon:
        return np.random.randint(2)
    else:
        Q_values = model.predict(obs.reshape(1, -1))
    
    return np.argmax(Q_values)

In [8]:
replay_memory = deque(maxlen=2000)

In [9]:
def play_one_step(env, state, epsilon):
    action = epsilon_greedy_policy(state, epsilon)
    next_state, reward, done = env.step(action)
    replay_memory.append((state, action, reward, next_state.flatten(), done))
    return next_state, reward, done

In [12]:
def sample_experiences(batch_size):
    indices = np.random.randint(len(replay_memory), size=batch_size)
    batch = [replay_memory[index] for index in indices]
    states, actions, rewards, next_states, dones = [
        np.array([experience[field_index] for experience in batch])
        for field_index in range(5)]
    return states, actions, rewards, next_states, dones

In [13]:
discount_rate = 0.95
optimizer = keras.optimizers.Adam(lr=1e-3)
loss_fn = keras.losses.mean_squared_error

In [14]:
def training_step(batch_size):
    states, actions, rewards, next_states, dones = sample_experiences(batch_size)
#     print(next_states)
    next_Q_values = model.predict(next_states)
    max_next_Q_values = np.max(next_Q_values, axis=1)
    target_Q_values = (rewards +
                       (1 - dones) * discount_rate * max_next_Q_values)
    target_Q_values = target_Q_values.reshape(-1, 1)
    mask = tf.one_hot(actions, n_outputs)
    with tf.GradientTape() as tape:
        all_Q_values = model(states)
        Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True)
        loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

In [15]:
best_score = 0
batch_size = 32

In [16]:
for episode in range(600):
    obs = env.reset()
    state = obs.flatten()
    
    for step in range(200):
        epsilon = max(1 - episode / 500, 0.01)
        obs, reward, done = play_one_step(env, state, epsilon)
        if done:
            break
            
    score = env.get_score()
    if score > best_score:
        best_weights = model.get_weights()
        best_score = score
    print("\rEpisode: {}, Steps: {}, eps: {:.3f}".format(episode, step + 1, epsilon), end="")
    
    if episode > 50:
        training_step(batch_size)
        
model.set_weights(best_weights)



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

Episode: 599, Steps: 38, eps: 0.0102

In [17]:
clock = Clock()

In [20]:
obs = env.reset()
done = False

while not done:
    env.render()
    
    state = obs.flatten()
    y_pred = epsilon_greedy_policy(state)
    action = np.argmax(y_pred)
    
    obs, reward, done = env.step(action)
    
    clock.tick(16)
    
env.close()