In [1]:
import csv

In [28]:
import chess
import chess.pgn
import io

In [3]:
print(chess.__version__)

1.10.0


In [8]:
pgn_file = "CHUNK/chunk_1.pgn"

In [48]:
def analyze_game(game):
    """分析棋局以提取棋手風格特徵"""
    board = game.board()
    #print(board)
    moves = list(game.mainline_moves())
    features = {
        "early_center_control": 0,  # 開局中心控制
        "piece_sacrifices": 0,      # 犧牲
        "king_activity": 0,         # 王的活躍性
        "passed_pawns": 0,          # 通路兵
        "tactical_moves": 0         # 戰術進攻
    }
    
    for move_index, move in enumerate(moves):
        board.push(move)
        #print(board)
        #move_san = board.san(move)

        # 開局中心控制 (e4, d4, c4 等)
        if move_index < 10 and move.uci()[:2] in {"e2", "d2", "c2", "e7", "d7", "c7"}:
            features["early_center_control"] += 1

        # 戰術犧牲檢測 (是否存在犧牲)
        if board.is_capture(move) and board.is_attacked_by(not board.turn, move.to_square):
            features["piece_sacrifices"] += 1

        # 殘局王的活躍性 (王進入敵方半區)
        if board.fullmove_number > 40:
            king_square = board.king(board.turn)
            if board.turn == chess.WHITE and chess.square_rank(king_square) > 4:
                features["king_activity"] += 1
            elif board.turn == chess.BLACK and chess.square_rank(king_square) < 3:
                features["king_activity"] += 1

        # 推進通路兵
        piece = board.piece_at(move.to_square)
        if piece and piece.piece_type == chess.PAWN and not board.is_attacked_by(not board.turn, move.to_square):
            features["passed_pawns"] += 1

        # 簡單戰術判斷（如將軍與連續攻擊）
        if board.is_check():
            features["tactical_moves"] += 1

    # 統計結果
    return features

def determine_style(features):
    """根據特徵數據判斷棋手風格"""
    if features["piece_sacrifices"] > 2 or features["tactical_moves"] > 5:
        return "進攻型棋手"
    elif features["early_center_control"] > 5:
        return "控制型棋手"
    elif features["passed_pawns"] > 3 or features["king_activity"] > 2:
        return "殘局型棋手"
    else:
        return "均衡型棋手"

with open(pgn_file, "r", encoding="utf-8") as pgn:
    game = chess.pgn.read_game(pgn)
    features = analyze_game(game)
    style = determine_style(features)

print("特徵數據：", features)
print("棋手風格：", style)

特徵數據： {'early_center_control': 4, 'piece_sacrifices': 63, 'king_activity': 6, 'passed_pawns': 11, 'tactical_moves': 8}
棋手風格： 進攻型棋手


In [22]:
# 定义输入和输出文件路径
input_file = "CHUNK/chunk_1.pgn"  # 上传的文件路径
output_file = "chess_games.csv"  # 输出的CSV文件路径
output_file_test = "chess_games_test.csv"  # 输出的CSV文件路径
number = 0
# 统计棋局总数
total_games = 0
with open(input_file, "r", encoding="utf-8") as pgn:
    while chess.pgn.read_game(pgn) is not None:
        total_games += 1
# 设置只读取总数减 1 的棋局
max_games = total_games - 1
games_processed = 0
fieldnames = []  # 保持原顺序
games = []  # 用于存储每盘棋局数据
with open(input_file, "r", encoding="utf-8") as pgn:
    while games_processed < max_games:
        game = chess.pgn.read_game(pgn)
        if game is None:  # 如果没有更多棋局
            break

        # 收集字段名
        headers = game.headers
        for key in headers.keys():
            if key not in fieldnames:  # 按照出现顺序添加字段名
                fieldnames.append(key)

        # 添加棋局数据
        game_data = headers.copy()
        game_data["Moves"] = game.mainline_moves()  # 移动序列
        games.append(game_data)
        games_processed += 1

# 最后添加 "Moves" 列
if "Moves" not in fieldnames:
    fieldnames.append("Moves")

# 写入 CSV 文件
with open(output_file_test, "w", newline="", encoding="utf-8") as csv_file:
    csv_writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
    csv_writer.writeheader()  # 写入表头
    for game_data in games:
        csv_writer.writerow(game_data)

print(f"全部棋局（不含最后一局）已保存到 CSV 文件：{output_file}")

全部棋局（不含最后一局）已保存到 CSV 文件：chess_games.csv


In [10]:
games_processed = 0
with open(input_file, "r", encoding="utf-8") as pgn:
    while games_processed < 1:
        game = chess.pgn.read_game(pgn)
        if game is None:  # 如果没有更多棋局
            break

        # 收集字段名
        headers = game.headers
        for key in headers.keys():
            if key not in fieldnames:  # 按照出现顺序添加字段名
                fieldnames.append(key)

        # 添加棋局数据
        game_data = headers.copy()
        game_data["Moves"] = game.mainline_moves()  # 移动序列
        games.append(game_data)
        games_processed += 1
        print(game_data["Moves"])

1. d4 g6 2. c4 Bg7 3. Nc3 b6 4. e4 Bb7 5. f3 c6 6. Be3 a5 7. Qd2 f6 8. Nge2 g5 9. Ng3 e6 10. Nh5 Bh6 11. h4 gxh4 12. Bxh6 Nxh6 13. Qxh6 d6 14. Nxf6+ Ke7 15. Rxh4 e5 16. Qg7+ Ke6 17. d5+ cxd5 18. cxd5+ Bxd5 19. exd5+ Kf5 20. Bd3+ e4 21. Bxe4+ Ke5 22. Nh5+ Qf6 23. Qxf6#
