In [2]:
import random
import numpy as np 
import gymnasium as gym
from gymnasium import spaces
import copy
import sys

import torch as th
from stable_baselines3 import A2C, DQN, PPO

from game_2048 import Game2048


In [2]:
N_DISCRETE_ACTIONS = 4
OBS_SPACE_LIM = [[8193 for _ in range(4)] for _ in range(4)]
LOGDIR = "./tensorboards"


In [60]:
class GameEnv(gym.Env):
    """Custom Environment that follows gym interface."""
    def __init__(self):
            super().__init__()

            self.game = Game2048()
            self.action_space = spaces.Discrete(4)
            self.observation_space = spaces.Box(low=-1, high=1, shape=(88,), dtype=np.float32)

            self.last_state = np.array(self.game.board)
            self.last_states = [copy.deepcopy(self.last_state) for _ in range(10)]
            self.total_steps = 0

    def step(self, action):

        current_state, reward_increment, done = self.game.move(action)
        current_state = np.array(current_state)
        self.update_memory(current_state)
        self.total_steps += 1


        future_outcomes = [
                        Game2048.left(current_state)[0], 
                        Game2048.right(current_state)[0],
                        Game2048.up(current_state)[0], 
                        Game2048.down(current_state)[0]
                        ]
        
        future_rewards = [
                        Game2048.left(current_state)[1],
                        Game2048.right(current_state)[1],
                        Game2048.up(current_state)[1],
                        Game2048.down(current_state)[1]
                        ]
        
        stuck_future_outcomes = [np.array_equal(current_state, future_outcomes[i]) * 1 for i in range(4)]


        reward = self.count_empty_cells(current_state) - self.count_empty_cells(self.last_states[-2])

        observation = self.scale_observation(future_outcomes, stuck_future_outcomes, current_state, future_rewards)

        done =  np.array_equal(stuck_future_outcomes, np.ones(4))
        truncated =  all(np.array_equal(state, current_state) for state in self.last_states)

        if 2048 in current_state:
            reward += 1000
            done = True

        info = {}
        return observation, reward, done, truncated, info


    def reset(self, seed=None, options=None):
        if seed is not None:
            self.game = Game2048(seed)
        else:
            self.game = Game2048()

        self.last_state = np.array(self.game.board)
        self.last_states = [copy.deepcopy(self.last_state) for _ in range(10)]
        current_state = np.array(self.game.board)
        self.update_memory(current_state)

        future_outcomes = [
            Game2048.left(current_state)[0],
            Game2048.right(current_state)[0],
            Game2048.up(current_state)[0],
            Game2048.down(current_state)[0]
        ]

        future_rewards = [
            Game2048.left(current_state)[1],
            Game2048.right(current_state)[1],
            Game2048.up(current_state)[1],
            Game2048.down(current_state)[1]
        ]

        stuck_future_outcomes = [np.array_equal(current_state, future_outcomes[i]) * 1 for i in range(4)]

        observation = self.scale_observation(future_outcomes, stuck_future_outcomes, current_state, future_rewards)
        info = {}
        return observation, info


    def render(self):
        self.game.render_board(self.game.board)

    def close(self):
        pass

    @staticmethod
    def count_empty_cells(board):
        return np.count_nonzero(np.array(board) == 0)

    def valid_action_mask(self):
        if 0 in self.game.board:
            return np.ones(4).astype(np.float32)
        
        valid_left = Game2048.left(self.game.board)[0] != self.game.board
        valid_right = Game2048.right(self.game.board)[0] != self.game.board
        valid_up = Game2048.up(self.game.board)[0] != self.game.board
        valid_down = Game2048.down(self.game.board)[0] != self.game.board
        return np.array([valid_left, valid_right, valid_up, valid_down]).astype(np.float32)
    
    def update_memory(self, new_state):
        self.last_states.pop(0)
        self.last_states.append(copy.deepcopy(new_state))


    def scale_observation(self, future_outcomes, stuck_future_outcomes, current_state, future_rewards):
        future_outcomes = np.array(future_outcomes)
        stuck_future_outcomes = np.array(stuck_future_outcomes)
        current_state = np.array(current_state)
        future_rewards = np.array(future_rewards)

        current_state = (current_state - 1024) / 1024
        future_rewards = (future_rewards - 1024) / 1024

        for i, outcome in enumerate(future_outcomes):
            outcome = (outcome - 1024) / 1024
            future_outcomes[i] = outcome

        observation = np.concatenate([current_state.ravel(), np.concatenate(future_outcomes).ravel(),
                        np.array(stuck_future_outcomes).astype(np.float32), future_rewards]).astype(np.float32)
        return observation


In [4]:
from stable_baselines3.common.env_checker import check_env
env = GameEnv()
check_env(env, warn=True)

In [5]:
def evaluate(model, num_steps=100):
    scores = []
    boards = []

    for i in range(num_steps):
        env = GameEnv()
        obs, info = env.reset()
        done = False
        rewards = []
        while not done:
            action, info = model.predict(obs, deterministic=True)
            action = int(model.predict(obs, deterministic=True)[0])
            obs, reward, done, truncated, info = env.step(action)
            done = done or truncated
            rewards.append(reward)
        boards.append(env.game.board)
        scores.append(env.game.total_score)


    scores = np.array(scores)
    print(f"Average score: {scores.mean()}")

    best_board = boards[np.argmax(scores)]
    # print max score
    print(f'Max score: {np.max(scores)}')
    env.game.render_board(best_board)

# Random baseline is ~1000

In [6]:
from sb3_contrib.common.maskable.policies import MaskableActorCriticPolicy
from sb3_contrib.common.wrappers import ActionMasker
from sb3_contrib.ppo_mask import MaskablePPO


def mask_fn(env: gym.Env) -> np.ndarray:
    # Do whatever you'd like in this function to return the action mask
    # for the current env. In this example, we assume the env has a
    # helpful method we can rely on.
    return env.valid_action_mask()

for i in range(4):

    env = GameEnv()
    env = ActionMasker(env, mask_fn)  # Wrap to enable masking

    # MaskablePPO behaves the same as SB3's PPO unless the env is wrapped
    # with ActionMasker. If the wrapper is detected, the masks are automatically
    # retrieved and used when learning. Note that MaskablePPO does not accept
    # a new action_mask_fn kwarg, as it did in an earlier draft.

    # modify network architecture
    #policy_kwargs = dict(activation_fn=th.nn.ReLU,
    #                        net_arch=dict(pi=[84*2, 84*3, 84, 10], vf=[84*2, 84*3, 84, 10]))


    model = MaskablePPO(MaskableActorCriticPolicy, env, verbose=0, tensorboard_log="./tensorboards_maskable")
    model.learn(total_timesteps=1_000_000 * 1.5 * 2 * 6)
    model.save(f"models/2048_maskable_{i}")
    evaluate(model, num_steps=1000)




Average score: 5024.168
Max score: 15312
+-------------------+
| 8  | 2  | 8  | 2  |
+-------------------+
|1024| 32 | 2  | 4  |
+-------------------+
|256 | 16 | 64 | 8  |
+-------------------+
| 2  |512 | 8  | 4  |
+-------------------+
Average score: 5194.404
Max score: 14704
+-------------------+
| 4  | 8  | 4  | 2  |
+-------------------+
| 8  | 16 | 32 | 4  |
+-------------------+
| 4  |128 |1024| 2  |
+-------------------+
|128 |512 | 2  | 4  |
+-------------------+
Average score: 5384.308
Max score: 16640
+-------------------+
| 2  | 4  | 16 |256 |
+-------------------+
| 4  | 32 |128 |1024|
+-------------------+
| 8  | 16 | 32 |512 |
+-------------------+
| 2  | 4  | 16 |128 |
+-------------------+


KeyboardInterrupt: 

In [8]:
# load 3rd model
model = MaskablePPO.load("models/2048_maskable_2")

In [41]:
evaluate(model, num_steps=100)

Average score: 5152.48
Max score: 14056
+-------------------+
| 2  | 4  | 64 | 4  |
+-------------------+
| 8  | 32 |128 |256 |
+-------------------+
| 4  | 16 | 64 |1024|
+-------------------+
| 2  | 4  |256 | 2  |
+-------------------+


In [58]:
env = GameEnv()
env = ActionMasker(env, mask_fn)  # Wrap to enable masking


model.set_env(env)
model.learn(total_timesteps=1_000_000 * 2 * 2.5, progress_bar=True)

Output()

<sb3_contrib.ppo_mask.ppo_mask.MaskablePPO at 0x104f82d30>

In [59]:
evaluate(model, num_steps=10_00)

Average score: 5183.496
Max score: 15024
+-------------------+
| 2  | 4  | 64 |128 |
+-------------------+
| 8  | 16 |256 |1024|
+-------------------+
| 4  | 32 | 64 |256 |
+-------------------+
| 2  | 16 | 32 |128 |
+-------------------+


In [61]:
model.save("models/2048_MaskablePPO_Solved_TT")

In [62]:
for i in range(1,9):
    model.learn(total_timesteps=1_000_000 * 2 * 2.5)
    model.save(f"models/2048_MaskablePPO_Solved_TT_{i}")
    evaluate(model, num_steps=1000)

Average score: 4621.532
Max score: 13704
+-------------------+
| 2  | 8  | 64 |128 |
+-------------------+
| 4  | 16 |128 |256 |
+-------------------+
| 2  | 8  | 64 |1024|
+-------------------+
| 8  | 2  | 16 |128 |
+-------------------+
Average score: 5491.924
Max score: 20332
+-------------------+
| 2  | .  | 2  | .  |
+-------------------+
| .  | .  | 4  | 4  |
+-------------------+
| .  | 16 | 32 |2048|
+-------------------+
| 4  | 4  | 8  | 2  |
+-------------------+
Average score: 4227.528
Max score: 13604
+-------------------+
| 2  | 4  | 16 |128 |
+-------------------+
| 4  | 32 | 64 |1024|
+-------------------+
| 2  | 16 |128 |256 |
+-------------------+
| 4  | 8  | 32 |128 |
+-------------------+
Average score: 4677.488
Max score: 13848
+-------------------+
| 2  | 4  | 32 | 64 |
+-------------------+
| 8  | 32 |256 |1024|
+-------------------+
| 4  | 8  | 64 |256 |
+-------------------+
| 2  | 4  | 32 | 64 |
+-------------------+
Average score: 4886.896
Max score: 15800
+--

In [63]:
model

<sb3_contrib.ppo_mask.ppo_mask.MaskablePPO at 0x104f82d30>

In [66]:
help(model)

Help on MaskablePPO in module sb3_contrib.ppo_mask.ppo_mask object:

class MaskablePPO(stable_baselines3.common.on_policy_algorithm.OnPolicyAlgorithm)
 |  MaskablePPO(policy: Union[str, Type[sb3_contrib.common.maskable.policies.MaskableActorCriticPolicy]], env: Union[gymnasium.core.Env, stable_baselines3.common.vec_env.base_vec_env.VecEnv, str], learning_rate: Union[float, Callable[[float], float]] = 0.0003, n_steps: int = 2048, batch_size: Optional[int] = 64, n_epochs: int = 10, gamma: float = 0.99, gae_lambda: float = 0.95, clip_range: Union[float, Callable[[float], float]] = 0.2, clip_range_vf: Union[NoneType, float, Callable[[float], float]] = None, normalize_advantage: bool = True, ent_coef: float = 0.0, vf_coef: float = 0.5, max_grad_norm: float = 0.5, target_kl: Optional[float] = None, stats_window_size: int = 100, tensorboard_log: Optional[str] = None, policy_kwargs: Optional[Dict[str, Any]] = None, verbose: int = 0, seed: Optional[int] = None, device: Union[torch.device, str] 

In [3]:


%load_ext tensorboard

% tensorboard --LOGDIR './tensorboards_maskable'


UsageError: Line magic function `%` not found.
