In [1]:
#!pip install gymnasium[atari]
#!pip install gymnasium[accept-rom-license]
#!pip install --upgrade gym ale-py
#!pip install keyboard
#!pip install keras
#!pip install tensorflow
#!pip install wandb
import tensorflow as tf
from tensorflow import keras
import numpy as np
from collections import deque

from ale_py import ALEInterface
from ale_py.roms import SpaceInvaders
import pathlib
import gymnasium as gym
import wandb



In [2]:
#Inicializar interfaz
ale = ALEInterface()


In [3]:
#cargar juego
ale.loadROM(SpaceInvaders)

env = gym.make('ALE/SpaceInvaders-v5')

n_inputs = env.observation_space.shape[0]
n_outputs = env.action_space.n


In [4]:
main_nn = keras.Sequential([
    keras.layers.Conv2D(32, (8, 8), strides=4, activation='relu', input_shape=(210, 160, 3)),
    keras.layers.Conv2D(64, (4, 4), strides=2, activation='relu'),
    keras.layers.Conv2D(64, (3, 3), strides=1, activation='relu'),
    keras.layers.Flatten(),
    keras.layers.Dense(512, activation='relu'),
    keras.layers.Dense(n_outputs)
])

target_nn = keras.models.clone_model(main_nn)

optimizer = keras.optimizers.Adam(lr=0.01)
loss_fn = keras.losses.mean_squared_error

replay_buffer = deque(maxlen=10000)



In [5]:
def epsilon_greedy_policy(state, epsilon=0):
    if np.random.rand() < epsilon:
        return np.random.randint(n_outputs)
    else:

        if isinstance(state, tuple) and len(state) == 2 and isinstance(state[0], np.ndarray) and isinstance(state[1], dict):
            Q_values = main_nn.predict(state[0][np.newaxis])
        else:
            Q_values = main_nn.predict(state[np.newaxis])
            
        return np.argmax(Q_values[0])

In [6]:
def sample_experiences(batch_size):
    indices = np.random.randint(len(replay_buffer), size=batch_size)
    batch = [replay_buffer[index] for index in indices]
    states, actions, rewards, next_states, dones = [
        np.array([experience[field_index] for experience in batch], dtype=object)
        for field_index in range(5)]
    return states, actions, rewards, next_states, dones



In [7]:
def play_one_step(env, state, epsilon):
    action = epsilon_greedy_policy(state, epsilon)
    result = env.step(action)
    next_state, reward, done, _,_ = env.step(action)
    if next_state.dtype == np.uint8:
        replay_buffer.append((state, action, reward, next_state, done))
    return next_state, reward


In [8]:
discount_rate = 0.99


def training_step(batch_size):
    experiences = sample_experiences(batch_size)
    states, actions, rewards, next_states, dones = experiences
    next_Q_values = target_nn.predict(next_states.astype('float32'))
    max_next_Q_values = np.max(next_Q_values, axis=1)
    target_Q_values = (rewards + (1 - dones) * discount_rate * max_next_Q_values)
    target_Q_values = target_Q_values.reshape(-1, 1)
    mask = tf.one_hot(actions.astype('int32'), n_outputs)
    with tf.GradientTape() as tape:
        for i in range(len(states)):
            if isinstance(states[i], tuple):
                states[i] = states[i-1]
        all_Q_values = main_nn(tf.convert_to_tensor(np.stack([np.array(state, dtype=object) for state in states]).astype('float32')))
        Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True)
        loss = tf.reduce_mean(loss_fn(target_Q_values.astype('float32'), Q_values))
    grads = tape.gradient(loss, main_nn.trainable_variables)
    optimizer.apply_gradients(zip(grads, main_nn.trainable_variables))
    return loss.numpy()

In [None]:
#LOOP DEL JUEGO
wandb.init(project="my-space-invaders-project")
for episode in range(600):

    if episode == 590:
        env = gym.make('ALE/SpaceInvaders-v5',render_mode='human');
        
    obs = env.reset()
    total_reward = 0
    loss = 0
    for step in range(200):
        epsilon = max(1 - episode / 500, 0.01)
        obs, reward = play_one_step(env, obs, epsilon)

        if episode > 50:
            loss = training_step(32)
            wandb.log({"loss": loss})
        total_reward += reward

    wandb.log({"episode": episode, "total_reward": total_reward})
    print(f"Episode: {episode}")
    
main_nn.save('my_dqn.h5')

[34m[1mwandb[0m: W&B API key is configured. Use [1m`wandb login --relogin`[0m to force relogin


VBox(children=(Label(value='Waiting for wandb.init()...\r'), FloatProgress(value=0.016933333333327028, max=1.0…

Episode: 0
Episode: 1
Episode: 2
Episode: 3
Episode: 4
Episode: 5
Episode: 6
Episode: 7
Episode: 8
Episode: 9
Episode: 10
Episode: 11
Episode: 12
Episode: 13
Episode: 14
Episode: 15
Episode: 16
Episode: 17
Episode: 18
Episode: 19
Episode: 20
Episode: 21
Episode: 22
Episode: 23
Episode: 24
Episode: 25
Episode: 26


Episode: 27
Episode: 28
Episode: 29
Episode: 30
Episode: 31
Episode: 32
Episode: 33
Episode: 34
Episode: 35
Episode: 36
Episode: 37


Episode: 38
Episode: 39
Episode: 40
Episode: 41
Episode: 42
Episode: 43
Episode: 44
Episode: 45
Episode: 46


Episode: 47
Episode: 48
Episode: 49
Episode: 50


Episode: 51




Episode: 52
