# First made Testapp using the NES Tetris Enviroment

In [None]:
import gym 

#important wrapper to change the action_space from 256 to 12
from nes_py.wrappers import JoypadSpace
from gym_tetris.actions import MOVEMENT

test = gym.make('TetrisA-v2')
test = JoypadSpace(test, MOVEMENT)

done = True
for step in range(1000):
    if done:
        state = test.reset()
    x = test.action_space.sample()
    print(x)
    state, reward, done, info = test.step(10)
    test.render()

test.close()

# Setup

In [None]:
#Configure to use GPU

import tensorflow as tf

gpus = tf.config.list_physical_devices('GPU')
print(len(gpus))
if gpus:
    try:
        tf.config.set_logical_device_configuration(gpus[0], [tf.config.LogicalDeviceConfiguration(memory_limit=6144)])
        logical_gpus = tf.config.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    
    except RuntimeError as e:
        # Virtual devices must be set before GPUs have been initialised
        print(e)

In [None]:
#Necessary pip installations

%pip install gym-tetris
%pip install gym 
%pip install keras
%pip install keras-rl2
%pip install JSAnimation
%pip install tensorflow.keras
%pip install tf-agents

### Stuff from the Collab Tutorial

In [None]:
#!sudo apt-get update
#!sudo apt-get install -y xvfb ffmpeg freeglut3-dev
# When using WLS

%pip install 'imageio==2.4.0'
%pip install pyvirtualdisplay
%pip install tf-agents[reverb]
%pip install pyglet
%pip install tf-keras
%pip install ale-py==0.8

In [None]:
import os
# Keep using keras-2 (tf-keras) rather than keras-3 (keras).
os.environ['TF_USE_LEGACY_KERAS'] = '1'

from __future__ import absolute_import, division, print_function

import base64
import imageio
import IPython
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import PIL.Image
import pyvirtualdisplay
import reverb
import gym 
import tensorflow as tf

from tf_agents.agents.dqn import dqn_agent
from tf_agents.drivers import py_driver
from tf_agents.environments import suite_gym
from tf_agents.environments import tf_py_environment
from tf_agents.eval import metric_utils
from tf_agents.metrics import tf_metrics
from tf_agents.networks import sequential
from tf_agents.policies import py_tf_eager_policy
from tf_agents.policies import random_tf_policy
from tf_agents.replay_buffers import reverb_replay_buffer
from tf_agents.replay_buffers import reverb_utils
from tf_agents.trajectories import trajectory
from tf_agents.specs import tensor_spec
from tf_agents.utils import common

# Set up a virtual display for rendering OpenAI gym environments.
display = pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start()

In [None]:
#Hyperparameters 

num_iterations = 30000 # @param {type:"integer"}

initial_collect_steps = 1000  # @param {type:"integer"}
collect_steps_per_iteration =   500# @param {type:"integer"}
replay_buffer_max_length = 100000  # @param {type:"integer"}

batch_size = 32  # @param {type:"integer"}
learning_rate = 0.001  # @param {type:"number"}
log_interval = 200  # @param {type:"integer"}

num_eval_episodes = 10  # @param {type:"integer"}
eval_interval = 1000  # @param {type:"integer"}

In [None]:
import gym 
from nes_py.wrappers import JoypadSpace
from gym_tetris.actions import SIMPLE_MOVEMENT
from tf_agents.environments import suite_gym
from tf_agents.environments import tf_py_environment
from tf_agents.environments.gym_wrapper  import GymWrapper
from tf_agents.environments import TFPyEnvironment
import tf_agents

# Here we load in the enviroment via two different functions
# suite_gym.load() is a tensorflow.agents function, which enables you to use the enviroment for a tf-agent
# however you will no longer be able to restrict the action_space as action_spec is used 

env_name = 'TetrisA-v2'
tester = suite_gym.load(env_name)
env = gym.make(env_name)
env = JoypadSpace(env, SIMPLE_MOVEMENT)
#env = tf_agents.environments.gym_wrapper.GymWrapper(env)
#env = tf_agents.environments.TimeLimit(env) 
#tf_env = TFPyEnvironment(env)

We cannot really preprocess the Tetris enviroment above, by restricting the action_space etc. when using tf-agents later.
Thats why we swapped the to Open AI Gym Tetris enviroment under Atari Games, which has an action_space of 5 and needs less preprocessing therefore.

In [None]:
import gym 

# One big issue we thought is the retunred reward being 0, even if you lose the game -> done = True
# However you will not be able to use the enviroment properly if try and apply these wrappers

class ModifiedRewardEnv(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)
        # Additional initialization if needed

    def step(self, action):
        observation, reward, done, info = self.env.step(action)
        
        # Apply penalty if the reward is not positive
        if reward <= 0:
            reward -= 0.2  # Apply a penalty of -0.2 for each time step without positive reward
        observation = np.expand_dims(observation, axis=0)
        
        return observation, reward, done, info


In the following we try two different appraoches loading the Open AI Gym Tetris enviroment, with different steps of preprocessing. However all these approaches either return errors later or we end up with an agent that cannot learn anything at all.

In [None]:
env_name = 'ALE/Tetris-v5'
env = gym.make(env_name)
#env = ModifiedRewardEnv(env)

In [None]:
import tensorflow as tf
from tf_agents.environments import TFPyEnvironment
from tf_agents.environments import suite_atari
from tf_agents.environments.atari_preprocessing import AtariPreprocessing
from tf_agents.environments.atari_wrappers import FrameStack4

env_name = 'ALE/Tetris-v5'
env = suite_atari.load(env_name,max_episode_steps=27000,
     gym_env_wrappers=[AtariPreprocessing, FrameStack4])
env = TFPyEnvironment(env)

It is common to have a train_env and an eval_env

In [None]:
train_py_env = suite_atari.load(env_name,max_episode_steps=27000,
     gym_env_wrappers=[AtariPreprocessing, FrameStack4])
eval_py_env = suite_atari.load(env_name,max_episode_steps=27000,
     gym_env_wrappers=[AtariPreprocessing, FrameStack4])

train_env = tf_py_environment.TFPyEnvironment(train_py_env )
eval_env = tf_py_environment.TFPyEnvironment(eval_py_env)

Here we try to use a network inspired by this paper: https://cs231n.stanford.edu/reports/2016/pdfs/121_Report.pdf.
However we do not have enough computational power to implement and use the actual QNetwork strcuture.

In [None]:
import tensorflow as tf 
from tf_agents.networks import network

#input_shape = (210, 160, 3), without Atari-preprocessing
# Atari preprocessing makes it Grayscale and crops it to 84x84

input_tensor_spec = tensor_spec.from_spec(env.observation_spec())
action_tensor_spec = tensor_spec.from_spec(env.action_spec())
num_actions = action_tensor_spec.maximum - action_tensor_spec.minimum + 1


class CustomQNetwork(network.Network):

    def __init__(self, input_tensor_spec, name='CustomQNetwork'):
        super(CustomQNetwork, self).__init__(
            input_tensor_spec=input_tensor_spec,
            state_spec=(),
            name=name)
        
        #This architecture was 

        self._layers = [
            tf.keras.layers.Conv2D(16, (3, 3), activation='relu'),
            tf.keras.layers.Conv2D(16, (3, 3), activation='relu'),
            tf.keras.layers.Conv2D(32, (3, 3), activation='relu'),
            tf.keras.layers.Conv2D(32, (1, 1), activation='relu'),
            tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
            tf.keras.layers.Conv2D(64, (1, 1), activation='relu'),
            tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(256, activation='relu'),
            tf.keras.layers.Dropout(0.25),
            tf.keras.layers.Dense(num_actions, activation='linear')
        ]

    def call(self, observations, step_type=None, network_state=()):
        # Convert observations to float32
        observations = tf.cast(observations, tf.float32)
        output = observations
        for layer in self._layers:
            output = layer(output)
        return output, network_state



q_net = CustomQNetwork(
    input_tensor_spec)

# Initialize optimizer and train step counter
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
train_step_counter = tf.Variable(0)

# Create DQN agent
agent = dqn_agent.DqnAgent(
    env.time_step_spec(),
    env.action_spec(),
    q_network=q_net,
    optimizer=optimizer,
    td_errors_loss_fn=common.element_wise_squared_loss,
    train_step_counter=train_step_counter)

# Initialize agent
agent.initialize()


From here on follow multiple QNetwork approaches:

In [None]:
import tensorflow as tf
from tf_agents.networks import network
from tf_agents.utils import common
from tf_agents.specs import tensor_spec
from tf_agents.agents.dqn import dqn_agent

class QNetwork(network.Network):

    def __init__(self, input_tensor_spec, num_actions, name=None):
        super(QNetwork, self).__init__(
            input_tensor_spec=input_tensor_spec,
            state_spec=(),  # No internal state
            name=name)

        self.layers = [
            # This lambda layer will convert input images to float32 and normalize their values
            #tf.keras.layers.Lambda(lambda x: tf.cast(x, tf.float32) / 255.0),
            tf.keras.layers.Conv2D(32, (8, 8), strides=(4, 4), activation='relu'),
            tf.keras.layers.Conv2D(64, (4, 4), strides=(2, 2), activation='relu'),
            tf.keras.layers.Conv2D(64, (3, 3), strides=(1, 1), activation='relu'),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(512, activation='relu'),
            tf.keras.layers.Dense(num_actions)
        ]

    def call(self, observation, step_type=None, network_state=()):
        del step_type  # unused
        x = observation
        for layer in self._sub_layers:
            x = layer(x)
        return x, network_state


# Assuming you have defined train_env somewhere above and set the learning_rate
input_tensor_spec = tensor_spec.from_spec(train_env.observation_spec())
num_actions = train_env.action_spec().maximum - train_env.action_spec().minimum + 1

q_net = QNetwork(input_tensor_spec, num_actions)

# Now the q_net should be compatible with DqnAgent as it expects a tf_agents.networks.Network type object
optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=1e-3)

train_step_counter = tf.Variable(0)

agent = dqn_agent.DqnAgent(
    train_env.time_step_spec(),
    train_env.action_spec(),
    q_network=q_net,
    optimizer=optimizer,
    td_errors_loss_fn=common.element_wise_squared_loss,
    train_step_counter=train_step_counter)

agent.initialize()

Finally we come to the policy used for our tf_agent the DqnAgent namely "agent"

In [None]:
eval_policy = agent.policy
collect_policy = agent.collect_policy

In [None]:
random_policy = random_tf_policy.RandomTFPolicy(env.time_step_spec(),
                                                env.action_spec())

In [None]:
time_step = env.reset()
random_policy.action(time_step)

In [None]:
# computes the average return for simulations of the game, using a random policy
def compute_avg_return(environment, policy, num_episodes=10):

  total_return = 0.0
  for _ in range(num_episodes):

    time_step = environment.reset()
    episode_return = 0.0

    while not time_step.is_last():
      action_step = policy.action(time_step)
      #print(time_step)
      result = environment.step(1)
      num_items = len(result)
      print(num_items)
     
      print(action_step.action())
      time_step = environment.step(action_step.action)
      episode_return += time_step.reward
    total_return += episode_return

  avg_return = total_return / num_episodes
  return avg_return.numpy()[0]


Letting the Agent use a random policy to play the Tetris, it will always return 0. This is really weird as there is not punishment for failing/ game over when the last timestep is reached.

In [None]:
# Checking the reward_space
env.reward_spec()
observation, reward, terminated, truncated, info = env.step(1)

In [None]:
# One can change the 20 to run more simulations, but it will always return 0
compute_avg_return(env, random_policy, 20)

Next comes the setup of the memory before we come to the training

In [None]:
#setting up the memory

table_name = 'uniform_table'
replay_buffer_signature = tensor_spec.from_spec(
      agent.collect_data_spec)
replay_buffer_signature = tensor_spec.add_outer_dim(
    replay_buffer_signature)

table = reverb.Table(
    table_name,
    max_size=replay_buffer_max_length,
    sampler=reverb.selectors.Uniform(),
    remover=reverb.selectors.Fifo(),
    rate_limiter=reverb.rate_limiters.MinSize(1),
    signature=replay_buffer_signature)

reverb_server = reverb.Server([table])

replay_buffer = reverb_replay_buffer.ReverbReplayBuffer(
    agent.collect_data_spec,
    table_name=table_name,
    sequence_length=2,
    local_server=reverb_server)

rb_observer = reverb_utils.ReverbAddTrajectoryObserver(
  replay_buffer.py_client,
  table_name,
  sequence_length=2)

In [None]:
agent.collect_data_spec


In [None]:
agent.collect_data_spec._fields

In [None]:
py_driver.PyDriver(
    env,
    py_tf_eager_policy.PyTFEagerPolicy(
      random_policy, use_tf_function=True),
    [rb_observer],
    max_steps=initial_collect_steps).run(train_py_env.reset())

In [None]:
iter(replay_buffer.as_dataset()).next()

In [None]:
# Dataset generates trajectories with shape [Bx2x...]
dataset = replay_buffer.as_dataset(
    num_parallel_calls=3,
    sample_batch_size=batch_size,
    num_steps=2).prefetch(3)

dataset

In [None]:
iterator = iter(dataset)
print(iterator)

In [None]:
iterator.next()

This is the training loop. All examples of DQN-Agents and QNetwokrs we were able to get to work, could not learn anything at all over multiple hours.

In [None]:
#train loop

# (Optional) Optimize by wrapping some of the code in a graph using TF function.
agent.train = common.function(agent.train)

# Reset the train step.
agent.train_step_counter.assign(0)

# Evaluate the agent's policy once before training.
avg_return = compute_avg_return(eval_env, agent.policy, num_eval_episodes)
returns = [avg_return]

# Reset the environment.
time_step = train_py_env.reset()

# Create a driver to collect experience.
collect_driver = py_driver.PyDriver(
    env,
    py_tf_eager_policy.PyTFEagerPolicy(
      agent.collect_policy, use_tf_function=True),
    [rb_observer],
    max_steps=collect_steps_per_iteration)

for _ in range(num_iterations):

  # Collect a few steps and save to the replay buffer.
  time_step, _ = collect_driver.run(time_step)

  # Sample a batch of data from the buffer and update the agent's network.
  experience, unused_info = next(iterator)
  train_loss = agent.train(experience).loss

  step = agent.train_step_counter.numpy()

  if step % log_interval == 0:
    print('step = {0}: loss = {1}'.format(step, train_loss))

  if step % eval_interval == 0:
    avg_return = compute_avg_return(eval_env, agent.policy, num_eval_episodes)
    print('step = {0}: Average Return = {1}'.format(step, avg_return))
    returns.append(avg_return)

After the training a plot/metric is showing the training progress and a video of our agent in the enviroment is created

In [None]:
#Metric

iterations = range(0, num_iterations + 1, eval_interval)
plt.plot(iterations, returns)
plt.ylabel('Average Return')
plt.xlabel('Iterations')
plt.ylim(top=250)

In [None]:
#Creates a Video of the DQN playing Tetris, but nothing is learned

def embed_mp4(filename):
  """Embeds an mp4 file in the notebook."""
  video = open(filename,'rb').read()
  b64 = base64.b64encode(video)
  tag = '''
  <video width="640" height="480" controls>
    <source src="data:video/mp4;base64,{0}" type="video/mp4">
  Your browser does not support the video tag.
  </video>'''.format(b64.decode())

  return IPython.display.HTML(tag)

In [None]:
def create_policy_eval_video(policy, filename, num_episodes=5, fps=30):
  filename = filename + ".mp4"
  with imageio.get_writer(filename, fps=fps) as video:
    for _ in range(num_episodes):
      time_step = eval_env.reset()
      video.append_data(eval_py_env.render())
      while not time_step.is_last():
        action_step = policy.action(time_step)
        time_step = eval_env.step(action_step.action)
        video.append_data(eval_py_env.render())
  return embed_mp4(filename)

create_policy_eval_video(agent.policy, "trained-agent")

In [None]:
create_policy_eval_video(random_policy, "random-agent")

This was our try to make a TF_agent learn Tetris

Below we tried to build a DQN by using the classes, but our actual approach were via the TF-agents

In [None]:
import gym 
import tensorflow as tf
from nes_py.wrappers import JoypadSpace
from gym_tetris.actions import SIMPLE_MOVEMENT
from tf_agents.environments import suite_gym
from tf_agents.environments import tf_py_environment
from tf_agents.environments.gym_wrapper  import GymWrapper
from tf_agents.environments import TFPyEnvironment
import tf_agents


env_name = 'ALE/Tetris-v5'
env = suite_gym.load(env_name)

In [None]:
from collections import deque
import random

class DQNAgent:
    def __init__(self, state_shape, action_size):
        self.state_shape = state_shape
        self.action_size = action_size
        self.memory = deque(maxlen=30000)
        self.gamma = 0.99  # discount rate
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()

    def _build_model(self):
        # Neural Net for Deep-Q learning Model
        model = tf.keras.Sequential([
            tf.keras.layers.Dense(128, input_dim=self.state_shape[0], activation='relu'),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(self.action_size, activation='linear')
        ])
        model.compile(loss='mse',
                      optimizer=tf.keras.optimizers.Adam(lr=self.learning_rate))
        return model

    def update_target_model(self):
        # copy weights from model to target_model
        self.target_model.set_weights(self.model.get_weights())

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])  # returns action

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma *
                          np.amax(self.target_model.predict(next_state)[0]))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)


In [None]:
env   
state_shape = (env.observation_space,)  
action_size = env.action_space.n
print(env.observation_space.shape[0])
print(env.action_space.n) 
agent = DQNAgent(state_shape, action_size)
batch_size = 32

EPISODES = 2000
for e in range(EPISODES):
    state = env.reset()
    state = np.reshape(state, [1, state_shape[0]])
    for time in range(500):  # Adjust max time based on your environment
        # env.render()
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        reward = reward if not done else -10
        next_state = np.reshape(next_state, [1, state_shape[0]])
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        if done:
            agent.update_target_model()
            print("episode: {}/{}, score: {}, e: {:.2}"
                  .format(e, EPISODES, time, agent.epsilon))
            break
        if len(agent.memory) > batch_size:
            agent.replay(batch_size)