In [1]:
# Transformer Model (TensorFlow/Keras)

import tensorflow as tf
from tensorflow.keras import layers
import numpy as np

In [2]:
# Positional Encoding

def get_positional_encoding(seq_len, d_model):
    pos = np.arange(seq_len)[:, np.newaxis] # Create a column vector for positions
    i = np.arange(d_model)[np.newaxis, :] # Create a row vector for dimensions
    # Calculate the angle rates
    angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model)) # Apply the formula for positional encoding
    # Calculate the positional encoding - using sine for even indices and cosine for odd indices
    angle_rads = pos * angle_rates 
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    return tf.cast(angle_rads[np.newaxis, ...], dtype=tf.float32) # Add batch dimension   

In [3]:
# Scaled Dot-Product Attention

def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q, k, transpose_b=True)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    output = tf.matmul(attention_weights, v)
    return output, attention_weights


In [4]:
# Multi-Head Attention

class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0
        self.num_heads = num_heads
        self.d_model = d_model
        self.depth = d_model // num_heads
        self.wq = layers.Dense(d_model)
        self.wk = layers.Dense(d_model)
        self.wv = layers.Dense(d_model)
        self.dense = layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]
        q = self.split_heads(self.wq(q), batch_size)
        k = self.split_heads(self.wk(k), batch_size)
        v = self.split_heads(self.wv(v), batch_size)
        scaled_attention, _ = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
        output = self.dense(concat_attention)
        return output

In [5]:
# Transformer Encoder Layer

class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout_rate=0.1):
        super().__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = tf.keras.Sequential([
            layers.Dense(dff, activation='relu'),
            layers.Dense(d_model)
        ])
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(dropout_rate)
        self.dropout2 = layers.Dropout(dropout_rate)

    def call(self, x, *, training=None, mask=None):
        attn_output = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

In [6]:
# Full Transformer Encoder

class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
                 maximum_position_encoding, dropout_rate=0.1):
        super().__init__()
        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = get_positional_encoding(maximum_position_encoding, d_model)

        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, dropout_rate) 
                           for _ in range(num_layers)]

        self.dropout = layers.Dropout(dropout_rate)
        self.final_layer = layers.Dense(input_vocab_size)

    def call(self, x, *, training=None, mask=None):
        seq_len = tf.shape(x)[1]
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training=training, mask=mask)

        return self.final_layer(x)

In [7]:
# Example Usage

# Dummy parameters and input
sample_transformer = Transformer(
    num_layers=2,
    d_model=128,
    num_heads=4,
    dff=512,
    input_vocab_size=1000,
    maximum_position_encoding=100
)

dummy_input = tf.constant([[1, 2, 3, 4, 0, 0]])
dummy_mask = None
output = sample_transformer(dummy_input, training=False, mask=dummy_mask)
print("Output :", output) 
print("Output shape:", output.shape)  # (batch_size, input_seq_len, vocab_size)

Output : tf.Tensor(
[[[-0.34750703  0.16808674  0.25967413 ... -0.4543587  -0.38159454
    0.10776502]
  [-0.68258923  0.2985749  -0.03120044 ... -0.35497564 -0.07257999
   -0.00656354]
  [-0.5882651   0.12710622  0.20479527 ... -0.42233342 -0.1015323
    0.12235051]
  [-0.9115173   0.44931298  0.14421995 ... -0.6026541  -0.16720936
    0.09520782]
  [-1.0670317   0.49854913  0.411912   ... -0.48383534  0.02246449
   -0.00252168]
  [-0.9720975   0.4970778   0.53814995 ... -0.60057867  0.03045533
    0.01129379]]], shape=(1, 6, 1000), dtype=float32)
Output shape: (1, 6, 1000)


In [8]:
# Generate predictions (argmax over vocab dimension)
def predict(model, input_tensor, mask=None):
    logits = model(input_tensor, training=False, mask=mask)
    return tf.argmax(logits, axis=-1)

In [9]:
# Calculate loss (sparse categorical crossentropy)
def compute_loss(labels, logits):
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
    loss = loss_fn(labels, logits)
    mask = tf.cast(tf.not_equal(labels, 0), dtype=loss.dtype)
    loss *= mask
    return tf.reduce_sum(loss) / tf.reduce_sum(mask)

In [10]:
# Calculate accuracy
def compute_accuracy(labels, predictions):
    mask = tf.not_equal(labels, 0)
    predictions = tf.cast(predictions, labels.dtype)  # Ensure same dtype
    matches = tf.equal(labels, predictions)
    matches = tf.logical_and(mask, matches)
    return tf.reduce_sum(tf.cast(matches, tf.float32)) / tf.reduce_sum(tf.cast(mask, tf.float32))

In [11]:
# Example usage of the above methods
predictions = predict(sample_transformer, dummy_input, mask=dummy_mask)
print("Predictions:", predictions.numpy())

Predictions: [[ 70 468 200  44  44  44]]


In [12]:
# For demonstration, use dummy_input as labels (normally use real labels)
loss = compute_loss(dummy_input, output)
accuracy = compute_accuracy(dummy_input, predictions)
print("Loss:", loss.numpy())
print("Accuracy:", accuracy.numpy())

Loss: 6.736805
Accuracy: 0.0
