In [None]:
!pip install keras_tuner
import os
import shutil
import tensorflow as tf
import keras_tuner as kt
import numpy as np
from tensorflow.keras.layers import Dense, LayerNormalization, MultiHeadAttention, Dropout, Add, Concatenate, GlobalAveragePooling1D, Embedding
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from sklearn.model_selection import train_test_split



In [None]:
import zipfile

In [None]:
##############################################################################
# ✅ 1) Upload and Extract Dataset
##############################################################################

from google.colab import files

# Upload the zipped file
uploaded = files.upload()  # This will prompt you to upload your zip file

zip_path = "/content/final_deduplicated_X.zip"  # Ensure correct file name
extract_path = "/content/"

# Extract the file
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

# Load the unzipped `.npy` file
full_comb_X = np.load("/content/final_deduplicated_X.npy")
full_comb_Y = np.load("/content/final_deduplicated_Y.npy")  # Ensure labels are also available

print("Shape of full_comb_X:", full_comb_X.shape)
print("Shape of full_comb_Y:", full_comb_Y.shape)

# Optional: Delete the zip file to free space
os.remove(zip_path)


Shape of full_comb_X: (2234188, 6, 7, 2)
Shape of full_comb_Y: (2234188,)


In [None]:
##############################################################################
# 1) Load Data
##############################################################################
full_comb_X = np.load("/content/final_deduplicated_X.npy")  # shape (N,6,7,2)
full_comb_Y = np.load("/content/final_deduplicated_Y.npy")  # shape (N,)

print("Dataset Loaded:", full_comb_X.shape, full_comb_Y.shape)


Dataset Loaded: (2234188, 6, 7, 2) (2234188,)


In [None]:
##############################################################################
# 2) Patch Extraction (3×4) with stride (3×3)
##############################################################################
def extract_patches(inputs, patch_size=(3,4), stride=(3,3)):
    """
    Convert each (6,7,2) board into (num_patches, patch_dim)
    => specifically (4,24) with these sizes.
    """
    batch_size, height, width, channels = inputs.shape

    num_patches_h = (height - patch_size[0]) // stride[0] + 1
    num_patches_w = (width - patch_size[1]) // stride[1] + 1
    num_patches = num_patches_h * num_patches_w
    patch_dim = patch_size[0] * patch_size[1] * channels

    patches_tensor = tf.image.extract_patches(
        images=inputs,
        sizes=[1, patch_size[0], patch_size[1], 1],
        strides=[1, stride[0], stride[1], 1],
        rates=[1,1,1,1],
        padding='VALID'
    )
    # shape => (batch_size, num_patches_h, num_patches_w, patch_dim)

    patches = tf.reshape(
        patches_tensor,
        [batch_size, num_patches, patch_dim]
    )
    return patches.numpy()

# Create (N,4,24)
full_comb_X_patches = extract_patches(full_comb_X)
print("Patches Shape:", full_comb_X_patches.shape)
# Expect (N,4,24)


Patches Shape: (2234188, 4, 24)


In [None]:
##############################################################################
# 3) Subset for Hyperparameter Tuning
##############################################################################
X_train, X_val, y_train, y_val = train_test_split(
    full_comb_X_patches,
    full_comb_Y,
    test_size=0.15,
    random_state=42
)

subset_size = 300_000
if X_train.shape[0] < subset_size:
    subset_size = X_train.shape[0]  # if dataset smaller than 300k, use all

indices = np.random.choice(X_train.shape[0], subset_size, replace=False)
X_tune_train, y_tune_train = X_train[indices], y_train[indices]

# Another split for tuning validation
X_tune_train, X_tune_val, y_tune_train, y_tune_val = train_test_split(
    X_tune_train, y_tune_train,
    test_size=0.15,
    shuffle=True,
    random_state=42
)

print("Tuning Data Shapes:",
      X_tune_train.shape, y_tune_train.shape,
      X_tune_val.shape, y_tune_val.shape)

Tuning Data Shapes: (255000, 4, 24) (255000,) (45000, 4, 24) (45000,)


In [None]:
##############################################################################
# 4) Custom CLS Token & Positional Embedding
##############################################################################
class CLSTokenLayer(tf.keras.layers.Layer):
    def __init__(self, hidden_dim):
        super().__init__()
        # A learned parameter: shape (1,1,hidden_dim)
        self.cls_token = self.add_weight(
            shape=(1,1,hidden_dim),
            initializer="random_normal",
            trainable=True
        )

    def call(self, x):
        # x shape: (batch, num_patches, hidden_dim)
        batch_size = tf.shape(x)[0]
        # Tile the single (1,1,hidden_dim) to (batch,1,hidden_dim)
        cls_tkn = tf.tile(self.cls_token, [batch_size,1,1])
        # Concat along axis=1 => new shape (batch, num_patches+1, hidden_dim)
        return tf.concat([cls_tkn, x], axis=1)

class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, num_patches, hidden_dim):
        super().__init__()
        # A learned param for each patch plus the CLS token => (num_patches+1)
        self.pos_embed = self.add_weight(
            shape=(1, num_patches+1, hidden_dim),
            initializer="random_normal",
            trainable=True
        )

    def call(self, x):
        # x shape: (batch, num_patches+1, hidden_dim)
        return x + self.pos_embed


In [None]:
##############################################################################
# 5) Build the Transformer
##############################################################################
def build_ViT(hp):
    """
    We'll do a single-branch Vision Transformer with:
     - input shape => (4,24) => 4 patches, each 24 features
     - CLS token
     - positional embedding
     - multi-head attention blocks
     - final softmax with 7 classes
    We'll do hyperparameter search for hidden_dim, num_layers, num_heads, dropout, etc.
    """
    num_patches = full_comb_X_patches.shape[1]   # 4
    patch_dim   = full_comb_X_patches.shape[2]   # 24
    num_classes = 7  # final classification

    hidden_dim = hp.Int("hidden_dim", min_value=128, max_value=384, step=64)
    num_layers = hp.Int("num_layers", min_value=4, max_value=8, step=1)
    num_heads  = hp.Int("num_heads", min_value=2, max_value=8, step=2)
    dropout_rate = hp.Choice("dropout_rate", [0.0, 0.1, 0.2])
    learning_rate= hp.Choice("learning_rate", [1e-4, 3e-4, 1e-3])

    mlp_dim = hidden_dim * 2

    # Input => (batch,4,24)
    inp = tf.keras.Input(shape=(num_patches, patch_dim))
    # 1) Map patch_dim -> hidden_dim
    x = tf.keras.layers.Dense(hidden_dim)(inp)  # => (batch,4,hidden_dim)

    # 2) Insert CLS token, add positional embedding
    x = CLSTokenLayer(hidden_dim)(x)            # => (batch,5,hidden_dim)
    x = PositionalEncoding(num_patches, hidden_dim)(x)

    # 3) Transformer blocks
    for _ in range(num_layers):
        # a) MHA
        ln1 = tf.keras.layers.LayerNormalization()(x)
        attn_out = tf.keras.layers.MultiHeadAttention(
            num_heads=num_heads,
            key_dim=hidden_dim // num_heads,
            dropout=dropout_rate
        )(ln1, ln1, ln1)
        x = x + attn_out  # residual
        x = tf.keras.layers.LayerNormalization()(x)

        # b) MLP
        mlp_hid = tf.keras.layers.Dense(mlp_dim, activation="gelu")(x)
        mlp_hid = tf.keras.layers.Dropout(dropout_rate)(mlp_hid)
        mlp_out = tf.keras.layers.Dense(hidden_dim)(mlp_hid)
        mlp_out = tf.keras.layers.Dropout(dropout_rate)(mlp_out)
        x = x + mlp_out  # residual
        x = tf.keras.layers.LayerNormalization()(x)

    # 4) Classification from [CLS] => x[:,0,:]
    cls_token = x[:,0,:]  # shape => (batch, hidden_dim)
    cls_token = tf.keras.layers.LayerNormalization()(cls_token)
    out = tf.keras.layers.Dense(num_classes, activation="softmax")(cls_token)

    model = tf.keras.Model(inp, out)

    # 5) Compile with Adam, or AdamW, or Cosine Decay
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    model.compile(
        optimizer=optimizer,
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model

In [None]:
##############################################################################
# 6) Hyperband Tuner
##############################################################################
tuner_dir = "/content/vit_tuner"
if os.path.exists(tuner_dir):
    shutil.rmtree(tuner_dir)

tuner = kt.Hyperband(
    build_ViT,
    objective="val_accuracy",
    max_epochs=10,
    factor=3,
    directory=tuner_dir,
    project_name="patch_transformer"
)

checkpoint_path = "/content/best_vit_patches.keras"
early_stop = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss",
    patience=2,
    restore_best_weights=True
)
model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path,
    monitor='val_loss',
    mode='min',
    save_best_only=True,
    verbose=1
)

tuner.search(
    X_tune_train, y_tune_train,
    validation_data=(X_tune_val, y_tune_val),
    epochs=10,
    batch_size=64,
    callbacks=[early_stop, model_checkpoint],
    shuffle=True
)

best_hps = tuner.get_best_hyperparameters(1)[0]
print("Best HPs from Tuning:")
print(" - hidden_dim   =", best_hps.get('hidden_dim'))
print(" - num_layers   =", best_hps.get('num_layers'))
print(" - num_heads    =", best_hps.get('num_heads'))
print(" - dropout_rate =", best_hps.get('dropout_rate'))
print(" - learning_rate=", best_hps.get('learning_rate'))

Trial 30 Complete [00h 13m 32s]
val_accuracy: 0.3565555512905121

Best val_accuracy So Far: 0.39464443922042847
Total elapsed time: 02h 24m 26s
Best HPs from Tuning:
 - hidden_dim   = 384
 - num_layers   = 6
 - num_heads    = 2
 - dropout_rate = 0.1
 - learning_rate= 0.0001


In [None]:
##############################################################################
# 7) Final Training on Full Data
##############################################################################
best_model = tuner.hypermodel.build(best_hps)

# Use the entire dataset => (N,4,24)
X_train_full, X_val_full, y_train_full, y_val_full = train_test_split(
    full_comb_X_patches,
    full_comb_Y,
    test_size=0.15,
    shuffle=True,
    random_state=42
)

checkpoint_path_final = "/content/best_vit_patches_final.keras"
final_ckpt = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path_final,
    monitor='val_loss',
    mode='min',
    save_best_only=True,
    verbose=1
)
final_early_stop = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True
)

history = best_model.fit(
    X_train_full, y_train_full,
    validation_data=(X_val_full, y_val_full),
    epochs=30,
    batch_size=64,
    shuffle=True,
    callbacks=[final_ckpt, final_early_stop]
)

val_loss, val_acc = best_model.evaluate(X_val_full, y_val_full)
print(f"Final Val Accuracy: {val_acc:.4f}")

files.download(checkpoint_path_final)

Epoch 1/30
[1m29673/29673[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step - accuracy: 0.2994 - loss: 1.7860
Epoch 1: val_loss improved from inf to 1.65332, saving model to /content/best_vit_patches_final.keras
[1m29673/29673[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m825s[0m 26ms/step - accuracy: 0.2994 - loss: 1.7860 - val_accuracy: 0.3838 - val_loss: 1.6533
Epoch 2/30
[1m29673/29673[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step - accuracy: 0.3861 - loss: 1.6468
Epoch 2: val_loss improved from 1.65332 to 1.58782, saving model to /content/best_vit_patches_final.keras
[1m29673/29673[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m744s[0m 25ms/step - accuracy: 0.3861 - loss: 1.6468 - val_accuracy: 0.4203 - val_loss: 1.5878
Epoch 3/30
[1m29673/29673[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step - accuracy: 0.4196 - loss: 1.5923
Epoch 3: val_loss improved from 1.58782 to 1.54999, saving model to /content/best_vit_patches_final.k

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>