In [1]:
import copy
import time
from random import choice, shuffle
from math import log, sqrt
from copy import deepcopy
import numpy as np
import pandas as pd
import pygame,sys

player_o = 1     #### ai : black color
player_x = -1    #### human : white color

pygame 2.1.0 (SDL 2.0.16, Python 3.8.8)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [3]:
### Class: board 

In [4]:
def get_opponent(player):
    """
    返回对手
    :param player: 玩家
    :return: 返回输入玩家的对手
    """
    opponent = player_o if player == player_x else player_x
    return opponent
class Board(object):
    """
    board for game
    """
    def __init__(self, width=8, height=8, n_in_row=5):
        self.width = width
        self.height = height
        
        ## can use states.keys() to get the state of each place
        self.states = {}  # 记录当前棋盘的状态，键是位置，值是棋子，这里用玩家来表示棋子类型
        self.n_in_row = n_in_row  # 表示几个相同的棋子连成一线算作胜利

    def init_board(self):
        if self.width < self.n_in_row or self.height < self.n_in_row:
            raise Exception('board width and height can not less than %d' % self.n_in_row)
        self.availables = list(range(self.width * self.height))  # 表示棋盘上所有合法的位置，这里简单的认为空的位置即合法
        for m in self.availables:
            self.states[m] = 0   # initialize self.availables
    def update(self, player, move):      # player在move处落子，更新棋盘
        self.states[move] = player
        self.availables.remove(move)
    def has_a_winner(self):
        """
        检查是否有玩家获胜+有玩家获胜则返回True 和 玩家, 如果平局,则返回True, None
        """
        if len(self.availables) == 0:
            return True,None
        moved = list(set(range(self.width * self.height)) - set(self.availables))
        if len(moved) < self.n_in_row*2 - 1:##小于9的时候，不可能有人获胜
            return False, None
        width = self.width
        height = self.height
        states = self.states
        n = self.n_in_row
        for m in moved:
            h = m // width
            w = m % width
            player = states[m]
            if (w in range(width - n + 1) and  # 横向连成一线
                    len(set(states[i] for i in range(m, m + n))) == 1):
                return True, player
            if (h in range(height - n + 1) and  # 竖向连成一线
                    len(set(states[i] for i in range(m, m + n * width, width))) == 1):
                return True, player
            if (w in range(width - n + 1) and h in range(height - n + 1) and  # 右斜向上连成一线
                    len(set(states[i] for i in range(m, m + n * (width + 1), width + 1))) == 1):
                return True, player
            if (w in range(n - 1, width) and h in range(height - n + 1) and  # 左斜向下连成一线
                    len(set(states[i] for i in range(m, m + n * (width - 1), width - 1))) == 1):
                return True, player
        return False, None

In [33]:
def ADJ(available_actions):
    adgacent = []
    moved = list(set(range(64)) - set(available_actions))
    if not moved:
        return [28,27,35,36]
    for i in moved:
        row = i // 8
        col = i % 8

        if 0 < row:  # 添加棋子上方位置
            adgacent.append(i - 8)
        if row < 7:  # 添加棋子下方位置
            adgacent.append(i + 8)
        if 0 < col:  # 添加棋子左边位置
            adgacent.append(i - 1)
        if col < 7:  # 添加棋子右边位置
            adgacent.append(i + 1)
        if 0 < col and row < 7:  # 添加棋子的左上角位置
            adgacent.append(i + 7)
        if col < 7 and row < 7:  # 添加棋子的右上角位置
            adgacent.append(i + 9)
        if 0 < col and 0 < row:  # 添加棋子的左下角位置
            adgacent.append(i - 9)
        if col < 7 and 0 < row:  # 添加棋子右下角位置
            adgacent.append(i - 7)
    return list(set(adgacent) - set(moved))  # 更新临近点

In [4]:
### Class: Node

In [41]:
class Node:
    def __init__(self, board: Board, parent=None,player = 1):
        self.state = board   #当前棋盘状态
        self.untried_actions = ADJ(board.availables)   # 可走的地方
        self.parent = parent    # 根节点
        self.player = player    # 谁下棋
        self.children = {}
        self.Q = 0  # 节点最终收益价值
        self.N = 0  # 节点被访问的次数

    def weight_func(self, c_param=1.9):
        if self.N != 0:
            # tip： 这里使用了-self.Q 因为子节点的收益代表的是对手的收益
            w = -self.Q / self.N + c_param * np.sqrt(2 * np.log(self.parent.N) / self.N)
        else:
            w = 0.0
        return w
    @staticmethod           
    def get_random_action(available_actions):         # random choice 需要加 biased choice 
        action_number = len(available_actions)
        action_index = np.random.choice(range(action_number))
        return available_actions[action_index]

    def select(self, c_param=1.9):
        """
        根据当前的子节点情况选择最优的动作并返回子节点
        :param c_param: 探索参数用于探索的比例
        :return: 最优动作，最优动作下的子节点
        """
#         weight_max = -float('inf')
#         for child_key in self.children.keys():
#             tmp = self.children[child_key].weight_func(c_param)
#             if tmp>=weight_max:
#                 weight_max = tmp
#                 action = child_key
            
        weights = [child_node.weight_func(c_param) for child_node in self.children.values()]       
        action = pd.Series(data=weights, index=self.children.keys()).idxmax()
        next_node = self.children[action]
        return action, next_node
    def expand(self):
        """
        扩展子节点并返回刚扩展的子节点
        :return: 刚扩展出来的子节点
        """
        # 从没有尝试的节点中选择
        action = self.untried_actions.pop()
        # 获得当前的节点对应的玩家
        current_player = self.player
        # 获得下一步的局面
        next_board = deepcopy(self.state)
        next_board.update(current_player, action)
        # 获得下一步的玩家
        next_player = get_opponent(current_player)
        # 扩展出一个子节点
        child_node = Node(next_board, self,next_player)
        self.children[action] = child_node
        return child_node

    def update(self, winner,score):
        """
        经过模拟之后更新节点的价值和访问次数
        :param winner: 返回模拟的胜者
        :return:
        """
        self.N += 1
        opponent = get_opponent(self.player)
        if winner == self.player:
            self.Q+=20/score
        elif winner == opponent:
            self.Q-=20/score
#         elif winner == None:
#             self.Q += 0.5
        if self.parent:
            self.parent.update(winner,score)

### define rollout method 
    def rollout(self,depth):
        """
        从当前节点进行蒙特卡洛模拟返回模拟结果
        :return: 模拟结果
        """
        current_state = deepcopy(self.state)
        current_player = deepcopy(self.player)
        score = depth
        while True:
            is_over, winner = current_state.has_a_winner()
            if is_over:
                break
            available_actions = current_state.availables
            action = Node.get_random_action(available_actions)
            current_state.update(current_player,action)
            current_player = get_opponent(current_player)
            score+=1
        return winner,score

    def is_full_expand(self):
        """
        检测节点是否是已经完全扩展了
        :return: 返回节点是否完全扩展
        """
        return len(self.untried_actions) == 0

    def not_root_node(self):
        """
        检测节点是否是根节点
        :return: 返回节点是否是根节点
        """
        return self.parent

In [42]:
### Class: MCTS

In [43]:
class MCTS:
    def __init__(self):
        self.root = None
        self.current_node = None
    def __str__(self):
        return "monte carlo tree search ai"
    def simulation(self, second = 5):
        """
        用于模拟蒙特卡罗搜索
        :param count: 模拟的次数/ second: simulation time
        :return:
        """
        t = 0 
        start_time = time.time()
        while time.time()-start_time<second:
            t+=1
            leaf_node,depth = self.simulation_policy()   
            winner,score = leaf_node.rollout(depth)
            leaf_node.update(winner,score)
        print("Total simulation times:",t)

    def simulation_policy(self):
        """
        模拟过程中找到当前的叶子节点
        :return: 叶子节点
        """
        current_node = self.current_node
        depth = 1
        while True:
            is_over, _ = current_node.state.has_a_winner()
            if is_over:
                break
            if current_node.is_full_expand():
                _, current_node = current_node.select()
            else:
                return current_node.expand(),depth
            depth+=1

        leaf_node = current_node
        return leaf_node,depth 

    def take_action(self, current_state):
        """
        蒙特卡罗模拟选择最优动作
        :param current_state: 当前的状态
        :return: 最优动作
        """
        player = 1
        if not self.root: # 第一次初始化
            self.root = Node(current_state, None,player)
            self.current_node = self.root
        else:
            for child_node in self.current_node.children.values(): # 跳转到合适的状态,进而保存之前的记录
                if child_node.state == current_state:
                    self.current_node = child_node
                    break
                else:   # 游戏重新开始的情况下
                    self.current_node = Node(current_state, None,player)

        self.simulation(5)
        action, next_node = self.current_node.select(0.0) # 选择概率最大的下一步着子
        self.current_node = next_node # 跳转到对手状态上,对应上上步的状态跳转,next_node 的孩子点就是我的下一个状态
        return action

In [44]:
### Class: Game

In [45]:
def draw_board(screen):          
    board_color = [241, 196, 15]
    screen.fill(board_color)
    for h in range(0, 8):
        pygame.draw.line(screen,[0, 0, 0],[40, h * d+40], [600, 40+h * d], 1)
        pygame.draw.line(screen,[0, 0, 0], [40+d*h, 40], [40+d*h, 600], 1)
def draw_stone(screen,value_list):
    mat=np.array(value_list).reshape(8,8)
    for i in range(mat.shape[0]):
        for j in range(mat.shape[1]):
            if mat[i][j]==1:
                pos = [40+d * j, 40+d* i ]
                pygame.draw.circle(screen,[0, 0, 0], pos, 18,0)
            elif mat[i][j]==-1:
                pos = [40+d* j , 40+d * i]
                pygame.draw.circle(screen,[255, 255, 255], pos, 18,0)
def render_2(screen,value_list):    
    draw_board(screen)
    draw_stone(screen,value_list)
    pygame.display.update()

class Game(object):
    """
    game server
    """
    def __init__(self, board, **kwargs):
        self.board = board
        self.player = [1, -1]  # player1 and player -1
        self.n_in_row = int(kwargs.get('n_in_row', 5))
        self.time = float(kwargs.get('time', 5))
        self.max_actions = int(kwargs.get('max_actions', 1000))
    def start(self):
        self.board.init_board()   # prepare info
        ai = MCTS()  
        
        ### first choose start side
        pygame.init()
        screen_0=pygame.display.set_mode((640,640))   
        pygame.display.set_caption('Five-in-a-Row') # a black interface screen
        font = pygame.font.Font(pygame.font.get_default_font(), 36)
        text_surface_1=font.render('Who start first?',True,(255,0,0))
        text_surface_2=font.render('Press 1: AI (Black)',True,(0,0,0))
        text_surface_3=font.render('Press 2: Human (White)',True,(255,255,255))
        screen_0.fill((241, 196, 15))
        screen_0.blit(text_surface_1, dest=(70,70))
        screen_0.blit(text_surface_2, dest=(100,170))
        screen_0.blit(text_surface_3, dest=(100,270))
        pygame.display.update()
        choice=False
        while not choice:
            for event in pygame.event.get():
                if event.type==pygame.QUIT:
                    pygame.quit()
                    raise SystemExit
                elif event.type==pygame.KEYDOWN:
                    if event.key==49 or event.key==50:
                        turn_0=event.key
                        choice=True
        pygame.quit() 
        ### then start game
        pygame.init()
        done=False ; global d  ; d=int(560/(8-1))  
        # draw board sector    
        screen=pygame.display.set_mode((640,640))   
        pygame.display.set_caption('Five-in-a-Row') # a black interface screen
        draw_board(screen)
        pygame.display.update()        
        # play game 
        if turn_0==49:
            turn_list=[1,-1]
        elif turn_0==50:
            turn_list=[-1,1]
        turn=turn_list.pop(0)    
        while not done: 
            for event in pygame.event.get():
                if event.type==pygame.QUIT:  #用户按下关闭按钮
                    pygame.quit()
                    raise SystemExit  
                if event.type==pygame.MOUSEBUTTONDOWN:
                    if turn==-1:
                        (x,y)=event.pos
                        row = round((y - 40) / d)     
                        col = round((x - 40) / d)
                        move =row*self.board.width+col
                        if move in self.board.availables:
                            self.board.update(turn, move)
                            render_2(screen,list(self.board.states.values()) )
                            done, winner =self.board.has_a_winner()
                            turn_list.append(turn)
                            turn=turn_list.pop(0)                         
            if turn==1 and not done:
                move = ai.take_action(self.board)
                self.board.update(turn, move)        
                render_2(screen,list(self.board.states.values()))
                done, winner =self.board.has_a_winner()
                turn_list.append(turn)
                turn=turn_list.pop(0)
        end=False ; font_2 = pygame.font.Font(pygame.font.get_default_font(), 77)
        while not end: 
            for event in pygame.event.get():
                if event.type==pygame.QUIT:  #用户按下关闭按钮
                    pygame.quit()
                    raise SystemExit
            if winner==-1:
                text_surface_4=font_2.render('You Win!!',True,(255,0,0))
                screen.blit(text_surface_4, dest=(0,0))
                pygame.display.update()         
            elif winner==1:
                text_surface_5=font_2.render('You Lose!!',True,(255,0,0))
                screen.blit(text_surface_5, dest=(0,0))
                pygame.display.update()         
            elif winner==None:
                text_surface_6=font_2.render('No Winner.',True,(255,0,0))
                screen.blit(text_surface_6, dest=(0,0))
                pygame.display.update() 
        pygame.quit() ; raise SystemExit

In [46]:
### main 

In [47]:
def main():
    board = Board()
    board.init_board()
    game = Game(board)
    game.start()
if __name__ == '__main__':
    main()

Total simulation times: 846
Total simulation times: 846
Total simulation times: 878
Total simulation times: 848
Total simulation times: 8554


SystemExit: 