In [None]:
# !apt-get install -y libglu1-mesa-dev freeglut3-dev mesa-common-dev
!pip install -qq gymnasium
!pip install -qq gymnasium[atari]
!pip install -qq gymnasium[accept-rom-license]

In [None]:
# Functional package
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers

# ENV package
import gymnasium as gym

# Util package
import random
import os
import json
from tqdm import tqdm
from collections import deque
from typing import Tuple, Deque

# Visualization package
import cv2
import seaborn as sns
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML
from tensorflow.keras.utils import plot_model

# Code for environment

In [None]:
# Create the Tetris environment
env = gym.make("ALE/Tetris-v5", frameskip = 4)
# Get state and action sizes
# The state size is just an array of pixel value of the game screen
state_size = (env.observation_space.shape[0], env.observation_space.shape[1], env.observation_space.shape[2])
action_size = env.action_space.n

In [None]:
state_size

In [None]:
#See what is in a environment step
env.reset()

In [None]:
state, reward, done, _, info = env.step(4)
state

# Code for Reinforcement Learning Agent

In [None]:
# Define the LSTM-based RL model
class TetrisLSTMAgent():
    def __init__(self, state_size: int, action_size: int):
        self.state_size = state_size
        self.action_size = action_size
        self.memory: Deque[Tuple[np.ndarray, int, float, np.ndarray, bool]] = deque(maxlen=2000)
        self.gamma = 0.95    # discount rate
        self.epsilon = 1.0   # exploration rate
        self.epsilon_min = 0.02
        self.epsilon_decay = 0.999
        self.learning_rate = 0.001
        self.model = self._build_model()
        self.train_history = {'loss':[]}

    def _build_model(self) -> tf.keras.Model:
        model = tf.keras.Sequential([
            layers.Input(shape=(None, *self.state_size)),
            layers.TimeDistributed(layers.Conv2D(32, (3, 3), activation='relu')),
            layers.TimeDistributed(layers.MaxPooling2D(2, 2)),
            layers.TimeDistributed(layers.Conv2D(64, (3, 3), activation='relu')),
            layers.TimeDistributed(layers.MaxPooling2D(2, 2)),
            layers.TimeDistributed(tf.keras.layers.Flatten()),
            layers.LSTM(64, return_sequences=True),
            layers.LSTM(32),
            layers.Dense(self.action_size, activation=tf.keras.activations.softmax),
        ])
        model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate))
        return model

    def remember(self,
                 state: np.ndarray,
                 action: int,
                 reward: float,
                 next_state: np.ndarray,
                 done: bool) -> None:
        '''
        Save the experience to the memory
        '''
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state: np.ndarray):
        '''
        The agent return an action based on the state. The action can be the
        a model prediction or random action base on epsilon (exploration rate)
        '''
        # Choose between random action or model action
        if np.random.rand() <= self.epsilon:
          return random.randrange(self.action_size)
        else:
          state = np.reshape(state, [1, 1, *self.state_size])
          act_values = self.model.predict(state, verbose=0)
          return np.argmax(act_values[0])

    def replay(self, batch_size: int):
        '''
        Train the model base using the saved memory on the environment
        '''
        minibatch = random.sample(self.memory, batch_size)
        state_batch = []
        Q_value_batch = []
        # have the model learning from mini batch by replaying it
        for state, action, reward, next_state, done in minibatch:
            # state = np.reshape(state, [1, 1, self.state_size])
            # next_state = np.reshape(next_state, [1, 1, self.state_size])
            state_batch.append(state)
            # set model target as the Q-value from the action
            target = reward
            if not done:
                next_reward = np.amax(
                               self.model.predict(np.expand_dims(next_state, axis=(0, 1)), verbose=0)[0]
                          )
                target = (reward + self.gamma * next_reward)
            # set the action value to target
            target_f = self.model.predict(np.expand_dims(state, axis=(0, 1)), verbose=0)[0]
            target_f[action] = target
            # Add Q_value to training batch
            Q_value_batch.append(target_f)

        # train model
        state_batch = np.array(state_batch)
        Q_value_batch = np.array(Q_value_batch)

        state_batch = np.expand_dims(state_batch, axis=1)
        Q_value_batch = np.expand_dims(Q_value_batch, axis=1)

        hist = self.model.fit(state_batch, Q_value_batch, epochs=1, verbose=0, batch_size=batch_size)
        self.train_history['loss'] += hist.history['loss']
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def plot_model(self, plot_name: str='model.png', save_to: str=''):
      plot_model(self.model, to_file=os.path.join(save_to, plot_name), show_shapes=True, show_layer_names=True)
      model_img = plt.imread(plot_name)
      plt.imshow(model_img)
      plt.axis('off')
      plt.figure(figsize=(10, 10))
      plt.show()

In [None]:
tf.keras.backend.clear_session()
# Initialize the agent
demo_agent = TetrisLSTMAgent(state_size, action_size)

In [None]:
plot_name = 'model_v1.png'
demo_agent.plot_model(plot_name)

# Training the agent

In [None]:
tf.keras.backend.clear_session()
# Initialize the agent
agent = TetrisLSTMAgent(state_size, action_size)

In [None]:
# Training parameters
EPISODES = 100
BATCH_SIZE = 100
EPISODE_MAX_STEPS = 5000

# Main training loop
for e in tqdm(range(EPISODES)):
    env.reset()
    start_state, reward, done, _ , info= env.step(env.action_space.sample())
    # state = np.reshape(start_state, [1, 1, state_size])
    state= start_state
    print(f'\n\nthis is episode {e}')
    step_counter = 0
    haft_batch_size = int(BATCH_SIZE/2.)
    for time in range(EPISODE_MAX_STEPS):  # Limit each episode number of steps
        #get action and next state with model
        action = agent.act(state)
        next_state, reward, done, _ , info= env.step(action)
        # next_state = np.reshape(next_state, [1, 1, state_size])
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        if done:
            print(f"episode: {e}/{EPISODES}, score: {time}, e: {agent.epsilon}")
            break
    
        # Train the model using agent memory
        if len(agent.memory) > BATCH_SIZE and step_counter==haft_batch_size:
            agent.replay(BATCH_SIZE)
            step_counter=0
    
        step_counter += 1
        
    # Save the trained model every episode
    agent.model.save('tetris_lstm_model.h5')
    
    # Log the result after each episode
    with open('train_log.json', 'w') as f:
            json.dump(agent.train_history, f)

In [None]:
plt.plot(agent.train_history['loss'])

# Testing

### Download model for testing: <a href="https://drive.google.com/drive/folders/1mSn44sBc_bvF4sGPB6Fy4gaaZ9iRv-PM?usp=sharing">link</a>


In [None]:


def display_video(frames):
    # Copied from: https://colab.research.google.com/github/deepmind/dm_control/blob/master/tutorial.ipynb
    orig_backend = matplotlib.get_backend()
    matplotlib.use('Agg')
    fig, ax = plt.subplots(1, 1, figsize=(5, 5))
    matplotlib.use(orig_backend)
    ax.set_axis_off()
    ax.set_aspect('equal')
    ax.set_position([0, 0, 1, 1])
    im = ax.imshow(frames[0])
    def update(frame):
        im.set_data(frame)
        return [im]
    anim = animation.FuncAnimation(fig=fig, func=update, frames=frames,
                                    interval=50, blit=True, repeat=False)
    return HTML(anim.to_html5_video())

# Create and wrap the environment
env = gym.make("ALE/Tetris-v5", render_mode="rgb_array", frameskip = 4)
frames = []
taken_actions = []
states = []

#change this base on where you save the downloaded model
Model_path = "tetris_lstm_model.h5"

# Test the trained model
env.reset()
state = env.step(env.action_space.sample())[0]
state = np.array(state, dtype=float)
state = np.reshape(state, [1, 1, *state_size])
for t in range(5000):

    action = np.argmax(agent.model.predict(state, verbose=0))
    taken_actions.append(action)

    next_state, reward, done, _, info = env.step(action)
    next_state = np.array(next_state)
    states.append(next_state)

    state = np.reshape(next_state, [1, 1, *state_size])


    frames.append(env.render())
    if done:
        break

env.close()
display_video(frames)