#2048 Deep Q-Learning implementation

This Google Colab Jupyter Notebook consists of the game source code and fully implemented deep q-learning neural network designed to win it

### Imports

In [1]:
import numpy as np
# import keras.backend.tensorflow_backend as backend
import logic_2048
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Dense, Conv2D, Flatten
from keras.optimizers import Adam
from keras.callbacks import TensorBoard
from collections import deque
import time
import random
from tqdm import tqdm
import os
from PIL import Image
import cv2
# import sys
import tracemalloc as tm

### Initial parameters

In [2]:
DISCOUNT = 0.99
REPLAY_MEMORY_SIZE = 20_000  # How many last steps to keep for model training
MIN_REPLAY_MEMORY_SIZE = 1_000  # Minimum number of steps in a memory to start training
MINIBATCH_SIZE = 16  # How many steps (samples) to use for training    tweak
UPDATE_TARGET_EVERY = 5  # Terminal states (end of episodes)
MODEL_NAME = '2048_ALPHA'
MIN_REWARD = 20_000  # For model save
MEMORY_FRACTION = 0.20

# Environment settings
EPISODES = 2_000

# Exploration settings
epsilon = 1  # not a constant, going to be decayed
EPSILON_DECAY = 0.99975
MIN_EPSILON = 0.001

#  Stats settings
AGGREGATE_STATS_EVERY = 50  # episodes
SHOW_PREVIEW = True

### Game Environment

In [3]:
class GameEnv:
    SIZE = 4
    RETURN_IMAGES = True
    OBSERVATION_SPACE_VALUES = (SIZE, SIZE, 5)              # tweak
    ACTION_SPACE_SIZE = 4
    GAME_BOARD = np.zeros((4, 4), dtype=np.uint8)
    SCORE = 0
    # Tile color keys
    TILE_2 = 1
    TILE_4 = 2
    TILE_8 = 3
    TILE_16 = 4
    TILE_32 = 5
    TILE_64 = 6
    TILE_128 = 7
    TILE_256 = 8
    TILE_512 = 9
    TILE_1024 = 10
    TILE_2048 = 11
    # the dict! (colors)
    d = {1: (54, 142, 173),
        2: (192, 236, 252),
        3: (135, 255, 211),
        4: (0, 255, 127),
        5: (0, 117, 58),
        6: (229, 255, 59),
        7: (247, 186, 2),
        8: (181, 118, 2),
        9: (250, 122, 2),
        10: (250, 43, 2),
        11: (77, 13, 0)}

    def reset(self):
        self.GAME_BOARD = np.zeros((4, 4), dtype=np.uint8)
        self.episode_step = 0

        self.GAME_BOARD = logic_2048.place_new(self.GAME_BOARD)

        return self.get_one_hot_board()

    def get_one_hot_board(self):
        one_hot_board = np.empty(self.OBSERVATION_SPACE_VALUES, dtype=np.uint8)

        for ids, tile in np.ndenumerate(self.GAME_BOARD):
            match tile:
                case 2:
                    one_hot_board[ids[0]][ids[1]] = np.array([0, 0, 0, 0, 1], dtype=np.uint8)
                case 4:
                    one_hot_board[ids[0]][ids[1]] = np.array([0, 0, 0, 1, 0], dtype=np.uint8)
                case 8:
                    one_hot_board[ids[0]][ids[1]] = np.array([0, 0, 1, 0, 0], dtype=np.uint8)
                case 16:
                    one_hot_board[ids[0]][ids[1]] = np.array([0, 1, 0, 0, 0], dtype=np.uint8)
                case 32:
                    one_hot_board[ids[0]][ids[1]] = np.array([1, 0, 0, 0, 0], dtype=np.uint8)
                case _:
                    one_hot_board[ids[0]][ids[1]] = np.array([0, 0, 0, 0, 0], dtype=np.uint8)

        return one_hot_board

    def step(self, action):
        self.episode_step += 1
        pre_score = self.SCORE

        # Action is one-hot vector ex.  [0(a), 0(w), 1(s), 0(d)]
        if action[0] == 1:
            direction = "a"
        elif action[1] == 1:
            direction = "w"
        elif action[2] == 1:
            direction = "s"
        else:
            direction = "d"

        if self.RETURN_IMAGES and SHOW_PREVIEW:
            self.render()

        new_observation, success, _, self.SCORE = logic_2048.transform_matrix(
            self.GAME_BOARD, direction, self.SCORE, False
        )

        self.GAME_BOARD = new_observation
        winner = logic_2048.win_check(self.GAME_BOARD)

        # Reward part
        if winner:
            reward = 20_000
        elif not success:
            reward = -1_000
        else:
            reward = self.SCORE - pre_score

        done = False
        if not success or winner or self.episode_step >= 2000:
            done = True

        return self.get_one_hot_board(), reward, done

    def render(self):
        img = self.get_image()
        img = cv2.resize(np.array(img), (300, 300), interpolation=cv2.INTER_NEAREST)   # resizing so we can see our agent in all its glory.
        cv2.imshow("image", np.array(img))  # show it!
        cv2.waitKey(1)

    # FOR CNN #
    def get_image(self):
        env = np.zeros((self.SIZE, self.SIZE, 3), dtype=np.uint8)  # starts a rbg of our size

        for ids, tile in np.ndenumerate(self.GAME_BOARD):
            match tile:
                case 2:
                    env[ids[0]][ids[1]] = self.d[self.TILE_2]
                case 4:
                    env[ids[0]][ids[1]] = self.d[self.TILE_4]
                case 8:
                    env[ids[0]][ids[1]] = self.d[self.TILE_8]
                case 16:
                    env[ids[0]][ids[1]] = self.d[self.TILE_16]
                case 32:
                    env[ids[0]][ids[1]] = self.d[self.TILE_32]
                # case 64:
                #     env[ids[0]][ids[1]] = self.d[self.TILE_64]
                # case 128:
                #     env[ids[0]][ids[1]] = self.d[self.TILE_128]
                # case 256:
                #     env[ids[0]][ids[1]] = self.d[self.TILE_256]
                # case 512:
                #     env[ids[0]][ids[1]] = self.d[self.TILE_512]
                # case 1024:
                #     env[ids[0]][ids[1]] = self.d[self.TILE_1024]
                # case 2048:
                #     env[ids[0]][ids[1]] = self.d[self.TILE_2048]
                case _:
                    env[ids[0]][ids[1]] = (0, 0, 0)

        img = Image.fromarray(env, 'RGB')  # reading to rgb. Apparently. Even tho color definitions are bgr. ???
        return img

tm.start()

env = GameEnv()
tf.keras.utils.disable_interactive_logging()

### Override Tensorboard class

In [4]:
# For stats
ep_rewards = [MIN_REWARD]

# For more repetitive results
random.seed(1)
np.random.seed(1)
tf.random.set_seed(1)

# Memory fraction, used mostly when trai8ning multiple agents
#gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=MEMORY_FRACTION)
#backend.set_session(tf.Session(config=tf.ConfigProto(gpu_options=gpu_options)))

# Create models folder
if not os.path.isdir('models'):
    os.makedirs('models')


# Own Tensorboard class
class ModifiedTensorBoard(TensorBoard):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.step = 1
        self.writer = tf.summary.create_file_writer(self.log_dir)
        self._log_write_dir = self.log_dir

    def set_model(self, model):
        self.model = model

        self._train_dir = os.path.join(self._log_write_dir, 'train')
        self._train_step = self.model._train_counter

        self._val_dir = os.path.join(self._log_write_dir, 'validation')
        self._val_step = self.model._test_counter

        self._should_write_train_graph = False

    def on_epoch_end(self, epoch, logs=None):
        self.update_stats(**logs)

    def on_batch_end(self, batch, logs=None):
        pass

    def on_train_end(self, _):
        pass

    def update_stats(self, **stats):
        with self.writer.as_default():
            for key, value in stats.items():
                tf.summary.scalar(key, value, step = self.step)
                self.writer.flush()

### Implement the Deep Q-Learning Agent

In [5]:
for episode in tqdm(range(1, EPISODES + 1), ascii=True, unit='episodes'):

    # Update tensorboard step every episode
    agent.tensorboard.step = episode

    # Restarting episode - reset episode reward and step number
    episode_reward = 0
    step = 1

    # Reset environment and get initial state
    current_state = env.reset()

    # Reset flag and start iterating until episode ends
    done = False

    while not done:

        action = np.zeros(env.ACTION_SPACE_SIZE, dtype=np.uint8)
        # This part stays mostly the same, the change is to query a model for Q values
        if np.random.random() > epsilon:
            # Get action from Q table
            action[np.argmax(agent.get_qs(current_state))] = 1
        else:
            # Get random action
            action[np.random.randint(0, env.ACTION_SPACE_SIZE)] = 1

        new_state, reward, done = env.step(action)

        # Transform new continous state to new discrete state and count reward
        episode_reward += reward

        if SHOW_PREVIEW and not episode % AGGREGATE_STATS_EVERY:
            env.render()

        # Every step we update replay memory and train main network
        agent.update_replay_memory((current_state, action, reward, new_state, done))
        agent.train(done, step)

        current_state = new_state
        step += 1

    # Append episode reward to a list and log stats (every given number of episodes)
    ep_rewards.append(episode_reward)
    if episode % AGGREGATE_STATS_EVERY or episode == 1:
        average_reward = sum(ep_rewards[-AGGREGATE_STATS_EVERY:])/len(ep_rewards[-AGGREGATE_STATS_EVERY:])
        min_reward = min(ep_rewards[-AGGREGATE_STATS_EVERY:])
        max_reward = max(ep_rewards[-AGGREGATE_STATS_EVERY:])
        agent.tensorboard.update_stats(reward_avg=average_reward, reward_min=min_reward, reward_max=max_reward, epsilon=epsilon)

        # Save model, but only when min reward is greater or equal a set value
        if min_reward >= MIN_REWARD:
            agent.model.save(f'models/{MODEL_NAME}__{max_reward:_>7.2f}max_{average_reward:_>7.2f}avg_{min_reward:_>7.2f}min__{int(time.time())}.model')

    # Decay epsilon
    if epsilon > MIN_EPSILON:
        epsilon *= EPSILON_DECAY
        epsilon = max(MIN_EPSILON, epsilon)

    ss = tm.take_snapshot()

    for stats in ss.statistics("lineno")[:10]:
      print(stats)

You must install pydot (`pip install pydot`) and install graphviz (see instructions at https://graphviz.gitlab.io/download/) for plot_model to work.
You must install pydot (`pip install pydot`) and install graphviz (see instructions at https://graphviz.gitlab.io/download/) for plot_model to work.


### Train the neural network

In [None]:
for episode in tqdm(range(1, EPISODES + 1), ascii=True, unit='episodes'):

    # Update tensorboard step every episode
    agent.tensorboard.step = episode

    # Restarting episode - reset episode reward and step number
    episode_reward = 0
    step = 1

    # Reset environment and get initial state
    current_state = env.reset()

    # Reset flag and start iterating until episode ends
    done = False
    while not done:

        action = np.zeros(env.ACTION_SPACE_SIZE, dtype=np.uint8)
        # This part stays mostly the same, the change is to query a model for Q values
        if np.random.random() > epsilon:
            # Get action from Q table
            action[np.argmax(agent.get_qs(current_state))] = 1
        else:
            # Get random action
            action[np.random.randint(0, env.ACTION_SPACE_SIZE)] = 1

        new_state, reward, done = env.step(action)

        # Transform new continous state to new discrete state and count reward
        episode_reward += reward

        if SHOW_PREVIEW and not episode % AGGREGATE_STATS_EVERY:
            env.render()

        # Every step we update replay memory and train main network
        agent.update_replay_memory((current_state, action, reward, new_state, done))
        agent.train(done, step)

        current_state = new_state
        step += 1

    # Append episode reward to a list and log stats (every given number of episodes)
    ep_rewards.append(episode_reward)
    if episode % AGGREGATE_STATS_EVERY or episode == 1:
        average_reward = sum(ep_rewards[-AGGREGATE_STATS_EVERY:])/len(ep_rewards[-AGGREGATE_STATS_EVERY:])
        min_reward = min(ep_rewards[-AGGREGATE_STATS_EVERY:])
        max_reward = max(ep_rewards[-AGGREGATE_STATS_EVERY:])
        agent.tensorboard.update_stats(reward_avg=average_reward, reward_min=min_reward, reward_max=max_reward, epsilon=epsilon)

        # Save model, but only when min reward is greater or equal a set value
        if min_reward >= MIN_REWARD:
            agent.model.save(f'models/{MODEL_NAME}__{max_reward:_>7.2f}max_{average_reward:_>7.2f}avg_{min_reward:_>7.2f}min__{int(time.time())}.model')

    # Decay epsilon
    if epsilon > MIN_EPSILON:
        epsilon *= EPSILON_DECAY
        epsilon = max(MIN_EPSILON, epsilon)



INFO:tensorflow:Assets written to: models/2048_ALPHA__20128.00max_10064.00avg____0.00min__1688291994.model\assets


INFO:tensorflow:Assets written to: models/2048_ALPHA__20128.00max_10064.00avg____0.00min__1688291994.model\assets
  1%|#4                                                                                                                                        | 21/2000 [00:12<18:27,  1.79episodes/s]



  1%|#9                                                                                                                                        | 28/2000 [00:16<16:03,  2.05episodes/s]



  1%|#9                                                                                                                                      | 29/2000 [00:23<1:17:42,  2.37s/episodes]



  2%|##                                                                                                                                      | 30/2000 [00:29<1:55:22,  3.51s/episodes]



  2%|##1                                                                                                                                     | 31/2000 [00:34<2:08:29,  3.92s/episodes]



  2%|##1                                                                                                                                     | 32/2000 [00:40<2:32:46,  4.66s/episodes]



  2%|##2                                                                                                                                     | 33/2000 [00:48<2:57:35,  5.42s/episodes]



  2%|##3                                                                                                                                     | 34/2000 [00:55<3:19:34,  6.09s/episodes]



  2%|##3                                                                                                                                     | 35/2000 [01:02<3:23:50,  6.22s/episodes]

