In [1]:
import numpy as np
import tensorflow as tf # For Keras layers and comparison

# Your custom layers
from sequential import Sequential # Adjust path if needed
from layers.embedding import Embedding
from layers.lstm import LSTM
from layers.dense import Dense
from layers.bidirectional import Bidirectional
# from src.layers.text_vectorization import TextVectorizationWrapper # If testing this too
# from src.layers.dropout import Dropout # If testing this too

In [2]:
VOCAB_SIZE = 100       # Small vocabulary for testing Embedding
EMBEDDING_DIM = 8      # Small embedding dimension
SEQUENCE_LENGTH = 5    # Short sequences
BATCH_SIZE = 2         # Small batch size
LSTM_UNITS = 16        # Small number of LSTM units
DENSE_UNITS = 3        # For a final Dense layer
INPUT_FEATURES_FOR_DENSE = LSTM_UNITS # Or LSTM_UNITS * 2 if from Bidirectional non-sequence

In [3]:
def compare_outputs(keras_out, custom_out, layer_name):
    print(f"\n--- Comparing: {layer_name} ---")
    print(f"Keras Output Shape: {keras_out.shape}")
    print(f"Custom Output Shape: {custom_out.shape}")
    if keras_out.shape != custom_out.shape:
        print("❌ SHAPES MISMATCH!")
        return False

    if np.allclose(keras_out, custom_out, atol=1e-5): # Adjust tolerance
        print("✅ Outputs MATCH.")
        return True
    else:
        print("❌ Outputs MISMATCH!")
        print("   Keras output:", keras_out.flatten())
        print("   Custom output:", custom_out.flatten())
        diff = np.abs(keras_out - custom_out)
        print(f"   Max difference: {np.max(diff):.2e} at index {np.unravel_index(np.argmax(diff), diff.shape)}")
        return False

In [4]:
print("\n\n=== TESTING EMBEDDING LAYER ===")
# Sample input: batch of tokenized integer sequences
sample_input_tokens = np.random.randint(0, VOCAB_SIZE, size=(BATCH_SIZE, SEQUENCE_LENGTH))
print("Sample Input Tokens:\n", sample_input_tokens)

keras_input_emb_mask = tf.keras.layers.Input(shape=(SEQUENCE_LENGTH,), dtype='int32', name="input_emb_mask")
keras_embedding_layer_mask = tf.keras.layers.Embedding(
    input_dim=VOCAB_SIZE,
    output_dim=EMBEDDING_DIM,
    mask_zero=True,  # Set mask_zero to True
    name="keras_embedding_mask_true"
)
keras_output_emb_mask = keras_embedding_layer_mask(keras_input_emb_mask)
keras_emb_model_mask = tf.keras.Model(inputs=keras_input_emb_mask, outputs=keras_output_emb_mask)

# c. Weight Synchronization (Keras -> Custom)
# Important: Get weights *after* the layer is built (implicitly by creating the model)
keras_embedding_weights_mask = keras_embedding_layer_mask.get_weights()
# keras_embedding_weights_mask is a list: [embedding_matrix]
# For mask_zero=True, Keras's get_weights() for Embedding will still return the
# potentially non-zero weights for the 0-th index. The masking is a computational step.

sample_input_tokens_with_zeros = np.random.randint(1, VOCAB_SIZE, size=(BATCH_SIZE, SEQUENCE_LENGTH)) # Start with non-zeros
sample_input_tokens_with_zeros[0, 1] = 0  # Introduce a zero in the first sample
sample_input_tokens_with_zeros[0, 3] = 0  # Another zero
sample_input_tokens_with_zeros[1, 0] = 0

# b. Custom Model with mask_zero=True
custom_embedding_layer_test_mask = Embedding(
    input_dim=VOCAB_SIZE,
    output_dim=EMBEDDING_DIM,
    mask_zero=True  # Set mask_zero to True for your custom layer
)
# Your load_keras_weights method should handle the mask_zero logic
# (i.e., set the 0-th row of its internal weights to zeros)
custom_embedding_layer_test_mask.load_keras_weights(keras_embedding_weights_mask)
custom_emb_model_mask = Sequential([custom_embedding_layer_test_mask])

# d. Forward Pass
keras_pred_emb_mask = keras_emb_model_mask.predict(sample_input_tokens_with_zeros, verbose=0)
custom_pred_emb_mask = custom_emb_model_mask.predict(sample_input_tokens_with_zeros)

# e. Compare
compare_outputs(keras_pred_emb_mask, custom_pred_emb_mask, "Embedding Layer (mask_zero=True)")

# a. Keras Model
keras_input_emb = tf.keras.layers.Input(shape=(SEQUENCE_LENGTH,), dtype='int32', name="input_emb")
keras_embedding_layer = tf.keras.layers.Embedding(
    input_dim=VOCAB_SIZE, 
    output_dim=EMBEDDING_DIM, 
    mask_zero=False, # Test both True and False scenarios eventually
    name="keras_embedding"
)
keras_output_emb = keras_embedding_layer(keras_input_emb)
keras_emb_model = tf.keras.Model(inputs=keras_input_emb, outputs=keras_output_emb)

# c. Weight Synchronization (Keras -> Custom)
keras_embedding_weights = keras_embedding_layer.get_weights() 
# keras_embedding_weights is a list: [embedding_matrix]

# b. Custom Model
custom_embedding_layer_test = Embedding(
    input_dim=VOCAB_SIZE, 
    output_dim=EMBEDDING_DIM, 
    mask_zero=False # Match Keras setting
)
custom_embedding_layer_test.load_keras_weights(keras_embedding_weights) # Load weights from Keras layer
custom_emb_model = Sequential([custom_embedding_layer_test])

# d. Forward Pass
keras_pred_emb = keras_emb_model.predict(sample_input_tokens, verbose=0)
custom_pred_emb = custom_emb_model.predict(sample_input_tokens)

# e. Compare
compare_outputs(keras_pred_emb, custom_pred_emb, "Embedding Layer")



=== TESTING EMBEDDING LAYER ===
Sample Input Tokens:
 [[11 69 62 19 19]
 [88 38 47 99 92]]

--- Comparing: Embedding Layer (mask_zero=True) ---
Keras Output Shape: (2, 5, 8)
Custom Output Shape: (2, 5, 8)
❌ Outputs MISMATCH!
   Keras output: [ 0.02268467  0.01151795 -0.00092236  0.03119672  0.03414085 -0.01620958
  0.0031811   0.04107949 -0.03582083 -0.0316715  -0.00423247  0.01530648
  0.04588783  0.03455812 -0.01255903 -0.03345547  0.03453484  0.02269815
  0.01000663  0.0467765  -0.01871107  0.01247843  0.04292778  0.04911131
 -0.03582083 -0.0316715  -0.00423247  0.01530648  0.04588783  0.03455812
 -0.01255903 -0.03345547  0.01145571  0.00808764  0.02318955  0.0172548
 -0.04399084 -0.02273153 -0.02674059 -0.01996217 -0.03582083 -0.0316715
 -0.00423247  0.01530648  0.04588783  0.03455812 -0.01255903 -0.03345547
  0.02585098  0.01948514  0.01391545  0.01730097 -0.02558062  0.01476032
  0.03769923  0.01493018  0.03031636 -0.00662817  0.03439753  0.01058507
  0.04843768 -0.0160344  -0.

True

In [5]:
print("\n\n=== TESTING DENSE LAYER ===")
# Sample input: batch of flat vectors (e.g., output of an LSTM with return_sequences=False)
sample_input_flat_vectors = np.random.rand(BATCH_SIZE, LSTM_UNITS).astype(np.float32)

# a. Keras Model
keras_input_dense = tf.keras.layers.Input(shape=(LSTM_UNITS,), name="input_dense")
keras_dense_layer = tf.keras.layers.Dense(DENSE_UNITS, activation='softmax', name="keras_dense")
keras_output_dense = keras_dense_layer(keras_input_dense)
keras_dense_model = tf.keras.Model(inputs=keras_input_dense, outputs=keras_output_dense)

# c. Weight Synchronization
keras_dense_weights = keras_dense_layer.get_weights()
# keras_dense_weights is [kernel, bias]

# b. Custom Model
custom_dense_layer_test = Dense(units=DENSE_UNITS, activation='softmax')
custom_dense_layer_test.load_keras_weights(keras_dense_weights)
custom_dense_model = Sequential([custom_dense_layer_test])

# d. Forward Pass
keras_pred_dense = keras_dense_model.predict(sample_input_flat_vectors, verbose=0)
custom_pred_dense = custom_dense_model.predict(sample_input_flat_vectors)

# e. Compare
compare_outputs(keras_pred_dense, custom_pred_dense, "Dense Layer (activation='softmax')")



=== TESTING DENSE LAYER ===

--- Comparing: Dense Layer (activation='softmax') ---
Keras Output Shape: (2, 3)
Custom Output Shape: (2, 3)
✅ Outputs MATCH.


True

In [6]:
print("\n\n=== TESTING LSTM LAYER (Unidirectional) ===")
# Sample input: batch of sequences of embedding vectors
sample_input_vectors = np.random.rand(BATCH_SIZE, SEQUENCE_LENGTH, EMBEDDING_DIM).astype(np.float32)

# --- Test with return_sequences=True ---
print("\nTesting LSTM with return_sequences=True...")
# a. Keras Model
keras_input_lstm_seq = tf.keras.layers.Input(shape=(SEQUENCE_LENGTH, EMBEDDING_DIM), name="input_lstm_seq")
keras_lstm_layer_seq = tf.keras.layers.LSTM(LSTM_UNITS, return_sequences=True, name="keras_lstm_seq")
keras_output_lstm_seq = keras_lstm_layer_seq(keras_input_lstm_seq)
keras_lstm_model_seq = tf.keras.Model(inputs=keras_input_lstm_seq, outputs=keras_output_lstm_seq)

# c. Weight Synchronization
keras_lstm_weights = keras_lstm_layer_seq.get_weights() 
# keras_lstm_weights is [kernel, recurrent_kernel, bias]

# b. Custom Model
custom_lstm_layer_test_seq = LSTM(units=LSTM_UNITS, return_sequences=True)
custom_lstm_layer_test_seq.load_keras_weights(keras_lstm_weights)
custom_lstm_model_seq = Sequential([custom_lstm_layer_test_seq])

# d. Forward Pass
keras_pred_lstm_seq = keras_lstm_model_seq.predict(sample_input_vectors, verbose=0)
custom_pred_lstm_seq = custom_lstm_model_seq.predict(sample_input_vectors)

# e. Compare
compare_outputs(keras_pred_lstm_seq, custom_pred_lstm_seq, "LSTM Layer (return_sequences=True)")

# --- Test with return_sequences=False ---
print("\nTesting LSTM with return_sequences=False...")
keras_input_lstm_final = tf.keras.layers.Input(shape=(SEQUENCE_LENGTH, EMBEDDING_DIM), name="input_lstm_final")
keras_lstm_layer_final = tf.keras.layers.LSTM(LSTM_UNITS, return_sequences=False, name="keras_lstm_final") # Default
keras_output_lstm_final = keras_lstm_layer_final(keras_input_lstm_final)
keras_lstm_model_final = tf.keras.Model(inputs=keras_input_lstm_final, outputs=keras_output_lstm_final)

keras_lstm_weights_final = keras_lstm_layer_final.get_weights()

custom_lstm_layer_test_final = LSTM(units=LSTM_UNITS, return_sequences=False)
custom_lstm_layer_test_final.load_keras_weights(keras_lstm_weights_final)
custom_lstm_model_final = Sequential([custom_lstm_layer_test_final])

keras_pred_lstm_final = keras_lstm_model_final.predict(sample_input_vectors, verbose=0)
custom_pred_lstm_final = custom_lstm_model_final.predict(sample_input_vectors)
compare_outputs(keras_pred_lstm_final, custom_pred_lstm_final, "LSTM Layer (return_sequences=False)")



=== TESTING LSTM LAYER (Unidirectional) ===

Testing LSTM with return_sequences=True...

--- Comparing: LSTM Layer (return_sequences=True) ---
Keras Output Shape: (2, 5, 16)
Custom Output Shape: (2, 5, 16)
✅ Outputs MATCH.

Testing LSTM with return_sequences=False...

--- Comparing: LSTM Layer (return_sequences=False) ---
Keras Output Shape: (2, 16)
Custom Output Shape: (2, 16)
✅ Outputs MATCH.


True

In [7]:
print("\n\n=== TESTING BIDIRECTIONAL(LSTM) LAYER ===")
# Sample input: batch of sequences of embedding vectors
sample_input_vectors_bidi = np.random.rand(BATCH_SIZE, SEQUENCE_LENGTH, EMBEDDING_DIM).astype(np.float32)

# --- Test with return_sequences=True ---
print("\nTesting Bidirectional(LSTM) with return_sequences=True...")
# a. Keras Model
keras_input_bidi_seq = tf.keras.layers.Input(shape=(SEQUENCE_LENGTH, EMBEDDING_DIM), name="input_bidi_seq")
keras_bidi_layer_seq = tf.keras.layers.Bidirectional(
    tf.keras.layers.LSTM(LSTM_UNITS, return_sequences=True), name="keras_bidi_seq"
)
keras_output_bidi_seq = keras_bidi_layer_seq(keras_input_bidi_seq)
keras_bidi_model_seq = tf.keras.Model(inputs=keras_input_bidi_seq, outputs=keras_output_bidi_seq)

# c. Weight Synchronization
keras_bidi_weights = keras_bidi_layer_seq.get_weights()
# keras_bidi_weights is [fwd_kernel, fwd_recurrent_kernel, fwd_bias, bwd_kernel, bwd_recurrent_kernel, bwd_bias]

# b. Custom Model
# NOTE: Your Bidirectional takes an *instance* of your custom LSTM
custom_bidi_lstm_inner_seq = LSTM(units=LSTM_UNITS, return_sequences=True)
custom_bidi_layer_test_seq = Bidirectional(layer=custom_bidi_lstm_inner_seq, merge_mode="concat")
custom_bidi_layer_test_seq.load_keras_weights(keras_bidi_weights)
custom_bidi_model_seq = Sequential([custom_bidi_layer_test_seq])

# d. Forward Pass
keras_pred_bidi_seq = keras_bidi_model_seq.predict(sample_input_vectors_bidi, verbose=0)
custom_pred_bidi_seq = custom_bidi_model_seq.predict(sample_input_vectors_bidi)

# e. Compare
compare_outputs(keras_pred_bidi_seq, custom_pred_bidi_seq, "Bidirectional(LSTM) (return_sequences=True)")

# Extract what Keras's forward LSTM part produced (assuming LSTM_UNITS is the units of the inner LSTM)
keras_y_forward_component = keras_pred_bidi_seq[:, :, :LSTM_UNITS] # Or :keras_bidi_layer_seq.forward_layer.units
custom_y_forward_component = custom_bidi_layer_test_seq.forward_layer.forward(sample_input_vectors_bidi)

# Now compare these two:
compare_outputs(keras_y_forward_component, custom_y_forward_component, "Bidirectional's Forward LSTM Component")

LSTM_UNITS_FOR_THIS_TEST = keras_bidi_layer_seq.forward_layer.units # Get units from the Keras layer
keras_y_backward_component = keras_pred_bidi_seq[:, :, LSTM_UNITS_FOR_THIS_TEST:]

# Custom side - CORRECTED: No manual flipping needed
# The corrected LSTM implementation handles the output ordering internally
custom_y_backward_component = custom_bidi_layer_test_seq.backward_layer.forward(sample_input_vectors_bidi)

# Now compare these two - they should match directly
compare_outputs(keras_y_backward_component, custom_y_backward_component, "Bidirectional's Backward LSTM Component")

# --- Test with return_sequences=False ---
# (Similar structure as above, just set return_sequences=False in both Keras LSTM and custom LSTM)
print("\nTesting Bidirectional(LSTM) with return_sequences=False...")
keras_input_bidi_final = tf.keras.layers.Input(shape=(SEQUENCE_LENGTH, EMBEDDING_DIM), name="input_bidi_final")
keras_bidi_layer_final = tf.keras.layers.Bidirectional(
    tf.keras.layers.LSTM(LSTM_UNITS, return_sequences=False), name="keras_bidi_final"
)
keras_output_bidi_final = keras_bidi_layer_final(keras_input_bidi_final)
keras_bidi_model_final = tf.keras.Model(inputs=keras_input_bidi_final, outputs=keras_output_bidi_final)

keras_bidi_weights_final = keras_bidi_layer_final.get_weights()

custom_bidi_lstm_inner_final = LSTM(units=LSTM_UNITS, return_sequences=False)
custom_bidi_layer_test_final = Bidirectional(layer=custom_bidi_lstm_inner_final, merge_mode="concat")
custom_bidi_layer_test_final.load_keras_weights(keras_bidi_weights_final)
custom_bidi_model_final = Sequential([custom_bidi_layer_test_final])

keras_pred_bidi_final = keras_bidi_model_final.predict(sample_input_vectors_bidi, verbose=0)
custom_pred_bidi_final = custom_bidi_model_final.predict(sample_input_vectors_bidi)
compare_outputs(keras_pred_bidi_final, custom_pred_bidi_final, "Bidirectional(LSTM) (return_sequences=False)")



=== TESTING BIDIRECTIONAL(LSTM) LAYER ===

Testing Bidirectional(LSTM) with return_sequences=True...

--- Comparing: Bidirectional(LSTM) (return_sequences=True) ---
Keras Output Shape: (2, 5, 32)
Custom Output Shape: (2, 5, 32)
✅ Outputs MATCH.

--- Comparing: Bidirectional's Forward LSTM Component ---
Keras Output Shape: (2, 5, 16)
Custom Output Shape: (2, 5, 16)
✅ Outputs MATCH.

--- Comparing: Bidirectional's Backward LSTM Component ---
Keras Output Shape: (2, 5, 16)
Custom Output Shape: (2, 5, 16)
✅ Outputs MATCH.

Testing Bidirectional(LSTM) with return_sequences=False...

--- Comparing: Bidirectional(LSTM) (return_sequences=False) ---
Keras Output Shape: (2, 32)
Custom Output Shape: (2, 32)
✅ Outputs MATCH.


True