In [1]:
# Package
import numpy as np
import pandas as pd
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [2]:
def empty_columns(A):
    """Test whether a given boardstate has legal move
    ======
    Find all columns with 0 at the top

    Parameters
    ----------
    A : numpy array, int
        6*7 matrix, represent a given board state: 1 means Player 1, 2 means Player2, 0 means empty.

    Returns
    -------
    empty_indices : list, int
        Index of columns that are not full.
    """
    empty = (A[0,:]==0)
    return [i for i in range(len(empty)) if empty[i]]

def check_winning(A,connectcount=4):
    """Test whether a given boardstate has a winner
    ======
    Find #1 or #2 in the given matrix reaches the given connectcount value.

    Parameters
    ----------
    A : numpy array, int
        6*7 matrix, represent a given board state: 1 means Player 1, 2 means Player2, 0 means empty.
    connectcount : int
        # of 1 or 2 in a line required for win.

    Returns
    -------
    w: int
    1 : Player 1 win.
    2 : Player 2 win.
    0 : There is no winner in the current boardstate.
    """
    for i in range(np.size(A,0)):
        for j in range(np.size(A,1)):
            if A[i,j]==0: #No need to waste time if the spot is empty
                continue

            if i<(np.size(A,0)-connectcount+1): #That is, if we are far enough from the bottom for a connect-X
                #Check columns below i,j
                if np.allclose(A[i:i+connectcount,j],np.ones(connectcount)):
                    return 1
                elif np.allclose(A[i:i+connectcount,j],(np.ones(connectcount)*2)):
                    return 2

                #Check down-right diagonals, if able
                if j<(np.size(A,1)-connectcount+1):
                    if np.allclose(np.diag(A[i:i+connectcount,j:j+connectcount]),np.ones(connectcount)):
                        return 1
                    elif np.allclose(np.diag(A[i:i+connectcount,j:j+connectcount]),(np.ones(connectcount)*2)):
                        return 2
                elif j>(connectcount-2): #Check down-left diagonals, if able
                    if np.allclose(np.diag(np.fliplr(A[i:i+connectcount,j-connectcount+1:j+1])),np.ones(connectcount)):
                        return 1
                    elif np.allclose(np.diag(np.fliplr(A[i:i+connectcount,j-connectcount+1:j+1])),(np.ones(connectcount)*2)):
                        return 2

            if j<(np.size(A,1)-connectcount+1): #Check rows to the right if able
                if np.allclose(A[i,j:j+connectcount],np.ones(connectcount)):
                    return 1
                elif np.allclose(A[i,j:j+connectcount],(np.ones(connectcount)*2)):
                    return 2
    return 0

def generate_policy(A,turn,connectcount=4):
    """Randomly pick an available column to make the next move and determine whose turn next.
    ======
    Use the # of turn to determine the game state. If the game doesn't end, then generate a random move and switch turns.

    Parameters
    ----------
    A : numpy array, int
        6*7 matrix, represent a given board state: 1 means Player 1, 2 means Player2, 0 means empty.
    connectcount : int
        # of 1 or 2 in a line required for win.
    turn: int
        Represent the game state: 1, player 1 take the next move; 2, player 2 take the next move; 3, player 1 win;
        4, player 2 win; 5, a tie, i.e. no avaiable move but no one wins.
    policy: int
        A randomly generated integer represent the index of the next move column.

    Returns
    -------
    [A,turn]: list,
    A: 6*7 matrix, boardstate updated; turn: int, next turn updated.
    """
    if turn>2:
      print("Game is over!")
      if turn==3:
            print("Player 1 wins")
      elif turn==4:
            print("Player 2 wins")
      elif turn==5:
            print("It's a tie!")
      return [A,turn]
    w = check_winning(A,connectcount)
    if w==1:
        print("Game is over!")
        print("Player 1 wins")
        return [A,3]
    if w==2:
        print("Game is over!")
        print("Player 2 wins")
        return [A,4]
    empty_indices = empty_columns(A)
    if len(empty_indices)==0:
        print("Game is over!")
        print("It's a tie!")
        return [A,5]
    policy = empty_indices[random.randint(0,len(empty_indices)-1)]
    [A,turn] = generate_boardstate(A, turn, policy)
    return [A,turn]

def generate_boardstate(A,turn,policy):
    """Generate the next boardstate.
    ======
    Use policy to update the boardstate and switich turn.

    Parameters
    ----------
    A : numpy array, int
        6*7 matrix, represent a given board state: 1 means Player 1, 2 means Player2, 0 means empty.
    turn: int
        Represent the game state: 1, player 1 take the next move; 2, player 2 take the next move; 3, player 1 win;
        4, player 2 win; 5, a tie, i.e. no avaiable move but no one wins.
    policy: int
        A randomly generated integer represent the index of the next move column.

    Returns
    -------
    [A,turn]: list,
    A: 6*7 matrix, boardstate updated; turn: int, next turn updated.
    """
    if turn<3:
        m = np.size(A,0)
        for i in range(m):
            if A[m-i-1,policy]==0:
                A[m-i-1,policy]=turn
                if turn==1:
                    turn=2
                elif turn==2:
                    turn=1
                return [A,turn]
        return "Error - Bad Policy choice!"
    return "The game is already over! Turn count too high"

def generate_random_boardstate(turncount,rows=6,columns=7):
    """Randomly simulate the first n move of a empty boardstate.
    ======

    Parameters
    ----------
    turncount: int
    # of steps required to simulate.

    Returns
    -------
    [A,turn]: list,
    A: 6*7 matrix, boardstate updated; turn: int, next turn updated.
    """

    restart = True

    while restart:

      restart = False
      A = np.zeros((rows,columns))
      turn = 1

      for i in range(0,turncount):
        [A,turn]=generate_policy(A,turn)
        if check_winning(A,4)!=0:
          restart = True
          break
    return [A,turn]

def boardstate_flatten(A):
    return A[[5, 4, 3, 2, 1, 0], :].flatten(order='F')

In [3]:
def get_user_input(user_boardstate=None, user_turn=None, user_turncount=None):
    user_boardstate = input("Please give me a board state (6x7 matrix, use 0 for empty):\n")
    if user_boardstate.strip():
        try:
            boardstate_cleaned = " ".join([i.split(".")[0] for i in user_boardstate.split()])
            user_boardstate = np.array([int(i) for i in boardstate_cleaned.split()]).reshape(6, 7)

            if np.all(np.isin(user_boardstate, [0, 1, 2])):
                user_turn = int(input("Please give me the next turn (1 or 2): "))
                if user_turn in [1, 2]:
                    return [user_boardstate, user_turn]
                else:
                    print("Invalid value for turn.")
                    user_turn = None
        except (ValueError, IndexError):
            pass

    if user_boardstate is None or user_turn is None:
        print("Using random board state and turn.")
        user_turncount = input("Please provide the turn count (0-42): ")
        try:
            user_turncount = int(user_turncount)
            if not (0 <= user_turncount <= 42):
                print("Invalid turn count. Using a random turn count instead.")
                user_turncount = random.randint(0, 42)
        except ValueError:
            print("Invalid input. Using a random turn count instead.")
            user_turncount = random.randint(0, 42)

        user_boardstate, user_turn = generate_random_boardstate(turncount=user_turncount, rows=6, columns=7)

    return [user_boardstate, user_turn]

def next_step():
    next_boardstate_list = []
    next_boardstate_compete_list = []
    user_boardstate, user_turn = get_user_input()
    print(user_boardstate, user_turn)
    empty_index = empty_columns(user_boardstate)
    next_user_turn = 1 if user_turn == 2 else (2 if user_turn == 1 else None)


    if not empty_index:
        return "Board is full. No valid step."

    for col_index in empty_index:
        next_boardstate = user_boardstate.copy()  # Use copy to create a new array
        next_boardstate_compete = user_boardstate.copy()  # Use copy to create a new array
        # Find the column in user_boardstate
        column = user_boardstate[:, col_index]

        # Find the first position with value 0 from bottom to top
        row_index = np.where(column == 0)[0][-1]

        # Update the value at the found position to user_turn
        next_boardstate[row_index, col_index] = user_turn
        next_boardstate_list.append(next_boardstate)

        # Update the value at the found position to user_turn for compete
        next_boardstate_compete[row_index, col_index] = next_user_turn
        next_boardstate_compete_list.append(next_boardstate_compete)


    return next_boardstate_list, next_boardstate_compete_list, user_turn, next_user_turn

In [4]:
class ClassifyNet(nn.Module):
    def __init__(self, num_in, num_hidden_1, num_hidden_2, num_out):
        super(ClassifyNet, self).__init__()
        self.fc1 = nn.Linear(num_in,num_hidden_1)
        self.dropout1 = nn.Dropout(0.33)
        self.fc2 = nn.Linear(num_hidden_1,num_hidden_2)
        self.dropout2 = nn.Dropout(0.33)
        self.fc3 = nn.Linear(num_hidden_2, num_out)

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc3(x)
        output = F.log_softmax(x, dim=1)
        return output

num_hidden_1 = 1024
num_hidden_2 = 512
batch_size = 1000

#GPU
cuda = True
use_cuda = cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print('Using ',device)

#Load Data from CSV and train test split
dataset = pd.read_csv('CleanedData _pytorch.csv')
dataset = np.array(dataset)

data = dataset[:, :-1]
target = dataset[:, -1]

#Training data (select at random from first 50000)
data_size = len(data) - 1
data_ind = np.random.permutation(data_size)[:data_size]

#Convert data to torch and device
data_train = torch.from_numpy(data[data_ind[:50000]]).float().to(device)
target_train = torch.from_numpy(target[data_ind[:50000]]).long().to(device)
data_test = torch.from_numpy(data[data_ind[50000:]]).float().to(device)
target_test = torch.from_numpy(target[data_ind[50000:]]).long().to(device)

#Setup model and optimizer
model = ClassifyNet(42,num_hidden_1,num_hidden_2,3).to(device)
# optimizer = optim.Adam(model.parameters(), lr=0.005)  #Learning rates

# #Training
# print('Iteration,Testing Accuracy,Training Accuracy')
# for i in range(100):

#     #Model evaluation
#     model.eval()
#     with torch.no_grad():
#         pred = torch.argmax(model(data_test),axis=1)
#         test_accuracy = torch.sum(pred == target_test)/len(pred)
#         pred = torch.argmax(model(data_train),axis=1)
#         train_accuracy = torch.sum(pred == target_train)/len(pred)
#         print(i,test_accuracy.item()*100,train_accuracy.item()*100)

#     #Training mode, run data through neural network in mini-batches (SGD)
#     model.train()
#     for j in range(0,len(target_train),batch_size):
#         optimizer.zero_grad()
#         loss = F.nll_loss(model(data_train[j:j+batch_size,:]), target_train[j:j+batch_size])
#         loss.backward()
#         optimizer.step()

# # Model Saved
# torch.save(model.state_dict(), 'Connect_4_Classification.pth')

# Model loaded
model.load_state_dict(torch.load('Connect_4_Classification.pth'))
# Set model in evaluation
model.eval()

Using  cuda


ClassifyNet(
  (fc1): Linear(in_features=42, out_features=1024, bias=True)
  (dropout1): Dropout(p=0.33, inplace=False)
  (fc2): Linear(in_features=1024, out_features=512, bias=True)
  (dropout2): Dropout(p=0.33, inplace=False)
  (fc3): Linear(in_features=512, out_features=3, bias=True)
)

In [5]:
def gameplay():
    next_boardstate_list, next_boardstate_compete_list, user_turn, next_user_turn = next_step()

    win_predict_list = []

    for index in range(len(next_boardstate_list)):
        if check_winning(next_boardstate_list[index]) == user_turn:
            print(f"Player {user_turn} wins!")
            return next_boardstate_list[index]
        elif check_winning(next_boardstate_compete_list[index]) == next_user_turn:
            print("You have to put the next step here or you lose!")
            return next_boardstate_list[index]

        boardstate = next_boardstate_list[index]
        flattened_boardstate = torch.from_numpy(boardstate_flatten(boardstate)).float().to(device).view(1, -1)
        output = model(flattened_boardstate)
        win_predict = torch.argmax(output, dim=1).item()
        win_predict_list.append(win_predict)

    print(win_predict_list)

    if user_turn == 1:
        next_user_turn = 2
        if all(pred == 1 for pred in win_predict_list):
            return "You lost anyway."

        zero_index = next((i for i, pred in enumerate(win_predict_list) if pred == 0), None)
        if zero_index is not None:
            return [next_boardstate_list[zero_index],next_user_turn]

        two_index = next((i for i, pred in enumerate(win_predict_list) if pred == 2), None)
        print("Now the best we can do is to find a tie.")
        return [next_boardstate_list[two_index],next_user_turn]

    elif user_turn == 2:
        next_user_turn = 1
        if all(pred == 0 for pred in win_predict_list):
            return "You lost anyway."

        one_index = next((i for i, pred in enumerate(win_predict_list) if pred == 1), None)
        if one_index is not None:
            return [next_boardstate_list[one_index],next_user_turn]

        two_index = next((i for i, pred in enumerate(win_predict_list) if pred == 2), None)
        print("Now the best we can do is to find a tie.")
        return [next_boardstate_list[two_index],next_user_turn]

# Example usage:
result = gameplay()
print(result)

Using random board state and turn.
[[0. 2. 0. 0. 0. 0. 0.]
 [1. 2. 0. 0. 0. 0. 0.]
 [1. 2. 0. 0. 0. 2. 0.]
 [2. 1. 0. 0. 0. 1. 0.]
 [1. 1. 0. 2. 1. 2. 0.]
 [2. 2. 1. 2. 1. 1. 0.]] 1
[0, 0, 0, 0, 0, 0]
[array([[1., 2., 0., 0., 0., 0., 0.],
       [1., 2., 0., 0., 0., 0., 0.],
       [1., 2., 0., 0., 0., 2., 0.],
       [2., 1., 0., 0., 0., 1., 0.],
       [1., 1., 0., 2., 1., 2., 0.],
       [2., 2., 1., 2., 1., 1., 0.]]), 2]
