# Connect X

In [1]:
import itertools
import numpy as np
import pandas as pd

In [2]:
from platform import python_version
python_version()

'3.6.10'

In [3]:
import numpy as np

In [6]:
from gamelearner import Environment

class Connect4(Environment):
    """Simulates a game of Connect 4.

    Class attributes:
        Connect4.name (str): The game's name ('Connect 4').
        Connect4.shape (int): Width and height of board (6, 7).
        roles [int, int]: The player roles ([1, 2]).
        Connect4.possible_n_players (list): List of allowed
            numbers of players ([2]).
        Connect4.marks (list): The characters used to represent
            each role's move on the board (['S', 'O']).
        Connect4.connect (int): Number of discs in a row to win (4).
        Connect4.help_text (dict): Various messages (strings)
            to help user.
    """

    name = 'Connect 4'
    shape = (6, 7)
    roles = [1, 2]
    possible_n_players = [2]
    marks = ['S', 'O']
    connect = 4
    terminal_rewards = {'win': 1.0, 'lose': 0.0, 'draw': 0.5}

    help_text = {
        'Move format': "row, col from bottom left",
        'Move not available': "That position is not available.",
        'Number of players': "This game requires 2 players.",
        'Out of range': "slot must be in range 0 to %d." % (shape[1] - 1)
    }

    # Data objects for analyzing board
    _steps = {
        'u': (1, 0),
        'd': (-1, 0),
        'r': (0, 1),
        'l': (0, -1),
        'ur': (1, 1),
        'dr': (-1, 1),
        'ul': (1, -1),
        'dl': (-1, -1)
    }

    # Function used by _check_positions method
    _fcum = lambda x1, x2: (x1 + x2)*x2
    
    def __init__(self, moves=None):
        """Initialize a game.
        Args:
            moves (list): This is optional. Provide a list of completed
                moves. Each move should be a list or tuple of length 2
                where the first item is the player role and the second is
                the board position (col).
        """
        self.n_players = 2
        self._board_full, self._state = self._empty_board_state()
        self.winner = None
        self.player_iterator = itertools.cycle(self.roles)
        self.turn = next(self.player_iterator)
        super().__init__(start_state=self._state, moves=moves)

    @property
    def state(self):
        return self._state

    @state.setter
    def state(self, state):
        self._state[:] = state

    def _empty_board_state(self):
        """Initialize board_full and state."""
        # board_full has a border set to -1
        board_full = -np.ones(np.array(self.shape) + (2, 2), dtype='int8')
        state = board_full[1:1+self.shape[0], 1:1+self.shape[1]]
        state[:] = 0
        return board_full, state

    def reset(self):
        """Set the state of the game back to the beginning
        (no moves made).
        """

        super().reset()
        self.player_iterator = itertools.cycle(self.roles)
        self.turn = next(self.player_iterator)
        self._board_full, self.state = self._empty_board_state()
        self.winner = None

    def show_state(self):
        """Display the current state of the board."""

        chars = '_' + ''.join(self.marks)
        for row in reversed(self.state):
            print(" ".join(list(chars[i] for i in row)))

    @staticmethod
    def _fill_levels(state):
        # Note: This assumes proper filling!
        return (state > 0).sum(axis=0)

    def available_moves(self, state=None):
        """Returns list of available (empty) moves (slots).
        Args:
            state (np.ndarray): Array (shape (6, 7)) of game state or if
                                not provided the current game state will
                                be used.
        """

        if state is None:
            state = self.state

        spaces_left = self._fill_levels(state) < self.shape[0]
        
        return np.nonzero(spaces_left)[0]

    @staticmethod
    def _get_neighbours(board_full, pos):
        neighbours = {d: board_full[(step[0]+pos[0], step[1]+pos[1])] 
                      for d, step in self._steps.items()}
        return neighbours

    @staticmethod
    def _chain_in_direction(board_full, pos, direction, role):
        """Finds number of matching discs in one direction."""
        step = steps[direction]
        for i in range(connect):
            pos = (step[0]+pos[0], step[1]+pos[1])
            x = board_full[pos]
            if x != role:
                break
        return i

    def _check_game_state_after_move(self, board_full, move):

        role, column = move
        #TODO: Need to implement fill_levels with state
        assert board_full[pos] == 0
        results = {}
        for direction, step in steps.items():
            n = self._chain_in_direction(board_full, pos, direction, role)
            if n == self.connect - 1:
                return True
            results[direction] = n

        for d1, d2 in [('u', 'd'), ('l', 'r'), ('ul', 'dr'), ('dl', 'ur')]:
            if results[d1] + results[d2] >= self.connect-1:
                return True
        return False

    @staticmethod
    def _next_available_position(state, col):
        # Note: This assumes proper filling!
        return (state[:, col] > 0).sum()

    def _check_positions(self, positions, role, connect=None):
        """Check bool array positions for a connect x."""
        if connect is None:
            connect = self.connect
        _fcum = lambda x1, x2: (x1 + x2)*x2
        positions = positions.astype('int8')
        temp = np.empty_like(positions)
        temp[:] = list(itertools.accumulate(positions, _fcum))
        max_vert = temp.max()
        temp.T[:] = list(itertools.accumulate(positions.T, _fcum))
        max_horiz = temp.max()
        diagonals = [np.diagonal(positions, offset=k) for k in range(-2, 4)]
        max_diag = max(max(itertools.accumulate(x, _fcum)) for x in diagonals)
        return max(max_horiz, max_vert, max_diag) >= connect
    
    def check_game_state(self, state=None, role=None):
        
        game_over, winner = False, None

        if state is None:
            state = self.state

        # If role specified, only check for a win by role
        if role:
            roles = [role]
        else:
            roles = self.roles

        for role in roles:
            positions = (state == role)
            if self._check_positions(positions, role):
                game_over, winner = True, role
                break

        if winner is None and np.all(state > 0):
            game_over = True

        return game_over, winner

    def generate_state_key(self):
        raise NotImplementedError()

    def get_rewards(self):
        raise NotImplementedError()

    def get_terminal_rewards(self):
        raise NotImplementedError()

    def next_state(self):
        raise NotImplementedError()

    def next_state(self, state, move, role_check=True):
        """Returns the next state of the game when move is
        taken from current game state or from state if 
        provided.
        
        Args:
            state (np.ndarray): Array (shape (6, 7)) of board state
                or if not provided the current game state will be 
                used.
            move (tuple): Tuple of length 2 containing the player 
                role and the move (role, position). Position is also
                a tuple (row, col).
        Returns:
            next_state (np.ndarray): copy of state after move made.
        Raises:
            ValueError if it is not role's turn.
            AssertionError if the position is out of bounds or if
            there is already a move in that position.
        """

        role, position = move
        if role_check:
            if role != self.turn:
                raise ValueError(f"It is not player {role}'s turn.")

        assert 0 <= position < self.shape[1], self.help_text['Out of range']
        fill_level = self._fill_levels(state)[position]
        assert fill_level < self.shape[0], self.help_text['Move not available']

        next_state = state.copy()
        next_state[fill_level, position] = role

        return next_state

    def make_move(self, move, show=False):
        """Update the game state with a new move.
        Args:
            move (tuple): Tuple of length 2 containing a
                player role and action (role, action).
            show (bool): Print a message if True.
        """
        position = move[1]
        fill_level = self._fill_levels(self.state)[position]
        self._pos_last = (position, fill_level)
        super().make_move(move, show)
        self.turn = next(self.player_iterator)

    def reverse_move(self):
        raise NotImplementedError()


In [7]:
# Initialization tests
game = Connect4()
assert np.array_equal(game.state , np.zeros((6, 7), dtype='int8'))
assert game._board_full.shape == (8, 9)
game.make_move((1, 0))
game.make_move((2, 1))
game.make_move((1, 2))
game.make_move((2, 0))

test_state = np.array(
    [[1, 2, 1, 0, 0, 0, 0],
     [2, 0, 0, 0, 0, 0, 0],
     [0, 0, 0, 0, 0, 0, 0],
     [0, 0, 0, 0, 0, 0, 0],
     [0, 0, 0, 0, 0, 0, 0],
     [0, 0, 0, 0, 0, 0, 0]], dtype='int8'
)
assert np.array_equal(game.state, test_state)

moves = [
    (1, 0), (2, 1), (1, 2), (2, 0)
]
game = Connect4(moves=moves)
assert np.array_equal(game.state, test_state)

game.make_move((1, 6))
assert np.array_equal(
    game.state, 
    np.array(
        [[1, 2, 1, 0, 0, 0, 1],
         [2, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0]], dtype='int8')
)
assert game.check_game_state() == (False, None)

moves = [
    (1, 0), (2, 1), (1, 0), (2, 1),
    (1, 0), (2, 1)
]
game = Connect4(moves=moves)
assert game.check_game_state() == (False, None)
game.make_move((1, 0))
assert game.check_game_state() == (True, 1)
game.show_state()

_ _ _ _ _ _ _
_ _ _ _ _ _ _
S _ _ _ _ _ _
S O _ _ _ _ _
S O _ _ _ _ _
S O _ _ _ _ _


In [29]:
state = np.zeros(game.shape)
assert game.check_game_state(state) == (False, None)

state = np.array([[2, 2, 1, 2, 0, 2, 1],
         [1, 0, 0, 0, 2, 2, 0],
         [1, 2, 1, 2, 0, 1, 1],
         [0, 2, 1, 2, 1, 1, 1],
         [0, 2, 0, 2, 1, 0, 1],
         [2, 1, 0, 2, 2, 0, 2]])
assert game.check_game_state(state) == (True, 2)

state = np.array([
    [0, 0, 2, 0, 2, 2, 2],
    [1, 1, 2, 1, 1, 2, 0],
    [1, 1, 1, 1, 2, 0, 2],
    [0, 1, 2, 0, 0, 2, 0],
    [2, 1, 2, 0, 0, 2, 0],
    [0, 1, 2, 2, 1, 2, 0]
])
assert game.check_game_state(state) == (True, 1)

state = np.array([
    [2, 0, 2, 1, 0, 2, 1],
    [0, 1, 1, 0, 1, 2, 0],
    [1, 0, 0, 0, 2, 0, 2],
    [2, 1, 0, 0, 1, 0, 2],
    [0, 2, 1, 0, 1, 2, 0],
    [0, 0, 1, 2, 1, 0, 1]
])
assert game.check_game_state(state) == (False, None)

state = np.array([
    [0, 1, 0, 2, 2, 1, 1],
    [2, 1, 1, 1, 0, 1, 2],
    [2, 2, 0, 1, 1, 1, 0],
    [1, 2, 1, 0, 0, 2, 1],
    [1, 2, 2, 2, 1, 0, 1],
    [0, 0, 2, 2, 1, 0, 1]
])
assert game.check_game_state(state) == (True, 2)

state = np.array([
    [2, 2, 2, 1, 2, 2, 2],
    [1, 1, 1, 2, 1, 1, 1],
    [2, 2, 2, 1, 2, 2, 2],
    [1, 1, 1, 2, 1, 1, 1],
    [2, 2, 2, 1, 2, 2, 2],
    [1, 1, 1, 2, 1, 1, 1]
])  # draw
assert game.check_game_state(state) == (True, None)

state = np.random.randint(1, 3, size=(game.shape[0]*game.shape[1])).reshape(game.shape)
print(game.check_game_state(state))
state

(True, 1)


array([[2, 2, 1, 2, 2, 1, 1],
       [2, 1, 1, 2, 2, 2, 2],
       [1, 1, 1, 2, 1, 1, 2],
       [1, 1, 2, 2, 1, 2, 2],
       [2, 1, 2, 1, 1, 1, 2],
       [1, 2, 2, 1, 2, 1, 1]])

In [9]:
game = Connect4()
while not game.game_over:
    role = game.turn
    moves = game.available_moves()
    move = np.random.choice(moves)
    game.make_move((role, move))

game.show_state()
print(game.winner)

O O S _ _ _ _
S S S _ _ S _
O O O O _ O S
S S S O O S S
O S O S O O S
O S S O S O O
2


In [10]:
moves = []
game = Connect4()
for move in moves:
    game.make_move(move)
    game.show_state()
    k = input().lower()
    if k == 'd':
        import pdb; pdb.set_trace()
    if game.game_over or k == 'q':
        break

In [None]:
def _check_positions(positions, role, connect=4):
    """Check bool array positions for a connect x."""
    _fcum = lambda x1, x2: (x1 + x2)*x2
    positions = positions.astype('int8')
    temp = np.empty_like(positions)
    temp[:] = list(itertools.accumulate(positions, _fcum))
    max_vert = temp.max()
    temp.T[:] = list(itertools.accumulate(positions.T, _fcum))
    max_horiz = temp.max()
    diagonals = [np.diagonal(positions, offset=k) for k in range(-2, 4)]
    max_diag = max(max(itertools.accumulate(x, _fcum)) for x in diagonals)
    return max_horiz, max_vert, max_diag

In [None]:
state = np.random.randint(3, size=6*7).reshape(game.shape)
positions = (state == 1).astype('int8')
print(positions)
_check_positions(positions, 1)

In [None]:
%timeit _check_positions(positions, 1)

In [None]:
# TODO: quicker to just check game state from last move
last_move = game.moves[-1]
role, position = last_move
row = game._fill_levels(game.state)[position] - 1
game._check_game_state_after_move(game._board_full, last_move)

In [None]:
state[:] = np.zeros(size, dtype='int8')
assert available_cols(state).tolist() == [0, 1, 2, 3, 4, 5, 6]
state[0,3] = 1
assert available_cols(state).tolist() == [0, 1, 2, 3, 4, 5, 6]
state[0:5,1] = 2
assert available_cols(state).tolist() == [0, 1, 2, 3, 4, 5, 6]
state[0:6,size[1]-1] = 1
assert available_cols(state).tolist() == [0, 1, 2, 3, 4, 5]
state[:,:] = 1
assert available_cols(state).tolist() == []

In [None]:

state[:] = np.zeros(size, dtype='int8')
state[0,3:6] = 1
state[0:4,1] = 2

show_state(state)

In [None]:

pos = (1, 1)  # Note: This is actually (0, 0)
get_neighbours(board_full, pos)

In [None]:

state[:] = 2
show_state(state)
pos = (1, 1)
for direction in steps.keys():
    print(direction, chain_in_direction(board_full, pos, direction, 2))

In [None]:

state[:] = np.array([
    [0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0]
])
move = (2, (1, 1))
assert check_game_over(board_full, move) == False
move = (1, (1, 1))
assert check_game_over(board_full, move) == False

state[:] = np.array([
    [1, 1, 1, 0, 0, 0, 0],
    [0, 1, 1, 0, 0, 0, 0],
    [0, 0, 1, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0]
])
move = (1, (4, 4))
assert check_game_over(board_full, move) == True
move = (1, (4, 3))
assert check_game_over(board_full, move) == True
move = (2, (4, 4))
assert check_game_over(board_full, move) == False
move = (1, (2, 1))
assert check_game_over(board_full, move) == False
move = (1, (3, 2))
assert check_game_over(board_full, move) == False
move = (1, (4, 3))
assert check_game_over(board_full, move) == True
move = (2, (4, 3))
assert check_game_over(board_full, move) == False

state[:] = np.array([
    [1, 1, 1, 0, 0, 0, 0],
    [0, 1, 1, 2, 0, 0, 0],
    [0, 0, 1, 2, 2, 0, 0],
    [0, 0, 2, 0, 2, 0, 0],
    [0, 0, 2, 0, 0, 0, 2],
    [0, 0, 0, 0, 0, 0, 0]
])
move = (2, (4, 4))
assert check_game_over(board_full, move) == False
move = (1, (4, 4))
assert check_game_over(board_full, move) == True
move = (2, (4, 6))
assert check_game_over(board_full, move) == True

In [None]:
state[:] = np.array([
    [0, 1, 1, 1, 2, 2, 1],
    [0, 0, 1, 0, 2, 1, 0],
    [0, 0, 0, 0, 2, 2, 0],
    [0, 0, 0, 0, 0, 2, 0],
    [0, 0, 0, 0, 0, 2, 0],
    [0, 0, 0, 0, 0, 1, 0]
])
fill_levels(state)

In [None]:
# Find winning moves
role = 1
for col in available_cols(state):
    pos = (next_available_position(state, col), col)
    move = (role, (pos[0]+1, pos[1]+1))
    print((role, pos), check_game_over(board_full, move))

role = 2
for col in available_cols(state):
    pos = (next_available_position(state, col), col)
    move = (role, (pos[0]+1, pos[1]+1))
    print((role, pos), check_game_over(board_full, move))

In [None]:
fl = fill_levels(state)
fl

In [None]:
size[0]

In [None]:
fl

In [None]:
cols = np.arange(1,size[1]+1)
rows = [ for i in range(4)]
rows

In [None]:
fl

In [None]:
rows = np.empty((4, size[1]), dtype='int8')
for i in range(4):
    rows[i] = (fl-i).clip(-1,)
rows

In [None]:
cols = np.empty((4, size[1]), dtype='int8')
cols[:] = np.arange(1,size[1]+1)
cols

In [None]:
board_full

In [None]:
board_full[(rows, cols)]

In [None]:
m_level = int(np.median(fl))
m_level

In [None]:
state_key = np.empty((5, size[1]), dtype='int8')
state_key[0, :] = (fl - m_level).clip(-1, 2)
state_key[1:, :] = board_full[(rows, cols)]
state_key

### Find all possible (valid) combinations of states for 4x4 segment

In [None]:
from itertools import product

combinations = np.array(list(product(range(5), repeat=4))).sum(axis=1)
assert len(combinations) == 625

In [None]:
n_discs = pd.Series(combinations).value_counts().sort_index().rename('No. of boards')
n_discs.index.name = 'No. of discs'
summary = pd.concat([
    n_discs, 
    pd.Series(2**n_discs.index.values, index=n_discs.index, name='Disc combinations')
], axis=1)
summary['Total combinations'] = summary['No. of boards'] * summary['Disc combinations']
summary

In [None]:
summary['Total combinations'].sum()

### Generate a compact, hashable state representation

In [None]:
a1 = np.array([64, 16, 4, 1])
a2 = np.array([16777216, 65536, 256, 1])

def generate_state_key_uint32(grid):
    """Convert 4x4 int8 array to int64 value.
    """
    assert grid.dtype == 'int8'
    assert grid.shape == (4, 4)
    return (np.sum(grid*a1, axis=1)*a2).sum().astype('uint32')

grid = 3*np.ones((4,4), dtype='int8') 
assert generate_state_key_uint32(grid) == 2**32-1
assert generate_state_key_uint32(grid).dtype == 'uint32'
assert (generate_state_key_uint32(grid) + 1).dtype == 'int64'
grid = np.zeros((4,4), dtype='int8') 
assert generate_state_key_uint32(grid) == 0
grid[:, 3] = 1
assert generate_state_key_uint32(grid) == 1 + 256 + 256**2 + 256**3
grid = np.ones((4,3), dtype='int8') 
try:
    generate_state_key_uint32(grid)
except AssertionError:
    pass

grid = np.random.choice([0, 1, 2], size=16).astype('int8').reshape((4,4))
print(grid)
generate_state_key_uint32(grid)

In [None]:
# sum([30.9, 31, 31.1, 28.9])/4 == 30.5
%timeit generate_state_key_uint32(grid)

In [None]:
a1 = np.array([6, 4, 2, 0])
a2 = np.array([24, 16, 8, 0])

def generate_state_key_uint32(grid):
    """Convert 4x4 int8 array to int64 value.
    """
    assert grid.dtype == 'int8'
    return np.sum(np.sum(grid << a1, axis=1) << a2).astype('uint32')

grid = 3*np.ones((4,4), dtype='int8') 
assert generate_state_key_uint32(grid) == 2**32-1
assert generate_state_key_uint32(grid).dtype == 'uint32'
assert (generate_state_key_uint32(grid) + 1).dtype == 'int64'
grid = np.zeros((4,4), dtype='int8') 
assert generate_state_key_uint32(grid) == 0
grid[:, 3] = 1
assert generate_state_key_uint32(grid) == 1 + 256 + 256**2 + 256**3
grid = np.ones((4,3), dtype='int8') 
try:
    generate_state_key_uint32(grid)
except ValueError:
    pass

grid = np.random.choice([0, 1, 2], size=16).astype('int8').reshape((4,4))
print(grid)
generate_state_key_uint32(grid)

In [None]:
# sum([31.2, 31.9, 31.8, 32.4])/4 == 31.8
%timeit generate_state_key_uint32(grid)

In [None]:
a = np.array([1, 2, 3, 0], dtype='int8')

v = np.sum(a << np.array([6, 4, 2, 0])).astype('uint32')

v

In [None]:
(v >> 6) & 3, (v >> 4) & 3, (v >> 2) & 3, v & 3

In [None]:
np.array([v >> 6, v >> 4, v >> 2, v], dtype='int8') & 3