In [1]:
import tensorflow as tf
import numpy as np

In [2]:
class InputEmbeddings(tf.keras.layers.Layer):
    def __init__(self, d_model, vocab_size, **kwargs):
        """
        :param d_model: The size of each embedding vector.
        :param vocab_size: The size of the vocabulary, defining the number of unique tokens.
        :param kwargs: pass 
        """
        super(InputEmbeddings, self).__init__(**kwargs)
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.embeddings= tf.keras.layers.Embedding(vocab_size, d_model)
        
    def call(self, x):
        return self.embeddings(x) * np.sqrt(self.d_model)
    
    

In [3]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences


sentences = [
    "This is the first sentence.",
    "Here is another sentence.",
    "Yet another example sentence.",
    "This is the final test sentence."
]

# Tokenize the sentences
tokenizer = Tokenizer()
tokenizer.fit_on_texts(sentences)
sequences = tokenizer.texts_to_sequences(sentences)

# Get the maximum sequence length for padding
max_length = max(len(seq) for seq in sequences)

# Pad the sequences
padded_sequences = pad_sequences(sequences, maxlen=max_length, padding='post')

# Vocabulary size
vocab_size = len(tokenizer.word_index) + 1  # +1 for the padding token

# Define the model parameters
d_model = 16

# Create an instance of the InputEmbeddings layer
embedding_layer = InputEmbeddings(d_model, vocab_size)

# Convert sentences to embeddings
embeddings_output = embedding_layer(padded_sequences)

# Print the output embeddings
print(embeddings_output)
print(embeddings_output.shape)


tf.Tensor(
[[[ 0.11257984 -0.08856998  0.0926858  -0.03721432 -0.07346258
   -0.06355219  0.0730051  -0.06409092  0.12110634 -0.10530062
    0.06210451  0.18152653  0.01275554 -0.17367946  0.15521078
    0.16727619]
  [-0.01022848 -0.17847104  0.07384934 -0.03298935  0.10027124
    0.07477613 -0.18374644  0.16860901 -0.19943972  0.0578679
    0.02327628 -0.03379402  0.14394106 -0.06908861 -0.07943449
   -0.19171272]
  [-0.07945347 -0.12175112 -0.12724987 -0.02764472  0.17385592
    0.09522177 -0.00285444  0.14376272  0.18644036 -0.05858894
   -0.12442055 -0.13338466  0.18802051  0.09808816 -0.1432445
   -0.1927384 ]
  [ 0.12398507  0.0307304  -0.03567472  0.19479193 -0.07170348
    0.00606112  0.18673457 -0.14331993  0.18821092 -0.06485137
    0.0019834  -0.0861259   0.06197052 -0.13573009 -0.1991003
   -0.04486199]
  [-0.07074657 -0.14192633  0.08297123  0.00757518 -0.16485281
    0.01533937  0.00699195  0.08012719  0.10672225  0.19394918
    0.02697448 -0.08011355  0.12759005 -0.1704

The shape `(4, 6, 16)` can be interpreted as follows:

1. **4**: This dimension represents the number of samples or batches. In this context, it corresponds to the 4 sentences being processed.
2. **6**: This dimension represents the sequence length, which is the maximum number of tokens (words) in the padded sentences. In this case, each sentence has been padded to a length of 6 tokens.
3. **16**: This dimension represents the size of each embedding vector, denoted by `d_model` in the `InputEmbeddings` class. Each token in the input sequences is mapped to a 16-dimensional vector.

So, the tensor with shape `(4, 6, 16)` represents the embeddings of 4 sentences, each with a sequence length of 6 tokens, where each token is represented by a 16-dimensional embedding vector.

In [4]:
embeddings.numpy()[0]

NameError: name 'embeddings' is not defined

# Positional Encoding

The main purpose of positional encodings is to give the model some information about the order of words or the relative positions of words within a sequence. This is crucial for tasks involving language where the meaning of a sentence can change dramatically based on the order of words (e.g., "I like dogs more than cats" vs "I like cats more than dogs").



In [None]:
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, d_model:int, max_len:int, dropout:float,**kwargs):
        """
        Initializes the PositionalEncoding layer.

        :param d_model: The size of each embedding vector.
        :param max_len: The maximum number of positions for which embeddings will be created.
        :param droupout: The dropout rate to apply to the output of this layer.
        :param kwargs: pass
        """
        super(PositionalEncoding, self).__init__(**kwargs)
        self.d_model = d_model
        self.max_len = max_len
        self.dropout = tf.keras.layers.Dropout(rate=dropout)
        self.positional_encoding = self._get_positional_encoding()
        
    def _get_positional_encoding(self):
        """
        Generates the positional encodings using sinusoidal patterns.
        
        Returns:
        Tensor: A tensor containing positional encodings of shape (1, max_len, d_model).
        """
        positions = np.arange(self.max_len)[:, np.newaxis]
        div_term = np.exp(np.arange(0, self.d_model, 2) * -(np.log(10000.0) / self.d_model))  # Shape (d_model/2,)
        
        pe = np.zeros((self.max_len, self.d_model))
        pe[:, 0::2] = np.sin(positions * div_term)
        pe[:, 1::2] = np.cos(positions * div_term)
        
        pe = pe[np.newaxis, ...]  # Add a new batch dimension (1, max_len, d_model)
        return tf.cast(pe, tf.float32)
    
    
    def call(self, x):
        seq_len = tf.shape(x)[1]
        assert seq_len <= self.max_len, "Input sequence length exceeds the maximum length"
        
        x = x + self.positional_encoding[:, :seq_len]
        return self.dropout(x)
    


In [None]:
# Define parameters for the model
d_model = 64  # Dimensionality of the embeddings
max_len = 10  # Assume max sentence length
dropout_rate = 0.1  # Dropout rate

# Initialize the PositionalEncoding layer
pos_encoding_layer = PositionalEncoding(d_model, max_len, dropout_rate)

# Generate dummy embeddings for 4 sentences, each with varying lengths
dummy_embeddings = tf.random.normal((4, max_len, d_model))  # Shape (batch_size, sequence_length, d_model)

# Apply the PositionalEncoding layer
encoded_embeddings = pos_encoding_layer(dummy_embeddings)

# Print the shape and some values to verify
print("Shape of encoded embeddings:", encoded_embeddings.shape)
print("Sample values from encoded embeddings:", encoded_embeddings[0, :3, :5])  # Print first 3 positions of the first sentence


# Add & Norm Layer

1. **Residual Connection (Add)**: This step adds the original input of the sub-layer to the output of the sub-layer (like self-attention or feed-forward network), creating a shortcut connection.
2. **Layer Normalization (Norm)**: This step normalizes the sum to stabilize and accelerate training by normalizing the output across the features.


In [None]:
class AddAndNorm(tf.keras.layers.Layer):
    def __init__(self, epsilon=1e-6, **kwargs):
        """
        Initializes the AddAndNorm layer.

        :param epsilon: Small float added to variance to avoid dividing by zero.
        :param kwargs: Additional arguments for the layer.
        """
        super(AddAndNorm, self).__init__(**kwargs)
        self.epsilon = epsilon
        self.layer_norm = tf.keras.layers.LayerNormalization(epsilon=epsilon)
        
    def call(self, x, sub_layer_output):
        return self.layer_norm(x + sub_layer_output)
        

In [None]:
x = tf.random.uniform((4, 6, 16))  # Example input tensor
sub_layer_output = tf.random.uniform((4, 6, 16))  # Example sub-layer output

add_and_norm_layer = AddAndNorm()
output = add_and_norm_layer(x, sub_layer_output)
print(output)


## Position-wise Feed-Forward Networks

The "Feed Forward" layer in transformer architectures refers to a fully connected feed-forward network applied to each position separately and identically. Typically, it consists of two linear transformations with a ReLU activation in between. The purpose of this layer is to introduce non-linearity and expand the dimensionality of the embeddings before reducing it back to the original size.



In [None]:
class FeedForward(tf.keras.layers.Layer):
    def __init__(self, d_model:int, d_ff:int, dropout:float, **kwargs):
        """
        Initializes the FeedForward layer.

        :param d_model: The size of the input and output embeddings.
        :param d_ff: The hidden layer size of the feed-forward network.
        :param dropout_rate: The dropout rate to apply to the output of the feed-forward network.
        """    
        super(FeedForward, self).__init__()
        self.dense_1 = tf.keras.layers.Dense(d_ff, activation='relu')
        self.dense_2 = tf.keras.layers.Dense(d_model)
        self.dropout = tf.keras.layers.Dropout(dropout)
        
    def call(self, x):
        """
        Forward pass for the FeedForward layer.

        :param x: Input tensor.
        :return: Output tensor after applying the feed-forward network and dropout.
        """
        x = self.dense_1(x)
        x = self.dense_2(x)
        x = self.dropout(x)
        return x
    

In [None]:
sentences = [
    "The quick brown fox jumps.",
    "Over the lazy dog.",
    "And runs away swiftly."
]

# Tokenize the sentences
tokenizer = Tokenizer()
tokenizer.fit_on_texts(sentences)
sequences = tokenizer.texts_to_sequences(sentences)

# Pad the sequences to the maximum sequence length
max_length = max(len(seq) for seq in sequences)
padded_sequences = pad_sequences(sequences, maxlen=max_length, padding='post')

# Vocabulary size
vocab_size = len(tokenizer.word_index) + 1  # +1 for padding token

# Define the embedding size (d_model)
d_model = 16

embedding_layer = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=d_model)

embeddings_output = embedding_layer(padded_sequences)
embeddings_output.shape

In [None]:
d_ff = 64
dropout_rate = 0.1

# Create an instance of the FeedForward layer
feed_forward_layer = FeedForward(d_model, d_ff, dropout_rate)

# Apply the FeedForward layer to the embeddings
feed_forward_output = feed_forward_layer(embeddings_output)

print(feed_forward_output)


In [None]:
input_tensor = tf.random.uniform((4, 6, d_model))  # Shape (batch_size, sequence_length, d_model)
input_tensor[0].shape

In [None]:
A = np.random.randn(3, 5, 16)
W = np.random.randn(16, 4)

tf.matmul(A, W)

# Multi-Head Attention

Multi-Head Attention is a key component of the transformer architecture, enabling the model to focus on different parts of the input sequence to capture contextual relationships. Here is a detailed explanation with the relevant equations and an algorithm in words.


### Notations
- $\mathbf{X} $: Input matrix of shape $(\text{batch\_size}, \text{seq\_len}, \text{d\_model}$)
- $ \mathbf{Q}$, $ \mathbf{K} $, $ \mathbf{V} $: Query, Key, and Value matrices
- $ \mathbf{W}_Q $, $ \mathbf{W}_K $, $ \mathbf{W}_V $: Weight matrices for transforming input into queries, keys, and values
- $ \text{d\_model} $: Dimension of the model (embedding size)
- $ \text{num\_heads} $: Number of attention heads
- $\text{depth} = \frac{\text{d\_model}}{\text{num\_heads}} $: Dimension of each head
- $ \mathbf{W}_O $: Final output weight matrix
- $ \mathbf{O} $: Final output after multi-head attention

### 1. Linear Projections
The input $\mathbf{X}$ is linearly projected into three different matrices: queries ($\mathbf{Q}$), keys ($\mathbf{K}$), and values ($\mathbf{V}$). These are obtained by multiplying $\mathbf{X}$ with three learned weight matrices:

$$
\mathbf{Q} = \mathbf{X} \mathbf{W}_Q, \quad \mathbf{K} = \mathbf{X} \mathbf{W}_K, \quad \mathbf{V} = \mathbf{X} \mathbf{W}_V
$$

where:
- $ \mathbf{W}_Q, \mathbf{W}_K, \mathbf{W}_V \in \mathbb{R}^{\text{d\_model} \times \text{d\_model}} $ are weight matrices.

### 2. Split into Multiple Heads
The query, key, and value matrices are split into multiple heads to allow the model to jointly attend to information from different representation subspaces at different positions. Each head has its own set of Q, K, V matrices:

$$
\mathbf{Q}_i = \mathbf{Q}[:, :, i \times \text{depth} : (i+1) \times \text{depth}]
$$

$$
\mathbf{K}_i = \mathbf{K}[:, :, i \times \text{depth} : (i+1) \times \text{depth}]
$$

$$
\mathbf{V}_i = \mathbf{V}[:, :, i \times \text{depth} : (i+1) \times \text{depth}]
$$

for $ i = 0, 1, \ldots, \text{num\_heads} - 1 $.

### 3. Scaled Dot-Product Attention
For each head, the attention scores are computed using the scaled dot-product attention mechanism:

$$
\text{Attention}(\mathbf{Q}, \mathbf{K}, \mathbf{V}) = \text{softmax}\left(\frac{\mathbf{Q} \mathbf{K}^\top}{\sqrt{\text{depth}}}\right) \mathbf{V}
$$

where:
- The scaling factor $sqrt{\text{depth}}$ prevents the dot products from growing too large.

### 4. Concatenation of Heads
The outputs of each attention head are concatenated:

$$
\mathbf{O} = \text{Concat}(\text{head}_1, \text{head}_2, \ldots, \text{head}_{\text{num\_heads}})
$$

### 5. Final Linear Transformation
The concatenated output $\mathbf{O}$ is then linearly transformed with a final weight matrix $\mathbf{W}_O$:

$$
\text{Output} = \mathbf{O} \mathbf{W}_O
$$

where $ \mathbf{W}_O \in \mathbb{R}^{\text{num\_heads} \cdot \text{depth} \times \text{d\_model}} $.




In [None]:
class MultiHeadAttention(tf.keras.layers.Layer):
  def __init__(self, d_model, num_heads, **kwargs):
    self.num_heads = num_heads
    self.d_model = d_model
    self.depth = d_model // num_heads

    self.wq = tf.keras.layers.Dense(d_model)
    self.wk = tf.keras.layers.Dense(d_model)
    self.wv = tf.keras.layers.Dense(d_model)
    self.dense = tf.keras.layers.Dense(d_model)

  
  def split_into_heads(self, x, batch_size):

    """
    Split the last dimension into (num_heads, depth).
    Transpose the result to shape (batch_size, num_heads, seq_len, depth).
    """
    x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth)) # shape: (batch_size, seq_len_q, num_heads, depth)
    return tf.transpose(x, perm=[0, 2, 1, 3]) # shape: (batch_size, num_heads, seq_len_q, depth)

  def call(self, v, k, q, mask):
    batch_size = tf.shape(v)[0]

    q = self.wq(q)
    v = self.wv(v)
    k = self.wk(k)

    q_splitted = self.split_into_heads(q, batch_size)
    v_splitted = self.split_into_heads(v, batch_size)
    k_splitted = self.split_into_heads(k, batch_size)

    scaled_attention, attention_weights = self.scaled_dot_product_attention(q_splitted, k_splitted, v_splitted, mask)
    scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  # (batch_size, seq_len_q, num_heads, depth)
    concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))  # (batch_size, seq_len_q, d_model)
    output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)

    return output, attention_weights

  
  def scaled_dot_product_attention(self, q, k, v, mask):
    """
    Calculate the attention weights.
        q, k, v must have matching leading dimensions.
        k, v must have matching penultimate dimension, i.e.: seq_len_k = seq_len_v.
        The mask has different shapes depending on its type(padding or look ahead) but it must be
        broadcastable for addition.
    """
    q_k_dot = tf.matmul(q, k, transpose_b=True) # (batch_size, num_heads, seq_len_q, seq_len_k)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_q_k_dot = q_k_dot / np.sqrt(dk)

    if mask is not None:
      scaled_q_k_dot += (mask * -1e9) # Add the mask to the scaled tensor.
    
    attention_weights = tf.nn.softmax(scaled_q_k_dot, axis=-1) # (batch_size, num_heads, seq_len_q, seq_len_k)
    output = tf.matmul(attention_weights, v)

    return output, attention_weights


In [None]:
x = np.random.randn(3, 2, 4)
x



In [None]:
x.reshape(3, -1, 2, 2)

## Test Encoder

In [5]:
import tensorflow as tf 
import numpy as np
from implementation.encoder.EncoderLayer import EncoderLayer

dummy_input = tf.random.uniform((1, 10, 512), dtype=tf.float32)
dummy_mask = np.zeros((1, 1, 10))  # No actual masking for simplicity

# Instantiate the EncoderLayer
encoder_layer = EncoderLayer(d_model=512, num_heads=8, dff=2048)

# Pass the dummy input through the encoder layer
output = encoder_layer(dummy_input, dummy_mask)

print("Output shape:", output.shape)


Output shape: (1, 10, 512)


## Test decoder

In [2]:
from implementation.decoder.DecoderLayer import DecoderLayer
import tensorflow as tf
import numpy as np

dummy_decoder_input = tf.random.uniform((1, 10, 512), dtype=tf.float32)
dummy_encoder_output = tf.random.uniform((1, 10, 512), dtype=tf.float32)
look_ahead_mask = np.ones((1, 1, 10))  # Simplified mask for demonstration
padding_mask = np.zeros((1, 1, 10))  # Simplified mask for demonstration

decoder_layer = DecoderLayer(d_model=512, num_heads=8, d_ff=2048)
decoder_output, block1, block2 = decoder_layer(
    dummy_decoder_input, dummy_encoder_output, look_ahead_mask, padding_mask
)

print("Decoder output shape:", decoder_output.shape)


Decoder output shape: (1, 10, 512)
