## Projektarbeit Mühle

### Imports

In [1]:
import PySimpleGUI as psGui
import matplotlib.pyplot as plt
import numpy as np
import time
import copy
from tensorflow import keras
import tensorflow as tf
import datetime as dt
import random
import sklearn.preprocessing as pre

### Class definitions

#### Environment

In [47]:
from typing import List
class MillEnv(object):
    def __init__(self):
        self.isPlaying: int = 1
        self.gamePhase: list = [0,0]
        self.moveNeeded: int = 0
        self.inHand: list = [9,9]
        self.onBoard: list = [0,0]
        self.checkerPositions: list = [[],[]]
        self.selected: int = -1
        self.board: np.ndarray = np.zeros(24)
        self.winner = 0
        self.columns: np.ndarray = np.array([[0,1,2],
                        [3,4,5],
                        [6,7,8],
                        [9,10,11],
                        [12,13,14],
                        [15,16,17],
                        [18,19,20],
                        [21,22,23],
                        [0,9,21],
                        [3,10,18],
                        [6,11,15],
                        [1,4,7],
                        [16,19,22],
                        [8,12,17],
                        [5,13,20],
                        [2,14,23],
                        ])
    def makeMove(self, move: int) -> bool:
        valid: bool = False
        last_state: tuple = self.getSummary(self.isPlaying)
        last_player: int = self.isPlaying
        if self.moveNeeded == 0: # Set Checker on position
            if self.board[move] == 0:
                self.board[move] = self.isPlaying
                self.checkerPositions[1 if self.isPlaying == 1 else 0].append(move)
                valid = True
                if self.gamePhase[1 if self.isPlaying == 1 else 0] == 2:
                    self.board[self.selected] = 0
                    self.moveNeeded = 1
                    self.checkerPositions[1 if self.isPlaying == 1 else 0].remove(self.selected)
                else:
                    self.inHand[1 if self.isPlaying == 1 else 0] -= 1
                    self.onBoard[1 if self.isPlaying == 1 else 0] += 1
                    if np.array(self.inHand).sum() == 0:
                        self.gamePhase = [1,1]
                        self.moveNeeded = 1
                self.isPlaying = -self.isPlaying
        elif self.moveNeeded == 1: # choose checker to move
            if self.board[move] == 1 * self.isPlaying and (~(self.getMoveFields(move).all()) or self.gamePhase[1 if self.isPlaying == 1 else 0] == 2):
                valid = True
                self.selected = move
                if self.gamePhase[1 if self.isPlaying == 1 else 0] == 2:
                    self.moveNeeded = 0
                else:
                    self.moveNeeded = 2
        elif self.moveNeeded == 2: # move checker up, down, left or right
            if self.getMoveFields(self.selected)[move] == 0:
                idxToMoveAxis: np.ndarray = np.where(self.getInRows(self.selected) == self.selected)
                idxToMove = list(zip(idxToMoveAxis[1], idxToMoveAxis[0]))
                order = idxToMove[0][1]
                valid = True
                self.board[self.selected] = 0
                last_state = self.getSummary(last_player)
                self.checkerPositions[1 if self.isPlaying == 1 else 0].remove(self.selected)
                if move == 0: # up
                    self.board[self.getInRows(self.selected)[1][idxToMove[abs(order-1)][0]-1]] = self.isPlaying
                    self.checkerPositions[1 if self.isPlaying == 1 else 0].append(self.getInRows(self.selected)[1][idxToMove[abs(order-1)][0]-1])
                if move == 1: # right
                    self.board[self.getInRows(self.selected)[0][idxToMove[order][0]+1]] = self.isPlaying
                    self.checkerPositions[1 if self.isPlaying == 1 else 0].append(self.getInRows(self.selected)[0][idxToMove[order][0]+1])
                if move == 2: # down
                    self.board[self.getInRows(self.selected)[1][idxToMove[abs(order-1)][0]+1]] = self.isPlaying
                    self.checkerPositions[1 if self.isPlaying == 1 else 0].append(self.getInRows(self.selected)[1][idxToMove[abs(order-1)][0]+1])
                if move == 3: # left
                    self.board[self.getInRows(self.selected)[0][idxToMove[order][0]-1]] = self.isPlaying
                    self.checkerPositions[1 if self.isPlaying == 1 else 0].append(self.getInRows(self.selected)[0][idxToMove[order][0]-1])
                self.selected = -1
                self.moveNeeded = 1
                self.isPlaying = -self.isPlaying
        elif self.moveNeeded == 3: # delete opponent checker
            threeInChosenRow: np.ndarray = abs(self.board[self.getInRows(move)].sum(axis=1)) == 3
            if self.board[move] == -1 * self.isPlaying and ~threeInChosenRow.any():
                valid = True
                self.board[move] = 0
                self.checkerPositions[1 if self.isPlaying == -1 else 0].remove(move)
                self.onBoard[1 if self.isPlaying == -1 else 0] -= 1
                if self.gamePhase[1 if self.isPlaying == -1 else 0] == 0:
                    self.moveNeeded = 0
                elif self.gamePhase[1 if self.isPlaying == -1 else 0] == 1:
                    if self.onBoard[1 if self.isPlaying == -1 else 0] == 3:
                        self.gamePhase[1 if self.isPlaying == -1 else 0] = 2
                    self.moveNeeded = 1
                elif self.gamePhase[1 if self.isPlaying == -1 else 0] == 2:
                    self.gamePhase = [3,3]
                    self.winner = last_player
                self.isPlaying = -self.isPlaying
        if last_state[0] < self.getSummary(last_player)[0]:
            self.isPlaying = last_player
            self.moveNeeded = 3
            canDelete: bool = False
            for pos in self.checkerPositions[1 if self.isPlaying == -1 else 0]:
                if ~(abs(self.board[self.getInRows(pos)].sum(axis=1)) == 3).any():
                    canDelete = True
                    break
            if not canDelete:
                valid = True
                if self.gamePhase[1 if self.isPlaying == -1 else 0] == 0:
                    self.moveNeeded = 0
                elif self.gamePhase[1 if self.isPlaying == -1 else 0] >= 1:
                    self.moveNeeded = 1
        if self.gamePhase[1 if last_player == -1 else 0] == 1:
            finished = True
            for pos in self.checkerPositions[1 if last_player == -1 else 0]:
                if ~(self.getMoveFields(pos).all()):
                    finished = False
                    break
            if finished:
                self.winner = last_player
                self.gamePhase = [3,3]
        return valid
    def isFinished(self):
        return self.winner
    def getBoard(self) -> np.ndarray:
        return copy.deepcopy(self.board)
    def getInRows(self, pos: int) -> np.ndarray:
        arrayPos: np.ndarray = self.columns == pos
        return self.columns[arrayPos.any(axis=1)]
    def reset(self):
        self.isPlaying: int = 1
        self.gamePhase: list = [0,0]
        self.moveNeeded: int = 0
        self.inHand: list = [9,9]
        self.onBoard: list = [0,0]
        self.checkerPositions: list = [[],[]]
        self.selected: int = -1
        self.board: np.ndarray = np.zeros(24)
        self.winner = 0
    def getSummary(self, player: int) -> (int, int, int, int):
        numTwoPlayerActual: int = 0
        numTwoPlayerOpponent: int = 0
        numThreePlayerActual: int = 0
        numThreePlayerOpponent: int = 0
        for column in self.columns:
            columnSum: int = self.board[column[0]] + self.board[column[1]] + self.board[column[2]]
            if columnSum == -2 * player:
                numTwoPlayerOpponent += 1
            elif columnSum  == 2 * player:
                numTwoPlayerActual += 1
            elif columnSum == -3 * player:
                numTwoPlayerOpponent += 1
            elif columnSum == 3 * player:
                numThreePlayerActual += 1
        return numThreePlayerActual, numThreePlayerOpponent, numTwoPlayerActual, numTwoPlayerOpponent
    def getMoveFields(self, pos: int) -> np.ndarray:
        moveFields: np.ndarray = np.zeros(4)
        chosenRows: np.ndarray = self.getInRows(pos)
        idxAxis: np.ndarray = np.where(chosenRows == pos)
        idx = list(zip(idxAxis[1], idxAxis[0]))
        order = idx[0][1]
        if idx[order][0] != 1:
            if idx[order][0] == 0:
                moveFields[3] = 2
                moveFields[1] = self.board[chosenRows[0][1]]
            elif idx[order][0] == 2:
                moveFields[1] = 2
                moveFields[3] = self.board[chosenRows[0][1]]
        else:
            moveFields[3] = self.board[chosenRows[0][0]]
            moveFields[1] = self.board[chosenRows[0][2]]
        if idx[abs(order-1)][0] != 1:
            if idx[abs(order-1)][0] == 0:
                moveFields[0] = 2
                moveFields[2] = self.board[chosenRows[1][1]]
            elif idx[abs(order-1)][0] == 2:
                moveFields[2] = 2
                moveFields[0] = self.board[chosenRows[1][1]]
        else:
            moveFields[0] = self.board[chosenRows[1][0]]
            moveFields[2] = self.board[chosenRows[1][2]]
        return moveFields


#### Agent

In [3]:
class Agent(object):
    def getPos(self, state: np.ndarray, temp: float, moveNeeded: int,network:keras.Model=None) -> np.ndarray:
        if network is None:
            if moveNeeded == 2:
                return np.random.randint(0,4)
            else:
                return np.random.randint(0,24)
        softmaxed_output = keras.backend.softmax(network(state.reshape(1,-1))/ temp)
        action_value = np.random.choice(np.array(softmaxed_output[0]), p= np.array(softmaxed_output[0]))
        pos: np.ndarray = np.argmax(softmaxed_output[0] == action_value)
        return pos

#### Graphics

In [8]:
class MillDisplayer(object):
    def __init__(self, MillEnvironment: MillEnv = None):
        psGui.theme("dark")
        self.millImage: str = "MühleBrett.png"
        self.blackCheckerImage: str  = "Schwarz.png"
        self.whiteCheckerImage: str = "Weiss.png"
        self.millEnv: MillEnv = MillEnv()
        if MillEnvironment is not None :
            self.millEnv = MillEnvironment
        self.ImageIDArray = np.array([])
        self.imageLocations = [(10,490), (225, 490), (440, 490),
                                (75,415), (225, 415), (375, 415),
                                (150,340), (225, 340), (310, 340),
                                (10,265), (75, 265), (150, 265),
                                (310,265), (375, 265), (440, 265),
                                (150,190), (225, 190), (310, 190),
                                (75,115), (225, 115), (375, 115),
                                (10,55), (225, 55), (440, 55)]
        self.graph = psGui.Graph(
                        canvas_size=(500, 500),
                        graph_bottom_left=(0, 0),
                        graph_top_right=(500, 500),
                        )
        self.statusTextBox = psGui.Text("Player "+self.getPlayerName(self.millEnv.isPlaying)+" is playing", size=(50, 1))
        self.layout_ = [[psGui.Button("Player vs. Player"),psGui.Button("Player vs. Agent"),psGui.Button("Agent vs. Agent")],
                        [self.statusTextBox],
                       [self.graph],
                       [psGui.Button("Close")]]
        self.window  = psGui.Window("Mill AI", layout=self.layout_)
        self.window.finalize()
        self.graph.DrawImage(filename=self.millImage, location=(0,500))
        self.activateClick()
        self.reloadEnv()
    def windowsLoop(self):
        while True:
            event, values = self.window.read()
            if event == psGui.WIN_CLOSED or event == 'Close': # if user closes window or clicks cancel
                break
            elif not event == "":
                self.reset()
        self.window.close()
    def makeMove(self, pos: int) -> bool:
        valid: bool = self.millEnv.makeMove(pos)
        if valid:
            self.reloadEnv()
        return valid
    def reloadEnv(self):
        self.setStatus("Player " + self.getPlayerName(self.millEnv.isPlaying) + " is playing - move needed: " + str(self.millEnv.moveNeeded))
        for imageID in self.ImageIDArray:
            self.graph.DeleteFigure(imageID)
        np.delete(self.ImageIDArray, np.s_[:])
        for case, location in zip(self.millEnv.getBoard(), self.imageLocations):
            if case == 1:
                self.ImageIDArray = np.append(self.ImageIDArray,self.graph.DrawImage(filename=self.blackCheckerImage, location=location))
            elif case == -1:
                self.ImageIDArray = np.append(self.ImageIDArray,self.graph.DrawImage(filename=self.whiteCheckerImage, location=location))
        self.window.refresh()
    def getClicked(self, event) -> int:
        for index, location in enumerate(self.imageLocations):
            x2, y2 = location
            if self.isInArea(event.x, -event.y + 500, x2, y2, 50, 50):
                return index
        return -1
    def setAfterClicked(self, event):
        pos = self.getClicked(event)
        if pos == -1:
            return False
        if self.millEnv.moveNeeded == 2:
            dif = self.millEnv.selected - pos
            if dif == 0:
                return False
            if dif == -1:
                pos = 1
            elif dif == 1:
                pos = 3
            elif dif < 0:
                pos = 2
            elif dif > 0:
                pos = 0
        return self.makeMove(pos)
    def isInArea(self, posX1: int, posY1: int, posX2: int, posY2: int, width: int, height: int) -> bool:
        if posX2 <= posX1 <= posX2 + width:
            if posY2 >= posY1 >= posY2 - height:
                return True
        return False
    def setStatus(self, status: str):
        self.statusTextBox.Update(status)
    def close(self):
        self.window.close()
    def activateClick(self):
        self.graph.TKCanvas.bind("<Button-1>",self.setAfterClicked)
    def deactivateClick(self):
        self.graph.TKCanvas.unbind("<Button-1>")
    def read(self, timout: bool=False):
        return self.window.read(1 if timout else None)
    def reset(self):
        self.millEnv.reset()
        self.reloadEnv()
    def getPlayerName(self, player: int) -> str:
        if player == 1:
            return "black"
        elif player == -1:
            return "white"
        else:
            return "not a player"

#### Moderated Graphics

In [54]:
class ModeratedGraphics(object):
    def __init__(self, modelList: List[keras.Model] = [None,None,None,None]):
        self.env = MillEnv()
        self.agent = Agent()
        self.graphics = MillDisplayer(self.env)
        self.graphics.reloadEnv()
        self.modelList = modelList
    def agentPlay(self):
        self.env.reset()
        self.graphics.deactivateClick()
        finished = 0
        while finished == 0:
            pos = self.agent.getPos(self.env.getBoard(),1, self.env.moveNeeded,self.modelList[self.env.moveNeeded])
            self.graphics.makeMove(pos)
            event, values = self.graphics.read(True)
            if self.eventHandler(event):
                return
            finished = self.env.isFinished()
        if not finished == 2:
            self.graphics.setStatus("player " + self.graphics.getPlayerName(finished) +" won")
        else:
            self.graphics.setStatus("The game ended in a draw")
    def playersVSPlayer(self):
        self.graphics.activateClick()
        self.graphics.reset()
        finished = 0
        while finished == 0:
            event, values = self.graphics.read(True)
            if self.eventHandler(event):
                return
            self.graphics.reloadEnv()
            finished = self.env.isFinished()
        if not finished == 2:
            self.graphics.setStatus("player " + self.graphics.getPlayerName(finished) +" won")
        else:
            self.graphics.setStatus("The game ended in a draw")
        self.graphics.deactivateClick()
    def playerVSAgent(self):
        self.graphics.activateClick()
        self.graphics.reset()
        finished = 0
        while finished == 0:
            event, values = self.graphics.read(True)
            if self.eventHandler(event):
                return
            elif self.env.isPlaying == 1:
                self.graphics.activateClick()
            else:
                self.graphics.deactivateClick()
                pos = self.agent.getPos(self.env.getBoard(),1, self.env.moveNeeded,self.modelList[self.env.moveNeeded])
                self.graphics.makeMove(pos)
            self.graphics.reloadEnv()
            finished = self.env.isFinished()
        if not finished == 2:
            self.graphics.setStatus("player " + self.graphics.getPlayerName(finished) +" won")
        else:
            self.graphics.setStatus("The game ended in a draw")
        self.graphics.deactivateClick()
    def playLoop(self):
        self.graphics.deactivateClick()
        self.playerVSAgent()
        finished = False
        while not finished:
            event, values = self.graphics.read()
            finished = self.eventHandler(event)
    def eventHandler(self, event) -> bool:
        if event == psGui.WIN_CLOSED or event == 'Close': # if user closes window or clicks cancel
            self.graphics.close()
            return True
        elif event == "Agent vs. Agent":
            self.agentPlay()
        elif event == "Player vs. Player":
            self.playersVSPlayer()
        elif event == "Player vs. Agent":
            self.playerVSAgent()
        return False

#### Memory

In [None]:
class Memory(object):
    def __init__(self, size: int):
        self.size = size
        self.curr_write_idx = 0
        self.available_samples = 0
        self.buffer = np.array([(np.zeros(24,dtype=np.float32), 0.0, 0.0, np.zeros(24,
                                dtype=np.float32), False) for index in range(self.size)], dtype=object)
        self.base_node, self.leaf_nodes = create_tree([0 for index in range(self.size)])
        self.frame_idx = 0
        self.action_idx = 1
        self.reward_idx = 2
        self.terminal_idx = 3
        self.beta = 0.4
        self.alpha = 0.6
        self.min_priority = 0.01

    def append(self, experience: tuple, priority: float):
        self.buffer[self.curr_write_idx] = experience
        self.update(self.curr_write_idx, priority)
        self.curr_write_idx += 1
        # reset the current writer position index if creater than the allowed size
        if self.curr_write_idx >= self.size:
            self.curr_write_idx = 0
        # max out available samples at the memory buffer size
        if self.available_samples + 1 < self.size:
            self.available_samples += 1
        else:
            self.available_samples = self.size - 1

    def update(self, idx: int, priority: float):
        update(self.leaf_nodes[idx], self.adjust_priority(priority))

    def adjust_priority(self, priority: float):
        return np.power(priority + self.min_priority, self.alpha)

    def sample(self, num_samples: int):
        sampled_idxs = []
        is_weights = []
        sample_no = 0
        while sample_no < num_samples:
            sample_val = np.random.uniform(0, self.base_node.value)
            samp_node = retrieve(sample_val, self.base_node)
            if samp_node.idx < self.available_samples - 1:
                sampled_idxs.append(samp_node.idx)
                p = samp_node.value / self.base_node.value
                is_weights.append((self.available_samples + 1) * p)
                sample_no += 1
        # apply the beta factor and normalise so that the maximum is_weight < 1
        is_weights = np.array(is_weights)
        is_weights = np.power(is_weights, -self.beta)
        is_weights = is_weights / np.max(is_weights)
        # now load up the state and next state variables according to sampled idxs
        return self.buffer[sampled_idxs], sampled_idxs, is_weights


#### Model

In [2]:
class DQModel(keras.Model):
    def __init__(self, hidden_size: int, num_actions: int, dueling: bool, dropoutRate: float):
        super(DQModel, self).__init__()
        self.dueling = dueling
        self.dropoutRate = dropoutRate
        self.dense1 = keras.layers.Dense(hidden_size *3, activation='relu',
                                         kernel_initializer=keras.initializers.he_normal())
        self.dense2 = keras.layers.Dense(hidden_size * 4, activation='relu',
                                         kernel_initializer=keras.initializers.he_normal())
        self.dense3 = keras.layers.Dense(hidden_size * 2, activation='relu',
                                         kernel_initializer=keras.initializers.he_normal())
        self.adv_dense1 = keras.layers.Dense(hidden_size * 3, activation='relu',
                                         kernel_initializer=keras.initializers.he_normal())
        self.adv_dense2 = keras.layers.Dense(hidden_size * 3, activation='relu',
                                         kernel_initializer=keras.initializers.he_normal())
        self.adv_out = keras.layers.Dense(num_actions,
                                          kernel_initializer=keras.initializers.he_normal())
        if dueling:
            self.v_dense = keras.layers.Dense(hidden_size * 3, activation='relu',
                                         kernel_initializer=keras.initializers.he_normal())
            self.v_out = keras.layers.Dense(1, kernel_initializer=keras.initializers.he_normal())
            self.lambda_layer = keras.layers.Lambda(lambda x: x- tf.reduce_mean(x))
            self.combine = keras.layers.Add()

    def call(self, input, **kwargs):
        x = self.dense1(input)
        x = keras.backend.dropout(x, self.dropoutRate)
        x = self.dense2(x)
        x = keras.backend.dropout(x, self.dropoutRate)
        x = self.dense3(x)
        adv = self.adv_dense1(x)
        adv = self.adv_dense2(adv)
        adv = self.adv_out(adv)
        if self.dueling:
            v = self.v_dense(x)
            v = self.v_out(v)
            norm_adv = self.lambda_layer(adv)
            combined = self.combine([v, norm_adv])
            return combined
        return adv

    @tf.function
    def traceable(self, input, **kwargs):
        return self(input, **kwargs)

    def changeDropoutRate(self, rate):
        self.dropoutRate = rate

NameError: name 'keras' is not defined

##### Node

In [None]:
class Node:
    def __init__(self, left, right, is_leaf: bool = False, idx = None):
        self.left = left
        self.right = right
        self.is_leaf = is_leaf
        self.value = sum(n.value for n in (left, right) if n is not None)
        self.parent = None
        self.idx = idx  # this value is only set for leaf nodes
        if left is not None:
            left.parent = self
        if right is not None:
            right.parent = self

    @classmethod
    def create_leaf(cls, value, idx):
        leaf = cls(None, None, is_leaf=True, idx=idx)
        leaf.value = value
        return leaf


def create_tree(input: list):
    nodes = [Node.create_leaf(v, i) for i, v in enumerate(input)]
    leaf_nodes = nodes
    while len(nodes) > 1:
        inodes = iter(nodes)
        nodes = [Node(*pair) for pair in zip(inodes, inodes)]

    return nodes[0], leaf_nodes

def retrieve(value: float, node: Node):
    if node.is_leaf:
        return node

    if node.left.value >= value:
        return retrieve(value, node.left)
    else:
        return retrieve(value - node.left.value, node.right)

def update(node: Node, new_value: float):
    change = new_value - node.value

    node.value = new_value
    propagate_changes(change, node.parent)


def propagate_changes(change: float, node: Node):
    node.value += change

    if node.parent is not None:
        propagate_changes(change, node.parent)


### Tests

#### Environment

In [5]:
env = MillEnv()
print(env.getInRows(14))

env.makeMove(0)
env.makeMove(3)
env.makeMove(2)
env.makeMove(4)
env.makeMove(1)
print(env.getBoard())
print(env.moveNeeded)
ThreeInChosenRow = abs(env.board[env.getInRows(0)].sum(axis=1)) == 3
print(~ThreeInChosenRow.any())
print(env.getMoveFields(4))
print(env.getSummary(1))

[[12 13 14]
 [ 2 14 23]]
[ 1.  1.  1. -1. -1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
  0.  0.  0.  0.  0.  0.]
3
False
[ 1.  0.  0. -1.]
(1, 0, 0, 1)


#### Agent

In [6]:
env = MillEnv()
ag = Agent()
for i in range(10):
    env.reset()
    while env.isFinished() == 0:
        valid = env.makeMove(ag.getPos(env.getBoard(), 0, env.moveNeeded))
    print(f"Gewonnen hat {env.isFinished()}")

Gewonnen hat 1
Gewonnen hat -1
Gewonnen hat -1
Gewonnen hat 1
Gewonnen hat 1
Gewonnen hat -1
Gewonnen hat -1
Gewonnen hat 1
Gewonnen hat 1
Gewonnen hat 1


#### Displayer

In [11]:
displayer = MillDisplayer()
displayer.windowsLoop()

#### Moderated  Play

In [55]:
moderateGraphics = ModeratedGraphics()
moderateGraphics.playLoop()