In [1]:
import numpy as np

In [2]:
 import sys
!{sys.executable} --version

Python 3.9.13


In [3]:
!{sys.executable} -m pip install torch



In [4]:
!{sys.executable} -m pip install -U matplotlib

Collecting matplotlib
  Downloading matplotlib-3.6.2-cp39-cp39-win_amd64.whl (7.2 MB)
     ---------------------------------------- 7.2/7.2 MB 11.0 MB/s eta 0:00:00
Collecting contourpy>=1.0.1
  Downloading contourpy-1.0.6-cp39-cp39-win_amd64.whl (161 kB)
     -------------------------------------- 161.3/161.3 kB 9.4 MB/s eta 0:00:00
Installing collected packages: contourpy, matplotlib
  Attempting uninstall: matplotlib
    Found existing installation: matplotlib 3.5.2
    Uninstalling matplotlib-3.5.2:
      Successfully uninstalled matplotlib-3.5.2
Successfully installed contourpy-1.0.6 matplotlib-3.6.2


In [5]:
import numpy as np
import matplotlib.pylab as pt
import torch as tr

In [6]:
class Scorer:
    def __init__(self, contig):
        self.contig = contig
        self.conv2d = tr.nn.Conv2d(1, 4, contig, padding='same', bias=False)
        self.conv2d.requires_grad_(False)

        self.conv2d.weight[:] = 0
        self.conv2d.weight[0,0,0,:contig] = 1
        self.conv2d.weight[1,0,:contig,0] = 1
        self.conv2d.weight[2,0,:contig,:contig] += tr.eye(contig)
        self.conv2d.weight[3,0,:contig,:contig] += tr.rot90(tr.eye(contig))

    def check(self, board):
        board = board.reshape(-1, *board.shape[-2:]) # insert channel dim
        result = self.conv2d(board)
        #print(result)
        floored = tr.div(result, self.contig, rounding_mode='trunc')
        winner = floored.max() + floored.min() # -1, 0, +1 for lose, draw, win
        return winner


scorer = Scorer(4)

In [7]:
board = tr.zeros(6,7)
print(board)

tensor([[0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0.]])


In [9]:
print(scorer.check(board))
board[4,:] = -1
board[4,4] = 0
board[4,5] = 0
print(board)
print(scorer.check(board))


tensor(-1.)
tensor([[ 0.,  0.,  0.,  0.,  0.,  0.,  0.],
        [ 0.,  0.,  0.,  0.,  0.,  0.,  0.],
        [ 0.,  0.,  0.,  0.,  0.,  0.,  0.],
        [ 0.,  0.,  0.,  0.,  0.,  0.,  0.],
        [-1., -1., -1., -1.,  0.,  0., -1.],
        [ 0.,  0.,  0.,  0.,  0.,  0.,  0.]])
tensor(-1.)


In [10]:
class Connect4State(object):
    def __init__(self, board, scorer):
        self.board = board.clone()
        self.scorer = scorer
    def __str__(self):
        markers = np.full(self.board.shape, ".")
        markers[self.board > 0] = "X"
        markers[self.board < 0] = "O"
        return "\n".join(["".join(row) for row in markers])

    def is_leaf(self):
        if self.score_for_max_player() != 0: return True
        return (self.board[:board.size(dim=1)-1,:] != 0).all()
        #return (self.board != 0).all() # tie game

    def score_for_max_player(self):
        return self.scorer.check(self.board).item()

    def is_max_players_turn(self):
        return self.board.sum() == 0 # +/-1 cancel when it is X's turn
    def is_min_players_turn(self):
        return not self.is_max_players_turn()
    
    def get_next_open_row(self, col):
        if tr.equal(tr.where(self.board[:,col] == 0)[0], tr.tensor([])) or tr.where(self.board[:,col] == 0)[0].size(dim=0) == 1:
            return None
        else:
            return tr.where(self.board[:,col] == 0)[0][-2]
        
    def valid_actions(self):
        return tr.tensor([ [self.get_next_open_row(col),col] for col in range(self.board.size(dim = 1)) if self.get_next_open_row(col) != None])
        #return tr.nonzero(self.board == 0)

    def perform(self, action):
        player = +1 if self.is_max_players_turn() else -1
        row, col = action
        new_state = Connect4State(self.board, self.scorer)
        new_state.board[row, col] = player
        return new_state

    
def initial_state(row_size, col_size, contig):
    board = tr.zeros((row_size, col_size))
    return Connect4State(board, Scorer(contig))




In [11]:
if __name__ == "__main__":
    
    from time import perf_counter
    start = perf_counter()
    state = initial_state(row_size=7, col_size = 7, contig=4)
    print(state)
    while True:
        if state.is_leaf(): break
        actions = state.valid_actions()
        print(actions.t())
        if len(actions) == 0: break
        state = state.perform(actions[0])
        print(state)
    print("max score, is over:")
    print(state.score_for_max_player())
    print(state.is_leaf())
    print(f"{perf_counter()-start}s to run")

.......
.......
.......
.......
.......
.......
.......
tensor([[5, 5, 5, 5, 5, 5, 5],
        [0, 1, 2, 3, 4, 5, 6]])
.......
.......
.......
.......
.......
X......
.......
tensor([[4, 5, 5, 5, 5, 5, 5],
        [0, 1, 2, 3, 4, 5, 6]])
.......
.......
.......
.......
O......
X......
.......
tensor([[3, 5, 5, 5, 5, 5, 5],
        [0, 1, 2, 3, 4, 5, 6]])
.......
.......
.......
X......
O......
X......
.......
tensor([[2, 5, 5, 5, 5, 5, 5],
        [0, 1, 2, 3, 4, 5, 6]])
.......
.......
O......
X......
O......
X......
.......
tensor([[1, 5, 5, 5, 5, 5, 5],
        [0, 1, 2, 3, 4, 5, 6]])
.......
X......
O......
X......
O......
X......
.......
tensor([[0, 5, 5, 5, 5, 5, 5],
        [0, 1, 2, 3, 4, 5, 6]])
O......
X......
O......
X......
O......
X......
.......
tensor([[5, 5, 5, 5, 5, 5],
        [1, 2, 3, 4, 5, 6]])
O......
X......
O......
X......
O......
XX.....
.......
tensor([[4, 5, 5, 5, 5, 5],
        [1, 2, 3, 4, 5, 6]])
O......
X......
O......
X......
OO.....
XX.....
.......
tens

In [12]:
# Code to generate training data
# Each training example is an intermediate game state, paired with an approximate utility

# The depth-limited minimax algorithm
# Returns parent and child utilities
def minimax(state, max_depth):
    if max_depth == 0 or state.is_leaf():
        return state.score_for_max_player(), []
    else:
        child_utilities = [
            minimax(state.perform(action), max_depth - 1)[0]
            for action in state.valid_actions()]
        if state.is_max_players_turn():
            return max(child_utilities), child_utilities
        else:
            return min(child_utilities), child_utilities

def random_game(row_size, col_size, contig, max_depth):
    state = initial_state(row_size, col_size, contig)
    states = [state]
    while not state.is_leaf():
        utility, child_utilities = minimax(state, max_depth)
        ties = (np.array(child_utilities) == utility)
        tie_index = np.flatnonzero(ties)
        actions = state.valid_actions()
        action = actions[np.random.choice(tie_index)]
        state = state.perform(action)
        states.append(state)
    result = state.score_for_max_player()
    return states, result

# Used to generate a training data set
# Combines results from many random games
def generate(num_examples, row_size, col_size, contig, max_depth):
    all_states = []
    all_results = []
    num_games = 0
    while len(all_states) < num_examples:
        num_games += 1
        print(f"game {num_games}, {len(all_states)} of {num_examples} examples...")
        states, result = random_game(row_size, col_size, contig, max_depth)
        all_states += states
        all_results += [result] * len(states)

    return all_states, all_results

# Used to convert a game state to a tensor encoding suitable for NN input
# Uses one-hot encoding at each grid position
def encode(state):
    # encoding[0,:,:] == 1 where there are "_"s, 0 elsewhere
    # encoding[1,:,:] == 1 where there are "O"s, 0 elsewhere
    # encoding[2,:,:] == 1 where there are "X"s, 0 elsewhere
    symbols = tr.tensor([0, -1, +1]).reshape(-1,1,1)
    onehot = (symbols == state.board).float()
    return onehot


# Run some sanity checks to make sure generation and encoding are working
if __name__ == "__main__":

    row_size, col_size, contig = 7, 7, 4

    states, results = generate(num_examples=5, row_size=row_size, col_size = col_size, contig=contig, max_depth=3)
    for n, (state, result) in enumerate(zip(states, results)):
        print("example %d:" % n)
        print(state)
        print("result = %d" % result)
    print(state.score_for_max_player())

    print("Encoding of last example state:")
    encoding = encode(state)
    print(encoding)

    # quick test for encode
    print("")
    print("Encoding of initial state:")
    encoding = encode(initial_state(row_size, col_size, contig))
    print(encoding)
    assert(type(encoding) == tr.Tensor)
    expected = tr.zeros((3,row_size,col_size))
    expected[0,:,:] = 1
    assert((encoding == expected).all())
    
    print("If you see this line, initial state was encoded correctly")



game 1, 0 of 5 examples...
example 0:
.......
.......
.......
.......
.......
.......
.......
result = -1
example 1:
.......
.......
.......
.......
.......
......X
.......
result = -1
example 2:
.......
.......
.......
.......
.......
.....OX
.......
result = -1
example 3:
.......
.......
.......
.......
.......
..X..OX
.......
result = -1
example 4:
.......
.......
.......
.......
.......
..XO.OX
.......
result = -1
example 5:
.......
.......
.......
.......
.....X.
..XO.OX
.......
result = -1
example 6:
.......
.......
.......
.......
...O.X.
..XO.OX
.......
result = -1
example 7:
.......
.......
.......
.......
..XO.X.
..XO.OX
.......
result = -1
example 8:
.......
.......
.......
.......
..XO.X.
..XOOOX
.......
result = -1
example 9:
.......
.......
.......
.......
..XOXX.
..XOOOX
.......
result = -1
example 10:
.......
.......
.......
.......
..XOXX.
.OXOOOX
.......
result = -1
example 11:
.......
.......
.......
..X....
..XOXX.
.OXOOOX
.......
result = -1
example 12:
.......
...

In [13]:
print("Training data:")
training_examples = generate(num_examples = 1500, row_size=7, col_size=7, contig=4, max_depth=3)
print("\nTesting data:")
testing_examples = generate(num_examples = 100,  row_size=7, col_size=7, contig=4, max_depth=3)

Training data:
game 1, 0 of 1500 examples...
game 2, 15 of 1500 examples...
game 3, 35 of 1500 examples...
game 4, 55 of 1500 examples...
game 5, 96 of 1500 examples...
game 6, 110 of 1500 examples...
game 7, 138 of 1500 examples...
game 8, 165 of 1500 examples...
game 9, 199 of 1500 examples...
game 10, 211 of 1500 examples...
game 11, 233 of 1500 examples...
game 12, 257 of 1500 examples...
game 13, 300 of 1500 examples...
game 14, 311 of 1500 examples...
game 15, 337 of 1500 examples...
game 16, 345 of 1500 examples...
game 17, 366 of 1500 examples...
game 18, 394 of 1500 examples...
game 19, 429 of 1500 examples...
game 20, 470 of 1500 examples...
game 21, 478 of 1500 examples...
game 22, 488 of 1500 examples...
game 23, 512 of 1500 examples...
game 24, 533 of 1500 examples...
game 25, 547 of 1500 examples...
game 26, 578 of 1500 examples...
game 27, 593 of 1500 examples...
game 28, 615 of 1500 examples...
game 29, 639 of 1500 examples...
game 30, 682 of 1500 examples...
game 31, 7

In [14]:
def augment(states, results):
    augmented_states = []
    augmented_results = []
    for state, result in zip(states, results):
        for k in range(1):
            rot = tr.rot90(state.board, k)
            augmented_states.append(Connect4State(rot, state.scorer))
            augmented_states.append(Connect4State(tr.fliplr(rot), state.scorer))
        augmented_results += [result] * 2
    return augmented_states, augmented_results

In [15]:
# augment training data
print(len(training_examples[0]))
training_examples = augment(*training_examples)
print(len(training_examples[0]))

1504
3008


In [16]:
# Baseline testing error: always predict constant
_, utilities = testing_examples
baseline_error =sum((u-0)**2 for u in utilities) / len(utilities)
print(utilities)
print(baseline_error)

[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
1.0


In [17]:
# Defines a network with two fully-connected layers and tanh activation functions
class LinNet(tr.nn.Module):
    def __init__(self, row_size, col_size, hid_features):
        super(LinNet, self).__init__()
        self.to_hidden = tr.nn.Linear(3*row_size*col_size, hid_features)
        self.to_output = tr.nn.Linear(hid_features, 1)
    def forward(self, x):
        h = tr.relu(self.to_hidden(x.reshape(x.shape[0],-1)))
        y = tr.tanh(self.to_output(h))
        return y

In [18]:
# Calculates the error on one training example
def example_error(net, example):
    state, utility = example
    x = encode(state).unsqueeze(0)
    y = net(x)
    e = (y - utility)**2
    return e

# Calculates the error on a batch of training examples
def batch_error(net, batch):
    states, utilities = batch
    u = utilities.reshape(-1,1).float()
    y = net(states)
    e = tr.sum((y - u)**2) / utilities.shape[0]
    return e

# Trains the network on some generated data
if __name__ == "__main__":

    # whether to loop over individual training examples or batch them
    batched = True

    # Make the network and optimizer
    net = LinNet(row_size=7, col_size = 7,hid_features=16)
    optimizer = tr.optim.SGD(net.parameters(), lr=0.001)

    # Convert the states and their minimax utilities to tensors
    states, utilities = training_examples
    training_batch = tr.stack(tuple(map(encode, states))), tr.tensor(utilities)

    states, utilities = testing_examples
    testing_batch = tr.stack(tuple(map(encode, states))), tr.tensor(utilities)

    # Run the gradient descent iterations
    curves = [], []
    for epoch in range(50000):
    
        # zero out the gradients for the next backward pass
        optimizer.zero_grad()

        # loop version (slow)
        if not batched:
 
            training_error, testing_error = 0, 0

            for n, example in enumerate(zip(*training_examples)):
                e = example_error(net, example)
                e.backward()
                training_error += e.item()
            training_error /= len(training_examples)

            with tr.no_grad(): # less computationally expensive
                for n, example in enumerate(zip(*testing_examples)):
                    e = example_error(net, example)
                    testing_error += e.item()
                testing_error /= len(testing_examples)

        # batch version (fast)
        if batched:
            e = batch_error(net, training_batch)
            e.backward()
            training_error = e.item()

            with tr.no_grad():
                e = batch_error(net, testing_batch)
                testing_error = e.item()

        # take the next optimization step
        optimizer.step()    
        
        # print/save training progress
        if epoch % 1000 == 0:
            print("%d: %f, %f" % (epoch, training_error, testing_error))
        curves[0].append(training_error)
        curves[1].append(testing_error)


0: 0.972889, 0.885761
1000: 0.886221, 0.989022
2000: 0.846715, 1.021583
3000: 0.815431, 1.046477
4000: 0.790805, 1.059438
5000: 0.770306, 1.065983
6000: 0.752211, 1.072201
7000: 0.736033, 1.082005
8000: 0.721391, 1.095781
9000: 0.707878, 1.111608
10000: 0.694524, 1.127945
11000: 0.680550, 1.145233
12000: 0.665354, 1.160331
13000: 0.649598, 1.173117
14000: 0.633551, 1.185358
15000: 0.617040, 1.196191
16000: 0.599418, 1.210484
17000: 0.581054, 1.223377
18000: 0.562466, 1.234669
19000: 0.544215, 1.243026
20000: 0.526691, 1.248801
21000: 0.509830, 1.252648
22000: 0.493620, 1.255179
23000: 0.477879, 1.258099
24000: 0.463263, 1.260792
25000: 0.449819, 1.263369
26000: 0.437332, 1.265562
27000: 0.425778, 1.267829
28000: 0.415140, 1.269459
29000: 0.405226, 1.272797
30000: 0.395998, 1.276805
31000: 0.387284, 1.281661
32000: 0.379167, 1.285965
33000: 0.371530, 1.289278
34000: 0.364279, 1.290873
35000: 0.357394, 1.293940
36000: 0.350855, 1.299145
37000: 0.344672, 1.305445
38000: 0.338795, 1.311154

In [None]:
pt.plot(curves[0], 'b-')
pt.plot(curves[1], 'r-')
pt.plot([0, len(curves[1])], [baseline_error, baseline_error], 'g-')
pt.plot()
pt.legend(["Train","Test","Baseline"])
pt.show()

In [18]:
# play against AI

# The depth-limited minimax algorithm with NN as eval function
# Returns parent and child utilities
def minimax_eval(state, max_depth):
    if state.is_leaf():
        return state.score_for_max_player(), []
    elif max_depth == 0:
        x = encode(state).unsqueeze(0)
        u = net(x)
        return u, []
    else:
        child_utilities = [
            minimax(state.perform(action), max_depth - 1)[0]
            for action in state.valid_actions()]
        if state.is_max_players_turn():
            return max(child_utilities), child_utilities
        else:
            return min(child_utilities), child_utilities

state = initial_state(row_size=7, col_size=7,contig=4)
while not state.is_leaf():
    print(state)
    if state.is_max_players_turn():
        choice = input('Enter zero-based col: ')
        col = int(choice)
        #row,col = map(int, choice.split(","))
        row = state.get_next_open_row(col)
        state.board[row,col] = 1
    else:
        print("AI's move:")
        utility, child_utilities = minimax_eval(state, max_depth=3)
        ties = (np.array(child_utilities) == utility)
        tie_index = np.flatnonzero(ties)
        actions = state.valid_actions()
        action = actions[np.random.choice(tie_index)]
        state = state.perform(action)
print(state)
result = state.score_for_max_player()
print(f"Final score: {result}")

.......
.......
.......
.......
.......
.......
.......
Enter zero-based col: 2
.......
.......
.......
.......
.......
..X....
.......
AI's move:
.......
.......
.......
.......
..O....
..X....
.......
Enter zero-based col: 2
.......
.......
.......
..X....
..O....
..X....
.......
AI's move:
.......
.......
.......
..X....
..O....
..X..O.
.......
Enter zero-based col: 2
.......
.......
..X....
..X....
..O....
..X..O.
.......
AI's move:
.......
.......
..X....
..X....
..O....
O.X..O.
.......
Enter zero-based col: 2
.......
..X....
..X....
..X....
..O....
O.X..O.
.......
AI's move:
..O....
..X....
..X....
..X....
..O....
O.X..O.
.......
Enter zero-based col: 3
..O....
..X....
..X....
..X....
..O....
O.XX.O.
.......
AI's move:
..O....
..X....
..X....
..X....
..OO...
O.XX.O.
.......
Enter zero-based col: 3
..O....
..X....
..X....
..XX...
..OO...
O.XX.O.
.......
AI's move:
..O....
..X....
..X....
..XX...
..OO...
O.XX.OO
.......
Enter zero-based col: 4
..O....
..X....
..X....
..XX...
..OO..