# AI boys: chess detection project

**Members [ROLES]:**
- Kridbhume Chammanard [Image Processing]
- Ting-Yi Lin [Model Development]
- Thana Wanavit [Image Processing]
- Norapath Arjanurak [Model Development]
- Pattaradanai Lakkananithiphan [Pre&Postprocessing]

### Import the necessary libraries

In [87]:
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import mediapipe as mp
from collections import Counter
import chess
import chess.pgn

import os
import random

from PIL import Image
from ultralytics import YOLO
from dotenv import dotenv_values
from ultralytics.utils.ops import scale_image

import json

### Constant Settings

In [95]:
# initaiate hands
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(min_detection_confidence=0.2, min_tracking_confidence=0.5)
# a to h from left to right
X_INDEX = list("abcdefgh")
# 8 to 1 from top to bottom
Y_INDEX = [str(i+1) for i in range(8)][::-1]
# initiate state accumulation variables
state_list = []
pgn_list = []
# noise frame tolerance
TOLERANCE = 2
# crop distance
CROP = 50
# 1/frame_rate
RATE = 5
# class names
BLACK = "black"
WHITE = "white"
KING = "king"
# promotion key for uci
PROMOTE_KEY = {"knight":"n","king":"k","bishop":"b","queen":"q","rook":"r","pawn":""}
SYM_KEY = {"knight":"n","king":"k","bishop":"b","queen":"q","rook":"r","pawn":"p"}
# model path
MODEL_PATH = "chess_model/runs/detect/chess_data_model_yolov11m_epoch50/weights/best.pt"
# data path
DATA_PATH = ""
# submission output
OUTPUT_PATH = "submission.csv"

W0000 00:00:1733814108.415938   32339 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1733814108.428249   32339 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


### Model Inference

In [89]:
class ChessPredicter:
    def __init__(self, model_path=MODEL_PATH):
      model = YOLO(model_path)
      self.model = model

    def predict_image(self, img):
      results = self.model(img)[0]
      return results.to_json()
    
    def format_output(self, result):
      lst = json.loads(result)
      # reformat as class, x, y, w, h
      return [(e['name'],e['box']['x1'],e['box']['y1'],e['box']['x2']-e['box']['x1'],e['box']['y2']-e['box']['y1']) for e in lst]
    
    def get_state(self, img):
       result = self.predict_image(img)
       return self.format_output(result)

### Image preprocessing

In [90]:
class ChessboardProcessor:

    WIDTH = 640
    HEIGHT = 640

    def __init__(self, image):
        # self.image_path = image_path
        self.image = image
        self.warped_image = None
        self.transformation_matrix = None

    # Utility function to display images
    def display_image(self, img, title="Image"):
        plt.figure(figsize=(6, 6))
        plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        plt.axis('off')
        plt.title(title)
        plt.show()

    # Reorders points to top-left, top-right, bottom-right, bottom-left
    def reorder(self, pts):
        pts = pts.reshape((4, 2))
        new_pts = np.zeros((4, 1, 2), dtype="float32")
        sum_pts = pts.sum(1)
        new_pts[0] = pts[np.argmin(sum_pts)]  # Top-left
        new_pts[3] = pts[np.argmax(sum_pts)]  # Bottom-right
        diff_pts = np.diff(pts, axis=1)
        new_pts[1] = pts[np.argmin(diff_pts)]  # Top-right
        new_pts[2] = pts[np.argmax(diff_pts)]  # Bottom-left
        return new_pts

    # Preprocess the image (grayscale, blur, threshold, morphology)
    def preprocess(self):
        img_gray = cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY)
        img_blur = cv2.GaussianBlur(img_gray, (3, 3), 1)
        img_threshold = cv2.adaptiveThreshold(img_blur, 255, 1, 1, 15, 2)

        # Morphological operations
        # kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
        # img_opening = cv2.morphologyEx(img_threshold, cv2.MORPH_OPEN, kernel)
        # img_closing = cv2.morphologyEx(img_opening, cv2.MORPH_CLOSE, kernel)

        # self.display_image(img_threshold, "Preprocessed Image")
        return img_threshold

    # Finds the largest contour with 4 points
    def find_biggest_contour(self, contours):
        largest_pts = None
        max_area = 0
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > 50:  # Ignore small contours
                peri = cv2.arcLength(contour, True)
                approx = cv2.approxPolyDP(contour, 0.1 * peri, True)
                if area > max_area and len(approx) == 4:
                    largest_pts = approx
                    max_area = area
        return self.reorder(largest_pts) if largest_pts is not None else None, max_area

    # Finds the chessboard corners and returns the reordered corners
    def find_board_corners(self):
        processed_img = self.preprocess()
        contours, _ = cv2.findContours(processed_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        contour_img = self.image.copy()
        cv2.drawContours(contour_img, contours, -1, (0, 255, 0), 3)
        # self.display_image(contour_img, "Image with Contours")
        corners, _ = self.find_biggest_contour(contours)
        return corners

    # Applies perspective transformation to get a bird's-eye view
    def warp_image(self, corners):
        original_pts = np.float32(corners)
        new_pts = np.float32([[0, 0], [self.WIDTH, 0], [0, self.HEIGHT], [self.WIDTH, self.HEIGHT]])
        self.transformation_matrix = cv2.getPerspectiveTransform(original_pts, new_pts)
        self.warped_image = cv2.warpPerspective(self.image, self.transformation_matrix, (self.WIDTH, self.HEIGHT))
        return self.warped_image

    # Draws points on the given image
    def draw_points(self, img, pts, color=(0, 0, 255), size=10):
        for pt in pts:
            pt = tuple(map(int, pt))  # Convert to (x, y) tuple
            cv2.circle(img, pt, size, color, -1)

    # Generates random points within the image dimensions
    def generate_random_points(self, num_points=4):
        height, width, _ = self.image.shape
        
        # Generate random points
        random_points = [(random.randint(0, width - 1), random.randint(0, height - 1)) for _ in range(num_points)]
        random_points = np.float32(random_points)
        
        # Create a list of tuples with (piece, x_coord, y_coord)
        piece_list = []
        for i, (x_coord, y_coord) in enumerate(random_points, start=1):
            piece_list.append((f"piece {i}", x_coord, y_coord))
        
        return piece_list
    
    # Main function to process the image
    def rotate_and_warp(self, detection_cg):
        # Find the chessboard corners in the frame
        corners = self.find_board_corners()
        if corners is not None:
            # Copy the original image for visualization
            image_with_points = self.image.copy()

            # Draw the original detection CG points on the original image
            for piece, x_coord, y_coord in detection_cg:
                self.draw_points(image_with_points, [(x_coord, y_coord)])

            # Warp the image based on the detected chessboard corners
            warped_image = self.warp_image(corners)

            # Prepare the original detection CG points for transformation
            original_points = np.array([[x, y] for _, x, y in detection_cg], dtype=np.float32).reshape(-1, 1, 2)

            # Apply perspective transformation to the original points
            transformed_points = cv2.perspectiveTransform(original_points, self.transformation_matrix)

            # Prepare the transformed points as a list of tuples (piece, x_new, y_new)
            transformed_points_list = []
            for i, (piece, _, _) in enumerate(detection_cg):  # Correct unpacking here
                x_new, y_new = transformed_points[i][0]
                transformed_points_list.append((piece, x_new, y_new))

            # Draw the transformed points on the warped image
            self.draw_points(warped_image, transformed_points[:, 0])

            # Display the original image with points and the warped image with transformed points
            # plt.figure(figsize=(12, 6))
            # plt.subplot(1, 2, 1)
            # plt.imshow(cv2.cvtColor(image_with_points, cv2.COLOR_BGR2RGB))
            # plt.title("Original Image with Detection Points")
            # plt.axis('off')

            # plt.subplot(1, 2, 2)
            # plt.imshow(cv2.cvtColor(warped_image, cv2.COLOR_BGR2RGB))
            # plt.title("Warped Image with Transformed Points")
            # plt.axis('off')

            # plt.show()

            return warped_image, transformed_points_list
        else:
            print("Could not find the corners of the chessboard.")
            return None, None

### Program loop

In [91]:
# legacy

# key for PGN
KEY = {"knight":"N","king":"K","bishop":"B","queen":"Q","rook":"R","pawn":""}

# a function to convert differences directly to a simplified version of PGN
def simple_pgn_from_differences(differences):
    
    pgn_moves = []

    # for each move
    for disappeared, appeared in differences:

        # the length of the pieces that disappeared and appeared
        l_d = len(disappeared)
        l_a = len(appeared)

        print("appeared:", appeared)
        print("disappeared:", disappeared)
        print("l_a:",l_a)
        print("l_d:", l_d)

        try:
            if l_d == 1 and l_a == 1: # movement or promotion

                old = list(disappeared)[0]
                new = list(appeared)[0]

                if old[:2] == new[:2]: # if the class and color is the same -> movement
                    print(1.1)
                    msg = f"{KEY[new[1]]}{new[2]}{new[3]}"

                else: # promotion
                    print(1.2)
                    msg = f"{KEY[old[1]]}{new[2]}{new[3]}={KEY[new[1]]}"

            elif l_d == 2 and l_a == 1: # capturing

                new = list(appeared)[0]
                old = [piece for piece in disappeared if piece[:2] == new[:2]][0] # the capturer

                if new[1] == "pawn":
                    print(2.1)
                    msg = f"{KEY[new[1]]}{old[2]}x{new[2]}{new[3]}"
                else:
                    print(2.2)
                    msg = f"{KEY[new[1]]}x{new[2]}{new[3]}"

            elif l_d == 2 and l_d == 2: # castling

                # find king's position
                new_king = [piece for piece in appeared if piece[1] == KING][0]

                if new_king[2] == "g": # king side
                    print(3.1)
                    msg = "O-O"
                elif new_king[2] == "c": # queen side
                    print(3.2)
                    msg = "O-O-O"
                else:
                    msg = "CASTLING_ERROR"

            else:
                msg = "MOVE_ERROR"

        except:

            msg = "MOVE_ERROR"

        print(msg)

        pgn_moves.append(msg)

    # find the color of the first move
    first_appeared = differences[0][1]
    l_fa = len(first_appeared)


    # check if it's black's turn at the start
    if (l_fa == 1 and list(first_appeared)[0][0] == "white") or \
    (l_fa == 2 and list(first_appeared)[0][0] == "white" and list(first_appeared)[0][1] == "white"):
        turn = "white"
    elif (l_fa == 1 and list(first_appeared)[0][0] == "black") or \
    (l_fa == 2 and list(first_appeared)[0][0] == "black" and list(first_appeared)[0][1] == "black"):
        turn = "black"
    else:
        turn = None
    
    # add ".." in case black starts
    if turn == "black":
        pgn_moves = [".."] + pgn_moves

    # combine the moves into pairs and add star of incompletion
    l_white = pgn_moves[0::2]
    l_black = pgn_moves[1::2]
    pairs = zip(l_white,l_black)
    pgn = " ".join(f"{index+1}. {content[0]} {content[1]}" for index, content in enumerate(pairs))
    if len(l_white) > len(l_black):
        pgn += f" {len(l_white)}. {l_white[-1]}"

    pgn += " *"
    
    return pgn

W0000 00:00:1733812957.396552   31738 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [102]:
# wrap the video reader
def frame_generator(video_path):
    # Open the video file
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print("Error: Could not open video file.")
        return

    while True:
        ret, frame = cap.read()
        if not ret:
            break  # End of video
        yield frame  # Yield frame to the caller

    cap.release()

# Function to check if hands are detected in a frame
def hands_detected(frame):
    # Convert the frame to RGB as MediaPipe uses RGB
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    
    # Process the frame and get the result
    results = hands.process(rgb_frame)
    
    # If hands are detected, results.multi_hand_landmarks will not be None
    return results.multi_hand_landmarks is not None

# summarize state list into easy to use format
def summarize_states(lst, tolerance):

    # Step 1: Clean the state list of noises
    # initialize counters
    state_counter = 1
    previous = None
    updated = False
    summarized_states = []
    # loop through the list
    for state in lst:

        # if new state detected
        if previous is None or state != previous:
            state_counter = 1
            updated = False

        # if the count of state reaches tolerance
        if state_counter >= tolerance and not updated:
            # print("Noted")
            summarized_states.append(state)
            updated = True

        # print(f"state = {state}\nprevious = {previous}\nstate_counter={state_counter}\nupdated={updated}","\n")

        # iterate the counters
        state_counter += 1
        previous = state
    # print(f"Summarized:", summarized_states)

    # Step 2: Identify differences between consecutive states
    differences = []
    for i in range(len(summarized_states) - 1):
        old_state = summarized_states[i]
        new_state = summarized_states[i + 1]
        
        disappeared = old_state - new_state
        appeared = new_state - old_state
        
        differences.append((disappeared, appeared))
    # print("Differences:", differences)

    return differences

def uci_from_differences(differences):

    """
    Convert the output of summarize_states into UCI format.

    Parameters:
        differences (list): List of differences as ({disappeared}, {appeared}).

    Returns:
        list: A list of UCI moves
    """

    uci_moves = []

    for disappeared, appeared in differences:

        l_d = len(disappeared)
        l_a = len(appeared)

        print("appeared:", appeared)
        print("disappeared:", disappeared)
        print("l_a:",l_a)
        print("l_d:", l_d)

        try:

            if l_d == 1 and l_a == 1: # movement or promotion

                print("triggered case 1")

                old = list(disappeared)[0]
                new = list(appeared)[0]

                if old[:2] == new[:2]: # if the class and color is the same -> movement
                    msg = f"{old[2]}{old[3]}{new[2]}{new[3]}"

                else: # promotion
                    msg = f"{old[2]}{old[3]}{new[2]}{new[3]}{PROMOTE_KEY[new[1]]}"

            elif l_d == 2 and l_a == 1: # capturing

                print("triggered case 2")

                new = list(appeared)[0]
                old = [piece for piece in disappeared if piece[:2] == new[:2]][0] # the capturer

                msg = f"{old[2]}{old[3]}{new[2]}{new[3]}"

            elif l_d == 2 and l_a == 2: # castling

                print("triggered case 3")

                new_king = [piece for piece in appeared if piece[1] == KING][0]

                if new_king[2] == "g": # king side
                    if new_king[0] == BLACK:
                        msg = "e8g8"
                    else:
                        msg = "e1g1"
                elif new_king[2] == "c": # queen side
                    if new_king[0] == BLACK:
                        msg = "e8c8"
                    else:
                        msg = "e1c1"
                else:
                    msg = "CASTLING_ERROR"

            else:
                msg = "MOVE_ERROR"

        except:
            msg = "MOVE_ERROR"

        uci_moves.append(msg)
     
    return uci_moves

def generate_pgn(moves, ori):

    # initialize empty board 
    board = chess.Board()

    if ori is not None:
        board.clear()

        # place each piece
        for color, piece, alpha, num in ori:

            piece = SYM_KEY[piece]

            if color == BLACK:
                piece = piece.lower()
            elif color == WHITE:
                piece = piece.upper()

            position = alpha + str(num)
            square = chess.parse_square(position)
            board.set_piece_at(square, chess.Piece.from_symbol(piece))

        # set the correct start turn
        print(moves)
        board.turn = board.color_at(chess.parse_square(moves[0][2] + str(moves[0][2])))

    # create game
    game = chess.pgn.Game()
    node = game
    node.headers["FEN"] = board.fen()

    # iterate through UCI and keep track of board state
    board_states = list()
    board_states.append(board)
    for move in moves:
        try:
            node = node.add_variation(board.parse_uci(move))
            board.push_uci(move)
            board_states.append(board)
        except ValueError:
            print(f"Move {move} is invalid")
    
    pgn = str(game).split("\n")[-1]
    return pgn, board_states

# convert a chess video to pgn format
def video2pgn(video_path):

    predictor = ChessPredicter()

    for i, frame in enumerate(frame_generator(video_path)):
            
        if i % RATE == 0:

            frame = frame[:,:,::-1]

            h,w,_ = frame.shape
            delta = (h-w)//2
            frame = frame[delta+CROP:delta+w-CROP,:,:]

            # plt.imshow(frame)
            # plt.show()

            # if hand is there
            if not hands_detected(frame):

                # detect pieces and format the piece's foot CG
                detection = predictor.get_state(frame)
                detection_cg = {(piece_class,x+w//2,y+h) for piece_class,x,y,w,h in detection}
                print(detection_cg)

                # get the transformed image and the coordinate of the transformed CG
                chessboard = ChessboardProcessor(frame)
                img, piece_cg = chessboard.rotate_and_warp(detection_cg)
                if img is None or piece_cg is None: # if the frame is bad skip it
                    continue
                # print(piece_cg[0][1])

                # get the image size to divide into cells
                shape = img.shape
                x_cell_size = shape[0]//8
                y_cell_size = shape[1]//8
                # print(x, x_cell_size, x//x_cell_size)
                

                # img, {(white-knight,x,y),(black-queen,x1,y1)}
                print(piece_cg)

                # reformat the piece_cg set to indicate row and column instead
                detection_cell = {(piece_class.split("-")[0], # color
                                piece_class.split("-")[1], # class
                                X_INDEX[int(x//x_cell_size) % 8], # column: a,b,c..
                                Y_INDEX[int(y//y_cell_size) % 8]) # row: 1,2,...
                                for piece_class,x,y in piece_cg}
                
                state_list.append(detection_cell)

    # Original state
    ori = state_list[0]
        
    # Once the processing is finished
    differences = summarize_states(lst=state_list, tolerance=TOLERANCE)
    print("differences:", differences)
    uci = uci_from_differences(differences)
    if 'MOVE_ERROR' in uci:
        print("MOVE ERROR")
        return None, None
    pgn, board_states = generate_pgn(moves=uci, ori=ori)

    return pgn, board_states   

# main program to take a directory of videos to a csv file
def main(path=DATA_PATH, output=OUTPUT_PATH):

    dic = dict()

    if path == "":
        for index, video in enumerate(["Hello.mp4","Hi.mp4"]):
            pgn = video[::-1]
            dic[index] = (video, pgn)
    else:
        videos = os.listdir(path)
        for index, video in enumerate(videos):
            pgn, _ = video2pgn(video)
            dic[index] = (video, pgn)

    df = pd.DataFrame.from_dict(dic, orient='index', columns=["row_id", "output"])
    df.to_csv(output, index=False, encoding='utf-8')

    return df


In [103]:
pgn, boards = video2pgn("2_move_student.mp4")


0: 384x416 2 black-knights, 3 black-pawns, 2 black-rooks, 1 white-king, 2 white-knights, 3 white-pawns, 2 white-queens, 6 white-rooks, 136.1ms
Speed: 3.9ms preprocess, 136.1ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 416)
{('black-knight', 246.65509, 894.45502), ('white-rook', 155.15, 246.44666), ('black-knight', 935.88025, 568.61945), ('black-pawn', 599.92047, 667.20331), ('black-rook', 979.03711, 890.58545), ('white-knight', 701.56879, 369.33289), ('black-pawn', 840.53876, 782.03778), ('black-pawn', 492.75937, 566.1261), ('white-queen', 370.82495, 255.59895), ('white-pawn', 258.42644, 248.33775), ('white-pawn', 597.76105, 558.40607), ('black-rook', 951.3667, 785.19885), ('white-pawn', 724.0993, 248.44209), ('white-queen', 598.58325, 156.85008), ('white-rook', 472.97589, 454.1069), ('white-rook', 925.11896, 145.91939), ('white-knight', 108.18653, 890.92322), ('white-king', 372.61035, 257.00275), ('white-rook', 702.47473, 369.66812), ('white-rook', 158.08586, 135.09

In [93]:
main()

Unnamed: 0,row_id,output
0,Hello.mp4,4pm.olleH
1,Hi.mp4,4pm.iH
