In [None]:
import numpy as np
import random
import tensorflow as tf
from tensorflow.keras import layers
from collections import deque
import tkinter as tk
import time
from typing import List, Tuple
from datetime import datetime
from collections import deque
import pickle
import os

# Fonction pour initialiser la grille avec deux tuiles au départ
def initialize_game() -> List[List[int]]:
    """
    Initialise une grille de jeu 4x4 avec deux tuiles (2 ou 4) placées aléatoirement.

    Returns:
        List[List[int]]: Grille de jeu initialisée avec deux tuiles.
    """
    board = [[0] * 4 for _ in range(4)]
    add_new_tile(board)
    add_new_tile(board)
    return board

# Fonction pour ajouter une nouvelle tuile (2 ou 4) à une position vide
def add_new_tile(board: List[List[int]]) -> None:
    """
    Ajoute une nouvelle tuile (2 ou 4) à une position vide dans la grille.

    Args:
        board (List[List[int]]): La grille de jeu où ajouter une nouvelle tuile.
    """
    empty_tiles = [(r, c) for r in range(4) for c in range(4) if board[r][c] == 0]
    if empty_tiles:
        r, c = random.choice(empty_tiles)
        board[r][c] = 2 if random.random() < 0.85 else 4

# Fonction pour fusionner une ligne ou une colonne de tuiles
def merge(line: List[int]) -> Tuple[List[int], int]:
    """
    Fusionne les tuiles identiques dans une ligne ou colonne, selon les règles du jeu.
    Les tuiles adjacentes de même valeur fusionnent et le score est mis à jour en conséquence.

    Args:
        line (List[int]): Ligne ou colonne de tuiles à fusionner (une liste d'entiers).

    Returns:
        Tuple[List[int], int]: La ligne fusionnée avec les tuiles à gauche, et le score total obtenu.
    """
    non_zero = [num for num in line if num != 0]  # Supprime les zéros
    merged = []
    score = 0  # Score pour cette ligne/colonne
    skip = False

    for i in range(len(non_zero)):
        if skip:
            skip = False
            continue
        if i + 1 < len(non_zero) and non_zero[i] == non_zero[i + 1]:
            merged.append(non_zero[i] * 2)
            score += non_zero[i] * 2  # Ajoute la valeur fusionnée au score
            skip = True  # Saute la case suivante car elle a fusionné
        else:
            merged.append(non_zero[i])

    # Complète avec des zéros à droite pour maintenir la taille de la ligne
    merged.extend([0] * (len(line) - len(merged)))
    return merged, score  # Retourne la ligne fusionnée et le score


# Fonction pour déplacer et fusionner les tuiles à gauche
def move_left(board: List[List[int]]) -> Tuple[List[List[int]], int]:
    """
    Déplace et fusionne toutes les tuiles du plateau vers la gauche, en suivant les règles du jeu.
    Calcule également le score obtenu suite aux fusions.

    Args:
        board (List[List[int]]): Grille de jeu (4x4) avant le déplacement.

    Returns:
        Tuple[List[List[int]], int]: La grille mise à jour après le déplacement et le score total obtenu.
    """
    score = 0  # Initialiser le score pour ce mouvement
    for i in range(4):
        new_row, gained_score = merge(board[i])  # Fusionner les tuiles et obtenir le score
        board[i] = new_row  # Mettre à jour la ligne
        score += gained_score  # Ajouter le score obtenu lors de la fusion
    return board, score  # Retourner le plateau mis à jour et le score total

# Fonction pour déplacer et fusionner les tuiles à droite
def move_right(board: List[List[int]]) -> Tuple[List[List[int]], int]:
    """
    Déplace et fusionne toutes les tuiles du plateau vers la droite.
    Calcule le score total pour les fusions réalisées pendant le déplacement.

    Args:
        board (List[List[int]]): Grille de jeu (4x4) avant le déplacement.

    Returns:
        Tuple[List[List[int]], int]: La grille mise à jour après le déplacement et le score total obtenu.
    """
    score = 0
    for i in range(4):
        new_row, gained_score = merge(board[i][::-1])  # Fusionner après avoir inversé la ligne
        board[i] = new_row[::-1]  # Réinverser après la fusion pour maintenir l'ordre
        score += gained_score
    return board, score

# Fonction pour déplacer et fusionner les tuiles vers le haut
def move_up(board: List[List[int]]) -> Tuple[List[List[int]], int]:
    """
    Déplace et fusionne toutes les tuiles du plateau vers le haut.
    Calcule le score pour les fusions réalisées lors du déplacement.

    Args:
        board (List[List[int]]): Grille de jeu (4x4) avant le déplacement.

    Returns:
        Tuple[List[List[int]], int]: La grille mise à jour après le déplacement et le score total obtenu.
    """
    score = 0
    for col in range(4):
        column = [board[row][col] for row in range(4)]  # Récupère la colonne actuelle
        new_column, gained_score = merge(column)  # Fusionner les tuiles de la colonne
        score += gained_score  # Ajouter le score de cette colonne
        for row in range(4):
            board[row][col] = new_column[row]  # Mettre à jour la colonne dans la grille
    return board, score

# Fonction pour déplacer et fusionner les tuiles vers le bas
def move_down(board: List[List[int]]) -> Tuple[List[List[int]], int]:
    """
    Déplace et fusionne toutes les tuiles du plateau vers le bas.
    Calcule le score pour les fusions réalisées pendant le déplacement.

    Args:
        board (List[List[int]]): Grille de jeu (4x4) avant le déplacement.

    Returns:
        Tuple[List[List[int]], int]: La grille mise à jour après le déplacement et le score total obtenu.
    """
    score = 0
    for col in range(4):
        column = [board[row][col] for row in range(4)]  # Récupère la colonne actuelle
        new_column, gained_score = merge(column[::-1])  # Fusionner après avoir inversé la colonne
        score += gained_score  # Ajouter le score obtenu
        for row in range(4):
            board[row][col] = new_column[::-1][row]  # Réinverser pour revenir à l'ordre initial
    return board, score


class Game2048Env:
    """
    Environnement du jeu 2048, gérant la logique de déplacement, fusion des tuiles et suivi du score.
    """

    def __init__(self) -> None:
        """
        Initialise une instance de l'environnement du jeu 2048.
        """
        self.reset()

    def reset(self) -> np.ndarray:
        """
        Réinitialise le plateau de jeu en créant un nouveau plateau avec deux tuiles initiales.
        Réinitialise également le score total.

        Returns:
            np.ndarray: Le plateau de jeu (4x4) sous forme de tableau numpy.
        """
        self.board = initialize_game()
        self.total_score = 0  # Réinitialise le score total du jeu
        return np.array(self.board)

    def step(self, action: int) -> Tuple[np.ndarray, int, bool]:
        """
        Effectue une action de déplacement dans la direction indiquée, met à jour le plateau,
        ajoute une nouvelle tuile si le déplacement est valide, et met à jour le score.

        Args:
            action (int): L'action à exécuter (0: haut, 1: bas, 2: gauche, 3: droite).

        Returns:
            Tuple[np.ndarray, int, bool]: Le plateau de jeu mis à jour, le score obtenu sur ce mouvement, 
            et un booléen indiquant si la partie est terminée.
        """
        prev_board = np.copy(self.board)
        score = 0  # Initialiser le score pour ce mouvement

        if action == 0:
            self.board, score = move_up(self.board)
        elif action == 1:
            self.board, score = move_down(self.board)
        elif action == 2:
            self.board, score = move_left(self.board)
        elif action == 3:
            self.board, score = move_right(self.board)

        if not np.array_equal(prev_board, self.board):
            add_new_tile(self.board)  # Ajoute une nouvelle tuile si le plateau a changé

        self.total_score += score  # Met à jour le score total

        done = check_win(self.board) or check_game_over(self.board)  # Vérifie si la partie est terminée

        return np.array(self.board), score, done
    def get_score(self) -> int:
        """Retourne le score total actuel."""
        return self.total_score

    def get_board(self) -> np.ndarray:
        """Retourne le plateau de jeu actuel."""
        return np.array(self.board)

# Fonction pour vérifier si le joueur a atteint la tuile 2048 (condition de victoire)
def check_win(board: List[List[int]]) -> bool:
    """
    Vérifie si le joueur a gagné la partie en atteignant une tuile de valeur 2048.

    Args:
        board (List[List[int]]): Le plateau de jeu.

    Returns:
        bool: True si le joueur a gagné, sinon False.
    """
    return any(2048 in row for row in board)

# Fonction pour vérifier s'il reste des mouvements possibles (condition de défaite)
def check_game_over(board: List[List[int]]) -> bool:
    """
    Vérifie si la partie est terminée, c'est-à-dire s'il ne reste plus de mouvements possibles.

    Args:
        board (List[List[int]]): Le plateau de jeu.

    Returns:
        bool: True si le jeu est terminé, sinon False.
    """
    # Vérifie s'il reste des cases vides
    if any(0 in row for row in board):
        return False
    # Vérifie les fusions possibles dans les lignes
    for row in board:
        for i in range(3):
            if row[i] == row[i + 1]:
                return False
    # Vérifie les fusions possibles dans les colonnes
    for col in range(4):
        for i in range(3):
            if board[i][col] == board[i + 1][col]:
                return False
    return True


# Classe DQNAgent pour l'entraînement par renforcement
class DQNAgent:
    def __init__(self, input_shape, num_actions, model_file='dqn_model.keras', tau=1.0):
        self.num_actions = num_actions
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95  # Discount rate
        self.epsilon = 1.0  # Exploration rate
        self.epsilon_min = 0.1
        self.epsilon_decay = 0.985
        self.tau = max(tau, 1e-3) # Boltzmann temperature
        self.learning_rate = 0.001  # Learning rate

        # Création du modèle de Q-Network et du Target Network
        self.model = self.create_model(input_shape, num_actions)
        self.target_model = self.create_model(input_shape, num_actions)

        # Fichiers pour sauvegarder la mémoire et les poids
        self.model_file = model_file
        
        # Chargement de la mémoire et des poids si disponibles
        self.load_model(self.model_file)
        
    def load_model(self, filename):
        """Charge un modèle depuis un fichier, si disponible."""
        if os.path.exists(filename):
            self.model = tf.keras.models.load_model(filename)
            # Recrée le modèle cible pour qu'il soit identique au modèle chargé
            self.target_model = tf.keras.models.clone_model(self.model)
            self.target_model.build(self.model.input_shape)  # Initialise le modèle avec la même forme d'entrée
            self.update_target_model() # Synchronise les poids entre le modèle et le modèle cible
        
    def create_model(self, input_shape, num_actions):
        model = tf.keras.Sequential([
            layers.Input(shape=input_shape),
            layers.Flatten(),
            layers.Dense(256, activation='relu'),
            layers.Dense(128, activation='relu'),
            layers.Dense(128, activation='relu'),
            layers.Dense(num_actions, activation='linear')  # Output: Q-values for each action
        ])
        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate), loss='mse')
        return model

    def update_target_model(self):
        # Met à jour le modèle cible avec les poids du modèle principal
        self.target_model.set_weights(self.model.get_weights())

    def remember(self, state, action, reward, next_state, done):
        # Sauvegarde la transition dans la mémoire
        self.memory.append((state, action, reward, next_state, done))
    import tensorflow as tf

    @staticmethod
    def boltzmann_policy(q_values, tau=1.0):
        # Avoid NaN by setting tau to a minimum threshold
            tau = max(tau, 1e-3)
            
            # Calculate exponentials for Boltzmann probabilities
            exp_q = np.exp(q_values / tau - np.max(q_values / tau))  # Subtract max for numerical stability
            sum_exp_q = np.sum(exp_q)
            
            # Handle the case where sum_exp_q might be zero to avoid division by zero
            if sum_exp_q == 0:
                probs = np.ones(len(q_values)) / len(q_values)  # Assign uniform probabilities if all are zero
            else:
                probs = exp_q / sum_exp_q  # Calculate probabilities

            # Ensure no NaNs in the final probabilities
            probs = np.nan_to_num(probs, nan=1.0 / len(q_values))  # Replace NaNs with uniform probabilities
            
            return np.random.choice(len(q_values), p=probs)

    def act(self, state):
        # Politique ε-greedy : explore avec probabilité epsilon, sinon exploite
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.num_actions)
        q_values = self.model.predict(state, verbose=0)[0]
        return self.boltzmann_policy(q_values, self.tau)
        # q_values = self.model.predict(state, verbose=0)  # Prédit les Q-values pour chaque action
        # return np.argmax(q_values[0])  # Choisit l'action avec la Q-value la plus élevée

    def train(self):
        # Sample a minibatch and perform prioritized experience replay if implemented
        # Update model based on Q-learning or Double Q-learning
        # Decay epsilon to reduce exploration over time
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
            
    def replay(self):
        if len(self.memory) < 64:  # Attends que la mémoire ait assez d'expériences
            return
        minibatch = random.sample(self.memory, 64)  # Sélectionne un échantillon de la mémoire
        for state, action, reward, next_state, done in minibatch:
            target = self.model.predict(state, verbose=0)
            if done:
                target[0][action] = reward  # Récompense si l'épisode est terminé
            else:
                t = self.target_model.predict(next_state, verbose=0)
                target[0][action] = reward + self.gamma * np.amax(t[0])  # Q-value cible

            self.model.fit(state, target, epochs=1, verbose=0)

        # Réduit epsilon (exploration) au fil du temps
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    
    def save_model(self):
        """Enregistre le modèle actuel dans un fichier."""
        #self.model.save(self.model_file)
        self.model.save("dqn_model.keras")
            
    def load_memory(self):
        """Charge la mémoire depuis un fichier."""
        memory_file = "memory.pkl"
        if os.path.exists(memory_file):
            with open(memory_file, "rb") as f:
                self.memory = pickle.load(f)       
    def save_memory(self):
        """Sauvegarde la mémoire dans un fichier."""
        with open(self.memory_file, 'wb') as f:
            pickle.dump(self.memory, f)
            
            
class Game2048EnvGUI(Game2048Env):
    """
    Classe pour l'interface graphique du jeu 2048, héritant de Game2048Env.

    Attributes:
        root (tk.Tk): La fenêtre principale de l'application Tkinter.
        canvas (tk.Canvas): Le canevas où le jeu est dessiné.
        tiles (List[List[Tuple[int, int]]]): Liste contenant les identifiants des tuiles sur le canevas.
        high_scores (List[dict]): Liste des meilleurs scores, où chaque score est associé à une date et un chiffre maximal.
    """

    def __init__(self, root: tk.Tk) -> None:
        """
        Initialise l'interface graphique du jeu 2048.

        Args:
            root (tk.Tk): La fenêtre principale de l'application Tkinter.
        """
        super().__init__()
        self.root = root
        self.root.title("2048 Training")
        # Définir une couleur de fond aléatoire
        self.root.configure(bg=random.choice(['#F0E68C', '#ADD8E6', '#90EE90', '#FFB6C1', '#FFDEAD', '#FFE4B5']))
        
        self.canvas = tk.Canvas(root, width=400, height=400, bg='white')  # Couleur de fond blanche pour le canvas
        self.canvas.pack()
        
        self.tiles = []  # Initialiser la liste des tuiles
        for i in range(4):
            row_tiles = []
            for j in range(4):
                x1, y1 = j * 100, i * 100
                x2, y2 = x1 + 100, y1 + 100
                # Créer un rectangle pour chaque case avec une bordure
                tile = self.canvas.create_rectangle(x1, y1, x2, y2, fill='lightgrey', outline='black', width=2)
                text = self.canvas.create_text(x1 + 50, y1 + 50, text='', font=("Helvetica", 30))
                row_tiles.append((tile, text))  # Stocke le rectangle et le texte ensemble
            self.tiles.append(row_tiles)

        self.high_scores: List[dict] = []  # Historique des meilleurs scores

    def update_gui(self) -> None:
        """
        Met à jour l'interface graphique pour refléter l'état actuel du plateau de jeu.
        """
        tile_colors = {
            0: "#CDC1B4",   # Couleur pour 0
            2: "#EEE4DA",   # Couleur pour 2
            4: "#EDE0C8",   # Couleur pour 4
            8: "#F2B179",   # Couleur pour 8
            16: "#F59563",  # Couleur pour 16
            32: "#F67C5F",  # Couleur pour 32
            64: "#F67C5F",  # Couleur pour 64
            128: "#EDCF72", # Couleur pour 128
            256: "#EDCC61", # Couleur pour 256
            512: "#EDC850", # Couleur pour 512
            1024: "#EDC53F",# Couleur pour 1024
            2048: "#EDC22E",# Couleur pour 2048
        }

        for i in range(4):
            for j in range(4):
                value = self.board[i][j]
                color = tile_colors.get(value, "#CDC1B4")  # Couleur par défaut pour les valeurs non définies
                
                # Mise à jour de la couleur et du texte des tuiles
                tile, text = self.tiles[i][j]
                self.canvas.itemconfig(tile, fill=color)  # Applique la couleur de fond à la tuile
                self.canvas.itemconfig(text, text=str(value) if value != 0 else '')  # Met à jour le texte
        
        self.root.update()

    def record_high_score(self, score: int, agent: DQNAgent) -> None:
        """
        Enregistre le score et la case avec le plus haut chiffre.

        Args:
            score (int): Le score total réalisé dans l'épisode.
        """
        # Recherche la case avec le plus haut chiffre
        max_tile = max(max(row) for row in self.board)
        num_layers = len(agent.model.layers)  # Nombre de couches du modèle
        total_params = agent.model.count_params()  # Nombre total de paramètres
        current_epsilon = agent.epsilon  # Valeur actuelle de epsilon
        
        high_score_entry = {
            "date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "score": score,
            "max_tile": max_tile,
            "num_layers": num_layers,
            "total_params": total_params,
            "current_epsilon": current_epsilon,
            "learning_rate": agent.learning_rate,
            "gamma": agent.gamma,
        }
        # Enregistre l'entrée dans un fichier
        with open("high_scores.txt", "a") as f:
            f.write(f"{high_score_entry['date']} - Score: {high_score_entry['score']}, Max Tile: {high_score_entry['max_tile']}, "
                    f"Layers: {high_score_entry['num_layers']}, Params: {high_score_entry['total_params']}, "
                    f"Epsilon: {high_score_entry['current_epsilon']}, "
                    f"Learning Rate: {high_score_entry['learning_rate']}, "
                    f"Gamma: {high_score_entry['gamma']}\n")

        # # Ajouter le score à l'historique et garder les 10 meilleurs
        # self.high_scores.append(high_score_entry)
        # self.high_scores.sort(key=lambda x: x['score'], reverse=True)
        # self.high_scores = self.high_scores[:10]

if __name__ == "__main__":
    root = tk.Tk()
    env_gui = Game2048EnvGUI(root)
    agent = DQNAgent(input_shape=(4, 4), num_actions=4)

    episodes = 50  # Réduit pour visualiser rapidement

    for e in range(episodes):
        state = env_gui.reset().reshape(1, 4, 4)
        done = False
        total_reward = 0
        total_score = 0  # Pour suivre le score total du jeu
        #max_tile = 0

        while not done:
            action = agent.act(state)
            next_state, reward, done = env_gui.step(action)
            next_state = next_state.reshape(1, 4, 4)

            total_reward += reward
            total_score = env_gui.total_score  # Suivre le score total
            #max_tile = env_gui.max_tile
            
            
            agent.remember(state, action, reward, next_state, done)
            state = next_state

            env_gui.update_gui()  # Met à jour l'interface graphique avec le nouvel état du jeu
            # time.sleep(0.5)  # Délai pour visualiser les actions

            if done:
                print(f"Episode: {e + 1}/{episodes}, Total Reward: {total_reward}, Total Score: {total_score}")
                agent.update_target_model()
                
                # Enregistre le score total de cet épisode
                env_gui.record_high_score(total_score, agent)
                
        agent.replay()

        # Sauvegarder le modèle après chaque épisode
        agent.save_model()  # Changer cette ligne pour appeler la sauvegarde du modèle

    root.mainloop()


Episode: 1/50, Total Reward: 616, Total Score: 616
Episode: 2/50, Total Reward: 608, Total Score: 608
Episode: 3/50, Total Reward: 1248, Total Score: 1248
Episode: 4/50, Total Reward: 540, Total Score: 540
Episode: 5/50, Total Reward: 1048, Total Score: 1048
Episode: 6/50, Total Reward: 648, Total Score: 648
Episode: 7/50, Total Reward: 576, Total Score: 576
Episode: 8/50, Total Reward: 564, Total Score: 564
Episode: 9/50, Total Reward: 2160, Total Score: 2160
Episode: 10/50, Total Reward: 1096, Total Score: 1096
Episode: 11/50, Total Reward: 1064, Total Score: 1064
Episode: 12/50, Total Reward: 988, Total Score: 988
Episode: 13/50, Total Reward: 732, Total Score: 732
Episode: 14/50, Total Reward: 1376, Total Score: 1376
Episode: 15/50, Total Reward: 1960, Total Score: 1960
Episode: 16/50, Total Reward: 512, Total Score: 512
Episode: 17/50, Total Reward: 408, Total Score: 408
Episode: 18/50, Total Reward: 564, Total Score: 564
Episode: 19/50, Total Reward: 200, Total Score: 200
Episode

In [5]:
if __name__ == "__main__":
    # Suppression de l'initialisation Tkinter
    env = Game2048Env()  # Remplacer Game2048EnvGUI par Game2048Env
    agent = DQNAgent(input_shape=(4, 4), num_actions=4)

    episodes = 5  # Réduit pour visualiser rapidement

    for e in range(episodes):
        state = env.reset().reshape(1, 4, 4)  # Appel à reset de Game2048Env
        done = False
        total_reward = 0
        total_score = 0  # Pour suivre le score total du jeu

        while not done:
            action = agent.act(state)
            next_state, reward, done = env.step(action)  # Appel à step de Game2048Env
            next_state = next_state.reshape(1, 4, 4)

            total_reward += reward
            total_score = env.total_score  # Suivre le score total
            # max_tile = env.max_tile (si cette variable existe dans Game2048Env)
            
            agent.remember(state, action, reward, next_state, done)
            state = next_state

            # Supprimer env_gui.update_gui() car il n'est plus nécessaire
        agent.replay()

        # Sauvegarder le modèle après chaque épisode
        agent.save_model()  # Changer cette ligne pour appeler la sauvegarde du modèle

    root.mainloop()