In [1]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from kaggle_environments import evaluate, make, utils, register
import telebot
from telebot import types


In [2]:
def make_turn(board, mark, turn):
    columns = 7
    rows = 6
    row = max([r for r in range(rows) if board[turn + (r * columns)] == 0])
    board[turn + (row * columns)] = mark
    
def bad_move(board, turn):
    columns = 7
    rows = 6
    if len([r for r in range(rows) if board[turn + (r * columns)] == 0]) == 0:
        return True
    return False

def is_win(board, column, mark):
    columns = 7
    rows = 6
    current_board = np.array(board.copy())

    def move_board(board, r, c):
        new_board = np.zeros(board.shape)
        rows, cols = new_board.shape
        for i in range(rows):
            for j in range(cols):
                new_board[i][j] = board[(i + rows - r) % rows][(j + cols - c) % cols]
        return new_board
    
    def is_win_simple(board_, mark):
        """ Checks for a win. Taken from the Kaggle environment. """
        if board_[0][0] == mark and board_[0][1] == mark and board_[0][2] == mark and board_[0][3] == mark:
            return True
        if board_[0][0] == mark and board_[1][0] == mark and board_[2][0] == mark and board_[3][0] == mark:
            return True
        if board_[0][0] == mark and board_[1][1] == mark and board_[2][2] == mark and board_[3][3] == mark:
            return True
        if board_[3][0] == mark and board_[2][1] == mark and board_[1][2] == mark and board_[0][3] == mark:
            return True
        return False

    for k in range(rows):
        for m in range(columns):
            new_board = move_board(current_board.reshape(rows, columns), k, m)
            if is_win_simple(new_board, mark):
                return True
            
    return False


def is_tie(board):
    return not(any(mark == 0 for mark in board))

def get_reward(board, column, mark):
    if is_tie(board):
        return 0.5
    if is_win(board, column, mark):
        return 1

    return 0 # игра еще не закончена 

def find_action_taken_by_opponent(new_board, old_board, config):
    """ Given a new board state and a previous one, finds which move was taken. Used for recycling tree between moves. """
    for i, piece in enumerate(new_board):
        if piece != old_board[i]:
            return i % config.columns
    return -1  # shouldn't get here

class MCTS_Node:
    def __init__(self, board, mark, terminal, game_result = None, parent = None, parent_action = None) -> None:
        self.board = board
        self.mark = mark
        self.terminal = terminal
        self.game_result = game_result
        self.parent = parent
        self.parent_action = parent_action
        self.children: list[MCTS_Node] = []
        self.number_visits = 0
        self.score = 0
        self.untried_actions = self.available_moves()

    def available_moves(self, board = None):
        if board is None:
            board = self.board

        return [move for move in range(7) if board[move] == 0]
    # def q(self):
    #     wins = self._results[1]
    #     loses = self._results[-1]
    #     return wins - loses
    def n(self):
        return self.number_visits

    def expand(self):
        action = self.untried_actions.pop()
        new_board = self.board.copy()

        make_turn(new_board, self.mark, action)

        score = get_reward(new_board, action, self.mark)
        terminal = True

        if score == 0:
            terminal = False
        child_node = MCTS_Node(
            new_board, mark=3 - self.mark, terminal=terminal, game_result=score, parent=self, parent_action=action)
        self.children.append(child_node)
        return child_node 
    
    def backpropagate(self, result):
        self.number_visits += 1.
        self.score += result
        if result == 0:
            self.score -= 5
        if self.parent:
            self.parent.backpropagate(1 - result)

    def rollout(self):
        if self.terminal:
            return self.game_result
        mark = self.mark
        new_board = self.board.copy()
        action = self.rollout_policy(self.available_moves())
        make_turn(new_board,  mark, action)
        score = get_reward(new_board, action, mark)

        while  score == 0:
            mark = 3 - mark
            action = self.rollout_policy(self.available_moves(new_board))

            make_turn(new_board, mark, action)
            score = get_reward(new_board, action, mark)
        if mark == self.mark:
            return score
        return 1 - score
    
    def rollout_policy(self, available_moves): # may be better?
        return available_moves[np.random.randint(len(available_moves))] #may be here

    def simulate(self):
        if self.terminal:
            return self.game_result
        return 1 - self.rollout()

    def is_fully_expanded(self):
        return len(self.untried_actions) == 0

    def best_child(self, c_param=0.1):    
        choices_weights = [(c.score / c.n()) + c_param * np.sqrt((2 * np.log(self.n()) / c.n())) for c in self.children]
        return self.children[np.argmax(choices_weights)]
    
    def _tree_policy(self):
        current_node = self
        while not current_node.is_terminal_node():
            
            if not current_node.is_fully_expanded():
                return current_node.expand()
            else:
                current_node = current_node.best_child()
        return current_node
    
    def best_action(self):
        simulation_no = 100
            
        for i in range(simulation_no):
            
            v = self._tree_policy()
            reward = v.rollout()
            v.backpropagate(reward)
        
        return self.best_child(c_param=0.)
    
    def expand_and_simulate_child(self):
        self.expand()
        simulation_score = self.children[-1].simulate()
        self.children[-1].backpropagate(simulation_score)

    def tree_single_run(self):
        if self.terminal:
            self.backpropagate(self.game_result)
            return
        if not self.is_fully_expanded():
            self.expand_and_simulate_child()
            return
        self.best_child().tree_single_run()
            
    def choose_child_via_action(self, action):
        for child in self.children:
            if child.parent_action == action:
                return child
        return None

def MCTS_agent(observation, configuration):
    """
    Connect X agent based on MCTS.
    """
    import random
    import math
    import time
    global current_state  # so tree can be recycled
    board = observation.board
    mark = observation.mark
    init_time = time.time()
    EMPTY = 0
    T_max = 35  # time per move, left some overhead
    Cp_default = 13

    
    for turn in [move for move in range(7) if board[move] == 0]:
        new_board = board.copy()
        make_turn(new_board, 3 - mark, turn)
        if is_win(new_board, turn, 3 - mark):
            return turn
    # If current_state already exists, recycle it based on action taken by opponent
    try:  
        current_state = current_state.choose_child_via_action(
            find_action_taken_by_opponent(board, current_state.board, configuration))
        current_state.parent = None  # make current_state the root node, dereference parents and siblings
        
    except:  # new game or other error in recycling attempt due to Kaggle mechanism
        current_state = MCTS_Node(board=board,mark= mark, terminal=False)
   
    # Run MCTS iterations until time limit is reached.
    while time.time() - init_time <= T_max:
        current_state.tree_single_run()
        
    current_state = current_state.best_child()
    return current_state.parent_action

In [3]:

EMPTY = 0

def interpreter(state, env):
    active = state[0] if state[0].status == "ACTIVE" else state[1]

    # Specification can fully handle the reset.
    if env.done:
        return state

    # Isolate the active and inactive agents.
    active = state[0] if state[0].status == "ACTIVE" else state[1]
    inactive = state[0] if state[0].status == "INACTIVE" else state[1]
    if active.status != "ACTIVE" or inactive.status != "INACTIVE":
        active.status = "DONE" if active.status == "ACTIVE" else active.status
        inactive.status = "DONE" if inactive.status == "INACTIVE" else inactive.status
        return state

    # Keep the board in sync between both agents.
    board = active.observation.board
    inactive.observation.board = board
    action = active.action
    mark = active.observation.mark

    # Illegal move by the active agent.
    if bad_move(board, action):
        active.status = f"Invalid move: {action}"
        inactive.status = "DONE"
        return state

    # Mark the position.
    make_turn(board, mark, action)

    # Check for a win.
    if is_win(board, action, mark):
        active.reward = 1
        active.status = "DONE"
        inactive.reward = 0
        inactive.status = "DONE"
    
        return state

    # Check for a tie.
    if is_tie(board):
        active.status = "DONE"
        inactive.status = "DONE"
        return state

    # Swap active and inactive agents to switch turns.
    active.status = "INACTIVE"
    inactive.status = "ACTIVE"

    return state

In [4]:
def print_board(board):
    print("\n|-----+-----+-----+-----+-----+-----+-----|")
    for i in range(6):
        print('|  ', end = '')
        for j in range(7):
            print(board[i * 7 + j], end='  |  ')
        print("\n|-----+-----+-----+-----+-----+-----+-----|")



In [5]:
def renderer(state, env):
    # row_bar = "\n---+---+---+---+---+---+---\n"
    # marks = [" ", "X", "O"]
    def print_board(board):
        for i in range(6):
            for j in range(7):
                print(board[i * 7 + j], end='   ')
            print('\n')
    # def print_pos(pos):
    #     str = ""
    #     if pos % 3 == 0 and pos > 0:
    #         str += row_bar
    #     if pos % 3 != 0:
    #         str += "|"
    #     return str + f" {marks[state[0].observation.board[pos]]} "

    # return "".join(print_pos(p) for p in range(42))
    print_board(state[0].observation.board)

In [6]:
from random import choice

def random_agent(obs):
    a = choice([c for c in range(len(obs.board)) if obs.board[c] == EMPTY])
    return a % 7 


def reaction_agent(obs):
    # Connect 3 in a row to win.
    
    print_board(obs.board)# No 3-in-a-rows, return random unmarked.
    action = int(input())
    return choice(list(filter(lambda m: m[1] == EMPTY, enumerate(obs.board))))[0]


agents = {"random": random_agent, "reaction": reaction_agent, 'MCTS': MCTS_agent}

In [7]:
true_env = make("connectx", debug=True)

specification = true_env.specification.copy()
specification["name"] = "ConnectXOnTor"
specification["title"] = "ConnectXOnTor"
specification["description"] = "ConnectX but On Tor"
specification['observation']['board']['default'] = [0] * 42
specification['configuration']['timeout']['default'] = 100

In [8]:
register(specification["name"], {
    "agents": agents,
    "interpreter": interpreter,
    "renderer": renderer,
    "specification": specification,
})

In [9]:
def draw_board(board):
    s = ''
    s += "\n|-----+-----+-----+-----+-----+-----+-----|\n"
    for i in range(6):
        s += '|  '
        for j in range(7):
            s += str(board[i * 7 + j]) + '  |  '
        s += "\n|-----+-----+-----+-----+-----+-----+-----|\n"
    return '```' + s + '```'


In [10]:
agents = {"random": random_agent, "reaction": reaction_agent, 'MCTS': MCTS_agent}
register(specification["name"], {
    "agents": agents,
    "interpreter": interpreter,
    "renderer": renderer,
    "specification": specification,
})
env = make(specification["name"], debug=True)


In [15]:
class TGBotForConnectX:
    def __init__(self) -> None:
        self.board_for_bot =  [0] * 42
        self.bot = telebot.TeleBot('7054763245:AAFT3zRXmKc_wqQY8eUOyU4UUr2MRXsmQAI')
        self.obs = None
        self.state = None
        self.env = make('ConnectXOnTor', debug=True)
        self.config = env.configuration
    def do_all(self):
        @self.bot.message_handler(commands=['start'])
        def start(message):

            self.bot.send_message(message.from_user.id, 'Это бот для игры Connect4 на торе. Каждый ход - это число от 1 до 7. \
                                   \nБоту требуется 10-15 секунд, чтобы сделать ход',  parse_mode="Markdown")            
            self.bot.send_message(message.from_user.id, 'Если хочешь ходить первым, введи 1 \nЕсли хочешь ходить вторым, введи 2', 
                                   parse_mode="Markdown")            
            self.bot.register_next_step_handler(message, choose_colour)
        def incorrect_choose_color(message):
            self.bot.send_message(message.from_user.id, 'Некорректный ввод')
            self.bot.register_next_step_handler(message, choose_colour)
        def choose_colour(message):
            self.state = self.env.reset(2)[0]
            self.obs = self.state['observation']

            if message.text == '2':
                turn = MCTS_agent(self.obs, self.config)
                self.state = self.env.step([turn, 200])[0]
                self.obs = self.state['observation']
                self.bot.send_message(message.from_user.id, draw_board(self.obs['board']),  parse_mode="Markdown")
                self.bot.register_next_step_handler(message, user_turn_second)
                
            elif message.text == '1':
                self.bot.send_message(message.from_user.id, draw_board(self.obs['board']),  parse_mode="Markdown")
                self.bot.send_message(message.from_user.id, 'сделай первый ход',  parse_mode="Markdown")
                self.bot.register_next_step_handler(message, user_turn_first)
            else: 
                self.bot.register_next_step_handler(message, incorrect_choose_color)

        def incorrect_turn(message):
            self.bot.send_message(message.from_user.id, 'Некорректный ход')
            self.bot.register_next_step_handler(message, user_turn_second)

        def user_turn_second(message):
            available_moves = [move for move in range(7) if self.obs['board'][move] == 0]
            action_by_user = int(message.text) - 1

            if action_by_user not in available_moves:
                self.bot.register_next_step_handler(message, incorrect_turn)

            self.state = self.env.step([200, action_by_user])[0]
            self.obs = self.state['observation']
            self.bot.send_message(message.from_user.id, draw_board(self.obs['board']),  parse_mode="Markdown")
            if self.state['status'] == 'DONE':
                self.bot.send_message(message.from_user.id, 'You win!')
                self.bot.send_message(message.from_user.id, 'выбери номер хода, если хочешь сыграть еще')
                self.bot.register_next_step_handler(message, choose_colour)
            else:
                turn = MCTS_agent(self.obs, self.config)
                print(turn)
                self.state = self.env.step([turn, 200])[0]
                self.obs = self.state['observation']
                self.bot.send_message(message.from_user.id, draw_board(self.obs['board']),  parse_mode="Markdown")
                if self.state['status'] == 'DONE':
                    self.bot.send_message(message.from_user.id, 'You lose!')
                    self.bot.send_message(message.from_user.id, 'выбери номер хода, если хочешь сыграть еще')
                    self.bot.register_next_step_handler(message, choose_colour)
                else:
                    self.bot.register_next_step_handler(message, user_turn_second)
        def user_turn_first(message):
            action_by_user = int(message.text) - 1
            self.state = self.env.step([action_by_user, 200])[0]
            self.obs = self.state['observation']
            self.bot.send_message(message.from_user.id, draw_board(self.obs['board']),  parse_mode="Markdown")
            if self.state['status'] == 'DONE':
                self.bot.send_message(message.from_user.id, 'You win!')
                self.bot.send_message(message.from_user.id, 'выбери номер хода, если хочешь сыграть еще')
                self.bot.register_next_step_handler(message, choose_colour)
            else:
                turn = MCTS_agent(self.obs, self.config)
                print(turn)
                self.state = self.env.step([200, turn])[0]
                self.obs = self.state['observation']
                self.bot.send_message(message.from_user.id, draw_board(self.obs['board']),  parse_mode="Markdown")
                if self.state['status'] == 'DONE':
                    self.bot.send_message(message.from_user.id, 'You lose!')
                    self.bot.send_message(message.from_user.id, 'выбери номер хода, если хочешь сыграть еще')
                    self.bot.register_next_step_handler(message, choose_colour)
                else:
                    self.bot.register_next_step_handler(message, user_turn_first)

        self.bot.polling()    
        

In [16]:
a = TGBotForConnectX()
a.do_all()
env.configuration

3


{'timeout': 100, 'columns': 7, 'rows': 6, 'inarow': 4, 'steps': 1000}