In [1]:
%pip install open-spiel==1.3 python-socketio==4.6.1 python-chess==1.999 numpy

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [None]:
URL_KRIEGSPER= "https://omega-chess.ddns.net/?"

In [None]:
from dataclasses import dataclass
import chess
import numpy as np
from copy import deepcopy
import random
import enum

from open_spiel.python.algorithms import mcts
from open_spiel.python.bots import human
from open_spiel.python.bots import uniform_random
import pyspiel


@dataclass
class MCSTPlayerConfig:
    roll_outs: int =1
    utc:int  = 25
    max_sim: int = 333


class GameType(enum.IntEnum):
    CHESS = 1#'chess'
    DARK_CHESS = 2#'dark_chess'
    KRIEGSPIEL = 3#'kriegspiel'

class Actions(enum.IntEnum):
  MAKE_BEST_MOVE = 1#"make_best_move"
  LIST_MOVE = 2#"list_move"
  MOVE = 3#"move"
  GET_VALID_MOVE= 4#"get_valid_move"

class GameStateOutput:
  white_view: str
  black_view: str
  possible_moves: list[str] | None = None
  best_move: str | None = None
  fen: str
  finish: bool
  general_message: str | None = None



@dataclass
class GameStateInput:
  game_type:GameType
  fen: str
  action:Actions
  move:str|None

KRIEGSPIEL_INVALID_MOVE = "Illegal move."

def _init_bot(bot_type, game, config):
    """Initializes a bot by type."""
    rng = np.random.RandomState(None)
    if bot_type == "mcts":
        evaluator = mcts.RandomRolloutEvaluator(
            config.roll_outs, rng
        )  # How many rollouts to do.
        return mcts.MCTSBot(
            game,
            config.utc,  # UTC exploration constant
            config.max_sim,  # max number of simulations
            evaluator,
            random_state=rng,
            solve=True,  # use MCTS-Solver
            verbose=False,
        )
    #  if bot_type == "random":
    #  return uniform_random.UniformRandomBot(1, rng)
    # if bot_type == "human":
    #  return human.HumanBot()
    raise ValueError("Invalid bot type: %s" % bot_type)


def _get_best_move(game, state):
    bot = _init_bot("mcts", game, MCSTPlayerConfig())
    if state.is_chance_node():
        outcomes = state.chance_outcomes()
        action_list, prob_list = zip(*outcomes)
        action = np.random.choice(action_list, p=prob_list)
    elif state.is_simultaneous_node():
        raise ValueError("Game cannot have simultaneous nodes.")
    else:
        action = bot.step(state)

    return action


def _create_state(game_name, fen=None):
    # it use undocumented new_initial_state(fen)
    # to generate from the fen in the case of chess
    # other variant use the fen parameter
    isChess = game_name == "chess"
    isValidFEN = fen is not None
    params = {}
    if not isChess and isValidFEN:
        params["fen"] = fen
    game_name= 'kriegspiel'
    game = pyspiel.load_game(game_name, params)
    state = (
        game.new_initial_state(fen)
        if isChess and isValidFEN
        else game.new_initial_state()
    )
    return game, state


def _legal_action_to_uci(game_type, state, fen):
    return _actions_to_uci(game_type, state, fen, state.legal_actions())


def action_to_uci(game_type, state, fen, action):
    return _actions_to_uci(game_type, state, fen, [action])[0]


def _check_castling(state: str):
    if state.startswith("O-O-O"):
        return "e1c1"
    if state.startswith("O-O"):
        return "e1g1"
    if state.startswith("o-o-o"):
        return "e8c8"
    if state.startswith("o-o"):
        return "e8g8"
    raise ValueError("Invalid Move")


def _actions_to_uci(game_type, state, fen, actions=None):
    # This code is difficult because it relays on the specific implementation
    # of the libraries rather than the actual api
    board = chess.Board(fen)

    # transform the san to uci
    transform = lambda x: board.parse_san(state.action_to_string(x)).uci()
    if game_type == "kriegspiel":
        # kriegspiel is already in uci
        transform = lambda x: state.action_to_string(x)

    try:
        return list(map(transform, actions))
    except:
        # This changes the function used to generate the move, in order to accept illegals moves
        board.generate_legal_moves = board.generate_pseudo_legal_moves

        def _handle_check_castling(action):
            try:
                return transform(action)
            except:
                return _check_castling(state.action_to_string(action))

        return list(map(_handle_check_castling, actions))


def _fen(state, game_type):
    if game_type == GameType.CHESS:
        return state.observation_string()
    else:
        return state.to_string()


def _custom_observation_string(state, player_id: int, game_type: str):
    """
    This function is just a quick way to implement the right view, but it is not mantainable,
    TODO: refactor me, use openspiel api somehow
    """

    if game_type != GameType.KRIEGSPIEL:
        return state.observation_string(player_id)

    # kriegspiel
    fen = state.to_string()
    splitted_fen = fen.split(" ")

    first_part, rest = splitted_fen[0], splitted_fen[1:]

    all_table = [["."] * 8 for _ in range(8)]
    checker = (
        lambda x: x.isalpha() and x.islower()
        if player_id == 0
        else x.isalpha() and x.isupper()
    )

    # compute correct view for the player
    rows = first_part.split("/")
    for i in range(len(rows)):
        offsets = 0
        for j in range(len(rows[i])):
            if rows[i][j].isdigit():
                for k in range(int(rows[i][j])):
                    all_table[i][offsets + k] = "."
                offsets += int(rows[i][j])
            elif checker(rows[i][j]):
                all_table[i][offsets] = rows[i][j]
                offsets += 1
            else:
                all_table[i][offsets] = "."
                offsets += 1

    return "/".join(["".join(row) for row in all_table]) + " " + " ".join(rest)


def _create_chat_bot_message(state, player_id: int, input_struct: GameStateInput):
    raise NotImplementedError("Not implemented yet")


def _create_observation_string(state, player_id: int, input_struct: GameStateInput):
    """
    Parameters
    ----------
    state : pyspiel.State
      The state of the game.

    player_id : int
      The player for which the observation string is generated.
    """

    match input_struct.game_type:
        case GameType.CHESS:
            return state.observation_string(player_id)
        case GameType.KRIEGSPIEL:
            return state.observation_string(player_id)
        case GameType.DARK_CHESS:
            return ""
    # return _create_chat_bot_message(state, player_id, input_struct)


def dispatch(game_state_input: GameStateInput) -> GameStateOutput:
    fen = game_state_input.fen
    game, state = _create_state(game_state_input.game_type, fen)
    out = GameStateOutput()

    match game_state_input.action:
        case Actions.MOVE:
            try:
                index = _legal_action_to_uci(
                    game_state_input.game_type, state, fen
                ).index(game_state_input.move)
            except ValueError:
                raise ValueError("Invalid Move")

            state.apply_action(state.legal_actions()[index])
            fen = _fen(state, game_state_input.game_type)

        case Actions.MAKE_BEST_MOVE:
            action = _get_best_move(game, state)
            out.best_move = action_to_uci(
                game_state_input.game_type, state, fen, action
            )
            state.apply_action(action)
            fen = _fen(state, game_state_input.game_type)

        case Actions.LIST_MOVE:
            out.possible_moves = _legal_action_to_uci(
                game_state_input.game_type, state, fen
            )

        case Actions.GET_VALID_MOVE:
            # make shuffle
            actions = state.legal_actions()
            random.shuffle(actions)
            for i in actions:
                _, newState = _create_state(game_state_input.game_type, fen)
                # newState = deepcopy(state)
                newState.apply_action(i)
                if newState.observation_string(1) == KRIEGSPIEL_INVALID_MOVE:
                    continue
                if newState.observation_string(0) == KRIEGSPIEL_INVALID_MOVE:
                    continue
                out.best_move = action_to_uci(game_state_input.game_type, state, fen, i)
                state = newState
                break

    # TODO: you should refactor this maybe with builder pattern??
    out.fen = _fen(state, game_state_input.game_type)


    out.finish = state.is_terminal()
    out.white_view = _custom_observation_string(state, 1, game_state_input.game_type)
    out.black_view = _custom_observation_string(state, 0, game_state_input.game_type)
    if game_state_input.game_type == GameType.KRIEGSPIEL:
        _, statechess = _create_state(GameType.CHESS, fen)
        out.finish = statechess.is_terminal()

    player_id = 0 if fen.split(" ")[1] == "w" else 1
    out.general_message = _create_observation_string(state, player_id, game_state_input)

    return out

if __name__ =='__main__':
    fen = "1n1r2k1/1p2b1q1/3B3p/5p2/b7/4K3/r7/q7 b - - 2 41"
    _, statechess = _create_state(GameType.CHESS, fen)
    out= statechess.is_terminal()
    print(out)




False


In [7]:
import time
import socketio
import random
import uuid
from enum import IntEnum


class DarkBoardStates(IntEnum):
    WAITING_FOR_MOVE = 1
    WAITING_FOR_COMPUTER_BEST_MOVE  = 2
    GAME_OVER  = 3
    ERROR = 4

game_type = 'kriegspiel'
class DarkBoard():

    def __init__(self, ):
        print("DarkBoard created")
        self.__fen: str = 'rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 0'
        self._state = DarkBoardStates.GAME_OVER
        self.__old_fen: str | None = None
        self.sio = socketio.Client()
        self.last_interaction = time.time()
        self.messages = []
        self.error_message = None

        # Socket events
        @self.sio.event
        def connect():
            print("I'm connected to darkboard")
            if self._state != DarkBoardStates.ERROR:
                self.sio.emit("ready")
                self.set_state(DarkBoardStates.WAITING_FOR_MOVE)

        @self.sio.event
        def game_over(pgn):
            self.set_state(DarkBoardStates.GAME_OVER)
            print("Game over")

        @self.sio.event
        def read_message(message):
            print('message', message)
            self.messages.append(message)
            if 'White\'s turn;' in  message:
                self.set_state(DarkBoardStates.WAITING_FOR_MOVE)
                print("It's my turn!")


        @self.sio.event
        def chessboard_changed(fen):
            self.__old_fen = self.__fen
            if fen != self.__fen:
                self.__fen = fen


        @self.sio.event
        def error(err):
            print("Error", err)
            self.set_state(DarkBoardStates.ERROR)
            self.error_message = 'Error from the other server '+err

        @self.sio.event
        def disconnect():
            print("Disconneted")
            self.set_state(DarkBoardStates.ERROR)
            self.error_message = 'The other server has disconnected'

    def set_state(self,state):
        if self._state !=state:
            self._state = state
            self.last_interaction = time.time()

    @property
    def fen(self):
        return self.__fen

    def connect(self, connection_string: str = URL_KRIEGSPER): #TODO sosituirlo con un env
        # const connectionPayloadDeveloper f 0 username, .TestBot., gameType, .developer., token: .bec635e710d6cdd5t3d3aa2b1c2e7007e45881eaf5d4g6b.
        # 5 const connectionPayload f username: .gianlo., gameType: .darkboard., room: .test.,

        connection_payload = {
            "username": "gg",
            "gameType": "developer",
            "token": "e8279621e621298662410167dce2a412981ab787529079ed"
        }

        query_string = "&".join([f"{key}={value}" for key, value in connection_payload.items()])
        self.sio.connect(f"{connection_string}{query_string}", socketio_path="/api/db/games/socket/")

    def send_move(self, move: str):
        """ A move in algebraic notation, e.g. e2e4
        For pawn promotion 5th character is the piece to promote to, e.g. e7e8q
        """
        assert len(move) == 4 or len(move) == 5, "Invalid move"
        assert move[0].isalpha() and move[1].isdigit() and move[2].isalpha() and move[3].isdigit(), "Invalid move"

        self.sio.emit("make_move", DarkBoard.move_to_san(self.__fen, move))

    def is_move_valid(self, _move: str) -> bool:
        """ Check if a move is valid

        """
        return self.__fen != self.__old_fen

    # TODO: might be a good idea to move these methods to util file
    @staticmethod
    def move_to_san(fen, move):
        # Convert FEN string to list of squares
        squares = DarkBoard.fen_to_squares(fen)

        # Parse the move
        from_square, _to_square = move[0:2], move[2:]

        from_rank, from_file = 8 - int(from_square[1]), ord(from_square[0]) - ord('a')

        # Get the piece moving
        piece = squares[from_rank][from_file]

        if piece.lower() == 'p' :
            return move

        print(f"DEbuG: piece {piece}{move}")
        return f"{piece}{move}"

    @staticmethod
    def fen_to_squares(fen: str) -> list[list[str]]:
        """ Convert FEN string to a list of lists representing the chess board

        """
        #
        curr_board: list[str] = [list(row) for row in fen.split(" ")[0].split('/')]
        final_board: list[list[str]] = [["."] * 8 for _ in range(8)]

        for i in range(len(curr_board)):
            offset = 0
            for j in range(len(curr_board[i])):
                if not curr_board[i][j].isdigit():
                    final_board[i][j + offset] = curr_board[i][j]
                else:
                    offset += int(curr_board[i][j]) - 1

        return final_board

    @property
    def state(self):
        if self._state == DarkBoardStates.WAITING_FOR_MOVE or self._state == DarkBoardStates.WAITING_FOR_COMPUTER_BEST_MOVE:
            if self.last_interaction is None or (time.time() - self.last_interaction > 60):
                self.set_state(DarkBoardStates.ERROR)
                if self._state == DarkBoardStates.WAITING_FOR_MOVE:
                    self.error_message = 'The other server has disconnected'
                else:
                    self.error_message = 'Too much time calculating the move'
        return self._state

    def make_move(self,move):
        game_input = GameStateInput(game_type,self.fen,Actions.MOVE,move)
        return dispatch(game_input)

    def best_move(self):
        invalid_counter = 0
        # Temporary value just used not to infinite loop
        self.set_state(DarkBoardStates.WAITING_FOR_COMPUTER_BEST_MOVE)
        while invalid_counter < 0:
            print("Waiting for best move", invalid_counter)
            game_input = GameStateInput(game_type, self.fen, Actions.MAKE_BEST_MOVE, None)
            state = dispatch(game_input)
            if state.white_view != KRIEGSPIEL_INVALID_MOVE or state.black_view != KRIEGSPIEL_INVALID_MOVE:
                invalid_counter += 1
                continue
            print("Best move", state.best_move)
            return state.best_move[0]

        game_input = GameStateInput(game_type, self.fen, Actions.GET_VALID_MOVE, None)
        #breakpoint()
        state = dispatch(game_input)
        print("Best move", state.best_move)
        return state.best_move


class DarkBoardSingleton:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.__init()
        return cls._instance

    def __init(self):
        self.darkboard = DarkBoard()

    def create_game(self):
        if self.darkboard is None or self.darkboard.state == DarkBoardStates.GAME_OVER or self.darkboard.state == DarkBoardStates.ERROR:
            self.darkboard = DarkBoard()
            self.darkboard.connect()

    @property
    def get_fen(self):
        return self.darkboard.fen

    @property
    def get_error_message(self):
        return self.darkboard.error_message

    @property
    def get_message(self):
        return self.darkboard.messages

    @property
    def get_state(self):
        return self.darkboard.state

    def make_best_move(self) :
        if self.get_state == DarkBoardStates.WAITING_FOR_MOVE :
            best = self.darkboard.best_move()
            self.darkboard.send_move(best)


def make_best_move(self):
    if self.get_state == DarkBoardStates.WAITING_FOR_MOVE:
        while True:
            try:
                best = self.darkboard.best_move()
                self.darkboard.send_move(best)
                break
            except ConnectionError:
                print("Connection refused by the server. Retrying...")
                time.sleep(1)

game = DarkBoardSingleton()
game.create_game()
while True:
  game.make_best_move()
  time.sleep(1)

DarkBoard created
DarkBoard created
I'm connected to darkboard
Best move h2h3
message 1. Black's turn; 
message 2. White's turn; 
It's my turn!
Best move d2d3
message 2. Black's turn; 
message 3. White's turn; 
It's my turn!
Best move b2b4
message 3. Black's turn; 1 Try; 
message 4. White's turn; Piece xb4; 
It's my turn!
Best move f2f3
message 4. Black's turn; 
message 5. White's turn; 
It's my turn!
Best move g2g3
message 5. Black's turn; 
message 6. White's turn; 
It's my turn!
Best move c1d2
DEbuG: piece Bc1d2
message 6. Black's turn; 
message 7. White's turn; 
It's my turn!
Best move d1c1
DEbuG: piece Qd1c1
message 7. Black's turn; 
message 8. White's turn; 
It's my turn!
Best move e2e4
message 8. Black's turn; 
message 9. White's turn; 
It's my turn!
Best move d2b4
DEbuG: piece Bd2b4
message 9. Black's turn; Pawn xb4; 
message 10. White's turn; 1 Try; 
It's my turn!
Best move h3h4
message 10. Black's turn; 1 Try; 
message 11. White's turn; Piece xe4; 2 Tries; 
It's my turn!
Best 