In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import chess
import chess.pgn
import chess.engine
import io
import json
from tqdm import tqdm
import os





In [None]:
# Đọc file CSV
df = pd.read_csv('data/games_wgm.csv')

# Hiển thị 5 dòng đầu tiên để kiểm tra
print(len(df))
print(df.columns)
print(df.head())

In [None]:
df[['pgn']]

In [None]:
import re
df_png = df[['pgn']].copy()
df_png = df_png[df_png['pgn'].apply(lambda x: isinstance(x, str))]

# Hàm để parse PGN và lấy thông tin cần thiết: Kết quả, Elo của người chơi trắng và đen, vị trí hiện tại, các nước đi
def parse_pgn(pgn_str):
    info = {}

    # Nếu không phải string thì trả về NaN cho toàn bộ
    if not isinstance(pgn_str, str):
        return {
            "Result": None,
            "WhiteElo": None,
            "BlackElo": None,
            "CurrentPosition": None,
            "Moves": None
        }

    fields = ['Result', 'WhiteElo', 'BlackElo', 'CurrentPosition']
    
    # Tìm các thông tin trong PGN
    for field in fields:
        match = re.search(rf'\[{field} "(.*?)"\]', pgn_str)
        info[field] = match.group(1) if match else None

    return info

# Hàm phân tích các nước đi từ cột PGN
def parse_moves(pgn):
    if not isinstance(pgn, str):
        return None

    moves = ""
    game = chess.pgn.read_game(io.StringIO(pgn))

    if game is None:
        return None

    for move in game.mainline_moves():
        moves += str(move) + " "
    return moves.strip()



In [None]:
# Không chạy cell này vì nó có tác dụng làm clean data thành file pgn_chess_data.csv và chạy lâu
# Bỏ các dòng không có PGN hợp lệ
df_valid = df[df['pgn'].apply(lambda x: isinstance(x, str))].copy()

# Phân tích header
df_pgn = df_valid['pgn'].apply(parse_pgn)
df_pgn = pd.DataFrame(df_pgn.tolist())

# Phân tích nước đi
df_pgn["Moves"] = df_valid['pgn'].apply(parse_moves)

In [None]:
table = df_pgn.copy()
print(table.columns)
print(table.shape)
print(table.describe())
print(table.info())
print(table.head(10))


In [None]:

# Hàm thống kê cơ bản cho các cột
def summarize_column(df, column):
    unique_values = df[column].nunique()
    total_values = len(df[column])
    non_null_values = df[column].notna().sum()
    null_values = df[column].isna().sum()
    return {
        'Cột': column,
        'Giá trị duy nhất': unique_values,
        'Tổng số giá trị': total_values,
        'Không null': non_null_values,
        'Null/NaN': null_values
    }

# Áp dụng cho tất cả các cột
summary = pd.DataFrame([summarize_column(table, col) for col in table.columns])

# Xử lý riêng cho cột Moves: đếm số bước đi trong mỗi ván
table['MoveCount'] = table['Moves'].fillna('').apply(lambda x: len(x.strip().split()))

# Thống kê số bước đi
move_stats = {
    'Tổng số ván': table.shape[0],
    'Tổng số bước đi': table['MoveCount'].sum(),
    'Trung bình số bước mỗi ván': table['MoveCount'].mean(),
    'Số bước ít nhất': table['MoveCount'].min(),
    'Số bước nhiều nhất': table['MoveCount'].max()
}

# In kết quả
print("Thống kê tổng quát các cột:")
print(summary.to_string(index=False))
print("\nThống kê số bước đi từ cột Moves:")
for k, v in move_stats.items():
    print(f"{k}: {v}")

In [None]:
table

In [None]:
table["MoveCount"] = pd.to_numeric(table["MoveCount"], errors="coerce")
table["MoveCount"] = table["MoveCount"].fillna(0)
table = table[table["MoveCount"] > 40]
table


In [None]:
summary = pd.DataFrame([summarize_column(table, col) for col in table.columns])
move_stats = {
    'Tổng số ván': table.shape[0],
    'Tổng số bước đi': table['MoveCount'].sum(),
    'Trung bình số bước mỗi ván': table['MoveCount'].mean(),
    'Số bước ít nhất': table['MoveCount'].min(),
    'Số bước nhiều nhất': table['MoveCount'].max()
}

# In kết quả
print("Thống kê tổng quát các cột:")
print(summary.to_string(index=False))
print("\nThống kê số bước đi từ cột Moves:")
for k, v in move_stats.items():
    print(f"{k}: {v}")

In [None]:

# Chuyển đổi Elo thành số
table["WhiteElo"] = pd.to_numeric(table["WhiteElo"], errors="coerce")
table["BlackElo"] = pd.to_numeric(table["BlackElo"], errors="coerce")
# Loại bỏ các dòng có giá trị NaN trong cột Elo
table = table.dropna(subset=["WhiteElo", "BlackElo"])
# Chuyển đổi Elo thành số nguyên
table["WhiteElo"] = table["WhiteElo"].astype(int)
table["BlackElo"] = table["BlackElo"].astype(int)

table



In [None]:

def is_valid_move(move_str):
    """Kiểm tra một nước đi có hợp lệ (dạng UCI) không."""
    if len(move_str) < 4:
        return False
    try:
        # kiểm tra tọa độ from và to có nằm trong 64 ô không
        move_from = move_str[:2]
        move_to = move_str[2:4]
        return move_from in chess.SQUARE_NAMES and move_to in chess.SQUARE_NAMES
    except:
        return False

def filter_invalid_moves(df, move_column='Moves'):
    """Lọc bỏ các dòng có chứa nước đi không hợp lệ trong cột Moves."""
    cleaned_rows = []
    for idx, row in df.iterrows():
        try:
            moves = row[move_column].split()
            if all(is_valid_move(m) for m in moves):
                cleaned_rows.append(row)
        except:
            continue  # Bỏ qua dòng lỗi

    return pd.DataFrame(cleaned_rows)
table = filter_invalid_moves(table, move_column='Moves')
print(table.shape)

In [None]:
#Đến đây thì đã có thể phân tích được các thông tin trong file PGN, bao gồm cả các nước đi của ván cờ.
# Ta lưu data sau khi được clean vào file pgn_chess_data.csv để sử dụng cho các bước sau
table.to_csv('ai/data/pgn_chess_data.csv', index=False)

In [3]:
table_pgn = pd.read_csv('ai/data/pgn_chess_data.csv')
table_pgn = table_pgn.drop(columns=['CurrentPosition'])

# Kiểm tra lại bảng sau khi xóa cột
print(table_pgn.head())

print( table_pgn.columns)
print( table_pgn.shape)
print( table_pgn.describe())
print( table_pgn.info())

table_pgn

   Result  WhiteElo  BlackElo  \
0       1      2577      2262   
1       1      2345      2262   
2       1      2429      2246   
3       0      2429      2225   
4       1      2592      2429   

                                               Moves  MoveCount  
0  e2e4 e7e6 d2d4 d7d5 b1d2 f8e7 g1f3 g8f6 e4e5 f...         65  
1  d2d4 g8f6 g1f3 g7g6 c2c4 f8g7 g2g3 e8g8 f1g2 d...         95  
2  e2e4 c7c5 g1f3 e7e6 d2d4 c5d4 f3d4 b8c6 b1c3 d...         87  
3  e2e4 g8f6 e4e5 f6d5 d2d4 d7d6 g1f3 g7g6 f1c4 c...         80  
4  d2d4 g8f6 c2c4 e7e6 g1f3 c7c5 d4d5 d7d6 b1c3 e...         81  
Index(['Result', 'WhiteElo', 'BlackElo', 'Moves', 'MoveCount'], dtype='object')
(271044, 5)
              Result       WhiteElo       BlackElo      MoveCount
count  271044.000000  271044.000000  271044.000000  271044.000000
mean        0.048527    2201.091598    2200.469105      82.786614
std         0.968379     268.592204     268.103664      26.767227
min        -1.000000     116.000000     100.00000

Unnamed: 0,Result,WhiteElo,BlackElo,Moves,MoveCount
0,1,2577,2262,e2e4 e7e6 d2d4 d7d5 b1d2 f8e7 g1f3 g8f6 e4e5 f...,65
1,1,2345,2262,d2d4 g8f6 g1f3 g7g6 c2c4 f8g7 g2g3 e8g8 f1g2 d...,95
2,1,2429,2246,e2e4 c7c5 g1f3 e7e6 d2d4 c5d4 f3d4 b8c6 b1c3 d...,87
3,0,2429,2225,e2e4 g8f6 e4e5 f6d5 d2d4 d7d6 g1f3 g7g6 f1c4 c...,80
4,1,2592,2429,d2d4 g8f6 c2c4 e7e6 g1f3 c7c5 d4d5 d7d6 b1c3 e...,81
...,...,...,...,...,...
271039,-1,2534,2423,e2e4 e7e5 g1f3 d7d6 d2d4 g8f6 b1c3 b8d7 f1c4 f...,60
271040,-1,2526,2524,e2e4 c7c5 f1e2 d7d6 f2f4 g7g6 g1f3 f8g7 e1g1 b...,128
271041,-1,2516,2534,e2e4 c7c5 g1f3 d7d6 d2d4 c5d4 f3d4 g8f6 b1c3 a...,60
271042,-1,2526,2524,e2e4 e7e5 g1f3 b8c6 f1b5 a7a6 b5a4 g8f6 e1g1 f...,120


In [4]:
# Hàm chuyển đổi bàn cờ thành ma trận (8x8x12)
def board_to_matrix(board):
    """Chuyển trạng thái bàn cờ thành ma trận (8, 8, 12) dạng one-hot"""
    piece_map = board.piece_map()
    board_matrix = np.zeros((8, 8, 12), dtype=np.int8)
    piece_to_index = {
        'P': 0, 'N': 1, 'B': 2, 'R': 3, 'Q': 4, 'K': 5,
        'p': 6, 'n': 7, 'b': 8, 'r': 9, 'q': 10, 'k': 11,
    }
    for square, piece in piece_map.items():
        row, col = divmod(square, 8)
        index = piece_to_index.get(piece.symbol())
        if index is not None:
            board_matrix[row][col][index] = 1
    return board_matrix

def uci_to_index(uci_move):
    try:
        from_square = chess.SQUARE_NAMES.index(uci_move[:2])
        to_square = chess.SQUARE_NAMES.index(uci_move[2:4])
        promotion = None
        if len(uci_move) == 5:  # Nước đi phong cấp
            promotion_piece = uci_move[4].lower()
            promotion_map = {'q': chess.QUEEN, 'r': chess.ROOK, 'b': chess.BISHOP, 'n': chess.KNIGHT}
            promotion = promotion_map.get(promotion_piece)
        return (from_square, to_square, promotion)
    except Exception as e:
        print(f"Invalid UCI move: {uci_move}, error: {e}")
        return None

In [5]:
def process_game_data(row, n=4):
    """Xử lý 1 ván cờ thành danh sách (X, y)"""
    white_elo = row['WhiteElo']
    black_elo = row['BlackElo']
    moves = row['Moves'].split()
    
    board = chess.Board()
    X, y = [], []
    history = []

    for i in range(len(moves) - 1):  # Trừ bước cuối vì không có next_move
        move_uci = moves[i]
        next_move = uci_to_index(moves[i+1])
        try:
            board.push_uci(move_uci)
        except:
            break  # Bỏ ván nếu có lỗi push

        board_matrix = board_to_matrix(board)

        # Lưu X: [board hiện tại, n bước lịch sử trước (None nếu thiếu)]
        history_trimmed = history[-n:]  # giữ n bước
        while len(history_trimmed) < n:
            history_trimmed.insert(0, None)  # pad None

        # Lưu lại mảng
        data_point = {
            "board": board_matrix,
            "history": history_trimmed,
            "white_elo": white_elo,
            "black_elo": black_elo
        }

        if next_move:
            X.append(data_point)
            y.append(next_move)

        # Cập nhật lịch sử
        history.append(uci_to_index(move_uci))

    return X, y


In [6]:
# Hàm xử lý toàn bộ bảng table và lưu dưới dạng npy, chia theo số ván cờ mỗi file
def process_and_save_table_as_npy(table_pgn, n=4, games_per_file=1000, output_folder="ai/data"):

    all_X, all_y = [], []
    games_in_file = 0
    file_id = 0

    for idx, row in tqdm(table_pgn.iterrows(), total=len(table_pgn)):
        X_game, y_game = process_game_data(row, n=n)

        if X_game and y_game:
            all_X.extend(X_game)
            all_y.extend(y_game)
            games_in_file += 1

        if games_in_file >= games_per_file:
            np.save(f"{output_folder}/chess_data_{file_id}.npy", {"X": all_X, "y": all_y}, allow_pickle=True)
            print(f"✅ Đã lưu file: chess_data_{file_id}.npy (games {idx - games_in_file + 1} đến {idx})")
            file_id += 1
            all_X, all_y = [], []
            games_in_file = 0

    # Lưu phần còn lại
    if all_X and all_y:
        np.save(f"{output_folder}/chess_data_{file_id}.npy", {"X": all_X, "y": all_y}, allow_pickle=True)
        print(f"✅ Đã lưu file: chess_data_{file_id}.npy (cuối)")


In [7]:
sample_table = table_pgn.head(2000)
process_and_save_table_as_npy(sample_table, n=4, games_per_file=1000, output_folder="ai/data")


  0%|          | 0/2000 [00:00<?, ?it/s]

 50%|█████     | 1005/2000 [00:09<00:32, 30.80it/s]

✅ Đã lưu file: chess_data_0.npy (games 0 đến 999)


100%|██████████| 2000/2000 [00:19<00:00, 100.55it/s]

✅ Đã lưu file: chess_data_1.npy (games 1000 đến 1999)





In [34]:
data = np.load("data/chess_data_0.npy", allow_pickle=True).item()
X = data["X"]
y = data["y"]

# Ví dụ:
print(len(X), len(y))
print(X[0].keys())  # 'board', 'history', 'white_elo', 'black_elo'
print(y[0])         # (from_square, to_square)


77023 77023
dict_keys(['board', 'history', 'white_elo', 'black_elo'])
(52, 44)
