In [1]:
import sys

try:
    import websockets
except:
    !{sys.executable} -m pip install websockets

In [2]:
from collections import deque
from typing import List, Dict, Tuple, Set, Optional


class MinesweeperGame:
    def __init__(self, board: List[List[str]]):
        self.board = board
        self.rows = len(board)
        self.cols = len(board[0])
        self.probabilities = {}
        self.relevant_unrevealed = set()

    def find_neighbors(self, row: int, col: int) -> List[Tuple[int, int]]:
        neighbors = []
        for i in range(row - 1, row + 2):
            for j in range(col - 1, col + 2):
                if 0 <= i < self.rows and 0 <= j < self.cols and (i, j) != (row, col):
                    neighbors.append((i, j))
        return neighbors

    def calculate_probabilities(self, iterations: int = 5):
        self.probabilities.clear()
        self.relevant_unrevealed.clear()

        # Initialize probabilities for known cells and identify relevant '?' cells
        for row in range(self.rows):
            for col in range(self.cols):
                cell = self.board[row][col]
                if cell == 'M':
                    self.probabilities[(row, col)] = 1.0
                elif isinstance(cell, int):
                    self.probabilities[(row, col)] = 0.0
                    for r, c in self.find_neighbors(row, col):
                        if self.board[r][c] == '?':
                            self.relevant_unrevealed.add((r, c))

        # Initialize and solve constraints
        self.solve_constraints(iterations)

    def solve_constraints(self, iterations: int):
        constraints = []
        for iteration in range(iterations):
            new_probabilities = {}
            unsolved_constraints = []

            for row in range(self.rows):
                for col in range(self.cols):
                    self.identify_constraints(row, col, constraints)

            # Solve Constraints
            for constraint, remaining_mines in constraints:
                num_unrevealed = len(constraint)
                if remaining_mines == 0:
                    for r, c in constraint:
                        new_probabilities[(r, c)] = 0.0
                elif remaining_mines == num_unrevealed:
                    for r, c in constraint:
                        new_probabilities[(r, c)] = 1.0
                else:
                    unsolved_constraints.append((constraint, remaining_mines))

            constraints = unsolved_constraints
            self.probabilities.update(new_probabilities)

    def identify_constraints(self, row: int, col: int, constraints: List[Tuple[Set[Tuple[int, int]], int]]):
        cell = self.board[row][col]
        if isinstance(cell, int):
            neighbors = self.find_neighbors(row, col)
            known_mines = sum(self.probabilities.get((r, c), 0) for r, c in neighbors)
            unrevealed_neighbors = {(r, c) for r, c in neighbors if (r, c) in self.relevant_unrevealed}
            remaining_mines = cell - known_mines
            if unrevealed_neighbors:
                constraints.append((unrevealed_neighbors, remaining_mines))


def is_valid_move(board: List[List[str]], x: int, y: int, visited: set, target: Tuple[int, int]) -> bool:
    rows, cols = len(board), len(board[0])
    return (
        0 <= x < rows 
        and 0 <= y < cols 
        and (x, y) not in visited 
        and (board[x][y] not in ('M', '?') or (x, y) == target)
    )


def bfs_find_directions(board: List[List[str]], start: Tuple[int, int], target: Tuple[int, int]) -> List[str]:
    rows, cols = len(board), len(board[0])
    visited = set()
    queue = deque([(start, [])])
    move_to_direction = {
        (-1, 0): "up",
        (1, 0): "down",
        (0, -1): "left",
        (0, 1): "right"
    }
    while queue:
        (cur_x, cur_y), directions_so_far = queue.popleft()
        visited.add((cur_x, cur_y))
        if (cur_x, cur_y) == target:
            return directions_so_far
        for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
            new_x, new_y = cur_x + dx, cur_y + dy
            if is_valid_move(board, new_x, new_y, visited, target):
                new_directions = directions_so_far + [move_to_direction[(dx, dy)]]
                queue.append(((new_x, new_y), new_directions))
                visited.add((new_x, new_y))


def next_best_move_from_hero(board: List[List[str]], hero_pos: Tuple[int, int]) -> Optional[Tuple[int, int]]:
    game = MinesweeperGame(board)
    game.calculate_probabilities()
    for row in range(game.rows):
        for col in range(game.cols):
            if game.board[row][col] == 'K':
                path_to_key = bfs_find_directions(board, hero_pos, (row, col))
                if path_to_key:
                    return (row, col), 0, path_to_key
    probabilities = {cell: prob for cell, prob in game.probabilities.items() if game.board[cell[0]][cell[1]] in ['?', 'K']}
    if not probabilities:
        raise ValueError("No options")
    sorted_cells = sorted(probabilities.items(), key=lambda x: x[1])
    for cell, probability in sorted_cells:
        path_to_cell = bfs_find_directions(board, hero_pos, cell)
        if path_to_cell:
            return cell, probability, path_to_cell

# Test the code
board = [
    [0, 1, 'M', '?'],
    [0, 1, 1, '?'],
    ['?', '?', '?', '?'],
    ['K', '?', '?', '?']
]

hero_pos = (0, 0)

result = next_best_move_from_hero(board, hero_pos)
print("Next best move:", result)


Next best move: ((2, 0), 0.0, ['down', 'down'])


In [3]:
from tqdm.auto import trange
import asyncio
import websockets
import json
from collections import Counter
import itertools

class TileMapping:
    def __init__(self):
        self.tile_mapping = {
            'covered': '?',
            'c0': 0,
            'c1': 1,
            'c2': 2,
            'c3': 3,
            'c4': 4,
            'c5': 5,
            'c6': 6,
            'c7': 7,
            'c8': 8,
            'bomb': 'M',
            'key': 'K',
        }
        
    def map_tiles(self, game_map):
        return [
            [
                self.tile_mapping[tile]
                for tile in row
            ] for row in game_map
        ]

class GameServer:
    def __init__(self, uri, debug=False):
        self.uri = uri
        self.websocket = None
        self.debug = debug
        self.message_queue = asyncio.Queue(maxsize=1)

    async def connect(self):
        self.websocket = await websockets.connect(self.uri)
        if self.debug:
            print("Connected!")
        asyncio.create_task(self.receive_messages())

    async def receive_messages(self):
        while True:
            try:
                message = await self.websocket.recv()
                if self.message_queue.full():
                    _ = self.message_queue.get_nowait()
                await self.message_queue.put(message)
            except websockets.exceptions.ConnectionClosedOK:
                return
    
    async def send_moves(self, moves_chain):
        moves_str = '\n'.join(moves_chain)
        await self.websocket.send(moves_str)
        if self.debug:
            print(f"Sent moves:\n{moves_str}")

class Game:
    def __init__(self, game_server, debug=False):
        self.game_server = game_server
        self.tile_mapping = TileMapping()
        self.debug = debug

    async def parse_message(self, message):
        data = json.loads(message)
        hero = data.get('hero', {})
        x, y = hero.get('x', -1), hero.get('y', -1)
        game_map = data.get('map', [])
        num_keys = data.get('numKeysRetrieved', 0)
        lives_remaining = data.get('livesRemaining', 0)
        flag = data.get('flag', 0)
        
        game_map = self.tile_mapping.map_tiles(game_map)
        return x, y, game_map, num_keys, lives_remaining, flag
    
    async def play(self):
        await self.game_server.connect()
        t = trange(1000, desc='Bar desc', leave=True)

        for i in t:
            message = await self.game_server.message_queue.get()
            x, y, game_map, num_keys, lives_remaining, flag = await self.parse_message(message)
            covered_count = Counter(itertools.chain.from_iterable(game_map)).get('?', 0)

            x, y = y, x
            if flag:
                print(flag)
                return "STOP"
                
            if lives_remaining == 0:
                break
            
            target, probability, path = next_best_move_from_hero(game_map, (x, y))
            t.set_description(
                f"Keys collected {num_keys} / Lives {lives_remaining} / Covered {covered_count} / P {probability:.02f}")
            if self.debug:
                print(target, probability)

            await self.game_server.send_moves(path)
        return

async def main():
    for i in range(100):
        try:
            game_server = GameServer(uri="ws://mikusweeper.chals.sekai.team/socket", debug=False)
            game = Game(game_server, debug=False)
            result = await game.play()
            if result == "STOP":
                break
        except ValueError as e:
            print(e)

if __name__ == '__main__':
    await main()

Bar desc:   0%|          | 0/1000 [00:00<?, ?it/s]

Bar desc:   0%|          | 0/1000 [00:00<?, ?it/s]

Bar desc:   0%|          | 0/1000 [00:00<?, ?it/s]

SEKAI{M1ku_K1ngd0m_h4s_b33n_54v3d_OwO<3}
