In [None]:
import numpy as np
import tensorflow as tf

import tf_agents as tfa

In [None]:
class TetrisEnv(tfa.environments.PyEnvironment):

    def __init__(self):
        # 0 - move left
        # 1 - move right
        # 2 - move down
        # 3 - rotate
        self._action_spec = tfa.specs.array_spec.BoundedArraySpec(
            shape = (),
            dtype = np.int32,
            minimum = 0,
            maximum = 3,
            name = 'action'
        )
        self._observation_spec = tfa.specs.array_spec.BoundedArraySpec(
            shape = (10,18),
            dtype = np.int32,
            minimum = 0,
            maximum = 17,
            name = 'observation'
        )
        
        self._state = np.random.randint(30, 40)
        
        self._step_counter = 0

    def action_spec(self):
        return self._action_spec
    
    def observation_spec(self):
        return self._observation_spec
    
    def _observe(self):
        return np.array([self._position], dtype = np.int32)
    
    def _reset(self):
        self._position = 0
        self._step_counter = 0
        return tfa.trajectories.time_step.restart(self._observe())
    
    def _step(self, action):
        if abs(self._position) >= 3 or self._step_counter >= 10:
            return self.reset()
        
        self._step_counter += 1
        
        if action == 0:
            self._position -= 1
        elif action == 1:
            pass
        elif action == 2:
            self._position += 1
        else:
            raise ValueError('`action` should be 0 (left), 1 (do nothing) or 2 (right). You gave `%s`' % action)
        
        reward = 0
        if self._position >= 3:
            reward = 1
        elif self._position <= -3 or self._step_counter >= 10:
            reward = -1
        
        if reward != 0:
            return tfa.trajectories.time_step.termination(
                self._observe(),
                reward
            )
        else:
            return tfa.trajectories.time_step.transition(
                self._observe(),
                reward = 0, 
                discount = 1.0 
            )

tfa.environments.utils.validate_py_environment(TetrisEnv(), episodes=10)

In [None]:
train_py_env = TetrisEnv()
test_py_env = TetrisEnv()
train_env = tfa.environments.tf_py_environment.TFPyEnvironment(train_py_env)
test_env = tfa.environments.tf_py_environment.TFPyEnvironment(test_py_env)

#### Hier ist der entgültige Agent. Er muss nur noch mit dem Tetris-Spiel kompatibel gemacht werden. Das heißt:
- "self._state" muss block_grid entsprechen
- "self._step()" muss Blöcke bewegen und rotieren können
- Restliche Kompatibilitätsprobleme müssen behoben werden sofern welche vorhanden sind

In [None]:
q_network = tfa.networks.sequential.Sequential([
    tf.keras.layers.Dense(16, activation = 'relu'),
    tf.keras.layers.Dense(3, activation = None) 
])

agent = tfa.agents.dqn.dqn_agent.DqnAgent(
    train_env.time_step_spec(),
    train_env.action_spec(),
    q_network = q_network,
    optimizer = tf.keras.optimizers.Adam(),
    td_errors_loss_fn = tfa.utils.common.element_wise_squared_loss,
    n_step_update = 1
)

agent.initialize()

agent.train = tfa.utils.common.function(agent.train)

In [None]:
def evaluate_policy(env, policy, episodes = 10):
    total_reward = 0.0
    total_steps = 0
    
    for ep in range(episodes):
        time_step = env.reset()
        
        total_reward += time_step.reward.numpy()[0]
        
        while not time_step.is_last(): 
            action_step = policy.action(time_step)
            action_tensor = action_step.action
            action = action_tensor.numpy()[0]
            
            time_step = env.step(action)
            
            total_reward += time_step.reward.numpy()[0]
            total_steps += 1
        
    average_reward = total_reward / episodes
    average_ep_length = total_steps / episodes
    return average_reward, average_ep_length

avg_reward, avg_length = evaluate_policy(test_env, agent.policy)
print("initial policy gives average reward of %.2f after an average %d steps" % (avg_reward, avg_length))

In [None]:
replay_buffer = tfa.replay_buffers.tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec = agent.collect_data_spec,
    batch_size = train_env.batch_size,
    max_length = 10000
)

replay_dataset = replay_buffer.as_dataset(
    num_parallel_calls = 3, 
    sample_batch_size = 64,
    num_steps = 2
).prefetch(3)

def record_experience(buffer, time_step, action_step, next_time_step):
    buffer.add_batch(
        tfa.trajectories.trajectory.from_transition(time_step, action_step, next_time_step)
    )

replay_dataset_iterator = iter(replay_dataset)

In [None]:
time_step = train_env.reset()

episode_length_history = []
reward_history = []

for step in range(10 + 1):
    
    for _ in range(10):
        action_step = agent.collect_policy.action(time_step)
        
        action_tensor = action_step.action
        action = action_tensor.numpy()[0]
        new_time_step = train_env.step(action)
        
        reward = new_time_step.reward.numpy()[0]
        
        record_experience(replay_buffer, time_step, action_step, new_time_step)
        
        time_step = new_time_step
    
    training_experience, unused_diagnostics_info = next(replay_dataset_iterator)
    
    train_step = agent.train(training_experience)
    loss = train_step.loss
    
    print("step: %d, loss: %d" % (step, loss))
    
    if step % 100 == 0:
        avg_reward, avg_length = evaluate_policy(test_env, agent.policy)
        print("average reward: %.2f average steps: %d" % (avg_reward, avg_length))
        
        reward_history.append(avg_reward)
        episode_length_history.append(avg_length)