In [1]:
import gymnasium as gym
import gymnasium_2048
import numpy as np
import random
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from collections import deque, namedtuple
from tqdm.notebook import tqdm
import time
import os
from tqdm.notebook import tqdm

Using device: cpu


In [2]:
# Device set to CPU
device = torch.device("cpu")

# Preprocess the environment state
def preprocess_state(state):
    """
    Converts one-hot encoded 2048 state (4,4,16) into flattened 1D vector with log2 values.

    Args:
        state (np.array): One-hot encoded (4, 4, 16) or raw (4, 4) grid.

    Returns:
        torch.Tensor: Flattened tensor of shape (16,) with float values in [0, 11].
    """
    if state.shape == (4, 4, 16):
        # One-hot to index
        indices = np.argmax(state, axis=2).astype(np.float32)
    else:
        # Convert values like 2, 4, 8 to log2
        indices = np.zeros_like(state, dtype=np.float32)
        non_zero = state > 0
        indices[non_zero] = np.log2(state[non_zero])

    flat = indices.flatten()  # (16,)
    return torch.tensor(flat, dtype=torch.float32, device=device)

In [3]:
class DQN(nn.Module):
    """
    Compact DQN for 2048 game.
    Input: 16 cells (flattened 4x4 board with log2 values)
    Output: 4 Q-values for each action (up, down, left, right)
    """
    def __init__(self, state_size=16, hidden_size=128, action_size=4):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, action_size)

    def forward(self, x):
        """
        Forward pass with log2 normalization.
        Assumes input x is already a tensor of shape (batch_size, 16).
        """
        # Clamp to avoid log(0), then normalize to [0,1]
        x = torch.clamp(x, min=1e-5)
        x = torch.log2(x) / 11.0  # log2(2048) = 11
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import random
from collections import deque, namedtuple

# Use CPU (since no GPU available)
device = torch.device("cpu")


class DQNAgent:
    """
    Deep Q-Network Agent with Double DQN, Experience Replay, and Soft Target Updates
    Suitable for 2048 game or similar discrete environments.
    """

    def __init__(self, state_size=16, action_size=4, 
                 buffer_size=10000, batch_size=64, 
                 gamma=0.99, lr=0.001, tau=0.001, 
                 update_every=4):

        # Environment dimensions
        self.state_size = state_size
        self.action_size = action_size

        # Hyperparameters
        self.batch_size = batch_size
        self.gamma = gamma          # Discount factor
        self.tau = tau              # Soft update interpolation factor
        self.update_every = update_every

        # Q-Networks
        self.qnetwork_local = DQN(state_size, 128, action_size).to(device)
        self.qnetwork_target = DQN(state_size, 128, action_size).to(device)
        self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=lr)

        # Replay buffer
        self.memory = deque(maxlen=buffer_size)
        self.experience = namedtuple("Experience", 
                                     field_names=["state", "action", "reward", "next_state", "done"])

        # Step counter for timing updates
        self.t_step = 0

        # Epsilon-greedy parameters
        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01

    def step(self, state, action, reward, next_state, done):
        """
        Store experience and trigger learning periodically.
        """
        self.memory.append(self.experience(state, action, reward, next_state, done))

        self.t_step = (self.t_step + 1) % self.update_every
        if self.t_step == 0 and len(self.memory) > self.batch_size:
            experiences = self._sample_experiences()
            self._learn(experiences)

    def act(self, state, epsilon=None):
        """
        Select action using epsilon-greedy policy.
        """
        if epsilon is None:
            epsilon = self.epsilon

        # Preprocess and prepare state tensor
        processed_state = preprocess_state(state).unsqueeze(0)  # Shape: (1, 16)

        # Predict Q-values (no gradients needed)
        self.qnetwork_local.eval()
        with torch.no_grad():
            action_values = self.qnetwork_local(processed_state)
        self.qnetwork_local.train()

        # Epsilon-greedy choice
        if random.random() > epsilon:
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))

    def _sample_experiences(self):
        """
        Randomly sample a batch from replay buffer and preprocess it.
        """
        experiences = random.sample(self.memory, self.batch_size)

        states = torch.stack([preprocess_state(e.state) for e in experiences]).to(device)
        actions = torch.tensor([e.action for e in experiences], dtype=torch.long).unsqueeze(1).to(device)
        rewards = torch.tensor([e.reward for e in experiences], dtype=torch.float32).unsqueeze(1).to(device)
        next_states = torch.stack([preprocess_state(e.next_state) for e in experiences]).to(device)
        dones = torch.tensor([e.done for e in experiences], dtype=torch.uint8).unsqueeze(1).float().to(device)

        return (states, actions, rewards, next_states, dones)

    def _learn(self, experiences):
        """
        Update Q-networks using Double DQN and soft target update.
        """
        states, actions, rewards, next_states, dones = experiences

        # Double DQN: get best next actions from local model
        next_action_values = self.qnetwork_local(next_states).detach()
        next_actions = next_action_values.max(1)[1].unsqueeze(1)

        # Get next Q-values from target model
        Q_targets_next = self.qnetwork_target(next_states).detach().gather(1, next_actions)

        # Compute target Q-values
        Q_targets = rewards + (self.gamma * Q_targets_next * (1 - dones))

        # Compute expected Q-values from local model
        Q_expected = self.qnetwork_local(states).gather(1, actions)

        # Loss (Huber is more stable for big values like 2048)
        loss = F.smooth_l1_loss(Q_expected, Q_targets)

        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Soft update target network
        self._soft_update(self.qnetwork_local, self.qnetwork_target, self.tau)

        # Decay epsilon
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

    def _soft_update(self, local_model, target_model, tau):
        """
        Soft update: θ_target = τ*θ_local + (1 - τ)*θ_target
        """
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(tau * local_param.data + (1.0 - tau) * target_param.data)

    def save(self, filename):
        """Save model parameters to file."""
        torch.save(self.qnetwork_local.state_dict(), filename)

    def load(self, filename):
        """Load model parameters from file."""
        self.qnetwork_local.load_state_dict(torch.load(filename))
        self.qnetwork_target.load_state_dict(torch.load(filename))


In [5]:
def plot_scores(scores, avg_window=100, filename='scores.png', save=True):
    """
    Plot the scores and their moving average.
    
    Args:
        scores (list): List of episode scores.
        avg_window (int): Window size for moving average.
        filename (str): Filename to save the plot.
        save (bool): Whether to save the plot as a file.
    """
    plt.figure(figsize=(10, 6))
    plt.plot(np.arange(len(scores)), scores, label='Score')
    
    # Compute moving average
    avg_scores = []
    window = deque(maxlen=avg_window)
    for score in scores:
        window.append(score)
        avg_scores.append(np.mean(window))
    
    plt.plot(np.arange(len(avg_scores)), avg_scores, 'r', label=f'Moving Avg ({avg_window})')
    plt.xlabel('Episode')
    plt.ylabel('Score')
    plt.title(f'Training Scores with Moving Average ({avg_window})')
    plt.legend()
    plt.grid(True)

    if save:
        plt.savefig(filename)
    plt.show()


def evaluate_agent(agent, env, n_episodes=10):
    """
    Evaluate the agent's performance without exploration.
    
    Args:
        agent: The trained DQN agent.
        env: The 2048 environment.
        n_episodes (int): Number of evaluation episodes.
    
    Returns:
        dict: Evaluation results including scores and max tiles.
    """
    scores = []
    max_tiles = []
    
    for _ in tqdm(range(n_episodes), desc="Evaluating"):
        state, _ = env.reset()
        score = 0
        done = False
        truncated = False
        
        while not (done or truncated):
            action = agent.act(state, epsilon=0.0)  # Pure greedy action
            next_state, reward, done, truncated, _ = env.step(action)
            score += reward
            state = next_state
        
        # Convert state to 4x4 grid and record max tile
        processed_state = preprocess_state(state)
        max_tile = np.max(processed_state)
        
        scores.append(score)
        max_tiles.append(max_tile)
    
    return {
        'avg_score': np.mean(scores),
        'max_score': np.max(scores),
        'avg_max_tile': np.mean(max_tiles),
        'best_max_tile': np.max(max_tiles),
        'all_scores': scores,
        'all_max_tiles': max_tiles
    }


In [6]:
def train_2048(n_episodes=500, max_steps=10000, eval_freq=100):
    """
    Train a DQN agent to play 2048
    
    Args:
        n_episodes: Maximum number of training episodes
        max_steps: Maximum number of steps per episode
        eval_freq: Frequency of evaluation during training
        
    Returns:
        tuple: (scores, max_tiles) achieved during training
    """
    env = gym.make('gymnasium_2048/TwentyFortyEight-v0')
    
    # Check state shape
    state, _ = env.reset()
    processed_state = preprocess_state(state)
    print(f"Original state shape: {state.shape}")
    print(f"Processed state shape: {processed_state.shape}")
    print(f"Sample processed state:\n{processed_state}")
    
    # Create agent with the correct parameters
    agent = DQNAgent(state_size=16, action_size=4)  # 16 = 4x4 flattened
    
    # Score tracking
    scores = []
    max_tiles = []
    scores_window = deque(maxlen=100)
    
    # Create directory for saved models
    os.makedirs('models', exist_ok=True)
    
    # Tracking best performance
    best_score = -np.inf
    best_max_tile = 0
    
    print("Starting training...")
    start_time = time.time()
    
    for i_episode in tqdm(range(1, n_episodes+1)):
        state, _ = env.reset()
        score = 0
        for t in range(max_steps):
            action = agent.act(state)
            next_state, reward, done, truncated, info = env.step(action)
            
            agent.step(state, action, reward, next_state, done or truncated)
            state = next_state
            score += reward
            
            if done or truncated:
                break
        
        # Get the max tile value
        processed_state = preprocess_state(state)
        max_tile = np.max(processed_state)
        
        # Save results
        scores.append(score)
        scores_window.append(score)
        max_tiles.append(max_tile)
        
        # Print progress
        if i_episode % 100 == 0:
            end_time = time.time()
            print(f'\rEpisode {i_episode}\tAvg Score: {np.mean(scores_window):.2f}\tMax Tile: {max_tile}\tEpsilon: {agent.epsilon:.2f}\tTime: {end_time - start_time:.2f}s')
            start_time = time.time()
        
        # Save model if it's the best so far
        if np.mean(scores_window) > best_score:
            best_score = np.mean(scores_window)
            agent.save('models/best_score_model.pth')
        
        if max_tile > best_max_tile:
            best_max_tile = max_tile
            agent.save('models/best_tile_model.pth')
        
        # Evaluate the model
        if i_episode % eval_freq == 0:
            results = evaluate_agent(agent, env)
            print(f"\nEvaluation after {i_episode} episodes:")
            print(f"Average Score: {results['avg_score']:.2f}")
            print(f"Average Max Tile: {results['avg_max_tile']}")
            print(f"Best Max Tile: {results['best_max_tile']}")
            print("")
            
            # Save periodic model
            agent.save(f'models/model_ep{i_episode}.pth')
    
    # Save final model
    agent.save('models/final_model.pth')
    
    # Plot scores
    plot_scores(scores)
    
    return scores, max_tiles

In [7]:
# Train the agent for 500 episodes
scores, max_tiles = train_2048(n_episodes=500, eval_freq=100)

print("Training completed!")
print(f"Best max tile achieved: {np.max(max_tiles)}")
print(f"Average score over last 100 episodes: {np.mean(scores[-100:]):.2f}")

Original state shape: (4, 4, 16)
Processed state shape: (4, 4)
Sample processed state:
[[0 0 0 0]
 [0 0 2 0]
 [0 0 0 0]
 [0 0 0 2]]
Starting training...


  0%|          | 0/500 [00:00<?, ?it/s]

Episode 100	Avg Score: 739.52	Max Tile: 64	Epsilon: 0.01	Time: 337.30s


KeyboardInterrupt: 

In [None]:
# Run a comprehensive evaluation
def evaluate_trained_agent(model_path='models/final_model.pth', n_episodes=10):
    """Run a comprehensive evaluation of the trained agent"""
    env = gym.make('2048-v0')
    agent = DQNAgent(state_size=16, action_size=4)
    
    # Load the model
    agent.load(model_path)
    
    results = evaluate_agent(agent, env, n_episodes=n_episodes)
    
    print(f"Results after {n_episodes} games:")
    print(f"Average Score: {results['avg_score']:.2f}")
    print(f"Maximum Score: {results['max_score']:.2f}")
    print(f"Average Max Tile: {results['avg_max_tile']}")
    print(f"Best Max Tile: {results['best_max_tile']}")
    
    return results

In [None]:
# Watch the agent play a game (if render_mode is supported)
try:
    max_tile = play_2048_visual(model_path='models/best_tile_model.pth', n_episodes=1)
    print(f"Highest tile achieved: {max_tile}")
except Exception as e:
    print(f"Could not visualize the game: {e}")
    print("This might be due to render mode not being supported in your environment.")

In [None]:
def play_2048_visual(model_path='models/final_model.pth', n_episodes=1):
    """
    Play 2048 with a trained model and visualize the game
    
    Args:
        model_path: Path to the trained model
        n_episodes: Number of episodes to play
        
    Returns:
        int: Highest tile achieved
    """
    try:
        env = gym.make('gymnasium_2048/TwentyFortyEight-v0', render_mode='human')
    except:
        env = gym.make('gymnasium_2048/TwentyFortyEight-v0')  # Fall back if visualization is not available
        
    agent = DQNAgent(state_size=16, action_size=4)
    
    # Load the model
    agent.load(model_path)
    
    for ep in range(n_episodes):
        state, _ = env.reset()
        done = False
        truncated = False
        total_reward = 0
        
        try:
            env.render()  # Show initial state
            time.sleep(0.5)
        except:
            pass
        
        while not (done or truncated):
            action = agent.act(state, epsilon=0.0)
            next_state, reward, done, truncated, info = env.step(action)
            total_reward += reward
            state = next_state
            
            try:
                env.render()
                time.sleep(0.3)  # Delay so we can see the moves
            except:
                pass
        
        # Process final state to get max tile
        processed_state = preprocess_state(state)
        max_tile = np.max(processed_state)
        
        print(f"Game {ep+1} finished. Score: {total_reward}, Max Tile: {max_tile}")
    
    return max_tile