In [13]:
import numpy as np
import pickle
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import KFold
from collections import Counter


# Load & Prepare Dataset

In [14]:
# Load the dataset
save_path = "connect4_dataset_parallel.pkl"
with open(save_path, "rb") as f:
    dataset_original = pickle.load(f)

save_path_skill = "connect4_rand.pkl"
with open(save_path_skill, "rb") as f:
    dataset_skill = pickle.load(f)

print(f"Loaded dataset from {save_path} with {len(dataset_original)} entries.")
print(f"Loaded dataset from {save_path_skill} with {len(dataset_skill)} entries.")

Loaded dataset from connect4_dataset_parallel.pkl with 443661 entries.
Loaded dataset from connect4_rand.pkl with 141164 entries.


In [16]:
# Group entries by `state` and determine the most common `best_move`
def filter_duplicates(dataset):
    state_to_moves = {}
    
    # Group all best_moves for each state
    for entry in dataset:
        state = entry['state']
        best_move = entry['best_move']
        if state not in state_to_moves:
            state_to_moves[state] = []
        state_to_moves[state].append(best_move)
    
    # Create a new dataset with the most common best_move for each state
    filtered_dataset = []
    for state, moves in state_to_moves.items():
        most_common_move = Counter(moves).most_common(1)[0][0]  # Get the most common best_move
        filtered_dataset.append({'state': state, 'best_move': most_common_move})
    
    return filtered_dataset


# weight the dataset
# duplicate entries with an ideal move in column 2 or 4
# duplicate TWICE entries with an ideal move in column 3
# the middle column should be the preferred column, and its neighbors preferred next
dataset_weighted = dataset_original.copy()
for entry in dataset_original:
    # print(entry)
    ideal_move = entry['best_move']
    if ideal_move == 2 or ideal_move == 4:
        dataset_weighted.append(entry)
    elif ideal_move == 3:
        dataset_weighted.append(entry)
        dataset_weighted.append(entry)

filtered_dataset = filter_duplicates(dataset_weighted)

print(f"Original dataset size: {len(dataset_original)}")
print(f"Filtered dataset size: {len(filtered_dataset)}")


Original dataset size: 443661
Filtered dataset size: 334489


In [17]:
dataset = filtered_dataset.copy()
print(f"Filtered dataset size: {len(dataset_weighted)}")
print(f"Filtered dataset size: {len(dataset_original)}")
print(f"Filtered dataset size: {len(filtered_dataset)}")
print(f"usable dataset size: {len(dataset)}")
print(f"skilled dataset size: {len(dataset_skill)}")



Filtered dataset size: 711921
Filtered dataset size: 443661
Filtered dataset size: 334489
usable dataset size: 334489
skilled dataset size: 141164


In [18]:
def bitboard_to_matrix(bitboard):
    """
    Converts a bitboard integer into a 6x7 binary matrix.
    """
    matrix = np.zeros((6, 7), dtype=np.int32)
    for row in range(6):
        for col in range(7):
            position = row * 7 + col
            if bitboard & (1 << position):
                matrix[row, col] = 1
    return matrix

def preprocess_dataset(dataset):
    """
    Converts the dataset into input matrices (X) and target labels (y).
    """
    X = []
    y = []
    for entry in dataset:
        player1_matrix = bitboard_to_matrix(entry['state'][0])
        player2_matrix = bitboard_to_matrix(entry['state'][1])
        # Combine both matrices into a 2-channel representation
        combined_matrix = np.stack([player1_matrix, player2_matrix], axis=0)
        X.append(combined_matrix)
        y.append(entry['best_move'])  # The target is the best move
    return np.array(X), np.array(y)



In [19]:
X_train, y_train = preprocess_dataset(dataset_weighted)
X_train_skill, y_train_skill = preprocess_dataset(dataset_skill)
X_test, y_test = preprocess_dataset(filtered_dataset)

In [20]:
print("Input shape (X):", X_train.shape)  # Expected: (num_samples, 2, 6, 7)
print("Target shape (y):", y_train.shape)  # Expected: (num_samples,)
print("Input shape (X):", X_test.shape)  # Expected: (num_samples, 2, 6, 7)
print("Target shape (y):", y_test.shape)  # Expected: (num_samples,)
print("Input shape (X):", X_train_skill.shape)  # Expected: (num_samples, 2, 6, 7)
print("Target shape (y):", y_train_skill.shape)  # Expected: (num_samples,)

Input shape (X): (711921, 2, 6, 7)
Target shape (y): (711921,)
Input shape (X): (334489, 2, 6, 7)
Target shape (y): (334489,)
Input shape (X): (141164, 2, 6, 7)
Target shape (y): (141164,)


# Define CNN

In [21]:
# Define the CNN model
class Connect4CNN(nn.Module):
    def __init__(self):
        super(Connect4CNN, self).__init__()
        self.conv1 = nn.Conv2d(2, 32, kernel_size=7, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=7, padding=1)
        self.fc1 = nn.Linear(64 * 6 * 7, 128)
        self.fc2 = nn.Linear(128, 7)  # 7 outputs for the 7 columns (best move)

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = x.view(x.size(0), -1)  # Flatten for the fully connected layer
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)  # No activation here; handled by loss function
        return x


In [22]:
def evaluate_model(model, dataloader, criterion):
    """
    Evaluate the model on a validation or test set.
    """
    model.eval()
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for batch_X, batch_y in dataloader:
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == batch_y).sum().item()
            total += batch_y.size(0)

    accuracy = correct / total
    return total_loss / len(dataloader), accuracy


# Train CNN

## Large, weighted dataset only

In [40]:
# Create a DataLoader for the large dataset
train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32),
                                torch.tensor(y_train, dtype=torch.long))
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Initialize and train the model
model = Connect4CNN()
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Train
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0
    for batch_X, batch_y in train_loader:
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"Epoch {epoch + 1}: Training Loss = {running_loss / len(train_loader):.4f}")


Epoch 1: Training Loss = 1.6058
Epoch 2: Training Loss = 1.5003
Epoch 3: Training Loss = 1.4461
Epoch 4: Training Loss = 1.4048
Epoch 5: Training Loss = 1.3708
Epoch 6: Training Loss = 1.3408
Epoch 7: Training Loss = 1.3162
Epoch 8: Training Loss = 1.2941
Epoch 9: Training Loss = 1.2745
Epoch 10: Training Loss = 1.2575


In [41]:
# Create a DataLoader for the smaller dataset
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32),
                                torch.tensor(y_test, dtype=torch.long))
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Evaluate the model
test_loss, test_accuracy = evaluate_model(model, test_loader, criterion)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")


Test Loss: 1.2451, Test Accuracy: 0.5241


## Skilled dataset

In [11]:
# Create a DataLoader for the skilled dataset
train_dataset_skill = TensorDataset(torch.tensor(X_train_skill, dtype=torch.float32),
                                torch.tensor(y_train_skill, dtype=torch.long))
train_loader_skill = DataLoader(train_dataset_skill, batch_size=32, shuffle=True)

# Initialize and train the model
model_skill = Connect4CNN()
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model_skill.parameters(), lr=0.001)

# Train
num_epochs = 10
for epoch in range(num_epochs):
    model_skill.train()
    running_loss = 0
    for batch_X, batch_y in train_loader_skill:
        optimizer.zero_grad()
        outputs = model_skill(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"Epoch {epoch + 1}: Training Loss = {running_loss / len(train_loader_skill):.4f}")


Epoch 1: Training Loss = 1.7778
Epoch 2: Training Loss = 1.6895
Epoch 3: Training Loss = 1.6443
Epoch 4: Training Loss = 1.6035
Epoch 5: Training Loss = 1.5630
Epoch 6: Training Loss = 1.5214
Epoch 7: Training Loss = 1.4786
Epoch 8: Training Loss = 1.4382
Epoch 9: Training Loss = 1.3988
Epoch 10: Training Loss = 1.3632


In [12]:
# Create a DataLoader for the smaller dataset
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32),
                                torch.tensor(y_test, dtype=torch.long))
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Evaluate the model
test_loss, test_accuracy = evaluate_model(model_skill, test_loader, criterion)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")


Test Loss: 1.8927, Test Accuracy: 0.3114


## train on large dataset, validate on skill

In [23]:
# Create DataLoaders
train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32),
                                torch.tensor(y_train, dtype=torch.long))
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)


val_dataset_skill = TensorDataset(torch.tensor(X_train_skill, dtype=torch.float32),
                                torch.tensor(y_train_skill, dtype=torch.long))
val_loader_skill = DataLoader(val_dataset_skill, batch_size=32, shuffle=False)

# Initialize and train the model
model_ls = Connect4CNN()
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model_ls.parameters(), lr=0.001)

# Train
num_epochs = 10
best_val_loss = float('inf')  # Track the best validation loss

for epoch in range(num_epochs):
    model_ls.train()
    running_loss = 0
    for batch_X, batch_y in train_loader:
        optimizer.zero_grad()
        outputs = model_ls(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    # Validate on the intermediate dataset
    val_loss, val_accuracy = evaluate_model(model_ls, val_loader_skill, criterion)
    print(f"Epoch {epoch + 1}: Training Loss = {running_loss / len(train_loader):.4f}, "
            f"Validation Loss = {val_loss:.4f}, Validation Accuracy = {val_accuracy:.4f}")
    # Save the model if validation loss improves
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model_ls.state_dict(), "best_connect4_model.pth")


Epoch 1: Training Loss = 1.6048, Validation Loss = 1.7614, Validation Accuracy = 0.2909
Epoch 2: Training Loss = 1.5037, Validation Loss = 1.7242, Validation Accuracy = 0.3085
Epoch 3: Training Loss = 1.4484, Validation Loss = 1.7335, Validation Accuracy = 0.3143
Epoch 4: Training Loss = 1.4059, Validation Loss = 1.7431, Validation Accuracy = 0.3187
Epoch 5: Training Loss = 1.3718, Validation Loss = 1.7727, Validation Accuracy = 0.3154
Epoch 6: Training Loss = 1.3428, Validation Loss = 1.7964, Validation Accuracy = 0.3191
Epoch 7: Training Loss = 1.3183, Validation Loss = 1.8133, Validation Accuracy = 0.3165
Epoch 8: Training Loss = 1.2957, Validation Loss = 1.8737, Validation Accuracy = 0.3161
Epoch 9: Training Loss = 1.2780, Validation Loss = 1.8542, Validation Accuracy = 0.3187
Epoch 10: Training Loss = 1.2610, Validation Loss = 1.8908, Validation Accuracy = 0.3146


In [24]:
# Prepare the test loader
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32),
                                torch.tensor(y_test, dtype=torch.long))
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Load the best model and test it
model_ls.load_state_dict(torch.load("best_connect4_model.pth"))
test_loss, test_accuracy = evaluate_model(model_ls, test_loader, criterion)
print(f"Final Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")


  model_ls.load_state_dict(torch.load("best_connect4_model.pth"))


Final Test Loss: 1.5376, Test Accuracy: 0.3863
