In [1]:
from gamec import *
from display import GomokuUI
from copy import deepcopy
import pygame
import numpy as np
import copy
from multiprocessing import Pool, cpu_count
from functools import partial

# 定义无穷大
INF = 999999999

def advantage_f(l3, l4, my_player):
    """
    局势评估函数
    注意：建议大幅提高 l4 的权重，因为活四通常意味着必胜/必防
    """
    opponent = 3 - my_player
    score = 0
    score += (l4.get(my_player, 0) - l4.get(opponent, 0)) * 20
    score += (l3.get(my_player, 0) - l3.get(opponent, 0)) * 1
    return score

def advantage_vector(l3, l4, my_player):
    opponent = 3 - my_player
    vec = np.array([
        l4.get(my_player, 0), l4.get(opponent, 0),
        l3.get(my_player, 0), l3.get(opponent, 0)
    ])
    return vec

# ============ 用于多进程的 Worker 函数 ============

def _evaluate_kill_move(args):
    """
    多进程 Worker: 评估一个杀招候选点
    返回: (r, c, result, depth) 或 None
    """
    game_state, r, c, adv_saved, my_player, kill_depth = args
    
    next_game = copy.deepcopy(game_state)
    success = next_game.place_stone(r, c)
    
    if not success:
        return None
    
    # 直接赢了
    if next_game.winner == my_player:
        return (r, c, INF, kill_depth + 1, True)  # True 表示直接胜利
    
    chaa = advantage_vector(next_game.l3_count, next_game.l4_count, my_player) - adv_saved
    judgement = chaa[0] != 0 or chaa[2] != 0
    
    if judgement or next_game.game_over:
        res, dpt = kill(next_game, kill_depth, -INF, INF, False, my_player)
        return (r, c, res, dpt, False)
    
    return None

def _evaluate_minimax_move(args):
    """
    多进程 Worker: 评估一个 minimax 候选点
    返回: (r, c, score)
    """
    game_state, r, c, my_player, depth = args
    
    next_game = copy.deepcopy(game_state)
    success = next_game.place_stone(r, c)
    
    if not success:
        return (r, c, -INF - 1)  # 无效走法给极小分
    
    # 直接赢了
    if next_game.winner == my_player:
        return (r, c, INF + 1)  # 直接胜利给极大分
    
    score = minimax(next_game, depth - 1, -INF, INF, False, my_player)
    return (r, c, score)

# ============ 多进程版本的 AI 入口 ============

def basic_ai_move_parallel(game: GomokuCore, my_player, depth=3, num_workers=None):
    """
    多进程版本的 AI 入口函数
    :param game: 当前的 GomokuCore 实例
    :param my_player: AI 的棋子颜色 (1 或 2)
    :param depth: 搜索深度
    :param num_workers: 进程数量，默认为 CPU 核心数
    :return: (row, col) 最佳落子点
    """
    if num_workers is None:
        num_workers = max(1, cpu_count() - 1)  # 留一个核心给主线程
    
    candidates = game.recommand_positions()
    
    if not candidates:
        return None
    
    best_move = candidates[0]
    adv_saved = advantage_vector(game.l3_count, game.l4_count, my_player)
    
    # ===== 阶段1: 并行搜索杀招 =====
    kill_depth = 5
    kill_args = [
        (game, r, c, adv_saved, my_player, kill_depth) 
        for r, c in candidates
    ]
    
    best_kaisha = -1
    best_rc = (None, None)
    
    with Pool(processes=num_workers) as pool:
        kill_results = pool.map(_evaluate_kill_move, kill_args)
    
    for result in kill_results:
        if result is not None:
            r, c, res, dpt, is_direct_win = result
            print(f"评估: ({r}, {c})")
            if is_direct_win:
                return (r, c)
            if res == INF and dpt > best_kaisha:
                best_kaisha = dpt
                best_rc = (r, c)
                print(f"发现杀招 at depth {dpt} : ({r}, {c})")
    
    if best_kaisha != -1:
        return best_rc
    
    print("没杀掉")
    
    # ===== 阶段2: 并行 Minimax 评估 =====
    minimax_args = [
        (game, r, c, my_player, depth) 
        for r, c in candidates
    ]
    
    with Pool(processes=num_workers) as pool:
        minimax_results = pool.map(_evaluate_minimax_move, minimax_args)
    
    best_score = -INF
    for r, c, score in minimax_results:
        if score > INF:  # 直接胜利
            return (r, c)
        if score > best_score:
            best_score = score
            best_move = (r, c)
    
    return best_move

# ============ 原有的 kill 和 minimax 函数保持不变 ============

def kill(game: GomokuCore, depth, alpha, beta, is_maximizing, my_player):
    """
    带排序剪枝(Beam Search)的 Minimax
    """
    if game.game_over:
        if game.winner == my_player:
            return INF, depth
        elif game.winner == (3 - my_player):
            return -INF, depth
        else:
            return 0, depth
    
    adv_saved = advantage_vector(game.l3_count, game.l4_count, my_player)
    
    if depth == 0:
        return advantage_f(game.l3_count, game.l4_count, my_player), depth
    
    candidates = game.recommand_positions()
    scored_moves = []
    
    for r, c in candidates:
        next_game = copy.deepcopy(game)
        success = next_game.place_stone(r, c)
        
        if success:
            if next_game.game_over:
                current_score = INF if next_game.winner == my_player else -INF
            else:
                current_score = advantage_f(next_game.l3_count, next_game.l4_count, my_player)
            
            current_vec = advantage_vector(next_game.l3_count, next_game.l4_count, my_player)
            chaa = current_vec - adv_saved
            judgement1 = (chaa[0] != 0 or chaa[2] != 0) and is_maximizing
            judgement2 = (chaa[2] != 0 or chaa[1] != 0 or chaa[0] != 0) and (not is_maximizing)
            
            if judgement1 or judgement2 or next_game.game_over:
                scored_moves.append((current_score, next_game))
    
    if not scored_moves and is_maximizing:
        return 0, depth
    elif not scored_moves and (not is_maximizing):
        scored_moves.append((current_score, next_game))
    
    max_eval = -INF
    dpt_best = -1
    
    if is_maximizing:
        scored_moves.sort(key=lambda x: x[0], reverse=True)
        for _, next_game_state in scored_moves:
            eval_score, dpt = kill(next_game_state, depth - 1, alpha, beta, False, my_player)
            dpt_best = max(dpt_best, dpt)
            max_eval = max(max_eval, eval_score)
            alpha = max(alpha, eval_score)
            if beta <= alpha:
                break
        return max_eval, dpt_best
    else:
        scored_moves.sort(key=lambda x: x[0], reverse=False)
        min_eval = INF
        for _, next_game_state in scored_moves:
            eval_score, dpt = kill(next_game_state, depth - 1, alpha, beta, True, my_player)
            dpt_best = max(dpt_best, dpt)
            min_eval = min(min_eval, eval_score)
            beta = min(beta, eval_score)
            if beta <= alpha:
                break
        return min_eval, dpt_best

def minimax(game: GomokuCore, depth, alpha, beta, is_maximizing, my_player, top_k=3):
    """
    带排序剪枝(Beam Search)的 Minimax
    """
    if game.game_over:
        if game.winner == my_player:
            return INF
        elif game.winner == (3 - my_player):
            return -INF
        else:
            return 0
    
    if depth == 0:
        return advantage_f(game.l3_count, game.l4_count, my_player)
    
    candidates = game.recommand_positions()
    scored_moves = []
    
    for r, c in candidates:
        next_game = copy.deepcopy(game)
        success = next_game.place_stone(r, c)
        
        if success:
            if next_game.game_over:
                current_score = INF if next_game.winner == my_player else -INF
            else:
                current_score = advantage_f(next_game.l3_count, next_game.l4_count, my_player)
            scored_moves.append((current_score, next_game))
    
    if not scored_moves:
        return 0
    
    if is_maximizing:
        scored_moves.sort(key=lambda x: x[0], reverse=True)
        best_moves = scored_moves[:top_k]
        max_eval = -INF
        for _, next_game_state in best_moves:
            eval_score = minimax(next_game_state, depth - 1, alpha, beta, False, my_player, top_k)
            max_eval = max(max_eval, eval_score)
            alpha = max(alpha, eval_score)
            if beta <= alpha:
                break
        return max_eval
    else:
        scored_moves.sort(key=lambda x: x[0], reverse=False)
        best_moves = scored_moves[:top_k]
        min_eval = INF
        for _, next_game_state in best_moves:
            eval_score = minimax(next_game_state, depth - 1, alpha, beta, True, my_player, top_k)
            min_eval = min(min_eval, eval_score)
            beta = min(beta, eval_score)
            if beta <= alpha:
                break
        return min_eval

# ============ 兼容性：保留原函数名 ============
def basic_ai_move(game: GomokuCore, my_player, depth=3):
    """原版单进程 AI（保留兼容性）"""
    return basic_ai_move_parallel(game, my_player, depth)

pygame 2.6.1 (SDL 2.28.4, Python 3.13.9)
Hello from the pygame community. https://www.pygame.org/contribute.html


  from pkg_resources import resource_stream, resource_exists


In [2]:

def main():
    # 1. 实例化逻辑层和显示层
    game = GomokuCore(board_size=15)
    ui = GomokuUI(board_size=15, cell_size=40)
    player = 3- game.current_player
    clock = pygame.time.Clock()
    running = True

    while running:
        # --- 事件处理 (Input) ---
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
            
            elif event.type == pygame.MOUSEBUTTONDOWN and game.current_player == player:
                if event.button == 1: # 左键点击
                    if game.game_over:
                        game.reset()
                    else:
                        # UI 负责将点击转换成坐标
                        row, col = ui.convert_mouse_to_grid(event.pos)
                        # Logic 负责判断是否合法并更新数据
                        game.place_stone(row, col)
        
        # --- 渲染循环 (Output) ---
        # 显示层只需要当前的数据状态
        ui.draw(
            board_array=game.get_board(), 
            current_player=game.current_player,
            game_over=game.game_over,
            winner=game.winner,
            l3_count=game.l3_count,
            l4_count=game.l4_count,
            last_move=game.last_move
        )
        if player != game.current_player:
            row,col = basic_ai_move_parallel(game, my_player=game.current_player, depth=5)
            print("AI落子:",row,col)
            game.place_stone(row, col)
        clock.tick(30) # 限制30帧，节省资源

    pygame.quit()

if __name__ == "__main__":
    main()

: 