In [None]:
from tetris_game import Tetris
from pg import PG
from dqn import DQN
import numpy as np
from tensorflow import keras
import matplotlib.pyplot as plt

## Manual gameplay

In [None]:
# create Tetris environment with UI enabled
env = Tetris(gui=True)

while True:
    # input move
    position = int(input('\nPosition: '))
    rotation = int(input('Rotation: '))
    # print calculated reward
    action = min(position, 9) * 4 + min(rotation, 3)
    _, reward, done, _ = env.step(action)
    print('Reward:', reward)
    # ...until the game is over
    if done:
        break

## Policy Gradients

In [None]:
# set neural network parameters
layers = [200, (128, 'elu'), (128, 'elu'), (36, 'softmax')]
loss_pg = keras.losses.categorical_crossentropy
optimizer_pg = keras.optimizers.Adam(lr=1e-3)
# set reinforcement learning parameter
discount_factor = 0.97
# select file path to import from, None yields a freshly initialized model
file = None

agent_pg = PG(layers, loss_pg, optimizer_pg, discount_factor, file)

In [None]:
# set number of iterations, episodes per iteration, and maximum number of steps per episode
iterations = 500
episodes = 50
max_steps = 2000
# define whether to train the agent or not
train = True

best = None
mean_rewards = []
for i in range(iterations):
    all_rewards = []
    all_grads = []
    all_lines = []
    for e in range(episodes):
        current_rewards = []
        current_grads = []
        # Get initial observation
        env = Tetris(gui=not train)
        obs = env.obs()
        invalid = env.invalid()
        for s in range(max_steps):
            # Get action and corresponding gradient
            action, grads = agent_pg.run_policy(obs, invalid)
            # Perform the action to get new observation and reward data 
            obs, reward, done, invalid = env.step(action)
            # Save reward / gradient in current_rewards / current_gradients
            current_rewards.append(reward)
            current_grads.append(grads)
            # Exit loop if game over
            if done:
                break
        # Save lists current_rewards / current_grads in all_rewards / all_grads
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
        all_lines.append(env.lines)
    mean_reward = np.sum([r for e in all_rewards for r in e]) / episodes
    mean_rewards.append(mean_reward)
    if train:
        print('Iteration {0}/{1} - mean reward, lines: {2}, {3}'.format(i + 1, iterations, mean_reward, np.mean(all_lines)))
        # Save model if it received the highest mean reward
        if best is None or mean_reward >= best:
            agent_pg.save('tetris_pg')
            best = mean_reward
            print('Model saved.')
        # Use collected reward and gradient data to train agent
        agent_pg.apply_grads(all_rewards, all_grads)
# Plot mean rewards
plt.plot(range(iterations), mean_rewards)
plt.xlabel('Iteration')
plt.ylabel('Mean reward')

## Deep Q-Learning

In [None]:
# the input is of size 200 and there are 36 different actions (tetriminos, rotation states)
n_obs = 200
n_actions = 36
# set neural network parameters
hidden_layers = [(512, 'elu'), (512, 'elu')]
optimizer_dqn = keras.optimizers.Adam(lr=5e-4)
# set reinforcement learning parameters
discount_factor = 0.97
buffer_size = 250000
# select file path to import from, None yields a freshly initialized model
file = None

agent_dqn = DQN(n_obs, hidden_layers, n_actions, optimizer_dqn, discount_factor, buffer_size, file)

In [None]:
# set number of episodes and maximum number of steps per episode
episodes = 100000
max_steps = 2000
# number of episodes played before agent is trained (to fill replay buffer)
n_pretrain = 500
# update target model every ... episodes
update_target = 500
# set batch size for replay buffer
batch_size = 256
# exploration decays from ... to ... in ... steps
epsilon_decay = 1, 0.01, 60000
# define whether to train agent or not
train = True

best = None
total_rewards = []
for e in range(episodes):
    total_reward = 0
    # Initialize environment, get initial state and invalid moves
    env = Tetris(gui=not train)
    state = env.obs()
    invalid = env.invalid()
    for s in range(max_steps):
        # Get agent's action
        epsilon = max(epsilon_decay[0] - e / epsilon_decay[2], epsilon_decay[1]) if train else 0
        action = agent_dqn.play_one_step(state, epsilon, invalid)
        # Let environment perform action
        next_state, reward, done, invalid = env.step(action)
        total_reward += reward
        # Add experience to agent's replay buffer and update current state 
        agent_dqn.add_experience(state, action, reward, next_state, done, invalid)
        state = next_state
        # Exit loop if game over
        if done:
            break
    # Save and print game data
    total_rewards.append(total_reward)
    if train:
        print('Episode {0}/{1} - total reward, score: {2}, {3}'.format(e + 1, episodes, total_reward, env.score))
        # Save model if the highest reward has been collected
        if best is None or total_reward >= best: 
            agent_dqn.save('tetris_dqn')
            best = total_reward
            print('Model saved.')
        # Perform training step
        if e >= n_pretrain:
            agent_dqn.training_step(batch_size)
            if e % update_target == 0:
                agent_dqn.update_target_model()
# Plot total rewards
plt.plot(range(episodes), total_rewards)
plt.xlabel('Episode')
plt.ylabel('Reward')