In [1]:
from c4 import Board, AgentRandom, AgentMinMaxMC, AgentMC
import os
import time

import matplotlib.pyplot as plt
import numpy as np
from IPython.display import clear_output
from tqdm.auto import tqdm

import torch
import torch.nn.functional as F
from torch import nn
from torch import optim
import torchvision
import wandb

In [2]:
gameplays = []
with open("games3.txt") as inputfile:
    gameplays = inputfile.readlines()

gameplays = [list(s.strip('S\n')) for s in gameplays]
print(gameplays[0])

gameplays = [(xs[:-1:], xs[-1]) for xs in gameplays]
print(gameplays[0])

['3', '3', '1', '2', '1', '1', '5', '6', '5', '5', '1', '0', '3', '5', '3', '3', '6', '5', '5', '1', '2', '0', '6', '6', '0', '2', '2', '0', '2', '3', '4', '0', '4', 'B']
(['3', '3', '1', '2', '1', '1', '5', '6', '5', '5', '1', '0', '3', '5', '3', '3', '6', '5', '5', '1', '2', '0', '6', '6', '0', '2', '2', '0', '2', '3', '4', '0', '4'], 'B')


Let's assume that game state evaluated to 1.0 -> A wins, -1.0 -> B wins

In [3]:
def extractAB_from_board(board):
    DX = 7
    DY = 6
    A_board = [DX * [0] for y in range(DY)]
    B_board = [DX * [0] for y in range(DY)]
    for i in range(DY):
        for j in range(DX):
            if board[i][j] == 1:
                A_board[i][j] = 1
            if board[i][j] == -1:
                B_board[i][j] = 1

    return A_board, B_board

In [4]:
boardslist = []
winnerslist = []

for game, winner in gameplays:
    current_board = Board()
    for move in game:
        current_board.apply_move(int(move))
        if winner == 'A':
            winnerslist.append(torch.Tensor([1.0, 0.0]))
        elif winner == 'B':
            winnerslist.append(torch.Tensor([0.0, 1.0]))
        else:
            continue

        a_board, b_board = extractAB_from_board(current_board.board)
        boardslist.append(torch.Tensor([current_board.board, a_board, b_board]))
        
    del current_board

data_boards = np.array(boardslist)
data_winners = np.array(winnerslist)

In [5]:
#print(data_boards)
#print(data_winners)

In [6]:
class GameDataset(torch.utils.data.Dataset):
    def __init__(self, gamelist, winnerlist):
        self.gamelist = gamelist
        self.winnerlist = winnerlist
    def __len__(self):
        return len(self.gamelist)
    def __getitem__(self, idx):
        return self.gamelist[idx], self.winnerlist[idx]

shuffled_indexes = np.random.permutation(len(data_boards))

train_size = int(0.8 * len(data_boards))
print(train_size)
train_dataset = GameDataset(data_boards[shuffled_indexes[ : train_size ]], data_winners[shuffled_indexes[ : train_size ]])
test_dataset = GameDataset(data_boards[shuffled_indexes[ train_size : ]], data_winners[shuffled_indexes[ train_size : ]])

print(f"train length: {len(train_dataset)}, test length: {len(test_dataset)}")

batch_size = 32

game_dataloaders = {
    "train": torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, num_workers=4, shuffle=True
    ),
    "test": torch.utils.data.DataLoader(
        test_dataset, batch_size=batch_size, num_workers=4, shuffle=False
    ),
}

471489
train length: 471489, test length: 117873


In [7]:
def plot_history(history):
    plt.figure(figsize=(16, 4))
    plt.subplot(1, 2, 1)
    train_loss = np.array(history["train_losses"])
    plt.semilogy(np.arange(train_loss.shape[0]), train_loss, label="batch train loss")
    plt.legend()

    plt.subplot(1, 2, 2)
    train_errs = np.array(history["train_errs"])
    plt.plot(np.arange(train_errs.shape[0]), train_errs, label="batch train error rate")
    val_errs = np.array(history["val_errs"])
    plt.plot(val_errs[:, 0], val_errs[:, 1], label="validation error rate", color="r")
    plt.legend()

def compute_error_rate(model, data_loader, cuda=True):
    model.eval()
    num_errs = 0.0
    num_examples = 0
    for x, y in data_loader:
        if cuda:
            x = x.cuda()
            y = y.cuda()

        with torch.no_grad():
            outputs = model.forward(x)
            _, predictions = outputs.max(dim=1)
            num_errs += (predictions != y).sum().item()
            num_examples += x.size(0)
    return num_errs / num_examples


def train(
    model, data_loaders, learning_rate, momentum, epochs=1, log_every=100, device="cuda", wandblogging=False
):
    model.to(device)

    optimizer = optim.SGD(model.parameters(), learning_rate, momentum)
        
    history = {"train_losses": [], "train_errs": [], "val_errs": []}
    best_params = None
    best_valid_err = np.inf
    iter_ = 0
    tstart = time.time()

    for epoch in range(epochs):
        model.train()

        for x, y in data_loaders["train"]:         
            x = x.to(device)
            y = y.to(device)

            #print(f"x shape:{x.shape}")
            
            pred = model(x)
            loss = model.loss(pred, y)

            print(f"y shape: {y.shape}, y: {y}")
            print(f"pred: {pred}")
            _ , predictions = pred.max(dim=1)
            batch_err_rate = (predictions != y).sum().item() / pred.size(0)

            history["train_losses"].append(loss.item())
            history["train_errs"].append(batch_err_rate)
            if wandblogging:
                wandb.log({"train_error_rate" : batch_err_rate})

            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            iter_ += 1
            if log_every != -1 and iter_ % log_every == 0:
                num_iter = iter_
                print(
                    "Minibatch {0: >6}  | loss {1: >5.2f} | err rate {2: >5.2f}%, steps/s {3: >5.2f}".format(
                        iter_,
                        loss.item(),
                        batch_err_rate * 100.0,
                        num_iter / (time.time() - tstart),
                    )
                )
                tstart = time.time()

        valid_err_rate = compute_error_rate(model, data_loaders["test"], device)
        
        history["val_errs"].append((iter_, valid_err_rate))
        if wandblogging:
            wandb.log({"valid_error_rate" : valid_err_rate})

        if valid_err_rate < best_valid_err:
            best_epoch = epoch
            best_valid_err = valid_err_rate
            best_params = [p.detach().cpu() for p in model.parameters()]
            
        #clear_output(True)
        m = "After epoch {0: >2} | valid err rate: {1: >5.2f}% | doing {2: >3} epochs".format(
            epoch, valid_err_rate * 100.0, epochs
        )
        print("{0}\n{1}\n{0}".format("-" * len(m), m))

    if best_params is not None:
        print("\nLoading best params on validation set (epoch %d)\n" % (best_epoch))
        with torch.no_grad():
            for param, best_param in zip(model.parameters(), best_params):
                param[...] = best_param


    #print("history[train loss] length:" + str(len(history["train_losses"])) + str(history["train_losses"]))
    #print("history[train errs] shape:" + str(len(history["train_errs"])) + str(history["train_errs"]))
    #print("history[val errs] shape:" + str(len(history["val_errs"])) + str(history["val_errs"]))
    plot_history(history)
    return history

In [8]:
class ModelConv(nn.Module):
    def __init__(self, *args, **kwargs):
        super(ModelConv, self).__init__()
        self.layers = nn.Sequential(nn.Conv2d(3, 128, 4, stride=1, padding=3),
                                    nn.ReLU(),
                                    nn.MaxPool2d(2,1),
                                    nn.Conv2d(128, 128, 2, stride=1, padding=1),
                                    nn.ReLU(),
                                    #nn.AvgPool2d(2,1),
                                    #nn.Dropout(0.5),
                                    nn.Conv2d(128, 128, 2, stride=1, padding=1),
                                    nn.ReLU(),
                                    nn.Conv2d(128, 1, 1, stride=1),
                                    nn.ReLU(),
                                    nn.Flatten(),
                                    nn.Linear(110, 20),
                                    nn.ReLU(),
                                    #nn.Dropout(0.5),
                                    nn.Linear(20, 2))

    def forward(self, X):
        X = X.view(X.size(0), 3, 6, 7)
        return self.layers.forward(X)

    def loss(self, Out, Targets):
        return F.cross_entropy(Out, Targets)

modelConv = ModelConv()

In [9]:
with torch.no_grad():
    # Initialize parameters
    for name, p in modelConv.named_parameters():
        if "weight" in name:
            nn.init.xavier_normal_(p)
        elif "bias" in name:
            p.zero_()
        else:
            raise ValueError('Unknown parameter name "%s"' % name)
            
t_start = time.time()
train(modelConv, game_dataloaders, 0.002, 0.98, epochs=20, log_every=10000)
print(f"training took {time.time() - t_start:.0f}s.")

y shape: torch.Size([32, 2]), y: tensor([[0., 1.],
        [1., 0.],
        [1., 0.],
        [0., 1.],
        [1., 0.],
        [1., 0.],
        [1., 0.],
        [0., 1.],
        [0., 1.],
        [1., 0.],
        [0., 1.],
        [0., 1.],
        [0., 1.],
        [0., 1.],
        [0., 1.],
        [0., 1.],
        [1., 0.],
        [1., 0.],
        [1., 0.],
        [0., 1.],
        [0., 1.],
        [0., 1.],
        [1., 0.],
        [0., 1.],
        [0., 1.],
        [1., 0.],
        [1., 0.],
        [1., 0.],
        [1., 0.],
        [1., 0.],
        [1., 0.],
        [1., 0.]], device='cuda:0')
pred: tensor([[-0.0098, -0.0170],
        [-0.0051, -0.0231],
        [-0.0023, -0.0172],
        [-0.0150, -0.0237],
        [ 0.0119, -0.0230],
        [ 0.0062, -0.0219],
        [ 0.0229, -0.0288],
        [ 0.0174, -0.0050],
        [ 0.0066, -0.0142],
        [-0.0329, -0.0460],
        [ 0.0288, -0.0037],
        [ 0.0098, -0.0110],
        [ 0.0424, -0.0393],
   

RuntimeError: The size of tensor a (32) must match the size of tensor b (2) at non-singleton dimension 1

Agent definition:

In [None]:
from c4 import Board, AgentRandom, AgentMinMaxMC, AgentMC

In [None]:
def game(agent_a, agent_b):
    b = Board()
    agents = [agent_a, agent_b]
    moves = []
    
    who = 0
    
    while not b.end():
        m = agents[who].best_move(b)
        b.apply_move(m) 
                          
        who = 1-who
    
    #b.print() 
    print (b.result)
    print ("X" * 20 + "\nEND OF GAME\n")
    print ()
    
    return b.result
    
def duel(agent_a, agent_b, N):
    score = {1:0, -1:0, 0:0}
    
    for i in range(N):
        r1 = game(agent_a, agent_b)
        score[r1] += 1
        r2 = game(agent_b, agent_a)
        score[-r2] += 1
    
    s = sum(score.values())
    
    for k in score:
        score[k] /= s    
    print (f'{agent_a.name}: {score[+1]}, {agent_b.name}: {score[-1]}, Draw: {score[0]}')     

In [None]:
class AgentCNN:
    def __init__(self, model):
        self.model = model
        self.model.to("cuda")
        self.name = 'AgentCNN'
        
    def best_move(self, b):
        boardresults = []
        
        bestresult = -2.0
        bestmove = 0
        #print(b.moves())
        for m in b.moves():
            b.apply_move(m)

            a_board, b_board = extractAB_from_board(current_board.board)
            
            netinput = torch.Tensor([[b.board, a_board, b_board]])
            netinput = netinput.to("cuda")

            res = self.model(netinput).item()
            boardresults.append(res)
            if res > bestresult:
                bestresult = res
                bestmove = m
            
            b.undo_move(m)

        #b.printchars()
        #print(boardresults)
        #print(bestmove)
            
        return bestmove

myAgent = AgentCNN(modelConv)

In [None]:
A = myAgent
B = AgentRandom()
#B = AgentMC(10)

duel(A, B, 500)    