# import library

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from IPython.display import HTML
from matplotlib.animation import FuncAnimation
from matplotlib.colors import LinearSegmentedColormap
from tqdm.auto import tqdm

In [2]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam



In [3]:
print(tf.__version__)

2.5.3


In [4]:
print(np.__version__)

1.23.1


# set tensorflow gpu

In [5]:
gpus = tf.config.experimental.list_physical_devices("GPU")
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)

    except RuntimeError as e:
        print(e)

In [6]:
device_type = [device.device_type for device in tf.config.list_physical_devices()]

In [7]:
# if has GPU run on GPU first
device_name = "/device:GPU:0" if "GPU" in device_type else "/device:CPU:0"
print(device_name)

/device:GPU:0


# notation

## bit definitions

In [8]:
%%html
<style>
table {float:left}
</style>

| box_value | box_name     |
|-----------|--------------|
| 0         | Empty        |
| 1         | BrownWood    |
| 2         | RedWood      |
| 3         | GreenWood    |
| 4         | BrownLeather |
| 5         | BrownPaper   |
| 6         | BlueSteel    |

# environment

In [9]:
class MoveTheBoxGameEnvironment:
    def __init__(
        self,
        board_height=9,
        board_width=7,
        unique_box_type=6,
        p_add_3_box=0.77,
        debug=False,
    ):

        # init parameter
        self.board_height = board_height
        self.board_width = board_width
        self.unique_box_type = unique_box_type
        self.p_add_3_box = p_add_3_box
        self.debug = debug
        self.cumulative_score = 0

        # init color_map
        box_color_map_list = (
            np.array(
                [
                    [255, 255, 255],
                    [169, 133, 92],
                    [183, 71, 34],
                    [156, 164, 52],
                    [102, 65, 38],
                    [189, 140, 61],
                    [149, 165, 168],
                ]
            )
            / 255
        )
        self.box_color_map = LinearSegmentedColormap.from_list(
            "box_color_map",
            box_color_map_list,
            box_color_map_list.shape[0],
        )

        # init game
        self.createBoard()
        self.createAboveBoard()
        self.addAboveBox()
        self.aboveIncludeFall()

    def reset(self):
        self.createBoard()
        self.createAboveBoard()
        self.addAboveBox()
        self.aboveIncludeFall()

    def createBoard(self):
        self.board = np.zeros(
            (self.board_height, self.board_width),
            dtype=np.int8,
        )

    def createAboveBoard(self):
        self.above_board = np.zeros(
            (1, self.board_width),
            dtype=np.int8,
        )

    def addAboveBox(self):
        number_of_box_above = np.random.choice(
            [3, 4], p=[self.p_add_3_box, 1 - self.p_add_3_box]
        )
        box_value = np.random.randint(
            low=1,
            high=self.unique_box_type + 1,
            size=number_of_box_above,
            dtype=np.int8,
        )
        aboveBox = np.zeros(
            self.board_width,
            dtype=np.int8,
        )
        aboveBox[:number_of_box_above] = 1
        aboveBox[aboveBox == 1] = box_value
        np.random.shuffle(aboveBox)
        self.above_board = aboveBox.reshape((1, self.board_width))

    def aboveIncludeFall(self):
        is_gameover = self.checkIsGameover()
        if is_gameover:
            return is_gameover
        temp_board = np.vstack((self.board, self.above_board)).copy()
        temp_board = self.fall(temp_board)
        self.above_board = temp_board[-1].reshape((1, self.board_width)).copy()
        self.board = temp_board[:-1].copy()
        self.addAboveBox()
        score = self.updateBoard()
        self.cumulative_score = self.cumulative_score + score
        if self.checkIsEmptyBoard():
            return self.aboveIncludeFall()
        return False

    # game mechanism
    def fall(self, temp_board):
        temp_board = temp_board.copy()
        for y in range(self.board_width):
            x = 0
            is_fall = False
            while x < temp_board.shape[0] - 1:
                # skip floor and run loop
                x += 1
                # check below is empty
                if not self.checkIsEmpty(temp_board[x, y]) and self.checkIsEmpty(
                    temp_board[x - 1, y]
                ):
                    # fall
                    temp_board[x - 1, y] = temp_board[x, y].copy()
                    temp_board[x, y] = 0
                    is_fall = True

                # reset and fall again
                if is_fall and (x >= temp_board.shape[0] - 1):
                    x = 0
                    is_fall = False
        return temp_board

    def remove(self, temp_board):
        temp_board = temp_board.copy()
        if self.debug:
            self.displayTempBoard(temp_board)
        remove_list = self.check(temp_board)
        numbers_of_remove = np.unique(remove_list, axis=0).shape[0]
        if self.debug:
            print(f"remove_list : {remove_list}")
            print(f"number of remove box (remove): {numbers_of_remove}")
        numbers_of_remove_recursive = []
        if len(remove_list) > 0:
            for remove_index in remove_list:
                temp_board[remove_index[0], remove_index[1]] = 0
            temp_board = self.fall(temp_board)
            temp_board, numbers_of_remove_recursive = self.remove(temp_board)
        return temp_board, [numbers_of_remove] + numbers_of_remove_recursive

    def check(self, temp_board):
        remove_list = []
        for x in range(temp_board.shape[0]):
            remove_list = remove_list + self.checkRow(temp_board, x)

        for y in range(temp_board.shape[1]):
            remove_list = remove_list + self.checkCol(temp_board, y)
        return remove_list

    def checkRow(self, temp_board, x):
        count = 1
        last_found = temp_board[x, 0]
        remove_list = []
        is_change = False

        for y in range(1, temp_board.shape[1]):
            if self.checkIsSameColor(last_found, temp_board[x, y]):
                if self.checkIsEmpty(temp_board[x, y]):
                    continue
                else:
                    count += 1
                    # end check index
                    if y >= temp_board.shape[1] - 1:
                        is_change = True
            else:
                last_found = temp_board[x, y]
                is_change = True

            if is_change:
                if not self.checkIsEmpty(temp_board[x, y - 1]):
                    if count >= 3:
                        for i in range(count):
                            remove_index = np.array([x, y - i - 1])
                            remove_list.append(remove_index)
                count = 1
                is_change = False
        return remove_list

    def checkCol(self, temp_board, y):
        count = 1
        last_found = temp_board[0, y]
        remove_list = []
        is_change = False
        for x in range(1, temp_board.shape[0]):
            if self.checkIsSameColor(last_found, temp_board[x, y]):
                if self.checkIsEmpty(temp_board[x, y]):
                    continue
                else:
                    count += 1
                    # end check index
                    if x >= temp_board.shape[0] - 1:
                        is_change = True
            else:
                last_found = temp_board[x, y]
                is_change = True
            if is_change:
                if not self.checkIsEmpty(temp_board[x - 1, y]):
                    if count >= 3:
                        for i in range(count):
                            remove_index = np.array([x - i - 1, y])
                            remove_list.append(remove_index)
                count = 1
                is_change = False
        return remove_list

    def updateBoard(self):
        temp_board = self.board.copy()
        temp_board = self.fall(temp_board)
        temp_board, numbers_of_remove = self.remove(temp_board)
        self.board = temp_board.copy()
        if self.debug:
            print(f"numbers_of_remove (updateBoard): {numbers_of_remove}")
        return self.calculatePoint(numbers_of_remove)

    def calculatePoint(self, numbers_of_remove_box):
        numbers_of_remove_box = np.array(numbers_of_remove_box)
        points = []
        for t, number_of_box in enumerate(numbers_of_remove_box):
            if number_of_box > 0:
                summation = 0
                for i in range(0, number_of_box - 3 + 1):
                    summation = summation + i
                if t == 0:
                    point = number_of_box + summation
                else:
                    main_point = numbers_of_remove_box[t] * np.sum(
                        numbers_of_remove_box[:t]
                    )
                    point = main_point + (number_of_box - 3) + summation
            else:
                point = 0
            points.append(point)
        return np.sum(points)

    # utility
    def displayBoard(self):
        # create virtual board
        temp_board = np.vstack((self.board, self.above_board)).copy()
        # plot virtual board
        fig = plt.figure(figsize=(3.5, 4.5))
        ax = fig.add_subplot(111)
        ax.pcolor(
            temp_board,
            cmap=self.box_color_map,
            vmin=0,
            vmax=self.unique_box_type,
            edgecolors="k",
            linewidth=2,
        )
        xticks_label = np.arange(0, self.board_width, 1)
        yticks_label = np.arange(0, self.board_height + 1, 1)
        # centering of call
        ax.set_xticks(xticks_label + 0.5, minor=False)
        ax.set_yticks(yticks_label + 0.5, minor=False)
        # label
        yticks_label = yticks_label.astype(np.str)
        yticks_label[-1] = "above"
        ax.set_xticklabels(xticks_label)
        ax.set_yticklabels(yticks_label)
        plt.show()

    def displayTempBoard(self, temp_board):
        # create virtual board
        temp_board = temp_board.copy()
        # plot virtual board
        fig = plt.figure(figsize=(3.5, 4.5))
        ax = fig.add_subplot(111)
        ax.pcolor(
            temp_board,
            cmap=self.box_color_map,
            vmin=0,
            vmax=self.unique_box_type,
            edgecolors="k",
            linewidth=2,
        )
        xticks_label = np.arange(0, self.board_width, 1)
        yticks_label = np.arange(0, self.board_height, 1)
        # centering of call
        ax.set_xticks(xticks_label + 0.5, minor=False)
        ax.set_yticks(yticks_label + 0.5, minor=False)
        # label
        ax.set_xticklabels(xticks_label)
        ax.set_yticklabels(yticks_label)
        plt.show()

    def checkIsEmpty(self, box):
        return box == 0

    def checkIsSameColor(self, box1, box2):
        return box1 == box2

    def checkIsEmptyBoard(self):
        return (self.board == 0).all()

    # action
    def findAllMove(self):
        all_move_index = []
        row_arange = np.arange(0, self.board_height, 1)
        col_arange = np.arange(0, self.board_width, 1)
        row_index, col_index = np.meshgrid(row_arange, col_arange)
        point_index = np.vstack((row_index.reshape(-1), col_index.reshape(-1))).T

        for index in point_index:

            if index[1] < self.board_width - 1:
                # move_right
                move_right_index = np.array([index, index + np.array([0, 1])])
                all_move_index.append(move_right_index)

            if index[0] < self.board_height - 1:
                # move_up
                move_up_index = np.array([index, index + np.array([1, 0])])
                all_move_index.append(move_up_index)

        return np.array(all_move_index)

    def findNotDuplicateMove(self):
        all_move_index = self.findAllMove()

        not_duplicate_move_index = []
        for move_index in all_move_index:
            point_a_index = move_index[0]
            point_a_value = self.board[point_a_index[0], point_a_index[1]]

            point_b_index = move_index[1]
            point_b_value = self.board[point_b_index[0], point_b_index[1]]

            if not self.checkIsSameColor(point_a_value, point_b_value):
                not_duplicate_move_index.append(move_index)

        return np.array(not_duplicate_move_index)

    def move(self, move_index):
        before_cumulative_score = self.cumulative_score
        # collect value
        point_a_index = move_index[0]
        point_a_value = self.board[point_a_index[0], point_a_index[1]].copy()

        point_b_index = move_index[1]
        point_b_value = self.board[point_b_index[0], point_b_index[1]].copy()

        # move
        self.board[point_a_index[0], point_a_index[1]] = point_b_value
        self.board[point_b_index[0], point_b_index[1]] = point_a_value
        score = self.updateBoard()
        self.cumulative_score = self.cumulative_score + score
        is_gameover = self.aboveIncludeFall()
        return (
            self.getGameState(),
            self.cumulative_score - before_cumulative_score,
            is_gameover,
        )

    def checkIsGameover(self):
        for y in range(self.board.shape[1]):
            full_board = (self.board[:, y] != 0).all()
            full_above_board = (self.above_board[:, y] != 0).all()
            if full_board and full_above_board:
                return True
        return False

    # Reinforcement
    def getGameState(self):
        return np.vstack((self.board, self.above_board)).copy()

    def play(self):
        temp_board_history = []
        move_history = []
        score_history = []
        for i in range(200):
            temp_board = self.getGameState()
            move_index = self.findNotDuplicateMove()
            select_move_index = np.random.randint(move_index.shape[0])
            move_history_str = f"move {i+1} : move from {move_index[select_move_index,0,:]} to {move_index[select_move_index,1,:]}"
            _, round_score, is_gameover = self.move(move_index[select_move_index])
            score_history.append(round_score)
            temp_board_history.append(temp_board)
            move_history.append(move_history_str)
            if is_gameover:
                break
        return temp_board_history, move_history, score_history

# deep q learning model

In [10]:
def createModel(env):

    num_actions = env.findAllMove().shape[0]
    shape = env.getGameState().shape

    inputs = layers.Input(
        shape=(
            shape[0],
            shape[1],
            1,
        )
    )

    #     # Convolutions on the frames on the screen
    #     layer1 = layers.Conv2D(
    #         filters=32,
    #         kernel_size=3,
    #         strides=1,
    #         padding="valid",
    #         activation="relu",
    #     )(inputs)

    #     layer2 = layers.Conv2D(
    #         filters=32,
    #         kernel_size=3,
    #         strides=1,
    #         padding="valid",
    #         activation="relu",
    #     )(layer1)

    #     layer3 = layers.Conv2D(
    #         filters=32,
    #         kernel_size=3,
    #         strides=1,
    #         padding="valid",
    #         activation="relu",
    #     )(layer2)

    layer4 = layers.Flatten()(inputs)

    layer5 = layers.Dense(50, activation="relu")(layer4)

    layer6 = layers.Dense(50, activation="relu")(layer5)

    layer7 = layers.Dense(10, activation="relu")(layer6)

    layer8 = layers.Dense(10, activation="relu")(layer7)

    action = layers.Dense(num_actions, activation="linear")(layer8)

    return keras.Model(inputs=inputs, outputs=action)

# setup parameter

In [11]:
# for quick running
reduce_step = 100

In [12]:
# q learning parameter
gamma = 0.99  # Discount factor for past rewards
max_steps_per_episode = 10000
max_episode = 10000000 // reduce_step
max_buffer_size = 100000

In [13]:
# epsilon greedy parameter
epsilon = 1.0
epsilon_max = 1.0
epsilon_min = 0.1
epsilon_interval = epsilon_max - epsilon_min

# epsilon greedy frame
epsilon_random_frames = 50000 // reduce_step
epsilon_greedy_frames = 1000000 // reduce_step

In [14]:
# deep learning parameter
learning_rate = 0.00025
batch_size = 32

In [15]:
# deep q learning parameter
update_after_actions = np.max(1, 4 // reduce_step)
update_target_network = 10000 // reduce_step

# create object 

## environment

In [16]:
env = MoveTheBoxGameEnvironment()

In [17]:
action_list = env.findAllMove()

In [18]:
num_actions = action_list.shape[0]

## model

In [19]:
action_model = createModel(env)

In [20]:
target_model = createModel(env)
target_model.set_weights(action_model.get_weights())

## optimizer

In [21]:
optimizer = Adam(
    beta_1=0.9,
    beta_2=0.999,
    learning_rate=learning_rate,
    clipnorm=1.0,
)

## loss function

In [22]:
loss_function = keras.losses.Huber()

## replay memmory

In [23]:
replay_buffer = pd.DataFrame(
    columns=[
        "state",
        "action",
        "reward",
        "state_next",
        "is_gameover",
    ]
)
episode_reward_history = []

# train model

In [24]:
frame_count = 0
running_reward = 0

In [25]:
with tf.device(device_name):
    for episode_count in tqdm(range(max_episode)):

        env.reset()
        episode_reward = 0

        for timestep in range(1, max_steps_per_episode):

            if timestep == 1:
                state = env.getGameState()

            frame_count = frame_count + 1

            # Use epsilon-greedy for exploration
            if frame_count < epsilon_random_frames or epsilon > np.random.rand(1)[0]:
                # Take random action
                action = np.random.choice(num_actions)
            else:
                # Predict action Q-values
                # From environment state
                state_tensor = tf.convert_to_tensor(state)
                state_tensor = tf.expand_dims(state_tensor, 0)
                action_probs = action_model(state_tensor, training=False)
                # Take best action
                action = tf.argmax(action_probs[0]).numpy()

            # Decay probability of taking random action
            epsilon = epsilon - (epsilon_interval / epsilon_greedy_frames)
            epsilon = max(epsilon, epsilon_min)

            # Apply the sampled action in our environment
            state_next, reward, is_gameover = env.move(action_list[action])
            episode_reward = episode_reward + reward

            # Save to replay buffer
            replay_dict = {
                "state": state,
                "action": action,
                "reward": reward,
                "state_next": state_next,
                "is_gameover": is_gameover,
            }
            replay_buffer = replay_buffer.append(replay_dict, ignore_index=True)

            # Remove for keep buffer size
            if replay_buffer.shape[0] > max_buffer_size:
                replay_buffer = replay_buffer.iloc[1:, :]
                buffer_size = max_buffer_size
            else:
                buffer_size = replay_buffer.shape[0]

            # Update state
            state = state_next

            # Update action model
            if frame_count % update_after_actions == 0 and buffer_size > batch_size:

                # Get indices of samples for replay buffers
                indices = np.random.choice(
                    range(buffer_size), size=batch_size, replace=False
                )

                batch_replay = replay_buffer.iloc[indices, :]

                # Using list comprehension to sample from replay buffer
                state_sample = np.array(list(batch_replay["state"]))
                action_sample = np.array(list(batch_replay["action"]))
                rewards_sample = np.array(list(batch_replay["reward"]))
                state_next_sample = np.array(list(batch_replay["state_next"]))
                done_sample = tf.convert_to_tensor(
                    np.array(list(batch_replay["is_gameover"].astype(float))),
                    dtype=np.float32,
                )

                # Build the updated Q-values for the sampled future states
                # Use the target model for stability
                future_rewards = target_model.predict(state_next_sample)

                # Q value = reward + discount factor * expected future reward
                updated_q_values = rewards_sample + gamma * tf.reduce_max(
                    future_rewards, axis=1
                )

                # If final frame set the last value to -1
                updated_q_values = updated_q_values * (1 - done_sample) - done_sample

                # Create a mask so we only calculate loss on the updated Q-values
                masks = tf.one_hot(action_sample, num_actions)

                with tf.GradientTape() as tape:
                    # Train the model on the states and updated Q-values
                    q_values = action_model(state_sample)

                    # Apply the masks to the Q-values to get the Q-value for action taken
                    q_action = tf.reduce_sum(tf.multiply(q_values, masks), axis=1)
                    # Calculate loss between new Q-value and old Q-value
                    loss = loss_function(updated_q_values, q_action)

                # Backpropagation
                grads = tape.gradient(loss, action_model.trainable_variables)
                optimizer.apply_gradients(zip(grads, action_model.trainable_variables))

            # Update target model
            if frame_count % update_target_network == 0:
                # Update the the target network with new weights
                target_model.set_weights(action_model.get_weights())
                # Log details
                template = "running reward: {:.2f} at episode {}, frame count {}"
                print(template.format(running_reward, episode_count, frame_count))

            if is_gameover:
                break

        # Update running reward
        episode_reward_history.append(episode_reward)
        if len(episode_reward_history) > 100:
            del episode_reward_history[:1]
        running_reward = np.mean(episode_reward_history)

  0%|          | 0/100000 [00:00<?, ?it/s]

running reward: 7.20 at episode 5, frame count 100
running reward: 8.75 at episode 12, frame count 200
running reward: 7.94 at episode 18, frame count 300
running reward: 7.58 at episode 24, frame count 400
running reward: 8.40 at episode 30, frame count 500
running reward: 8.84 at episode 37, frame count 600
running reward: 8.91 at episode 43, frame count 700
running reward: 9.24 at episode 49, frame count 800
running reward: 10.31 at episode 55, frame count 900
running reward: 10.31 at episode 61, frame count 1000
running reward: 9.91 at episode 68, frame count 1100
running reward: 9.31 at episode 75, frame count 1200
running reward: 8.80 at episode 82, frame count 1300
running reward: 9.09 at episode 88, frame count 1400
running reward: 9.02 at episode 95, frame count 1500
running reward: 9.01 at episode 101, frame count 1600
running reward: 9.05 at episode 107, frame count 1700
running reward: 8.76 at episode 114, frame count 1800
running reward: 8.61 at episode 121, frame count 19

KeyboardInterrupt: 

# test random play

In [None]:
np.random.seed(0)

In [None]:
env = MoveTheBoxGameEnvironment(
    unique_box_type=3,
)

In [None]:
temp_board_history, move_history, score_history = env.play()

In [None]:
env.board.shape

## animation

In [None]:
def animate(i):
    temp_board = temp_board_history[i]
    ax.clear()
    ax.pcolor(
        temp_board,
        cmap=env.box_color_map,
        vmin=0,
        vmax=env.unique_box_type,
        edgecolors="k",
        linewidth=2,
    )
    xticks_label = np.arange(0, env.board_width, 1)
    yticks_label = np.arange(0, env.board_height + 1, 1)
    # centering of call
    ax.set_xticks(xticks_label + 0.5, minor=False)
    ax.set_yticks(yticks_label + 0.5, minor=False)
    # label
    yticks_label = yticks_label.astype(np.str_)
    yticks_label[-1] = "top"
    ax.set_xticklabels(xticks_label)
    ax.set_yticklabels(yticks_label)
    ax.set_title(
        move_history[i]
        + f"\n total score : {np.sum(score_history[:i],dtype=int)}, round score : {score_history[i]}"
    )

In [None]:
fig = plt.figure(figsize=(3.5, 4.5))
ax = fig.add_subplot(111)
plt.close(fig)

ani = FuncAnimation(
    fig,
    animate,
    frames=len(temp_board_history),
    interval=500,
    repeat=False,
)

HTML(ani.to_jshtml())

In [None]:
from matplotlib.animation import PillowWriter

In [None]:
save_ani = False

In [None]:
# Save the animation as an animated GIF
if save_ani:
    ani.save("random_play_animation.gif", dpi=300, writer=PillowWriter(fps=1))