# DQN Tetris Training - GPU L4 Version

Ce notebook entraîne un agent DQN sur Tetris

In [None]:
import numpy as np
from keras import layers
import gymnasium as gym
import ale_py
import tensorflow as tf
import random
from collections import deque
import time
import os

---

In [None]:
# Configuration GPU
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print(f"GPU disponible(s): {len(gpus)}")
        print(f"GPU utilisé: {gpus[0].name}")
    except RuntimeError as e:
        print(e)
else:
    print("Aucun GPU détecté, utilisation du CPU")

---

In [None]:
# Configuration Threading (optimisé pour GPU L4)
os.environ["OMP_NUM_THREADS"] = "8"
tf.config.threading.set_intra_op_parallelism_threads(8)
tf.config.threading.set_inter_op_parallelism_threads(8)

# Enregistrer les jeux ALE
gym.register_envs(ale_py)

---

In [None]:
# Hyperparamètres optimisés pour GPU L4
env_name = "ALE/Tetris-v5"
learning_rate = 0.001
epsilon = 1.0
epsilon_min = 0.05
epsilon_decay = 0.995
gamma = 0.99
batch_size = 256  # Plus grand batch pour GPU
memory_size = 50000
episodes = 500
update_target_frequency = 10
render_mode = None  # Pas de rendu pour GPU
loss = 'huber'
train_interval = 2  # Entraîner tous les 2 pas

# Custom reward system variables
one_line_clear_reward = 100         # reward for clearing one line
two_lines_clear_reward = 250        # reward for clearing two lines
three_lines_clear_reward = 450      # reward for clearing three lines
four_lines_clear_reward = 800       # reward for clearing four lines (Tetris!)

survival_reward = 1                 # reward per step (survival bonus)
game_over_penalty = -200            # penalty for game over

positive_reward_multiplier = 10     # multiplier for positive original rewards
negative_reward_multiplier = 5      # multiplier for negative original rewards
score_increase_divisor = 100        # divisor to estimate lines cleared from score

---

In [None]:
# Initialiser l'environnement Tetris
env = gym.make(env_name, render_mode=render_mode)

observation_shape = env.observation_space.shape  # (210, 160, 3)
action_space = env.action_space.n  # 5 actions

---

In [None]:
# Fonction pour prétraiter l'observation (image)
def preprocess_observation(obs):
    """Convertir l'image RGB en tableau réduit et normalisé"""
    # Réduire la taille de l'image et convertir en niveaux de gris
    obs_gray = np.mean(obs, axis=2)  # Convertir en niveaux de gris
    obs_resized = tf.image.resize(obs_gray[..., np.newaxis], (84, 84))
    return obs_resized.numpy().flatten().astype(np.float32)

state_shape = preprocess_observation(env.reset()[0]).shape[0]
action_shape = action_space
print(f"État aplati: {state_shape}")
print(f"Actions: {action_shape}")

---

In [None]:
# Configuration des callbacks
os.makedirs('./saved_models', exist_ok=True)

callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor='loss',
        patience=8,
        restore_best_weights=True,
        verbose=0
    ),
    tf.keras.callbacks.ModelCheckpoint(
        filepath='./saved_models/dqn_model_best.weights.h5',
        monitor='loss',
        save_best_only=True,
        save_weights_only=True,
        verbose=0
    ),
    tf.keras.callbacks.TensorBoard(
        log_dir='logs',
        histogram_freq=0,
        write_graph=False,
        write_images=False,
        update_freq='epoch'
    ),
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor='loss',
        factor=0.5,
        patience=5,
        min_lr=1e-6,
        verbose=0
    )
]

---

In [None]:
# Création du modèle Q
def create_q_model():
    model = tf.keras.Sequential(
        [
            layers.Dense(512, activation='relu', input_shape=(state_shape, )),
            layers.Dropout(0.2),
            layers.Dense(256, activation='relu'),
            layers.Dropout(0.2),
            layers.Dense(128, activation='relu'),
            layers.Dense(action_shape, activation='linear')
        ])
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss=loss,
        jit_compile=True  # Compilation XLA pour accélérer
    )
    return model

q_model = create_q_model()
target_model = create_q_model()
target_model.set_weights(q_model.get_weights())

print("\nArchitecture du modèle:")
q_model.summary()

---

In [None]:
# Initialisation de la mémoire de replay
memory = deque(maxlen=memory_size)

def store_transition(state, action, reward, next_state, done):
    memory.append((state, action, reward, next_state, done))

---

In [None]:
# Echantillonnage d'un batch de transition
def sample_batch():
    batch = random.sample(memory, batch_size)
    state, action, reward, next_state, done = map(np.asarray, zip(*batch))
    return np.array(state), np.array(action), np.array(reward), np.array(next_state), np.array(done)

---

In [None]:
# Politique Epsilon-greedy
def epsilon_greedy_policy(state, epsilon):
    """Politique Epsilon-greedy"""
    if np.random.random() < epsilon:
        # Action aléatoire (exploration)
        return np.random.choice(action_shape)
    else:
        # Meilleure action selon le modèle Q (exploitation)
        q_values = q_model(state[np.newaxis], training=False)
        return np.argmax(q_values[0])

---

In [None]:
# Fonction de récompense personnalisée
def calculate_custom_reward(info, prev_info, reward, done):
    """
    Calculates a custom reward based on several game metrics using global variables
    
    Uses global variables for easy tuning:
    - one_line_clear_reward, two_lines_clear_reward, three_lines_clear_reward, four_lines_clear_reward
    - survival_reward, game_over_penalty
    - positive_reward_multiplier, negative_reward_multiplier
    - score_increase_divisor
    """
    custom_reward = 0
    
    # Massive penalty if game over
    if done:
        custom_reward += game_over_penalty
        return custom_reward
    
    # Extract information if available
    current_score = info.get('score', 0)
    prev_score = prev_info.get('score', 0) if prev_info else 0
    
    # Reward based on score increase
    score_increase = current_score - prev_score
    
    # Detection of completed lines (score usually increases in steps)
    if score_increase > 0:
        # Estimation of number of lines (adjusted according to game scoring)
        lines_cleared = score_increase // score_increase_divisor
        
        if lines_cleared == 1:
            custom_reward += one_line_clear_reward
        elif lines_cleared == 2:
            custom_reward += two_lines_clear_reward
        elif lines_cleared == 3:
            custom_reward += three_lines_clear_reward
        elif lines_cleared >= 4:
            custom_reward += four_lines_clear_reward
        else:
            custom_reward += score_increase  # Small bonus for other actions
    
    # survival reward (encourage staying alive)
    custom_reward += survival_reward
    
    # Bonus for positive original reward (successful placement)
    if reward > 0:
        custom_reward += reward * positive_reward_multiplier
    
    # Penalty for negative original reward
    if reward < 0:
        custom_reward += reward * negative_reward_multiplier
    
    return custom_reward

In [None]:
# Fonction d'entraînement avec retour de loss
def train_step():
    if len(memory) < batch_size:
        return None
    state, action, reward, next_state, done = sample_batch()

    # Forward propagation
    next_q_values = target_model(next_state, training=False)
    max_next_q_values = np.max(next_q_values, axis=1)

    target_q_values = q_model(state, training=False).numpy()
    for i, act in enumerate(action):
        target_q_values[i][act] = reward[i] if done[i] else reward[i] + gamma * max_next_q_values[i]

    # Train with verbose to capture loss
    history = q_model.fit(state, target_q_values, verbose=0, callbacks=callbacks, batch_size=batch_size)
    
    return history.history['loss'][0] if history.history['loss'] else None

---

In [None]:
# Boucle d'entraînement
reward_history = []
loss_history = []
training_steps_per_episode = []

for episode in range(episodes):
    start = time.time()
    obs, info = env.reset()
    state = preprocess_observation(obs)
    total_reward = 0
    total_custom_reward = 0
    done = False
    steps = 0
    prev_info = info.copy()
    episode_losses = []
    training_steps = 0

    while not done:
        action = epsilon_greedy_policy(state, epsilon)
        
        # Exécuter l'action dans l'environnement
        obs, reward, terminated, truncated, info = env.step(action)
        next_state = preprocess_observation(obs)
        
        # Déterminer si l'épisode est terminé
        done = terminated or truncated

        # Calculer la récompense personnalisée
        custom_reward = calculate_custom_reward(info, prev_info, reward, done)
        
        # Stocker la transition en mémoire avec la récompense personnalisée
        store_transition(state, action, custom_reward, next_state, done)
        total_reward += reward
        total_custom_reward += custom_reward

        state = next_state
        prev_info = info.copy()
        steps += 1
        
        # Entraîner tous les train_interval pas
        if steps % train_interval == 0:
            loss = train_step()
            if loss is not None:
                episode_losses.append(loss)
                training_steps += 1

    end = time.time()
    timelength = end - start
        
    epsilon = max(epsilon_min, epsilon * epsilon_decay)

    if episode % update_target_frequency == 0:
        target_model.set_weights(q_model.get_weights())
    
    reward_history.append(total_custom_reward)
    training_steps_per_episode.append(training_steps)
    
    # Calculate episode stats
    avg_loss = np.mean(episode_losses) if episode_losses else 0
    min_loss = np.min(episode_losses) if episode_losses else 0
    max_loss = np.max(episode_losses) if episode_losses else 0
    loss_history.append(avg_loss)
    
    # Display comprehensive stats
    print(f"Episode: {episode}/{episodes} | Original: {total_reward:.0f} | Custom: {total_custom_reward:.0f} | Epsilon: {epsilon:.3f} | Time: {timelength:.2f}s | Steps: {steps} | Memory: {len(memory)}")
    print(f"  └─ Loss → Avg: {avg_loss:.6f} | Min: {min_loss:.6f} | Max: {max_loss:.6f} | Trainings: {training_steps}")

    # Sauvegarder le modèle tous les 50 épisodes
    if (episode + 1) % 50 == 0:
        q_model.save_weights(f'./saved_models/dqn_model_episode_{episode+1}.weights.h5')
        print(f"\n{'='*100}")
        print(f"Model save epoch n {episode+1}")
        print(f"Reward → Best: {max(reward_history):.0f} | Avg (50 last): {np.mean(reward_history[-50:]):.0f}")
        print(f"Loss   → Avg (50 last): {np.mean(loss_history[-50:]):.6f}")
        print(f"{'='*100}\n")

env.close()

# Sauvegarder le modèle final
q_model.save_weights('./saved_models/dqn_model_final.weights.h5')
print("\n" + "="*100)
print("TRAINING COMPLETE")
print("="*100)
print(f"Total Episodes: {episodes}")
print(f"Best Reward: {max(reward_history):.0f}")
print(f"Average Reward: {np.mean(reward_history):.0f}")
print(f"Final Average Reward (last 50): {np.mean(reward_history[-50:]):.0f}")
print(f"Average Loss: {np.mean(loss_history):.6f}")
print(f"Final Average Loss (last 50): {np.mean(loss_history[-50:]):.6f}")
print(f"Total Training Steps: {sum(training_steps_per_episode)}")
print(f"Average Training Steps per Episode: {np.mean(training_steps_per_episode):.2f}")
print(f"Model saved to: ./saved_models/dqn_model_final.weights.h5")
print("="*100)

---

In [None]:
# Visualisation des résultats
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(reward_history)
plt.title('Récompenses par épisode')
plt.xlabel('Épisode')
plt.ylabel('Récompense totale')
plt.grid(True)

plt.subplot(1, 2, 2)
window = 20
moving_avg = np.convolve(reward_history, np.ones(window)/window, mode='valid')
plt.plot(moving_avg)
plt.title(f'Moyenne mobile (fenêtre={window})')
plt.xlabel('Épisode')
plt.ylabel('Récompense moyenne')
plt.grid(True)

plt.tight_layout()
plt.savefig('./saved_models/training_progress.png')
plt.show()

print(f"\nRécompense moyenne: {np.mean(reward_history):.2f}")
print(f"Récompense maximale: {np.max(reward_history):.2f}")
print(f"Récompense finale (derniers 20 épisodes): {np.mean(reward_history[-20:]):.2f}")