<a href="https://colab.research.google.com/github/bgyarbro/chopsticks-game/blob/master/chopsticks_colab_nn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%tensorflow_version 1.x

TensorFlow 1.x selected.


In [2]:
import random
import itertools
import collections
import math
import numpy as np
import tensorflow as tf

In [3]:
tf.__version__

'1.15.2'

In [18]:
def new_board():
    """This function returns a new starting board,
    with hands 1 1 1 1."""
    return ((1,1),
            (1,1))

def apply_move(board_state, move, side):
    """This function takes a move and side which turn it is to play
    and applies it to a given board state and returns the resulting
    board state. move = 1,2,3,or 4 meaning RR, RL, LR, LL"""

    state_list = list(list(s) for s in board_state)
    if side == 1:
        hand_to_move = state_list[0]
        hand_other = state_list[1]
    if side == -1:
        hand_to_move = state_list[1]
        hand_other = state_list[0]
    if move == (1,0):
        hand_other[0] = ( hand_other[0] + hand_to_move[0] ) % 5
    if move == (2,0):
        hand_other[1] = ( hand_other[1] + hand_to_move[0] ) % 5
    if move == (3,0):
        hand_other[0] = ( hand_other[0] + hand_to_move[1] ) % 5
    if move == (4,0):
        hand_other[1] = ( hand_other[1] + hand_to_move[1] ) % 5
    if side == 1:
        new_state_list = [hand_to_move, hand_other]
    if side == -1:
        new_state_list = [hand_other, hand_to_move]

    return tuple(tuple(s) for s in new_state_list)


def available_moves(board_state, side):
    """This function returns all the available moves for a
    given player. Not sure if this should return a list or
    use the yield keyword. And if using yield, should it 
    yield a tuple?"""

    if side == 1:
        if board_state[0][0] != 0:
            if board_state[1][0] != 0:
                yield (1,0)
        if board_state[0][0] != 0:
            if board_state[1][1] != 0:
                yield (2,0)
        if board_state[0][1] != 0:
            if board_state[1][0] != 0:
                yield (3,0)
        if board_state[0][1] != 0:
            if board_state[1][1] != 0:
                yield (4,0)
    if side == -1:  
        if board_state[1][0] != 0:
            if board_state[0][0] != 0:
                yield (1,0)
        if board_state[1][0] != 0:
            if board_state[0][1] != 0:
                yield (2,0)
        if board_state[1][1] != 0:
            if board_state[0][0] != 0:
                yield (3,0)
        if board_state[1][1] != 0:
            if board_state[0][1] != 0:
                yield (4,0)


def has_winner(board_state):
    """"""

    state_list = list(list(s) for s in board_state)
    if (state_list[0][0] == 0 and state_list[0][1] == 0):
        return -1
    if (state_list[1][0] == 0 and state_list[1][1] == 0):
        return 1
    else:
        return 0 # no one has won


def play_game(plus_player_func, minus_player_func):


    board_state = new_board()

    player_turn = 1

    while True:
        
        _available_moves = list(available_moves(board_state, player_turn))
        if not _available_moves:
            print("you broke it, no moves left")
            return 0

        if player_turn > 0:
            print(plus_player_func(board_state, 1, 100))
            move = plus_player_func(board_state, 1, 100)[1]
            #move = tuple(int(x.strip()) for x in input("Make your move " ).split(','))
        else:
            move = minus_player_func(board_state, -1, 100)[1]

        if move not in _available_moves:
            # if a player makes an invalid move the other player wins
            print("illergal move, yer other player wins")
            return -player_turn

        board_state = apply_move(board_state, move, player_turn)
        print("Player " + str(player_turn) + " to move")
        print("plays " + str(move))
        print(board_state)

        winner = has_winner(board_state)

        if winner != 0:
            print("we have a winner: side %s" % player_turn)
            return winner
        player_turn = -player_turn


def random_player(board_state, side, blank=""):
    moves = list(available_moves(board_state, side))
    return random.choice(moves)

In [5]:
def monte_carlo_sample(board_state, side):
    """Sample a single rollout from the current board_state and side. Moves are made to the current board_state until we
     reach a terminal state then the result and the first move made to get there is returned.
    Args:
        board_state (3x3 tuple of int): state of the board
        side (int): side currently to play. +1 for the plus player, -1 for the minus player
    Returns:
        (result(int), move(int,int)): The result from this rollout, +1 for a win for the plus player -1 for a win for
            the minus player, 0 for a draw
    """
    result = has_winner(board_state)
    if result != 0:
        return result, None
    moves = available_moves(board_state, side)
    if not moves:
        return 0, None

    # select a random move
    move = random.choice(moves)
    result, next_move = monte_carlo_sample(apply_move(board_state, move, side), -side)
    return result, move


def monte_carlo_tree_search(board_state, side, number_of_samples):
    """Evaluate the best from the current board_state for the given side using monte carlo sampling.
    Args:
        board_state (3x3 tuple of int): state of the board
        side (int): side currently to play. +1 for the plus player, -1 for the minus player
        number_of_samples (int): number of samples rollouts to run from the current position, the higher the number the
            better the estimation of the position
    Returns:
        (result(int), move(int,int)): The average result for the best move from this position and what that move was.
    """
    move_wins = collections.defaultdict(int)
    move_samples = collections.defaultdict(int)
    for _ in range(number_of_samples):
        print(_)
        result, move = monte_carlo_sample(board_state, side)
        # store the result and a count of the number of times we have tried this move
        if result == side:
            move_wins[move] += 1
        move_samples[move] += 1

    # get the move with the best average result
    move = max(move_wins, key=lambda x: move_wins.get(x) / move_samples[move])

    return move_wins[move] / move_samples[move], move


def _upper_confidence_bounds(payout, samples_for_this_machine, log_total_samples):
    return payout / samples_for_this_machine + math.sqrt((2 * log_total_samples) / samples_for_this_machine)


def monte_carlo_tree_search_uct(board_state, side, number_of_samples):
    """Evaluate the best from the current board_state for the given side using monte carlo sampling with upper
    confidence bounds for trees.
    Args:
        board_state (3x3 tuple of int): state of the board
        side (int): side currently to play. +1 for the plus player, -1 for the minus player
        number_of_samples (int): number of samples rollouts to run from the current position, the higher the number the
            better the estimation of the position
    Returns:
        (result(int), move(int,int)): The average result for the best move from this position and what that move was.
    """
    state_results = collections.defaultdict(float)
    state_samples = collections.defaultdict(float)

    for _ in range(number_of_samples):
        #print(_)
        current_side = side
        current_board_state = board_state
        first_unvisited_node = True
        rollout_path = []
        result = 0
        
        ticker = 0
        while result == 0:
            if ticker > 1000000:
                result = 0
                break
            #if ticker % 100000 == 0:
                #print(ticker / 1000000)
            ticker += 1
            move_states = {move: apply_move(current_board_state, move, current_side)
                           for move in available_moves(current_board_state, current_side)}

            if not move_states:
                result = 0
                break

            #for move in available_moves(current_board_state, current_side):
            #    print(move)

            #print(move_states)
            #print("state samples")
            #for key, val in state_samples.items():
            #    print(key, ":", val)

            if all((state in state_samples) for _, state in move_states):
                log_total_samples = math.log(sum(state_samples[s] for s in move_states.values()))
                move, state = max(move_states, key=lambda _, s: _upper_confidence_bounds(state_results[s],
                                                                                         state_samples[s],
                                                                                         log_total_samples))
            else:
                move = random.choice(list(move_states.keys()))

            current_board_state = move_states[move]

            if first_unvisited_node:
                rollout_path.append((current_board_state, current_side))
                if current_board_state not in state_samples:
                    first_unvisited_node = False

            current_side = -current_side

            result = has_winner(current_board_state)

        for path_board_state, path_side in rollout_path:
            state_samples[path_board_state] += 1.
            result *= path_side
            # normalize results to be between 0 and 1 before this it between -1 and 1
            result /= 2.
            result += .5
            state_results[path_board_state] += result

    move_states = {move: apply_move(board_state, move, side) for move in available_moves(board_state, side)}

    move = max(move_states, key=lambda x: state_results[move_states[x]] / state_samples[move_states[x]])

    return state_results[move_states[move]] / state_samples[move_states[move]], move

In [6]:
#play_game(monte_carlo_tree_search_uct, monte_carlo_tree_search_uct)

In [7]:
# hyperparameters
HIDDEN_NODES = (100, 100, 100)  # number of hidden layer neurons
INPUT_NODES = (2 * 2)
BATCH_SIZE = 100
LEARN_RATE = 1e-4
OUTPUT_NODES = INPUT_NODES
PRINT_RESULTS_EVERY_X = 200 #

In [8]:
input_placeholder = tf.placeholder("float", shape=(None, INPUT_NODES))
reward_placeholder = tf.placeholder("float", shape=(None,))
actual_move_placeholder = tf.placeholder("float", shape=(None, OUTPUT_NODES))

In [10]:
hidden_weights_1 = tf.Variable(tf.truncated_normal((INPUT_NODES, HIDDEN_NODES[0]), stddev=1. / np.sqrt(INPUT_NODES)))
hidden_weights_2 = tf.Variable(
    tf.truncated_normal((HIDDEN_NODES[0], HIDDEN_NODES[1]), stddev=1. / np.sqrt(HIDDEN_NODES[0])))
hidden_weights_3 = tf.Variable(
    tf.truncated_normal((HIDDEN_NODES[1], HIDDEN_NODES[2]), stddev=1. / np.sqrt(HIDDEN_NODES[1])))
output_weights = tf.Variable(tf.truncated_normal((HIDDEN_NODES[-1], OUTPUT_NODES), stddev=1. / np.sqrt(OUTPUT_NODES)))

hidden_layer_1 = tf.nn.relu(
    tf.matmul(input_placeholder, hidden_weights_1) + tf.Variable(tf.constant(0.01, shape=(HIDDEN_NODES[0],))))
hidden_layer_2 = tf.nn.relu(
    tf.matmul(hidden_layer_1, hidden_weights_2) + tf.Variable(tf.constant(0.01, shape=(HIDDEN_NODES[1],))))
hidden_layer_3 = tf.nn.relu(
    tf.matmul(hidden_layer_2, hidden_weights_3) + tf.Variable(tf.constant(0.01, shape=(HIDDEN_NODES[2],))))
output_layer = tf.nn.softmax(
    tf.matmul(hidden_layer_3, output_weights) + tf.Variable(tf.constant(0.01, shape=(OUTPUT_NODES,))))

In [11]:
policy_gradient = tf.reduce_sum(tf.reshape(reward_placeholder, (-1, 1)) * actual_move_placeholder * output_layer)
train_step = tf.train.RMSPropOptimizer(LEARN_RATE).minimize(-policy_gradient)

Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor


In [13]:
sess = tf.Session()
sess.run(tf.global_variables_initializer())

In [14]:
board_states, actual_moves, rewards = [], [], []

In [15]:
episode_number = 1
results = collections.deque()

In [16]:
def make_move(board_state, side):
    board_state_flat = np.ravel(board_state)
    board_states.append(board_state_flat)
    probability_of_actions = sess.run(output_layer, feed_dict={input_placeholder: [board_state_flat]})[0]

    try:
        move = np.random.multinomial(1, probability_of_actions)
    except ValueError:
        # sometimes because of rounding errors we end up with probability_of_actions summing to greater than 1.
        # so need to reduce slightly to be a valid value
        move = np.random.multinomial(1, probability_of_actions / (sum(probability_of_actions) + 1e-7))

    actual_moves.append(move)

    move_index = move.argmax()
    return move_index / 4, move_index % 4

In [19]:
while True:
    reward = play_game(make_move, random_player)

    results.append(reward)
    if len(results) > PRINT_RESULTS_EVERY_X:
        results.popleft()

    last_game_length = len(board_states) - len(rewards)

    # we scale here so winning quickly is better winning slowly and loosing slowly better than loosing quick
    reward /= float(last_game_length)

    rewards += ([reward] * last_game_length)

    episode_number += 1

    if episode_number % BATCH_SIZE == 0:
        normalized_rewards = rewards - np.mean(rewards)
        normalized_rewards /= np.std(normalized_rewards)

        sess.run(train_step, feed_dict={input_placeholder: board_states,
                                        reward_placeholder: normalized_rewards,
                                        actual_move_placeholder: actual_moves})

        # clear batches
        del board_states[:]
        del actual_moves[:]
        del rewards[:]

    if episode_number % PRINT_RESULTS_EVERY_X == 0:
        print("episode: %s win_rate: %s" % (episode_number, 0.5 + sum(results) / (PRINT_RESULTS_EVERY_X * 2.)))

TypeError: ignored