In [1]:
import numpy as np

In [2]:
def positional_encoding(seq_len, d_model):
    '''
        Generate positional encoding for input sequences
    '''
    pe = np.zeros((seq_len, d_model)) # Positional encoding matrix
    position = np.arange(0, seq_len)[:, np.newaxis] 
    div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))
    pe[:, 0::2] = np.sin(position * div_term)
    pe[:, 1::2] = np.cos(position * div_term)
    return pe

$$

    Attention(Q, K, V) = softmax(\frac{QK^{T}}{\sqrt{d_k}})V

$$

In [3]:
def scaled_dot_product_attention(query, key, value, mask=None):
    '''
        Compute the scaled dot-product attention
    '''
    d_k = query.shape[-1]
    scores = np.matmul(query, key.transpose(0, 1, 3, 2)) / np.sqrt(d_k)
    if mask is not None:
        scores = scores + (mask * -1e9)
    exp_scores = np.exp(scores - np.max(scores, axis=-1, keepdims=True))
    attention_weights = exp_scores / np.sum(exp_scores, axis=-1, keepdims=True)
    attention_output = np.matmul(attention_weights, value)
    return attention_output, attention_weights

In [4]:
class MultiHeadAttention:
    def __init__(self, d_model, num_heads):
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        self.W_q = np.random.randn(d_model, d_model)
        self.W_k = np.random.randn(d_model, d_model)
        self.W_v = np.random.randn(d_model, d_model)
        self.W_o = np.random.randn(d_model, d_model)

    def forward(self, x, mask=None):
        batch_size, seq_len, d_model = x.shape
        Q = np.matmul(x, self.W_q).reshape(batch_size, seq_len, self.num_heads, self.d_k).transpose(0, 2, 1, 3)
        K = np.matmul(x, self.W_k).reshape(batch_size, seq_len, self.num_heads, self.d_k).transpose(0, 2, 1, 3)
        V = np.matmul(x, self.W_v).reshape(batch_size, seq_len, self.num_heads, self.d_k).transpose(0, 2, 1, 3)
        output, weights = scaled_dot_product_attention(Q, K, V, mask)
        output = output.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, d_model)
        output = np.matmul(output, self.W_o)
        return output, weights

In [5]:
class FeedForward:
    def __init__(self, d_model, d_ff):
        self.W1 = np.random.randn(d_model, d_ff)
        self.W2 = np.random.randn(d_ff, d_model)
        self.b1 = np.zeros((1, d_ff))
        self.b2 = np.zeros((1, d_model))

    def forward(self, x):
        x = np.matmul(x, self.W1) + self.b1
        x = np.maximum(0, x) # ReLU
        x = np.matmul(x, self.W2) + self.b2
        return x

In [6]:
class LayerNorm:
    def __init__(self, d_model, eps=1e-6):
        self.gamma = np.ones((1, 1, d_model))
        self.beta = np.zeros((1, 1, d_model))
        self.eps = eps

    def forward(self, x):
        mean = np.mean(x, axis=-1, keepdims=True)
        var = np.var(x, axis=-1, keepdims=True)
        x_norm = (x - mean) / np.sqrt(var * self.eps)
        return self.gamma * x_norm + self.beta

In [7]:
class EncoderLayer:
    def __init__(self, d_model, num_heads, d_ff):
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff)
        self.norm1 = LayerNorm(d_model)
        self.norm2 = LayerNorm(d_model)

    def forward(self, x, mask=None):
        attention_output, attention_weights = self.mha.forward(x, mask)
        x = self.norm1.forward(x + attention_output)
        ffn_output = self.ffn.forward(x)
        x = self.norm2.forward(x + ffn_output)
        return x, attention_weights

In [8]:
class DecoderLayer:
    def __init__(self, d_model, num_heads, d_ff):
        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff)
        self.norm1 = LayerNorm(d_model)
        self.norm2 = LayerNorm(d_model)
        self.norm3 = LayerNorm(d_model)

    def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
        attention1_output, attention1_weights = self.mha1.forward(x, tgt_mask)
        x = self.norm1.forward(x + attention1_output)
        attention2_output, attention2_weights = self.mha2.forward(x, src_mask)
        x = self.norm2.forward(x + attention2_output)
        ffn_output = self.ffn.forward(x)
        x = self.norm3.forward(x + ffn_output)
        return x, attention1_weights, attention2_weights

In [9]:
class Transformer:
    def __init__(self, d_model, num_heads, d_ff, num_layers, vocab_size, max_seq_len):
        self.d_model = d_model
        self.encoder_layers = [EncoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)]
        self.decoder_layers = [DecoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)]
        self.embedding = np.random.randn(vocab_size, d_model) / np.sqrt(d_model)
        self.pos_encoding = positional_encoding(max_seq_len, d_model)
        self.final_layer = np.random.randn(d_model, vocab_size)
    
    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        # Embed and add positional encoding
        src_emb = self.embedding[src] + self.pos_encoding[:src.shape[1], :]
        tgt_emb = self.embedding[tgt] + self.pos_encoding[:tgt.shape[1], :]        
        
        # Encoder
        enc_output = src_emb
        enc_attn_weights = []
        for layer in self.encoder_layers:
            enc_output, attn_weights = layer.forward(enc_output, src_mask)
            enc_attn_weights.append(attn_weights)
        
        # Decoder
        dec_output = tgt_emb
        dec_attn_weights1, dec_attn_weights2 = [], []
        for layer in self.decoder_layers:
            dec_output, attn1_weights, attn2_weights = layer.forward(dec_output, enc_output, src_mask, tgt_mask)
            dec_attn_weights1.append(attn1_weights)
            dec_attn_weights2.append(attn2_weights)
        
        # Final linear layer
        output = np.matmul(dec_output, self.final_layer)
        return output, enc_attn_weights, dec_attn_weights1, dec_attn_weights2

In [10]:
if __name__ == "__main__":
    # Hyperparameters
    d_model = 64
    num_heads = 4
    d_ff = 256
    num_layers = 2
    vocab_size = 1000
    max_seq_len = 50
    batch_size = 2
    seq_len = 10
    
    # Dummy input (batch_size, seq_len) with integer token IDs
    src = np.random.randint(0, vocab_size, (batch_size, seq_len))
    tgt = np.random.randint(0, vocab_size, (batch_size, seq_len))
    
    # Create causal mask for decoder
    tgt_mask = np.tril(np.ones((batch_size, 1, seq_len, seq_len)))
    
    # Initialize and run Transformer
    transformer = Transformer(d_model, num_heads, d_ff, num_layers, vocab_size, max_seq_len)
    output, enc_attn, dec_attn1, dec_attn2 = transformer.forward(src, tgt, tgt_mask=tgt_mask)
    
    print("Output shape:", output.shape)  # (batch_size, seq_len, vocab_size)

Output shape: (2, 10, 1000)
