In [81]:
import numpy as np
import random
import heapq
import math
import time
from collections import deque

In [82]:

class PuzzleBoardState(object):
    """华容道棋盘类（固定 3×3 终局）"""

    def __init__(self, data=None, parent=None, random_seed=2022):
        """
        固定 3×3 8-puzzle，不需要维度输入；目标终局唯一：
        1 2 3
        8 0 4
        7 6 5
        """
        self.dim = 3
        self.default_dst_data = self._goal_board()

        if data is None:
            init_solvable = False
            init_count = 0
            while not init_solvable and init_count < 500:
                init_data = self._get_random_data(random_seed=random_seed + init_count)
                init_count += 1
                init_solvable = self._if_solvable(init_data, self.default_dst_data)
            data = init_data
        else:
            # 规范化输入为 3×3
            data = np.array(data, dtype=int).reshape(3, 3)
        self.data = data
        self.parent = parent
        self.piece_x, self.piece_y = self._get_piece_index()

    @staticmethod
    def _goal_board() -> np.ndarray:
        # 固定终局（顺时针螺旋），0 表示空格
        arr = [1, 2, 3,
               8, 0, 4,
               7, 6, 5]
        return np.array(arr, dtype=int).reshape(3, 3)

    def _get_random_data(self, random_seed):
        """生成 3×3 的随机棋盘"""
        random.seed(random_seed)
        init_data = [i for i in range(self.dim ** 2)]
        random.shuffle(init_data)
        init_data = np.array(init_data).reshape((self.dim, self.dim))
        return init_data

    def _get_piece_index(self):
        """返回当前空格(0)的位置 (x, y)"""
        x, y = np.argwhere(self.data == 0)[0]
        return int(x), int(y)

    def is_final(self):
        """是否到达目标终止状态"""
        return np.array_equal(self.data, self.default_dst_data)

    def next_states(self):
        """返回当前状态的相邻状态（4 邻域：上下左右移动空格）"""
        res = []
        for dx, dy in ((-1, 0), (1, 0), (0, -1), (0, 1)):
            x2, y2 = self.piece_x + dx, self.piece_y + dy
            if 0 <= x2 < self.dim and 0 <= y2 < self.dim:
                new_data = self.data.copy()
                # 交换空格与目标格
                new_data[self.piece_x][self.piece_y] = new_data[x2][y2]
                new_data[x2][y2] = 0
                # 关键：生成子状态时记录 parent=self，便于回溯路径
                res.append(PuzzleBoardState(data=new_data, parent=self))
        return res

    def get_data(self):
        return self.data

    def get_data_hash(self):
        """将棋盘数据转为可哈希的 tuple 形式以便存入集合/字典"""
        return hash(tuple(self.data.flatten()))

    def get_parent(self):
        return self.parent


In [83]:
# -------------------- 启发式函数1：错位方块数--------------------

def linear_heuristic(state: PuzzleBoardState) -> int:
    dim = state.dim  # 固定 3
    board = state.data
    target = state.default_dst_data

    # 1) 错位方块数（不计 0）
    misplaced = 0
    for i in range(dim):
        for j in range(dim):
            v = int(board[i][j])
            if v != 0 and v != int(target[i][j]):
                misplaced += 1

    return misplaced

# -------------------- 启发式函数2：曼哈顿距离 --------------------
def manhattan_heuristic(state: PuzzleBoardState) -> int:
    dim = state.dim
    board = state.data
    target = state.default_dst_data
    manhattan = 0
    for i in range(dim):
        for j in range(dim):
            v = board[i][j]
            if v == 0:
                continue
            ti, tj = np.argwhere(target == v)[0]
            manhattan += abs(i - int(ti)) + abs(j - int(tj))
    return manhattan

In [84]:
# -------------------- A*（支持自定义启发式函数） --------------------
def astar(start_state: PuzzleBoardState, heuristic_func=None):
    start_hash = start_state.get_data_hash()
    g_score = {start_hash: 0}
    f_start = heuristic_func(start_state)
    f_score = {start_hash: f_start}

    open_heap = []
    counter = 0
    heapq.heappush(open_heap, (f_start, 0, counter, start_state))
    counter += 1

    closed = set()

    while open_heap:
        f, g, _, current = heapq.heappop(open_heap)
        cur_hash = current.get_data_hash()
        if g != g_score.get(cur_hash, math.inf):
            continue

        if current.is_final():
            path = []
            node = current
            while node is not None:
                path.append(node)
                node = node.get_parent()
            return path[::-1]

        closed.add(cur_hash)

        for neighbor in current.next_states():
            n_hash = neighbor.get_data_hash()
            tentative_g = g + 1
            if n_hash in closed and tentative_g >= g_score.get(n_hash, math.inf):
                continue
            if tentative_g < g_score.get(n_hash, math.inf):
                g_score[n_hash] = tentative_g
                f_new = tentative_g + heuristic_func(neighbor)
                heapq.heappush(open_heap, (f_new, tentative_g, counter, neighbor))
                counter += 1

    return None


In [85]:
def astar_with_stats(start_state: PuzzleBoardState, heuristic_func=None):
    """
    简化策略（更新）：按层推进。每一步在当前所有前沿节点（currents）上：
    - 生成并统计所有邻居（4 邻域），全部计入 expansions；
    - 计算 f = (g+1) + h；
    - 优先在“未访问”的候选中选择最小 f；若出现并列（相同最小 f），则所有并列项一起进入下一步；
      并以列表形式打印被选中的候选索引。

    返回：
      (path, {"expansions": int})，path 为按 parent 回溯的解路径（若找到）；若未找到返回 (None, {...}).
    """

    def board_to_lines(m: np.ndarray):
        return [" ".join(str(int(x)) for x in row) for row in m]

    def arrow(dx: int, dy: int) -> str:
        # 表示 0 的移动方向：从 current 的 (cx,cy) -> neighbor 的 (nx,ny)
        if dx == -1 and dy == 0:
            return "↑"
        if dx == 1 and dy == 0:
            return "↓"
        if dx == 0 and dy == -1:
            return "←"
        if dx == 0 and dy == 1:
            return "→"
        return "?"

    # 初始化（按层推进）
    currents = [start_state]
    expansions = 0
    g = 0
    visited = {start_state.get_data_hash()}

    # 安全阈值，防止极端情况下的无限循环
    max_steps = 50000

    for step_idx in range(1, max_steps + 1):
        # 成功判定：若当前层任一节点为目标，则直接回溯该节点得到路径
        for node in currents:
            if node.is_final():
                full_path = []
                cur = node
                while cur is not None:
                    full_path.append(cur)
                    cur = cur.get_parent()
                return full_path[::-1], {"expansions": expansions}

        # 聚合当前层所有候选
        all_cands = []
        print(f"\n— Step {step_idx}（g={g}→{g+1}）候选分支：")
        global_idx = 1
        for current in currents:
            cx, cy = current.piece_x, current.piece_y
            neighbors = current.next_states()
            if neighbors:
                # expansions：累加本步被“考虑”的候选数量（此处为全部邻居）
                expansions += len(neighbors)
            for nb in neighbors:
                h_val = heuristic_func(nb)
                f_val = (g + 1) + h_val
                dx, dy = nb.piece_x - cx, nb.piece_y - cy  # 0 的移动方向
                mv_val = current.data[nb.piece_x][nb.piece_y]
                mv_tag = f"0{arrow(dx,dy)} 与 {int(mv_val)} 交换"
                all_cands.append({
                    "idx": global_idx,
                    "nb": nb,
                    "f": f_val,
                    "h": h_val,
                    "mv_tag": mv_tag
                })
                global_idx += 1

        if not all_cands:
            # 理论上不会发生（至少有一个邻居），防御性退出
            break

        # 逐个竖排打印矩阵
        for info in all_cands:
            print(f"[{info['idx']}] {info['mv_tag']}, f={info['f']}, h={info['h']}")
            for line in board_to_lines(info['nb'].data):
                print(f"    {line}")

        # 选择：优先未访问的最小 f，若并列则全部入选
        non_visited = [c for c in all_cands if c['nb'].get_data_hash() not in visited]
        pool = non_visited if non_visited else all_cands
        min_f = min(c['f'] for c in pool)
        winners = [c for c in pool if c['f'] == min_f]

        chosen_idxs = [c['idx'] for c in winners]
        print(f"→ 选择: {chosen_idxs} 作为下一步 (f={min_f})")

        # 前进一步：按哈希去重，避免同一状态重复进入下一层
        next_currents = []
        seen_hash = set()
        for c in winners:
            hsh = c['nb'].get_data_hash()
            if hsh in seen_hash or hsh in visited:
                continue
            next_currents.append(c['nb'])
            seen_hash.add(hsh)

        if not next_currents:
            # 兜底：若 winners 全部已访问或去重后为空，则退化为选择 (f,h) 最优的单个
            best = min(pool, key=lambda it: (it['f'], it['h']))
            next_currents = [best['nb']]
            print(f"→ 由于候选均已访问/重复，退化为单一选择: [{best['idx']}]")

        # 批量标记访问，更新当前层
        for node in next_currents:
            visited.add(node.get_data_hash())
        currents = next_currents
        g += 1

    # 未在阈值内找到解
    return None, {"expansions": expansions}

# -------------------- 可解性判定（针对固定螺旋终局） --------------------
def is_solvable_to_spiral(arr_like) -> bool:
    arr = np.array(arr_like, dtype=int).reshape(3, 3)
    goal = PuzzleBoardState._goal_board()
    def inv_count(m):
        flat = m.flatten()
        cnt = 0
        for i in range(len(flat)):
            if flat[i] == 0:
                continue
            for j in range(i):
                if flat[j] != 0 and flat[j] > flat[i]:
                    cnt += 1
        return cnt
    # 3×3（奇数阶）规则：初始与目标的逆序数之和为偶数才可解
    return (inv_count(arr) + inv_count(goal)) % 2 == 0


def run_case(name, arr, heuristic_func=None):
    print(f"\n=== {name} ===")
    init = np.array(arr, dtype=int).reshape(3, 3)
    s = PuzzleBoardState(data=init)

    # 可解性预检查（避免无解时搜索遍历 9!/2）
    if not is_solvable_to_spiral(init):
        print("该初始局面对当前螺旋终局不可解（奇偶性不匹配），跳过 BFS/A*。")
        return

    # A*
    t0 = time.perf_counter()
    astar_path, astar_stat = astar_with_stats(PuzzleBoardState(data=init), heuristic_func=heuristic_func)
    t1 = time.perf_counter()
    if astar_path:
        print(f"A*:  steps={len(astar_path)-1}, expansions={astar_stat['expansions']}, time={(t1-t0):.3f}s")
    else:
        print(f"A*:  无解/超时，expansions={astar_stat['expansions']}, time={(t1-t0):.3f}s")

In [86]:
case1 = [8,1,3, 
         2,4,0, 
         7,6,5]

# 用例A：仅曼哈顿距离
run_case("曼哈顿", case1, heuristic_func=manhattan_heuristic)

# 用例B：错位数+逆序数
run_case("错位数", case1, heuristic_func=linear_heuristic)


=== 曼哈顿 ===

— Step 1（g=0→1）候选分支：
[1] 0↑ 与 3 交换, f=7, h=6
    8 1 0
    2 4 3
    7 6 5
[2] 0↓ 与 5 交换, f=7, h=6
    8 1 3
    2 4 5
    7 6 0
[3] 0← 与 4 交换, f=5, h=4
    8 1 3
    2 0 4
    7 6 5
→ 选择: [3] 作为下一步 (f=5)

— Step 2（g=1→2）候选分支：
[1] 0↑ 与 1 交换, f=7, h=5
    8 0 3
    2 1 4
    7 6 5
[2] 0↓ 与 6 交换, f=7, h=5
    8 1 3
    2 6 4
    7 0 5
[3] 0← 与 2 交换, f=5, h=3
    8 1 3
    0 2 4
    7 6 5
[4] 0→ 与 4 交换, f=7, h=5
    8 1 3
    2 4 0
    7 6 5
→ 选择: [3] 作为下一步 (f=5)

— Step 3（g=2→3）候选分支：
[1] 0↑ 与 8 交换, f=5, h=2
    0 1 3
    8 2 4
    7 6 5
[2] 0↓ 与 7 交换, f=7, h=4
    8 1 3
    7 2 4
    0 6 5
[3] 0→ 与 2 交换, f=7, h=4
    8 1 3
    2 0 4
    7 6 5
→ 选择: [1] 作为下一步 (f=5)

— Step 4（g=3→4）候选分支：
[1] 0↓ 与 8 交换, f=7, h=3
    8 1 3
    0 2 4
    7 6 5
[2] 0→ 与 1 交换, f=5, h=1
    1 0 3
    8 2 4
    7 6 5
→ 选择: [2] 作为下一步 (f=5)

— Step 5（g=4→5）候选分支：
[1] 0↓ 与 2 交换, f=5, h=0
    1 2 3
    8 0 4
    7 6 5
[2] 0← 与 1 交换, f=7, h=2
    0 1 3
    8 2 4
    7 6 5
[3] 0→ 与 3 交换, f=7, h=2
    1 3 0