In [None]:
import os
import time 

import numpy as np

In [None]:
def movable_condition(first, second):
    """Define whether two close tile can be merged"""
    return ((first == 0) and (second != 0)) or \
           ((np.any(np.array([first, second]) > 2)) and (first == second)) or \
           ((first, second) == (1, 2)) or \
           ((second, first) == (1, 2))


def can_move_col(array):
    """Check whether an array can be merged."""
    for i in range(3):
        first, second = array[i], array[i + 1]
        if movable_condition(first, second):
            return True
    return False


def allowed_moves(state):
    """Find allowed moves for a certain game state"""
    allowed_actions = []
    # Check whether the agent can swipe up
    if np.any([can_move_col(col) for col in state.T]):
        allowed_actions.append('w')
    # Check whether the agent can swipe down
    if np.any([can_move_col(col[::-1]) for col in state.T]):
        allowed_actions.append('s')
    # Check whether the agent can swipe left
    if np.any([can_move_col(row) for row in state]):
        allowed_actions.append('a')
    # Check whether the agent can swipe right
    if np.any([can_move_col(row[::-1]) for row in state]):
        allowed_actions.append('d')
    return allowed_actions


def try_move_col(array):
    """Return the next state for an array"""
    new_array = array.copy()
    for i in range(3):
        first, second = array[i], array[i + 1]
        if movable_condition(first, second):
            new_array[i] = first + second
            new_array[i + 1:] = np.append(new_array[i + 2:], 0)
            return new_array
        else:
            continue


def get_reward(current_state, next_state):
    """Given the current state and the next state, return the reward for the transition action."""
    reward = 0
    # maximum number gets larger
    reward += (np.max(next_state) - np.max(current_state))
    # more merge
    reward += (get_score(next_state) - get_score(current_state))
    return reward


def get_score(state):
    """Get score given a game state"""
    all_power = [3**(np.log2(num/3)+1) for row in state for num in row if num > 3]
    return np.sum(all_power)


def try_move(current_state, action):
    """Given the state and the chosen action, return the next state"""
    next_state = current_state.copy()
    allowed_actions = allowed_moves(current_state)
    if action not in allowed_actions:
        print(f'Can not move {action}')
        return current_state

    # Swipe up
    if action == 'w':
        for i, col in enumerate(current_state.T):
            if can_move_col(col):
                next_state.T[i] = try_move_col(col)
    # Swipe down
    elif action == 's':
        for i, col in enumerate(current_state.T):
            if can_move_col(col[::-1]):
                new_array = try_move_col(col[::-1])
                next_state.T[i] = new_array[::-1]
    # Swipe left
    elif action == 'a':
        for i, col in enumerate(current_state):
            if can_move_col(col):
                next_state[i] = try_move_col(col)
    # Swipe right
    elif action == 'd':
        for i, col in enumerate(current_state):
            if can_move_col(col[::-1]):
                new_array = try_move_col(col[::-1])
                next_state[i] = new_array[::-1]

    elif action == 'stop':
        return current_state, get_score(current_state)

    reward = get_reward(current_state, next_state)

    return next_state, reward


class Threes:
    """
    This is a simulated environment of game Threes.
    Swipe direction: {left: 'a', right: 'd', up: 'w', down: 's'}.
    There are two levels for this game: ['hard', 'easy'], in which default level is 'hard'.
    """

    def __init__(self, level='hard'):
        """Initialize the game"""
        self.state = np.zeros((4, 4))
        x, y = np.random.choice(4, 2)
        self.state[x, y] = np.random.choice([1, 2])
        self.score = get_score(self.state)
        self.level = level

    def playable(self):
        """Check whether the game is still playable."""
        if len(allowed_moves(self.state)) != 0:
            return True
        else:
            return False

    def gen_new_tile(self):
        """Generate a new tile after each move."""

        # Basic list of numbers that can be selected
        choice_list = [1, 2, 3]

        # More number can be selected when the maximum number on the grid gets larger
        if np.max(self.state) % 3 == 0:
            max_power = np.int(np.log2(np.max(self.state) / 3))
            choice_list += [3 * 2 ** i for i in range(max_power + 1)]

        # Generate the probabilities for each candidate
        if self.level == 'hard':
            norm_prob = [1 / len(choice_list)] * len(choice_list)
        else:
            prob = [i + 1 for i in range(len(choice_list))][::-1]
            norm_prob = [num / sum(prob) for num in prob]

            # return next number
        return np.random.choice(choice_list, p=norm_prob)

    def make_move(self, action):
        """Given the action, the game goes to the next state"""
        if action == 'stop':
            self.score = get_score(self.state)
            return self.state, self.score

        self.state = try_move(self.state, action)[0]

        # generate new tile for the current state
        try:
            new_tile = self.gen_new_tile()
        except:
            return
        loc_0 = np.argwhere(self.state == 0)
        x, y = loc_0[np.random.choice(len(loc_0))]

        # Update the game state and scores
        self.state[x, y] = new_tile
        self.score = get_score(self.state)


In [None]:
def hashable(state):
    """Switch state matrix to string matrix, so as to make it hashable."""
    return ', '.join([str(int(i)) for row in state for i in row])


def select_best_move_(game):
    """Selects best move that can get the maximum reward for the next state"""
    possible_next_actions = allowed_moves(game.state)
    state_action_score = [(move, try_move(game.state, move)[1])
                          for move in possible_next_actions]
    max_score = max(state_action_score, key=lambda item: item[1])[1]
    max_move_list = [move for move, score in state_action_score if score == max_score]
    best_next_move = np.random.choice(max_move_list)
    return best_next_move


class Agent:
    """
    This is an agent to play game "Threes". There are two main mode to play the game. One is human mode and the other
    is computer mode(demo game). For the computer mode, there are currently three methods to play the game: [
    'random', 'max', 'q-learning'] The functions here are inspired by
    "https://github.com/brianspiering/rl-course/blob/master/labs/lab_4_tic_tac_toe/lab_4_tic_tac_toe.ipynb"
    """

    def __init__(self, threes, epsilon=0.1, alpha=1.0):
        """Initial the Agent."""
        self.V = dict()
        self.NewGame = threes
        self.epsilon = epsilon
        self.alpha = alpha

    def state_value(self, game_state, action):
        """Look up state value. If never seen state, then assume 0."""
        return self.V.get((hashable(game_state), action), 0.0)

    def state_values(self, game_state, actions):
        """Return a dictionary of state-value pair. It is for finding the action that can maximize the q value """
        return dict(((hashable(game_state), action), self.state_value(game_state, action)) for action in actions)

    def learn_game(self, n_episodes=1000):
        """Let's learn through complete experience to get that reward."""
        for e in range(1, n_episodes + 1):
            game = self.NewGame()
            while game.playable():
                action, reward = self.learn_from_move(game)
            self.V[(hashable(game.state), action)] = reward

    def learn_from_move(self, game):
        """The heart of Q-learning."""

        current_state = game.state
        # Select next action with epsilon-greedy method
        selected_move = self.learn_select_move(game)

        # Next state s(t+1) and reward r
        next_state, reward = try_move(current_state, selected_move)

        # Current state Q value Q(s, a)
        old_value = self.state_value(current_state, selected_move)

        # best action a* for the next state with the largest q value Q(st+1, a*)
        next_max_V, next_max_move = self.select_best_move(game, next_state)

        # Q-learning that updates the q-value
        self.V[(hashable(current_state), selected_move)] = (1 - self.alpha) * old_value + self.alpha * (
                    reward + next_max_V)

        game.make_move(selected_move)
        return selected_move, reward

    def learn_select_move(self, game):
        """Exploration and exploitation"""
        if np.random.uniform(0, 1) < self.epsilon:
            selected_action = np.random.choice(allowed_moves(game.state))
        else:
            selected_action = self.select_best_move(game, game.state)[1]
        return selected_action

    def select_best_move(self, game, game_state):
        """Selects best move for given state(Greedy)"""
        state_action_values = self.state_values(game_state, allowed_moves(game_state))
        max_V = max(state_action_values.values())
        max_move = np.random.choice([state_action[1] for state_action, v in state_action_values.items() if v == max_V])
        return max_V, max_move

    def demo_game(self, level='hard', mode='random'):
        """Agent plays with different policies (random/max/q-learning)"""
        game = self.NewGame()
        game.level = level
        print(game.state)
        while game.playable():
            time.sleep(0.5)
            if mode == 'random':
                next_action = np.random.choice(allowed_moves(game.state))
            elif mode == 'max':
                next_action = select_best_move_(game)
            elif mode == 'q-learning':
                next_action = self.select_best_move(game, game.state)[1]
            else:
                return "No such mode"
            
            print(f'Action: {next_action}')
            game.make_move(next_action)
            print(game.state)
        return game.score

    def human_mode(self):
        """Interactive mode"""
        game = self.NewGame()
        level = input('level: easy or hard? \r')
        game.level = level
        print(game.state)
        while game.playable():
            human_allowed_moves = allowed_moves(game.state) + ['stop']
            human_move = input(f'You can input {human_allowed_moves} \r')
            if human_move == 'stop':
                return f'Game over! Your score is {game.score}'
            game.make_move(human_move)
            print(game.state)
        return f'Game over! Your score is {game.score}'

In [None]:
human_game = Agent(Threes)

print("Demo - Human Mode:")
human_game.human_mode()

In [None]:
random_game = Agent(Threes)

print("Demo - Random Mode:")
random_game.demo_game()

In [None]:
greedy_game = Agent(Threes)

print("Demo - Greedy Mode:")
random_game.demo_game(mode='max')

In [None]:
human_scores = [135, 2259, 4104, 693, 459, 10809, 20781, 2043, 15120, 486]

In [None]:
np.mean(human_scores)

In [None]:
np.max(human_scores)

In [None]:
import pandas as pd

In [None]:
def print_demo_game_stats(agent, n_games=100, level='Hard', mode='random'):
    """print the result(mean score and max score) of playing demo game"""
    results = [agent.demo_game(level=level, mode=mode) for _ in range(n_games)]
    mean_score, max_score = np.mean(results), np.max(results)
    print(f"mean score: {mean_score} | max score: {max_score} ", end='\r')
    return mean_score, max_score


def train_qlearning(agent, n_games=100, n_episodes=100, n_training_blocks=10, level='Hard'):
    """Given agent, do more training. Return (hopefully) improved agent."""
    for n_training_block in range(1, n_training_blocks + 1):
        agent.learn_game(n_episodes)
        print(f"After {n_episodes * n_training_block:,} learning games:")
        mean_score, max_score = print_demo_game_stats(agent, n_games=n_games, level=level, mode='q-learning')
    return agent, mean_score, max_score

In [None]:
random_scores_mean = []
random_scores_max = []
for _ in range(135):
    random_game = Agent(Threes)
    mean_score_random, max_score_random = print_demo_game_stats(random_game, n_games=100, level='Hard', mode='random')
    print(f'{_:04d} / 134', end='\r')
    random_scores_mean.append(mean_score_random)
    random_scores_max.append(max_score_random)

In [None]:
random_df = pd.DataFrame([random_scores_mean, random_scores_max]).T
random_df.columns = ['mean_score', 'max_score']
random_df.sort_values(mean_score, ascending=False)

In [None]:
random_df.mean()

In [None]:
greedy_scores_mean = []
greedy_scores_max = []
for _ in range(135):
    greedy_game = Agent(Threes)
    mean_score_greedy, max_score_greedy = print_demo_game_stats(greedy_game, n_games=100, level='Hard', mode='max')
    print(f'{_:04d} / 134', end='\r')
    greedy_scores_mean.append(np.mean(mean_score_greedy))
    greedy_scores_max.append(np.max(max_score_greedy))

In [None]:
greedy_df = pd.DataFrame([greedy_scores_mean, greedy_scores_max]).T
greedy_df.columns = ['mean_score', 'max_score']
greedy_df.sort_values('mean_score', ascending=False)

In [None]:
greedy_df.mean()

In [None]:
random_df['mode'] = 'random'
greedy_df['mode'] = 'greedy'
com_df = pd.concat([random_df, greedy_df])
com_df