In [None]:
import tensorflow as tf
from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2
import numpy as np
import random
import copy
from tqdm import tqdm
import matplotlib.pyplot as plt

# Very roughly coded ruleset for the game with manually defined pieces, not super important to code this bit cleanly

# Finding legal actions takes significant computing power, this dictionary will spit back any previously computed
# legal actions. Will only overflow when given larger cases.
all_legal_actions = {}
class Polyomino():
    def __init__(self, h, w, idx):
        # 1d Encoding of board makes logic much simpler
        self.idx = idx
        self.h = h
        self.w = w
        self.state = np.zeros(h*w)
        self.position = 0
        # Representation not unique for pieces longer than the width of the board, does not scale but works for our cases
        # Currently commented out this set of tetrominoes, used when testing easier cases
        # self.pieces = np.array([
        #     [0,1,2,w+2], [0,2*w-1,w,2*w], [0,w,w+1,w+2], [0,w,2*w,1], [0,w,1,2], [0,1,w+1,2*w+1], [0,w-2,w-1,w], [0,w,2*w,2*w+1], # L piece
        #     [0,1,w+1,w+2], [0,2*w-1,w-1,w], [0,w-1,w,1], [0,w,w+1,2*w+1], # Z piece
        #     [0,1,w+1,2], [0,w-1,w,2*w], [0,w-1,w,w+1], [0,w,2*w,w+1], # T piece
        #     [0,1,2,3], [0,w,2*w,3*w], [0,1,w,w+1]]) # I and square piece
        # self.max_pieces = [2,2,2,2,2]
        # self.available_pieces = self.max_pieces.copy()
        # self.kernel = [(1,0),(2,0),(3,0),(-2,1),(-1,1),(0,1),(1,1),(2,1),(-1,2),(0,2),(1,2),(0,3)]
        # self.flat_kernel = [grid[1]*self.w + grid[0] for grid in self.kernel]

        # Sets of pentominoes and all of their rotations/reflections. Small optimization can be made by requiring
        # that these representations all start with 0. Other useful precomputed values are just manually coded.
        self.pieces = np.array([
            [0,1,2,w+2,w+3], [0,w,2*w-1,2*w,3*w-1], [0,1,w+1,w+2,w+3], [0,w-1,2*w-1,3*w-1,w], # Z piece
            [0,1,w-2,w-1,w], [0,w,2*w,2*w+1,3*w+1], [0,1,2,w-1,w], [0,w,w+1,2*w+1,3*w+1], # Z piece reflected
            [0,1,w+1,2*w+1,2*w+2], [0,w-2,2*w-2,w-1,w], [0,2*w-1,w,2*w,1], [0,w,w+1,w+2,2*w+2], # S piece
            [0,w,2*w,w+1,w+2], [0,1,w+1,2*w+1,2], [0,w-2,w-1,w,2*w], [0,2*w-1,w,2*w,2*w+1], # T piece
            [0,1,2,3,4], [0,w,2*w,3*w,4*w], [0,w-1,w,2*w,w+1]]) # I and cross piece
        self.max_pieces = [2,2,2,6,4]
        self.available_pieces = self.max_pieces.copy()
        self.kernel = [(1,0),(2,0),(3,0),(4,0),(-2,1),(-1,1),(0,1),(1,1),(2,1),(3,1),
                        (-2,2),(-1,2),(0,2),(1,2),(2,2),(-1,3),(0,3),(1,3),(0,4)]
        self.flat_kernel = [grid[1]*self.w + grid[0] for grid in self.kernel]

        # Retrieve correct piece for a given action and vice versa
        self.piece_divider = {0:0, 1:8, 2:12, 3:16, 4:18, 5:19}
        self.piece_from_action = {0:0,1:0,2:0,3:0,4:0,5:0,6:0,7:0,8:1,9:1,10:1,11:1,12:2,13:2,14:2,15:2,16:3,17:3,18:4}

    # Get all possible actions from a given state
    def get_legal_actions(self):
        state_piece = tuple(np.concatenate([self.state, self.available_pieces]))
        if state_piece in all_legal_actions:
            return all_legal_actions[state_piece]

        actions = []
        available_pieces = []
        prev_position = self.position
        for i, num_pieces in enumerate(self.available_pieces):
            if num_pieces != 0:
                available_pieces.extend(self.pieces[self.piece_divider[i] : self.piece_divider[i+1]])
        available_pieces = set(tuple(piece) for piece in available_pieces)
        legal_grids = self.generate_legal_grids()
        while (self.position < self.h*self.w) and (self.state[self.position] == 1 or
                    (actions := [(i, self.position) for i, piece in enumerate(self.pieces)
                        if tuple(piece) in available_pieces and all(x in legal_grids for x in piece[1:])]) == []):
            self.position += 1
            legal_grids = self.generate_legal_grids()

        self.position = prev_position
        all_legal_actions[state_piece] = actions
        return actions

    # Checks the general area around where the piece is to be placed and returns which grids are free
    def generate_legal_grids(self):
        x = self.position % self.w
        y = self.position // self.w
        kernel = [self.flat_kernel[i] for i, grid in enumerate(self.kernel)
                  if 0 <= grid[0]+x < self.w and grid[1]+y < self.h and self.state[self.position + self.flat_kernel[i]] == 0]
        return kernel

    # Place a piece
    def step(self, action):
        self.position = action[1]
        self.state[self.pieces[action[0]] + self.position] = 1
        self.available_pieces[self.piece_from_action[action[0]]] -= 1

    # Unplace a piece
    def undo(self, action):
        self.state[self.pieces[action[0]] + self.position] = 0
        self.position = action[1]
        self.available_pieces[self.piece_from_action[action[0]]] += 1

    # Resulting value of the game, only called once it has been determined that there are no legal actions remaining
    def value(self):
        value = np.sum(self.state)
        while (self.position < self.h*self.w) and (self.state[self.position] == 1):
            self.position += 1
        # Place a remaining piece allowing for overlaps or protruding edges to get a more continuous value function
        if self.position < self.h*self.w:
            piece = np.argmax(np.array(self.available_pieces))
            max_grids_covered = 0
            for action in self.pieces[self.piece_divider[piece] : self.piece_divider[piece+1]]:
                grids_covered = 0
                left_x = 0
                for i, grid in enumerate(action):
                    if grid + self.position < self.h*self.w:
                        offset = self.position % self.w
                        if i == 0 and grid > 0:
                            left_x = grid % self.w - self.w
                            if left_x + offset >= 0 and self.state[grid + self.position] == 0:
                                grids_covered += 1
                        else:
                            if grid % self.w + offset < self.w and self.state[grid + self.position] == 0:
                                grids_covered += 1
                if grids_covered > max_grids_covered:
                    max_grids_covered = grids_covered
            value += max_grids_covered
        return value / (self.h*self.w)

    # Method to reset the game to avoid creating too many game instances
    def reset_game(self):
        self.state = np.zeros(self.h*self.w)
        self.position = 0
        self.available_pieces = self.max_pieces.copy()

    def get_board(self):
        return np.reshape(self.state.copy(), (self.h, self.w,1))

    def get_available_pieces(self):
        return self.available_pieces.copy()

In [None]:
# Monte Carlo Tree Search in this code is modified such that it can play any specified number of games in tandem.
# Code becomes more complex to understand but saves significant time on computation

# Root node of tree search is defined differently in order to add Dirichlet noise and because it has no parents
class RootNode:
    def __init__(self, priors, legal_actions):
        self.parent = None
        self.children = None
        self.children_priors = None
        self.children_values = None
        self.children_visits = None
        self.children_legal_actions = None
        self.visits = 0
        self.priors = priors
        self.noise = []

    # Model assumes all actions are possible at all points in time, must be normalized before using
    def get_normalized_priors(self):
        priors = self.priors[self.children_legal_actions]
        priors = priors / np.sum(priors)
        if len(self.noise) == 0: self.noise = np.random.dirichlet([0.03]*len(priors))
        if len(priors) > 1: priors = priors = 0.75*priors + 0.25*self.noise
        return priors

# Information in tree is stored in a somewhat roundabout way, in order to access certain pieces of information about
# your current node, you have to go to its parent and then that parent has information about all its children
# Unsure if efficient but the code I was referencing from had this node structure so I left it as is
class Node:
    def __init__(self, idx, parent, action):
        self.children = None
        self.children_priors = None
        self.children_values = None
        self.children_visits = None
        self.children_legal_actions = None
        self.idx = idx
        self.parent = parent
        self.action = action

    def get_normalized_priors(self):
        priors = self.priors[self.children_legal_actions]
        return priors / np.sum(priors)
    @property
    def priors(self):
        return self.parent.children_priors[self.idx]
    @priors.setter
    def priors(self, x):
        self.parent.children_priors[self.idx] = x
    @property
    def visits(self):
        return self.parent.children_visits[self.idx]
    @visits.setter
    def visits(self, x):
        self.parent.children_visits[self.idx] = x
    @property
    def value(self):
        return self.parent.children_values[self.idx]
    @value.setter
    def value(self, x):
        self.parent.children_values[self.idx] = x

# Choosing function for search tree, exploration constant c can be modified freely
def get_ucb_scores(node, c=0.1):
    priors = node.get_normalized_priors()
    return node.children_values + c*priors*node.visits**0.5 / (node.children_visits+1)

# Traverse the tree until you reach an unexpanded node
def select(root, game):
    current = root
    while current.children is not None:
        ucb_scores = get_ucb_scores(current)
        current = current.children[np.argmax(ucb_scores)]
        game.step(current.action)
    return current

def expand(games, leaves, model, children_actions):
    states = []
    pieces = []
    # Record all the states for a single action taken in each direction, this way a call to the model to perform a prediction
    # only needs to be performed a single time.
    for i, game in enumerate(games):
        leaves[i].children_legal_actions = [action[0] for action in children_actions[i]]
        for action in children_actions[i]:
            position = game.position
            game.step(action)
            states.append(game.get_board())
            pieces.append(game.get_available_pieces())
            game.undo((action[0], position))
    states = tf.convert_to_tensor(np.array(states), dtype="float32")
    pieces = tf.convert_to_tensor(np.array(pieces), dtype="float32")

    children_priors, children_values = model(states, pieces)
    children_values = children_values[:,0]
    # Cannot embed multiple dimensions into prediction batch, so we must seperate the dimensions manually
    num_branches = np.cumsum([0]+[len(children_actions[i]) for i in range(len(games))])
    for i, leaf in enumerate(leaves):
        leaf.children = [Node(idx, leaf, action) for idx, action in enumerate(children_actions[i])]
        leaf.children_priors = children_priors.numpy()[num_branches[i]:num_branches[i+1]]
        leaf.children_values = children_values.numpy()[num_branches[i]:num_branches[i+1]]
        leaf.children_visits = np.zeros(len(children_actions[i]))

def backpropagate(leaf, value):
    current = leaf
    while current.parent is not None:
        current.value = (current.value * current.visits + value) / (current.visits + 1)  # incremental mean update
        current.visits += 1
        current = current.parent
    current.visits += 1

# Monte Carlo Tree search
def search(games, model, iterations):
    states = tf.convert_to_tensor(np.array([game.get_board() for game in games]), dtype="float32")
    pieces = tf.convert_to_tensor(np.array([game.get_available_pieces() for game in games]), dtype="float32")
    priors = model(states, pieces)[0].numpy()
    roots = [RootNode(priors[i], games[i].get_legal_actions()) for i in range(len(games))]

    positions = [game.position for game in games]
    states = [game.state.copy() for game in games]
    available_pieces = [game.available_pieces.copy() for game in games]
    for _ in range(iterations):
        leaves = np.array([select(roots[i], games[i]) for i in range(len(games))])
        children_actions = [games[i].get_legal_actions() for i in range(len(games))]
        while (unfinished_games:=[i for i in range(len(games)) if children_actions[i] != []]) != []:
            expand(games[unfinished_games], leaves[unfinished_games], model,
                            [actions for actions in children_actions if actions != []])
            children_actions = [[] for _ in range(len(games))]
            for game_idx in unfinished_games:
                leaves[game_idx] = leaves[game_idx].children[np.argmax(leaves[game_idx].children_values)]
                games[game_idx].step(leaves[game_idx].action)
                children_actions[game_idx] = games[game_idx].get_legal_actions()
        for i, game in enumerate(games):
            backpropagate(leaves[i], game.value())
            game.position = positions[i]
            game.state = states[i].copy()
            game.available_pieces = available_pieces[i].copy()
    return roots

In [None]:
ITERATIONS = 100 # Number of times to swap between self-play and neural net training
SELFPLAY_GAMES = 40 # Number of self-play games before training the neural net on a minibatch
SEARCH_ITERATIONS = 50 # Number of whole game simulations per tree
BATCH_SIZE = 256 # Number of states to train on per epoch
height = 8
width = 10
games = [Polyomino(height, width, idx) for idx in range(SELFPLAY_GAMES)]
action_size = len(games[0].pieces)

# Model takes in the number of remaining pieces as input once the convolution layers are finished
input1 = tf.keras.layers.Input(shape=(height,width,1))
input2 = tf.keras.layers.Input(shape=(5))
conv1 = tf.keras.layers.Conv2D(64,(3,3), activation = 'relu', padding = 'same')(input1)
conv1 = tf.keras.layers.Conv2D(128,(3,3), activation = 'relu', padding = 'same')(conv1)
conv1 = tf.keras.layers.Conv2D(256,(3,3), activation = 'relu', padding = 'same')(conv1)
flat = tf.keras.layers.Flatten()(conv1)
flat2 = tf.keras.layers.Dense(128, activation="relu")(flat)
merged = tf.keras.layers.Concatenate()([flat2, input2])
dense = tf.keras.layers.Dense(128, activation="relu")(merged)
output1 = tf.keras.layers.Dense(action_size, activation="softmax", name='pi')(dense)
dense = tf.keras.layers.Dense(128, activation="relu")(dense)
output2= tf.keras.layers.Dense(1, activation="sigmoid", name='v',
                               kernel_initializer=tf.keras.initializers.RandomNormal(mean=0.1))(dense)
model = tf.keras.models.Model(inputs=[input1,input2], outputs=[output1, output2])
model.compile(loss=['categorical_crossentropy', "mean_squared_error"], loss_weights=[1., 100.],
              optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001))

examples = []
avg_values = []
history = []
waiting_for_result = []
for _ in tqdm(range(ITERATIONS)):
    # Conversion to a frozen function increases prediction efficiency
    full_model = tf.function(lambda x: model(x))
    full_model = full_model.get_concrete_function([tf.TensorSpec(model.inputs[0].shape, model.inputs[0].dtype),
                                                   tf.TensorSpec(model.inputs[1].shape, model.inputs[1].dtype)])
    frozen_func = convert_variables_to_constants_v2(full_model)
    frozen_func.graph.as_graph_def()

    while (unfinished_games:=[game for game in games if game.get_legal_actions() != []]) != []:
        roots = search(np.array(unfinished_games), frozen_func, SEARCH_ITERATIONS)
        for idx, root in enumerate(roots):
            pi_example = root.children_visits
            actions = [child.action for child in root.children]

            # In order to train the model, we have to put back the invalid actions we took out
            # during tree search and give them a value of 0
            invalid_actions = [i for i in range(action_size) if i not in [action[0] for action in actions]]
            for action in invalid_actions:
                if action < len(pi_example):
                    pi_example = np.insert(pi_example, action, 0)
                else:
                    pi_example = np.append(pi_example, 0)

            waiting_for_result.append((unfinished_games[idx].get_board(), pi_example/np.sum(pi_example),
                                       unfinished_games[idx].get_available_pieces(), unfinished_games[idx].idx))
            # Stochastically determine the next piece to place
            choice = random.choices(population=[i for i in range(len(actions))],
                                    weights=root.children_visits/np.sum(root.children_visits))[0]
            action = root.children[choice].action
            unfinished_games[idx].step(action)

    # Resulting game outcome of each state is not known until the games are finished
    examples += [(s, a, p, games[idx].value()) for s, a, p, idx in waiting_for_result]
    waiting_for_result = []
    for i in range(SELFPLAY_GAMES):
        games[i].reset_game()

    # Model is trained on most recent 3000 states
    recent_examples = examples[-3000:].copy()
    random.shuffle(recent_examples)
    states = np.reshape(np.array([example[0] for example in recent_examples[0:BATCH_SIZE]]), (BATCH_SIZE,height,width,1))
    actions = np.array([example[1] for example in recent_examples[0:BATCH_SIZE]])
    pieces = np.array([example[2] for example in recent_examples[0:BATCH_SIZE]])
    values = np.array([example[3] for example in recent_examples[0:BATCH_SIZE]])
    print(values)
    avg_values.append(np.mean(np.array([example[3] for example in recent_examples])))
    if len(examples) >= 3000:
        history.append(model.fit([states, pieces], [actions, values], epochs=1))



In [None]:
# Play games manually after model is trained
games = [Polyomino(height, width, idx) for idx in range(1)]
game = games[0]
# game.step((12,0))
# game.step((0,1))
# game.step((2,4))
# game.step((8,7))
# game.step((15,21))
# game.step((11,22))
# game.step((17,19))
# game.step((12,24))
# game.step((17,30))
# game.step((18,34))
# game.step((8,35))
# game.step((8,47))
# game.step((10,51))
# game.step((14,57))
root = search(np.array(games), frozen_func, 500)[0]
print(root.children_values)
pi_example = root.children_visits
actionss = [child.action for child in root.children]
invalid_actions = [i for i in range(action_size) if i not in [action[0] for action in actionss]]
for action in invalid_actions:
    if action < len(pi_example):
        pi_example = np.insert(pi_example, action, 0)
    else:
        pi_example = np.append(pi_example, 0)
print(pi_example)
print(root.get_normalized_priors())
print(model([np.reshape(game.get_board(), (1,8,10,1)), np.reshape(game.get_available_pieces(), (1,5))]))

print(np.reshape(game.get_board(), (8,10)))
print(game.get_legal_actions())

In [None]:
# Section to visualize results post-training

plt.rcParams["figure.figsize"] = [6.4, 4.8]
plt.rcParams["figure.autolayout"] = True
data1 = [1 - value for value in avg_values]
data2 = [loss.history["loss"][0] for loss in history]
t = [i for i in range(len(data1))]
t2 = [i for i in range(len(data1)-len(data2),len(data1))]
fig, ax1 = plt.subplots()

ax1.set_xlabel('Iterations')
ax1.set_ylabel('1 - Game Result', color='red')
# ax1.set_ylim(0,0.65)
ax1.plot(t, data1, color='red')
ax1.tick_params(axis='y', labelcolor='red')
ax2 = ax1.twinx()

ax2.set_ylabel('Network Loss', color="blue")
ax2.plot(t2, data2, color="blue")
ax2.tick_params(axis='y', labelcolor="blue")
# plt.title("Training results over 8x10 board")
plt.show()