In [33]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split

import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers


import pickle
import pandas as pd
import numpy as np
from IPython.display import clear_output
import time
import random
import matplotlib.pyplot as plt

# Import data and fix up for transformer

In [53]:
# # Replace 'your_file.pkl' with the path to your pickle file
# file_path = "Connect4Dataset_RandomOpp_Firstmove.pkl"

# # Open the pickle file in read mode
# with open(file_path, "rb") as file:
#     dataset_progressive_skill = pickle.load(file)

# # Convert the dataset to a DataFrame, including board state
# data = []
# for record in dataset_progressive_skill:
#     game_id, board, move, player1_skill, player2_skill, player = record
#     data.append(
#         {
#             "Game ID": game_id,
#             "Board State": board,
#             "Move": move,
#             "Player 1 Skill": player1_skill,
#             "Player 2 Skill": player2_skill,
#             "Player": player,
#         }
#     )

# connect4 = pd.DataFrame(data)

connect4 = pd.read_pickle("Connect4Dataset_SmartRandom_combined.pkl")

In [54]:
def standardize_board_state(board, player):
    """
    Swap [0,1] to [1,0] and [1,0] to [0,1] in a given board only if player == -1.
    """

    # if isinstance(player, str):
    #     player = int(player)

    if player == "minus":
        board = np.array(board)  # Ensure it's a NumPy array

        # Create masks
        mask_01 = (board[:, :, 0] == 0) & (board[:, :, 1] == 1)  # Find [0,1]
        mask_10 = (board[:, :, 0] == 1) & (board[:, :, 1] == 0)  # Find [1,0]

        # Swap values
        board[mask_01] = [1, 0]
        board[mask_10] = [0, 1]

    return board  # Return the modified or original board


# Apply to each board where the player is -1
connect4_standardized = connect4.copy()
connect4_standardized["Board State"] = connect4_standardized.apply(
    lambda row: standardize_board_state(row["Board State"], row["Player"]), axis=1
)

In [55]:
def flip_board(board):
    """Flip the board horizontally."""
    return np.flip(board, axis=1)


def add_flipped_boards(df):
    """
    Add flipped boards to the dataset, updating the Game ID for flipped boards.
    Args:
        df: A Pandas DataFrame with columns "Game ID", "Board State", "Move",
             "Player 1 Skill", "Player 2 Skill", and "Player".
    Returns:
        A new DataFrame with original and flipped boards, where flipped boards have updated Game IDs.
    """
    flipped_rows = []
    max_game_id = df[
        "Game ID"
    ].max()  # Start new Game IDs after the max ID in the original DataFrame

    for game_id, group in df.groupby("Game ID"):  # Group by each game
        for _, row in group.iterrows():
            # Extract board state, move, and other columns
            board = row["Board State"]
            move = row["Move"]
            other_columns = row.drop(
                ["Game ID", "Board State", "Move"]
            ).to_dict()  # Extract other columns

            # Add the original board state
            flipped_rows.append(
                {
                    "Game ID": game_id,  # Keep the original Game ID
                    "Board State": board,
                    "Move": move,
                    **other_columns,
                }
            )

            # Flip the board and adjust the move
            flipped_board = flip_board(board)
            flipped_move = 6 - move  # Adjust move for flipped board

            # Add the flipped board state with a new Game ID
            flipped_rows.append(
                {
                    "Game ID": max_game_id
                    + 1,  # Increment Game ID for flipped board states
                    "Board State": flipped_board,
                    "Move": flipped_move,
                    **other_columns,
                }
            )

        # Increment the max_game_id for the next game
        max_game_id += 1

    # Create a new DataFrame from the rows
    return pd.DataFrame(flipped_rows)


# Apply the function to your DataFrame
connect4_wflip = add_flipped_boards(connect4_standardized)
connect4_wflip.shape

(1046484, 6)

In [56]:
connect4_wflip = connect4_wflip[connect4_wflip["Player"] == "plus"]
connect4_wflip.shape

(525190, 6)

# Transformer architecture Dans code

In [None]:
# Use CNN convultions to filter the data before putting it into the transformer
# Look at actual ViT model

In [57]:
class PositionalIndex(tf.keras.layers.Layer):
    def call(self, x):
        bs = tf.shape(x)[0]  # extract batch size
        number_of_vectors = tf.shape(x)[
            1
        ]  # how many vectors - we know it should be m*n, but let's count to be sure...
        indices = tf.range(number_of_vectors)  # index for each vector
        indices = tf.expand_dims(indices, 0)  # reshape it appropriately
        return tf.tile(indices, [bs, 1])  # repeat for each batch

In [58]:
class ClassTokenIndex(tf.keras.layers.Layer):
    def call(self, x):
        bs = tf.shape(x)[0]  # extract batch size
        number_of_vectors = 1  # how many vectors - we just want 1 (which is @ 0) ... we only want to generate 1 vector for the class token
        # now just get it to be the right size
        indices = tf.range(number_of_vectors)  # index for each vector
        indices = tf.expand_dims(indices, 0)  # reshape it appropriately
        return tf.tile(indices, [bs, 1])  # repeat for each batch

In [59]:
def build_ViT(
    size,
    block_size,
    hidden_dim,
    num_layers,
    num_heads,
    key_dim,
    value_dim,
    mlp_dim,
    dropout_rate,
    num_classes,
):
    # n is number of rows of blocks
    # m is number of cols of blocks
    # block_size is number of pixels (with rgb) in each block
    inp = tf.keras.layers.Input(shape=(size, block_size))
    mid = tf.keras.layers.Dense(hidden_dim)(
        inp
    )  # transform to vectors with different dimension
    # the positional embeddings
    inp2 = PositionalIndex()(inp)
    emb = tf.keras.layers.Embedding(input_dim=size, output_dim=hidden_dim)(
        inp2
    )  # learned positional embedding for each of the n*m possible possitions
    mid = tf.keras.layers.Add()(
        [mid, emb]
    )  # for some reason, tf.keras.layers.Add causes an error, but + doesn't?
    # create and append class token to beginning of all input vectors
    tokenInd = ClassTokenIndex()(mid)
    token = tf.keras.layers.Embedding(input_dim=1, output_dim=hidden_dim)(tokenInd)
    mid = tf.keras.layers.Concatenate(axis=1)([token, mid])

    for l in range(num_layers):  # how many Transformer Head layers are there?
        ln = tf.keras.layers.LayerNormalization()(mid)  # normalize
        mha = tf.keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=key_dim, value_dim=value_dim
        )(
            ln, ln, ln
        )  # self attention!
        add = tf.keras.layers.Add()([mid, mha])  # add and norm
        ln = tf.keras.layers.LayerNormalization()(add)
        den = tf.keras.layers.Dense(mlp_dim, activation="gelu")(
            ln
        )  # maybe should be relu...who knows...
        den = tf.keras.layers.Dropout(dropout_rate)(den)  # regularization
        den = tf.keras.layers.Dense(hidden_dim)(
            den
        )  # back to the right dimensional space
        den = tf.keras.layers.Dropout(dropout_rate)(den)
        mid = tf.keras.layers.Add()([den, add])  # add and norm again

    fl = mid[:, 0, :]  # just grab the class token for each image in batch
    ln = tf.keras.layers.LayerNormalization()(fl)
    clas = tf.keras.layers.Dense(num_classes, activation="softmax")(
        ln
    )  # probability that the image is in each category
    mod = tf.keras.models.Model(inp, clas)
    mod.compile(
        optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
    )
    return mod

In [65]:
size = 69
block_size = 8
hidden_dim = 128
num_layers = 16
num_heads = 16
key_dim = (
    hidden_dim // num_heads
)  # usually good practice for key_dim to be hidden_dim//num_heads...this is why we do Multi-Head attention
value_dim = key_dim * 2
mlp_dim = hidden_dim * 4
dropout_rate = 0.1
num_classes = 7


trans = build_ViT(
    size,
    block_size,
    hidden_dim,
    num_layers,
    num_heads,
    key_dim,
    value_dim,
    mlp_dim,
    dropout_rate,
    num_classes,
)
trans.summary()

In [66]:
(
    ((hidden_dim * key_dim + key_dim) * 2 + hidden_dim * value_dim + value_dim)
    * num_heads
    + (value_dim * num_heads) * hidden_dim
    + hidden_dim
)

23344

# Prep data for testing

In [67]:
# def subset_games(df, num_games):
#     """
#     Subset a DataFrame based on a random sample of unique game IDs.

#     Parameters:
#         df (pd.DataFrame): The input DataFrame with a 'game id' column.
#         num_games (int): The number of unique game IDs to subset.

#     Returns:
#         pd.DataFrame: A subset of the original DataFrame.
#     """
#     # Get the unique game IDs
#     unique_game_ids = df["Game ID"].unique()

#     # Ensure the requested number of games doesn't exceed the available unique IDs
#     if num_games > len(unique_game_ids):
#         raise ValueError(
#             f"num_games ({num_games}) exceeds the total unique game IDs ({len(unique_game_ids)})."
#         )

#     # Randomly sample the desired number of unique game IDs
#     sampled_game_ids = pd.Series(unique_game_ids).sample(num_games, random_state=42)

#     # Subset the DataFrame
#     subset_df = df[df["Game ID"].isin(sampled_game_ids)]

#     return subset_df


# num_games = 20000  # Specify the number of games to include
connect4_testing = connect4_wflip.copy()

# Print some details
print(f"Subset contains {len(connect4_testing)} board states")

Subset contains 525190 board states


In [68]:
# Extract features (board states) and labels (recommended moves)
X = np.stack(
    connect4_testing["Board State"].values
)  # Convert board states into a NumPy array
y = connect4_testing["Move"].values

# Normalize the board states (optional for CNNs)
X = X.astype("float32") / 1.0  # Assuming board states are binary (0 or 1)

# Convert labels to one-hot encoding (required for multi-class classification)
num_classes = 7  # Moves are in columns 0-6
y = to_categorical(y, num_classes)

# Split into training, validation, and test sets
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.3, random_state=42
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42
)

print(
    f"Training set: {X_train.shape}, Validation set: {X_val.shape}, Test set: {X_test.shape}"
)


ndata_train = X_train.shape[0]
ndata_test = X_test.shape[0]

Training set: (367633, 6, 7, 2), Validation set: (78778, 6, 7, 2), Test set: (78779, 6, 7, 2)


In [69]:
y_train = np.argmax(y_train, axis=1)  # Convert from one-hot to class indices
y_test = np.argmax(y_test, axis=1)  # Convert from one-hot to class indices

In [70]:
X_train_test = np.zeros(
    (1, 6, 7, 2), dtype=int
)  # Only one board (ndata_train=1), 6x7 size, 2 players

# Fill the board with increasing numbers for players
X_train_test[0, :, :, 0] = np.arange(1, 7).reshape(6, 1) * np.ones(
    (1, 7)
)  # Player 1: Increase across rows
X_train_test[0, :, :, 1] = np.arange(1, 8).reshape(
    1, 7
)  # Player 2: Increase across columns

In [71]:
import numpy as np

# Initialize example board size
n, m = 6, 7  # Rows and columns for Connect Four
block_size = 8  # Length of a winning combination (4-in-a-row)

# Initialize x_train_ravel with the correct shape
ndata_train = X_train.shape[0]
x_train_ravel = np.zeros(
    (ndata_train, 69, block_size)
)  # 69 possible 4-in-a-row combinations per board, each flattened to length 4


# Function to extract all winning combinations from the board
def extract_combinations(board):
    combinations = []

    # Horizontal combinations
    for row in range(n):
        for col in range(m - 3):  # can extract 4 combinations horizontally
            combinations.append(board[row, col : col + 4])

    # Vertical combinations
    for col in range(m):
        for row in range(n - 3):  # can extract 4 combinations vertically
            combinations.append(board[row : row + 4, col])

    # Diagonal combinations (top-left to bottom-right)
    for row in range(n - 3):
        for col in range(
            m - 3
        ):  # can extract 4 combinations diagonally (top-left to bottom-right)
            combinations.append([board[row + i, col + i] for i in range(4)])

    # Diagonal combinations (top-right to bottom-left)
    for row in range(n - 3):
        for col in range(
            3, m
        ):  # can extract 4 combinations diagonally (top-right to bottom-left)
            combinations.append([board[row + i, col - i] for i in range(4)])

    return combinations


# Process each image/board and extract all winning combinations
for img in range(ndata_train):
    # Extract the board data for both players (using both channels)
    player1_board = X_train[img, :, :, 0]  # Board data for player 1
    player2_board = X_train[img, :, :, 1]  # Board data for player 2

    # Extract all winning combinations for player 1 and player 2
    combinations_player1 = extract_combinations(player1_board)
    combinations_player2 = extract_combinations(player2_board)

    # Combine player 1 and player 2 information into a single 8-length vector (4 positions x 2 players)
    for ind in range(69):  # 69 possible combinations
        combined_combination = []
        for i in range(4):  # 4 positions in the combination
            combined_combination.append(combinations_player1[ind][i])  # Player 1
            combined_combination.append(combinations_player2[ind][i])  # Player 2

        # Flatten and store in x_train_ravel
        x_train_ravel[img, ind, :] = np.array(combined_combination).ravel()

# Now x_train_ravel has the shape (ndata_train, 69, 4) as requested
x_train_ravel.shape

(367633, 69, 8)

In [72]:
import numpy as np

# Initialize example board size
n, m = 6, 7  # Rows and columns for Connect Four
block_size = 8  # Length of a winning combination (4-in-a-row)

# Initialize x_train_ravel with the correct shape
ndata_test = X_test.shape[0]
x_test_ravel = np.zeros(
    (ndata_test, 69, block_size)
)  # 69 possible 4-in-a-row combinations per board, each flattened to length 4


# Function to extract all winning combinations from the board
def extract_combinations(board):
    combinations = []

    # Horizontal combinations
    for row in range(n):
        for col in range(m - 3):  # can extract 4 combinations horizontally
            combinations.append(board[row, col : col + 4])

    # Vertical combinations
    for col in range(m):
        for row in range(n - 3):  # can extract 4 combinations vertically
            combinations.append(board[row : row + 4, col])

    # Diagonal combinations (top-left to bottom-right)
    for row in range(n - 3):
        for col in range(
            m - 3
        ):  # can extract 4 combinations diagonally (top-left to bottom-right)
            combinations.append([board[row + i, col + i] for i in range(4)])

    # Diagonal combinations (top-right to bottom-left)
    for row in range(n - 3):
        for col in range(
            3, m
        ):  # can extract 4 combinations diagonally (top-right to bottom-left)
            combinations.append([board[row + i, col - i] for i in range(4)])

    return combinations


# Process each image/board and extract all winning combinations
for img in range(ndata_test):
    # Extract the board data for both players (using both channels)
    player1_board = X_test[img, :, :, 0]  # Board data for player 1
    player2_board = X_test[img, :, :, 1]  # Board data for player 2

    # Extract all winning combinations for player 1 and player 2
    combinations_player1 = extract_combinations(player1_board)
    combinations_player2 = extract_combinations(player2_board)

    # Combine player 1 and player 2 information into a single 8-length vector (4 positions x 2 players)
    for ind in range(69):  # 69 possible combinations
        combined_combination = []
        for i in range(4):  # 4 positions in the combination
            combined_combination.append(combinations_player1[ind][i])  # Player 1
            combined_combination.append(combinations_player2[ind][i])  # Player 2

        # Flatten and store in x_train_ravel
        x_test_ravel[img, ind, :] = np.array(combined_combination).ravel()

# Now x_train_ravel has the shape (ndata_train, 69, 4) as requested
x_test_ravel.shape

(78779, 69, 8)

### Train 

In [50]:
trans.fit(x_train_ravel, y_train, epochs=3, batch_size=40, validation_split=0.15)

Epoch 1/3
[1m9000/9000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2158s[0m 239ms/step - accuracy: 0.2947 - loss: 1.7427 - val_accuracy: 0.4103 - val_loss: 1.4848
Epoch 2/3
[1m9000/9000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2203s[0m 245ms/step - accuracy: 0.4237 - loss: 1.4583 - val_accuracy: 0.4442 - val_loss: 1.4007
Epoch 3/3
[1m9000/9000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2200s[0m 244ms/step - accuracy: 0.4522 - loss: 1.3849 - val_accuracy: 0.4596 - val_loss: 1.3659


<keras.src.callbacks.history.History at 0x39edef200>

In [128]:
out = trans.evaluate(x_test_ravel, y_test) # Dans 

In [None]:
out[1] 


[1m3782/3782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 10ms/step - accuracy: 0.2225 - loss: 1.8723
Test Accuracy: 0.2249


In [144]:
# trans.save("connect4_transformer.h5") Dans
trans.save("connect4_transformer_test.h5")



