## Data Preprocessing Pipeline for Cleaning and Tokenizing

In [None]:
import os
import tensorflow as tf
import keras_nlp as keras_hub 
from tensorflow.keras import layers

# constants
SEQ_LEN = 128 # Maximum sequence length for tokenization
BATCH_SIZE = 32 # Batch size for training
AUTOTUNE = tf.data.AUTOTUNE # Auto-tune dataset performance

def load_and_clean_lines(file_path, min_words=3, max_words=250):
    """
    Load and clean lines from a given text file.

    Args:
        file_path (str): Path to the text file.
        min_words (int): Minimum number of words per line.
        max_words (int): Maximum number of words per line.

    Returns:
        list: A list of cleaned text lines.
    """
    with open(file_path, 'r', encoding='utf-8') as file:
        lines = file.readlines()

    return [
        line.strip()
        for line in lines
        if line.strip() and min_words < len(line.strip().split()) < max_words
    ]

def write_cleaned_lines(output_path, lines):
    """
    Write cleaned lines to a text file.

    Args:
        output_path (str): Destination file path.
        lines (list): List of cleaned strings.
    """
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write("\n".join(lines))


def build_tokenizer(vocab_path, seq_len=128):
    """
    Build WordPiece tokenizer and packing layer.

    Args:
        vocab_path (str): Path to vocabulary file.
        seq_len (int): Maximum sequence length.

    Returns:
        tokenizer: A WordPiece tokenizer.
        start_packer: A layer that adds a start token and pads/truncates to `seq_len`.
    """
    with open(vocab_path, "r", encoding="utf-8") as f:
        vocab = [line.strip().split("\t")[0] for line in f if line.strip()]

     # Add special tokens
    reserved_tokens = ["[PAD]", "[UNK]", "[BOS]"]
    vocab = reserved_tokens + vocab

    tokenizer = keras_hub.tokenizers.WordPieceTokenizer(
        vocabulary=vocab,
        sequence_length=seq_len,
        lowercase=False,
    )
    
    # Create a StartEndPacker layer to handle start token and padding
    start_packer = keras_hub.layers.StartEndPacker(
        sequence_length=seq_len,
        start_value=tokenizer.token_to_id("[BOS]"),
    )

    return tokenizer, start_packer

def preprocess_fn(text, tokenizer, start_packer):
    """
    Tokenizes and packs input text for training.

    Args:
        text (tf.Tensor): Raw text input.
        tokenizer: WordPiece tokenizer.
        start_packer: Layer to pack and add [BOS] token.

    Returns:
        tuple: (input_tensor, label_tensor)
    """
    tokens = tokenizer(text)
    inputs = start_packer(tokens)
    labels = tokens  # Model learns to predict next tokens
    return inputs, labels

def create_dataset(file_path, tokenizer, start_packer, is_training=False):
    """
    Create a tf.data.Dataset pipeline.

    Args:
        file_path (str): Path to the cleaned dataset file.
        tokenizer: Tokenizer instance.
        start_packer: Token packer layer.
        is_training (bool): Whether the dataset is used for training.

    Returns:
        tf.data.Dataset: Preprocessed batched dataset.
    """
    ds = tf.data.TextLineDataset(file_path) # Load text lines from file

    if is_training:
        ds = ds.cache().shuffle(10000) # Shuffle and cache dataset for training

    ds = (
        ds.map(lambda x: preprocess_fn(x, tokenizer, start_packer), num_parallel_calls=AUTOTUNE)
          .batch(BATCH_SIZE) # Batch the dataset
          .prefetch(AUTOTUNE) # Prefetch for performance
    )
    return ds

def main():
    # Base directory containing raw and vocab files
    raw_data_dir = "/content/simplebooks_data/simplebooks/simplebooks-92-raw"
    clean_data_dir = "/content/simplebooks_clean"
    os.makedirs(clean_data_dir, exist_ok=True)
    vocab_path: "/content/simplebooks_data/simplebooks/simplebooks-92/train.vocab"

    # Define file paths
    train_raw = os.path.join(raw_data_dir, "train.txt")
    valid_raw = os.path.join(raw_data_dir, "valid.txt")
    test_raw = os.path.join(raw_data_dir, "test.txt")

    # Cleaned file paths
    train_clean = os.path.join(clean_data_dir, "train_clean.txt")
    valid_clean = os.path.join(clean_data_dir, "valid_clean.txt")
    test_clean = os.path.join(clean_data_dir, "test_clean.txt")

    # Clean and save text
    write_cleaned_lines(train_clean, load_and_clean_lines(train_raw))
    write_cleaned_lines(valid_clean, load_and_clean_lines(valid_raw))
    write_cleaned_lines(test_clean, load_and_clean_lines(test_raw))

    # Build tokenizer and packer
    tokenizer, start_packer = build_tokenizer(vocab_path, seq_len=SEQ_LEN)

    # Create datasets
    train_ds = create_dataset(train_clean, tokenizer, start_packer, is_training=True)
    val_ds = create_dataset(valid_clean, tokenizer, start_packer)
    test_ds = create_dataset(test_clean, tokenizer, start_packer)

    return train_ds, val_ds, test_ds, tokenizer, start_packer 

# Run the preprocessing pipeline
if __name__ == "__main__":
    train_ds, val_ds, test_ds, tokenizer, start_packer = main()

## Position and Token Embeddings

In [None]:
from tensorflow import keras
from keras import ops
from keras import layers

class TokenAndPositionEmbedding(layers.Layer):
    """
    Combines token and positional embeddings.

    This layer learns:
    - An embedding vector for each token in the vocabulary.
    - An embedding vector for each position in the input sequence.
    
    The final embedding is a sum of the token embedding and the positional embedding.
    """
    def __init__(self, maxlen: int, vocab_size: int, embed_dim: int):
        """
        Initializes the token and position embedding layers.

        Args:
            maxlen (int): Maximum length of the input sequences.
            vocab_size (int): Size of the vocabulary.
            embed_dim (int): Dimension of the embedding vectors.
        """
        super().__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    
    def call(self, x):
        """
        Applies token and positional embeddings to the input.

        Args:
            x (tf.Tensor): Input tensor of shape (batch_size, sequence_length).

        Returns:
            tf.Tensor: Output tensor of shape (batch_size, sequence_length, embed_dim).
        """
        seq_len = ops.shape(x)[-1] # Get the sequence length from the input tensor shape

        # Create position indices [0, 1, 2, ..., sequence_length - 1]
        positions = ops.arange(0, seq_len)

        # Look up position embeddings
        position_embeddings = self.pos_emb(positions)

        # Look up token embeddings
        token_embeddings = self.token_emb(x)

        # Combine both
        return token_embeddings + position_embeddings

## Transformer Decoder Block

In [None]:
import tensorflow as tf
from tensorflow import keras
from keras import layers, ops

def causal_attention_mask(batch_size, n_dest, n_src, dtype):
    """
    Generates a causal attention mask to prevent attention to future tokens.

    This is used in decoder-only architectures like GPT, where tokens should
    only attend to previous or current positions (not future ones).

    Parameters:
    - batch_size (int or Tensor): Number of sequences in a batch.
    - n_dest (int): Number of destination positions (usually equal to sequence length).
    - n_src (int): Number of source positions (same as n_dest for self-attention).
    - dtype (tf.DType or str): The data type of the output mask, e.g., 'bool' or 'float32'.

    Returns:
    - tf.Tensor: A lower triangular mask of shape (batch_size, n_dest, n_src)
    """
    # Create destination and source position indices
    i = ops.arange(n_dest)[:, None]  # Shape: (n_dest, 1)
    j = ops.arange(n_src)            # Shape: (n_src,)

    # Compute lower triangular matrix (causal mask)
    mask_matrix = i >= j - n_src + n_dest
    mask = ops.cast(mask_matrix, dtype)  # Convert boolean mask to specified dtype

    # Reshape to add batch dimension
    mask = ops.reshape(mask, [1, n_dest, n_src])

    # Tile the mask to match the batch size
    mult = ops.concatenate([
        ops.expand_dims(batch_size, -1),  # Shape: [1]
        ops.convert_to_tensor([1, 1])     # Shape: [2]
    ], axis=0)
    
    return ops.tile(mask, mult)  # Final shape: (batch_size, n_dest, n_src)

class TransformerBlock(layers.Layer):
    """
    A single transformer decoder block implementing:
    - Causal self-attention (no lookahead)
    - Feedforward neural network (FFN)
    - Residual connections
    - Layer normalization
    - Dropout for regularization
    """
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, name=None):
        """
        Initializes the transformer block.

        Parameters:
        - embed_dim (int): Dimension of the token embeddings.
        - num_heads (int): Number of attention heads.
        - ff_dim (int): Hidden dimension of the feedforward network.
        - rate (float): Dropout rate.
        - name (str): Optional name for the layer.
        """
        super().__init__(name=name) # Initialize the base Layer class

        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential([
            layers.Dense(ff_dim, activation="relu"),  # Position-wise feedforward
            layers.Dense(embed_dim)
        ])
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training=False):
        """
        Executes the forward pass of the transformer block.

        Parameters:
        - inputs (tf.Tensor): Input tensor of shape (batch_size, seq_len, embed_dim)
        - training (bool): Whether the call is in training mode (enables dropout)

        Returns:
        - tf.Tensor: Output tensor of same shape as input
        """
        # Extract batch size and sequence length
        input_shape = ops.shape(inputs)
        batch_size = input_shape[0]
        seq_len = input_shape[1]

        # Generate causal mask to block attention to future tokens
        causal_mask = causal_attention_mask(
            batch_size=batch_size,
            n_dest=seq_len,
            n_src=seq_len,
            dtype="bool"
        )
        # Apply causal multi-head self-attention
        attention_output = self.att(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask,
            training=training
        )
        attention_output = self.dropout1(attention_output, training=training)
        out1 = self.layernorm1(inputs + attention_output)  # Residual + Norm

        # Feedforward network
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)  # Residual + Norm


# Hyperparameters

In [None]:
maxlen = 128
vocab_size = 98308
embed_dim = 256
num_heads = 4
ff_dim = 1024
num_layers = 2

## Transformer Decoder Model

In [None]:
from tensorflow import keras
from keras import layers, ops

def create_model(maxlen, vocab_size, embed_dim, num_heads, feed_forward_dim):
    """
    Builds and compiles a simple transformer-based language model.

    Args:
        maxlen (int): Maximum sequence length.
        vocab_size (int): Size of the vocabulary.
        embed_dim (int): Dimension of token and position embeddings.
        num_heads (int): Number of attention heads in the transformer block.
        feed_forward_dim (int): Dimension of the feed-forward network.

    Returns:
        keras.Model: Compiled Keras model ready for training.
    """
    # Input layer expecting integer token IDs
    inputs = layers.Input(shape=(maxlen,), dtype="int32", name="input_tokens")

    # Token and position embeddings
    embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    x = embedding_layer(inputs)

    # Transformer block with causal masking
    # Stack Transformer blocks dynamically
    for _ in range(num_layers):
        x = TransformerBlock(embed_dim, num_heads, ff_dim)(x)

    # Final dense layer maps to vocabulary size for language modeling
    logits = layers.Dense(vocab_size, name="output_logits")(x)

    # Define model with both logits and intermediate embeddings as output (for optional use)
    model = keras.Model(inputs=inputs, outputs=[logits, x], name="transformer_decoder")

    return model

## Seed

In [None]:
import os
import random
import numpy as np
import tensorflow as tf

def set_seed(seed: int = 42) -> None:
    """
    Set global random seed for Python, NumPy, and TensorFlow for reproducibility.

    Args:
        seed (int): The seed value to use for random number generators. Default is 42.

    Usage:
        >>> from utils.seed import set_seed
        >>> set_seed(123)
    """
    
    # Set Python built-in randomness seed
    os.environ["PYTHONHASHSEED"] = str(seed)
    random.seed(seed)

    # Set NumPy seed
    np.random.seed(seed)

    # Set TensorFlow randomness
    tf.random.set_seed(seed)

    # Keras-specific additional seed setting for deterministic initialization
    tf.keras.utils.set_random_seed(seed)

    # Enable full determinism in TensorFlow operations
    try:
        tf.config.experimental.enable_op_determinism()
    except AttributeError:
        print("[WARNING] `enable_op_determinism` not available in this TensorFlow version.")

    print(f"[INFO] Global seed set to {seed}")

## Logging Callbacks

In [None]:
import os
from datetime import datetime
import tensorflow as tf


def get_callbacks(
    base_dir: str = "experiments",
    monitor: str = "val_loss",
    model_name: str = "transformer_decoder_model"
) -> list:
    """
    Creates standard Keras callbacks for training monitoring, checkpointing, and early stopping.

    Args:
        base_dir (str): Directory where experiment logs and checkpoints are stored.
        monitor (str): Metric to monitor for checkpointing, LR reduction, and early stopping.
        model_name (str): Name of the model used in checkpoint filename.

    Returns:
        list: A list of tf.keras.callbacks.Callback instances.
    """
    # Timestamp for experiment versioning
    timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
    experiment_dir = os.path.join(base_dir, timestamp)

    # Paths
    log_dir = os.path.join(experiment_dir, "logs")
    ckpt_path = os.path.join(experiment_dir, "checkpoints", f"best_{model_name}.keras")
    csv_log_path = os.path.join(experiment_dir, "metrics.csv")

    # Ensure directories exist
    os.makedirs(os.path.dirname(ckpt_path), exist_ok=True)
    os.makedirs(log_dir, exist_ok=True)

    # Create callbacks
    return [
        tf.keras.callbacks.TensorBoard(log_dir=log_dir),
        tf.keras.callbacks.ModelCheckpoint(
            filepath=ckpt_path,
            monitor=monitor,
            save_best_only=True,
            save_weights_only=False,
            verbose=1
        ),
        tf.keras.callbacks.CSVLogger(
            filename=csv_log_path,
            append=False
        ),
        tf.keras.callbacks.ReduceLROnPlateau(
            monitor=monitor,
            factor=0.1,
            patience=3,
            verbose=1
        ),
        tf.keras.callbacks.EarlyStopping(
            monitor=monitor,
            patience=6,
            restore_best_weights=True,
            verbose=1
        )
    ]

## Metrics

In [None]:
from keras_nlp.metrics import Perplexity
from keras.metrics import SparseCategoricalAccuracy

def get_metrics(mask_token_id=0):
    """
    Returns standard evaluation metrics for language modeling.

    Args:
        mask_token_id (int): Token ID to ignore during perplexity calculation.

    Returns:
        list: List of compiled metrics.
    """
    return [
        Perplexity(from_logits=True, mask_token_id=mask_token_id),
        SparseCategoricalAccuracy(name="accuracy"),
    ]


## Trainer Class

In [None]:
import tensorflow as tf
import keras

class Trainer:
    def __init__(
        self,
        model_fn: callable,
        train_ds: tf.data.Dataset,
        val_ds: tf.data.Dataset,
    ):
        """
        Initialize the Trainer.

        Args:
            model_fn (Callable): Function that returns a Keras model instance.
            train_ds (tf.data.Dataset): Prepared training dataset.
            val_ds (tf.data.Dataset): Prepared validation dataset.
            config (dict): Dictionary loaded from YAML config file.
        """
        self.config = config
        self.model = model_fn()           # Build the model
        self._compile_model()             # Compile with optimizer, loss, metrics
        self.train_ds = train_ds
        self.val_ds = val_ds
        self.callbacks = get_callbacks(   # Initialize callbacks
            monitor="val_loss")

    def _compile_model(self):
        """
        Compile the model using settings from the configuration.
        Supports Adam and SGD optimizers with optional weight decay.
        """
        loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
        optimizer = tf.keras.optimizers.Adam(
                learning_rate=0.01,
                beta_1=0.9,
                beta_2=0.98,
                weight_decay=0.01
                )

        # Compile model
        self.model.compile(
            optimizer=optimizer,
            loss=[loss_fn, None],
            metrics=get_metrics()
        )

    def train(self) -> tf.keras.Model:
        """
        Execute the model training loop.

        Returns:
            Trained Keras model.
        """
        self.model.fit(
            self.train_ds,
            validation_data=self.val_ds,
            epochs=5,
            verbose=1,
            callbacks=self.callbacks
        )


## Training Transformer Decoder Model

In [None]:
import os
import yaml
import tensorflow as tf

# Set random seed for full reproducibility
set_seed = config.training["seed"]

train_ds, val_ds, test_ds = main()

# Define model function that returns a compiled model
def model_fn():
    return create_model(
        maxlen=config.model["max_sequence_length"],
        vocab_size=config.model["vocab_size"],
        embed_dim=config.model["embed_dim"],
        num_heads=config.model["num_heads"],
        feed_forward_dim=config.model["feed_forward_dim"]
    )

# Initialize Trainer and start training
trainer = Trainer(
    model_fn=model_fn,
    train_ds=train_ds,
    val_ds=val_ds
)

model = trainer.train()

# Evaluate final model on validation and test datasets
print("\n✅ Evaluating model on validation set...")
val_loss, val_acc = model.evaluate(val_ds)
print(f"📊 Final Validation Accuracy: {val_acc:.4f}")

print("\n🧪 Evaluating model on test set...")
test_loss, test_acc = model.evaluate(test_ds)
print(f"🧪 Test Accuracy: {test_acc:.4f}")

# Save final trained model to disk
os.makedirs("exports", exist_ok=True)
model_path = "exports/transformer_decoder_model.keras"
model.save(model_path)
print(f"\n✅ Final model saved to: {model_path}")