# Attention is All You Need

Coding a transformer from scratch


In [258]:
import numpy as np
import pandas as pd
import tensorflow as tf

### Data preprocessing pipeline


In [259]:
# Get Shakespeares work from Andrej Karpathy's website

url = 'https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt'
filepath = tf.keras.utils.get_file('shakespeare.txt', url)

with open(filepath) as f:
    shakespeare_text = f.read()

In [260]:
# Print the first few characters
print(shakespeare_text[:148])

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?



In [261]:
unique_chars = sorted(set(shakespeare_text))
char_to_int = {char: idx for idx, char in enumerate(unique_chars)}

# How many number of distinct characters has the vocabulary:
tokens_len = len(unique_chars)
print(f'Number of tokens in vocabulary: {tokens_len}')

# How many characters has the dataset:
text_length = len(shakespeare_text)
print(f'Total length of text dataset: {text_length}')

Number of tokens in vocabulary: 65
Total length of text dataset: 1115394


### Embedding Layer


In [262]:
embedding_dim = 10

model = tf.keras.Sequential()
model.add(tf.keras.layers.Embedding(tokens_len, embedding_dim))

input_array = np.random.randint(tokens_len, size=(1, 1))
model.compile('rmsprop', 'sparse_categorical_crossentropy')

output_array = model.predict(input_array)
print(output_array.shape)

model.summary()
# (1, 1, 10)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
(1, 1, 10)


### Positional Encoding


In [263]:
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, max_pos_enc, embedding_dim, **kwargs):
        super(PositionalEncoding, self).__init__(**kwargs)
        self.max_len = max_pos_enc # maximum sequence length that the model can handle
        self.embedding_dim = embedding_dim
        
        # Create the positional encodings
        position = np.arange(max_pos_enc)[:, np.newaxis]
        div_term = np.exp(np.arange(0, embedding_dim, 2) * -(np.log(10000.0) / embedding_dim))
        pe = np.zeros((max_pos_enc, embedding_dim))
        pe[:, 0::2] = np.sin(position * div_term)
        pe[:, 1::2] = np.cos(position * div_term)
        
        # Add batch dimension e.g. (max_pos_enc,embedding_dim) -> (1,max_pos_enc,embedding_dim)
        self.pe = tf.constant(pe[np.newaxis, :, :], dtype=tf.float32)
    
    def call(self, inputs):
        seq_len = tf.shape(inputs)[1]  # Get sequence length from input
        return inputs + self.pe[:, :seq_len, :]

In [264]:
embedding_dim = 10  # Embedding dimension
max_len = 50  # Maximum sequence length

# Define the model
model = tf.keras.Sequential([
    tf.keras.layers.Embedding(tokens_len, embedding_dim),
    PositionalEncoding(max_len, embedding_dim)
])

input_array = np.random.randint(tokens_len, size=(1, 10))  # Example input
output_array = model(input_array)

print(output_array.shape)  # Should be (1, 10, embedding_dim)

(1, 10, 10)


In [265]:
model.summary()

## Multi-Head Attention


### Scaled Dot-Product Attention


In [266]:
class ScaledDotProductAttention(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super(ScaledDotProductAttention, self).__init__(**kwargs)
        
    def call(self, inputs, mask=None):
        q, k, v = inputs
        
        dk = tf.cast(tf.shape(k)[-1], tf.float32)
        
        # dot product attention
        matmul_qk = tf.matmul(q, k, transpose_b=True) # (bs, q_len, k_len)
        
        # scale dot product
        scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
        
        # apply mask when necessary
        if mask is not None:
            # adding very large negative values 
            # so they go to zero after softmax
            scaled_attention_logits += (mask * -1e9) 
        
        # apply softmax to attention weights (scores)
        attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
        
        # multiply by V (values)
        out = tf.matmul(attention_weights, v)
        
        return out, attention_weights
        

### Multi Head Attention Head


In [267]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, num_heads, embedding_dim, **kwargs):
        super(MultiHeadAttention, self).__init__(**kwargs)
        
        self.num_heads = num_heads
        self.embedding_dim = embedding_dim
        assert embedding_dim % num_heads == 0, "embedding_dim must be divisible by num_heads"
        self.depth = embedding_dim // num_heads # depth per head
        
        # linear projection layers
        self.wq = tf.keras.layers.Dense(embedding_dim)
        self.wk = tf.keras.layers.Dense(embedding_dim)
        self.wv = tf.keras.layers.Dense(embedding_dim)
        
        # output projection
        self.dense = tf.keras.layers.Dense(embedding_dim)
        
        self.attention = ScaledDotProductAttention()
    
    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])
        
    def call(self, inputs, mask=None):
        q, k, v = inputs
        
        batch_size = tf.shape(q)[0]
        
        # linear projections
        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)
        
        # reshaping q, k, v
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)
        
        scaled_attention, attention_weights = self.attention([q, k, v], mask)
        
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.embedding_dim))
        
        out = self.dense(concat_attention)
        
        return out, attention_weights

## Position-wise Feed-Forward Network

$$\text{FFN(x)} = \text{max}(0,~ xW_1 + b_1)W_2 + b_2$$


In [268]:
class PositionwiseFeedForward(tf.keras.layers.Layer):
    def __init__(self, embedding_dim, hidden_dim, **kwargs):
        super(PositionwiseFeedForward, self).__init__(**kwargs)
        
        # hidden_dim (dff) - feed forward network hidden 
        # layer dimension a.k.a inner layer dimensionality
        
        self.dense1 = tf.keras.layers.Dense(hidden_dim, activation="relu")
        self.dense2 = tf.keras.layers.Dense(embedding_dim)
        
    def call(self, inputs):
        
        x = self.dense1(inputs)
        
        return self.dense2(x)

## Encoder Layer


In [269]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, embedding_dim, hidden_dim, num_heads, dropout_rate = 0.1, **kwargs):
        super(EncoderLayer, self).__init__(**kwargs)
        
        self.mha = MultiHeadAttention(num_heads, embedding_dim)
        self.ffn = PositionwiseFeedForward(embedding_dim, hidden_dim)
        
        self.layer_norm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layer_norm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        
        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)
        
    def call(self, inputs, training=False, mask=None):
        
        # multi-head attention
        attention_output, _ = self.mha([inputs, inputs, inputs], mask)
        attention_output = self.dropout1(attention_output, training=training)
        out1 = self.layer_norm1(inputs + attention_output)
        
        # feed-forward network
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layer_norm2(out1 + ffn_output)
        
        return out2
        

## Encoder

The Encoder stacks multiple encoder layer to create the full encoder. It includes the Embedding layer and Positional Encoding layer as well.


In [270]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, embedding_dim, hidden_dim, num_heads, 
                 tokens_len, max_pos_enc, dropout_rate=0.1, **kwargs):
        super(Encoder, self).__init__(**kwargs)
            
        self.embedding_dim = embedding_dim
        self.num_layers = num_layers
        
        self.embedding = tf.keras.layers.Embedding(tokens_len, embedding_dim)
        self.pos_encoding = PositionalEncoding(max_pos_enc, embedding_dim)
        
        self.encoding_layers = [
            EncoderLayer(embedding_dim, hidden_dim, num_heads, dropout_rate=dropout_rate)
            for _ in range(num_layers)
        ]
        
        self.dropout = tf.keras.layers.Dropout(dropout_rate)
        
    def call(self, x, training=False, mask=None):
        
        seq_len = tf.shape(x)[1]
        
        x = self.embedding(x)
        x = self.pos_encoding(x)
        
        x = self.dropout(x, training=training)
        
        for i in range(self.num_layers):
            x = self.encoding_layers[i](x, training, mask)
            
        return x

## DecoderLayer

In [271]:
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, embedding_dim, hidden_dim, num_heads, dropout_rate=0.1, **kwargs):
        super(DecoderLayer, self).__init__(**kwargs)
        
        # Self-attention
        self.self_attention = MultiHeadAttention(num_heads, embedding_dim)
        self.norm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)
        
        # Cross-attention (encoder-decoder attention)
        self.cross_attention = MultiHeadAttention(num_heads, embedding_dim)
        self.norm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)
        
        # Feed-forward
        self.ffn = PositionwiseFeedForward(embedding_dim, hidden_dim)
        self.norm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout3 = tf.keras.layers.Dropout(dropout_rate)
    
    def call(self, inputs, enc_output, training=False, look_ahead_mask=None, padding_mask=None):
        # Self attention with look-ahead mask
        self_attn_output, _ = self.self_attention([inputs, inputs, inputs], look_ahead_mask)
        self_attn_output = self.dropout1(self_attn_output, training=training)
        out1 = self.norm1(inputs + self_attn_output)
        
        # Cross attention with encoder output
        cross_attn_output, _ = self.cross_attention([out1, enc_output, enc_output], padding_mask)
        cross_attn_output = self.dropout2(cross_attn_output, training=training)
        out2 = self.norm2(out1 + cross_attn_output)
        
        # Feed forward
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.norm3(out2 + ffn_output)
        
        return out3

## Decoder

In [272]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, embedding_dim, hidden_dim, num_heads,
                 tokens_len, max_pos_enc, dropout_rate=0.1, **kwargs):
        super(Decoder, self).__init__(**kwargs)
        self.embedding_dim = embedding_dim
        self.num_layers = num_layers
        
        self.embedding = tf.keras.layers.Embedding(tokens_len, embedding_dim)
        self.pos_encoding = PositionalEncoding(max_pos_enc, embedding_dim)
        
        self.decoder_layers = [
            DecoderLayer(embedding_dim, hidden_dim, num_heads, dropout_rate)
            for _ in range(num_layers)
        ]
        
        self.dropout = tf.keras.layers.Dropout(dropout_rate)
    
    def call(self, x, enc_output, training=False, look_ahead_mask=None, padding_mask=None):
        seq_len = tf.shape(x)[1]
        
        # embedding and positional encoding
        x = self.embedding(x)
        x = self.pos_encoding(x)
        
        x = self.dropout(x, training=training)
        
        # decoder layers
        for i in range(self.num_layers):
            x = self.decoder_layers[i](x, enc_output, training, look_ahead_mask, padding_mask)
        
        return x

## Transformer

In [273]:
class Transformer(tf.keras.Model):
    def __init__(self, num_layers, embedding_dim, hidden_dim, num_heads,
                 input_vocab_size, target_vocab_size, max_pos_enc,
                 dropout_rate=0.1, **kwargs):
        super(Transformer, self).__init__(**kwargs)
        
        self.encoder = Encoder(
            num_layers, embedding_dim, hidden_dim, num_heads,
            input_vocab_size, max_pos_enc, dropout_rate
        )
        
        self.decoder = Decoder(
            num_layers, embedding_dim, hidden_dim, num_heads,
            target_vocab_size, max_pos_enc, dropout_rate
        )
        
        self.final_layer = tf.keras.layers.Dense(target_vocab_size)
    
    def create_padding_mask(self, seq):
        """Creates a mask for padding tokens (value 0)"""
        seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
        return seq[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len)
    
    def create_look_ahead_mask(self, seq_len):
        """Creates a mask to prevent attention to future tokens"""
        mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
        return mask  # (seq_len, seq_len)
    
    def call(self, inputs, training=False):
        inp, tar = inputs
        
        # Create masks
        enc_padding_mask = self.create_padding_mask(inp)
        dec_padding_mask = self.create_padding_mask(inp)
        
        # Look ahead mask for decoder
        look_ahead_mask = self.create_look_ahead_mask(tf.shape(tar)[1])
        dec_target_padding_mask = self.create_padding_mask(tar)
        combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
        
        # Encoder output
        enc_output = self.encoder(inp, training, enc_padding_mask)
        
        # Decoder output
        dec_output = self.decoder(
            tar, enc_output, training, combined_mask, dec_padding_mask
        )
        
        # Final output
        final_output = self.final_layer(dec_output)
        
        return final_output

In [279]:
# Example usage
if __name__ == "__main__":
    # Example parameters
    num_layers = 4
    embedding_dim = 512
    hidden_dim = 2048
    num_heads = 8
    input_vocab_size = 8000
    target_vocab_size = 8000
    max_pos_enc = 10000
    dropout_rate = 0.1
    
    # Create model
    transformer = Transformer(
        num_layers, embedding_dim, hidden_dim, num_heads,
        input_vocab_size, target_vocab_size, max_pos_enc, dropout_rate
    )
    
    # Compile model
    transformer.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )
    
    # Print model summary
    print(transformer.summary())

None


In [280]:
import tensorflow as tf
import numpy as np

# Assuming you've already defined your Transformer class from the previous code
# This example shows how to generate text with an untrained model

def generate_text_untrained(model, tokenizer, start_string, max_length=50, temperature=1.0):
    """
    Generate text using an untrained transformer model.
    
    Args:
        model: The untrained transformer model
        tokenizer: A tokenizer to convert between text and token IDs
        start_string: The initial seed text
        max_length: Maximum length of the generated text
        temperature: Controls randomness (higher = more random)
    
    Returns:
        The generated text
    """
    # Convert start string to tokens
    input_tokens = tokenizer.encode(start_string)
    input_tensor = tf.convert_to_tensor([input_tokens], dtype=tf.int32)
    
    # Create an empty target sequence with just the start token
    target_tensor = tf.convert_to_tensor([[tokenizer.token_to_id('<start>')]], dtype=tf.int32)
    
    generated_text = start_string
    
    # Generate one token at a time
    for i in range(max_length):
        # Get predictions
        predictions = model([input_tensor, target_tensor], training=False)
        
        # Predictions shape: (batch_size, seq_len, vocab_size)
        # We want the prediction for the last token in the sequence
        predictions = predictions[:, -1, :]  # (batch_size, vocab_size)
        
        # Apply temperature
        predictions = predictions / temperature
        
        # Sample from the distribution
        predicted_id = tf.random.categorical(predictions, num_samples=1)[0, 0].numpy()
        
        # Break if we predict the end token
        if predicted_id == tokenizer.token_to_id('<end>'):
            break
            
        # Convert the predicted token ID to text
        predicted_word = tokenizer.id_to_token(predicted_id)
        generated_text += " " + predicted_word
        
        # Update the target tensor
        new_target = tf.concat([target_tensor, [[predicted_id]]], axis=-1)
        target_tensor = new_target
    
    return generated_text


# Example usage:
if __name__ == "__main__":
    # Create a simple tokenizer for demonstration
    # In a real application, you'd use a proper tokenizer
    class SimpleTokenizer:
        def __init__(self, vocab_size=1000):
            self.vocab_size = vocab_size
            # This would normally be a real vocabulary
            self.special_tokens = {
                '<start>': 0,
                '<end>': 1,
                '<pad>': 2,
                '<unk>': 3
            }
            
        def token_to_id(self, token):
            if token in self.special_tokens:
                return self.special_tokens[token]
            # For demonstration, just return a random ID
            return np.random.randint(4, self.vocab_size)
            
        def id_to_token(self, id):
            for token, token_id in self.special_tokens.items():
                if id == token_id:
                    return token
            # For demonstration, just return a placeholder
            return f"word_{id}"
            
        def encode(self, text):
            # For demonstration, return some random IDs
            return [self.special_tokens['<start>']] + [self.token_to_id(t) for t in text.split()] + [self.special_tokens['<end>']]
    
    
    # Parameters
    num_layers = 2
    embedding_dim = 64
    hidden_dim = 128
    num_heads = 4
    input_vocab_size = 1000
    target_vocab_size = 1000
    max_pos_enc = 100
    
    # Create model (untrained)
    transformer = Transformer(
        num_layers, embedding_dim, hidden_dim, num_heads,
        input_vocab_size, target_vocab_size, max_pos_enc
    )
    
    # Create simple tokenizer
    tokenizer = SimpleTokenizer(vocab_size=input_vocab_size)
    
    # Generate text with the untrained model
    seed_text = "hello world"
    generated_text = generate_text_untrained(
        transformer, tokenizer, seed_text, max_length=20, temperature=1.2
    )
    
    print("Generated text from untrained model:")
    print(generated_text)
    print("\nNote: This text is random and doesn't make sense because the model is untrained.")

1. The `call()` method of your layer may be crashing. Try to `__call__()` the layer eagerly on some test input first to see if it works. E.g. `x = np.random.random((3, 4)); y = layer(x)`
2. If the `call()` method is correct, then you may need to implement the `def build(self, input_shape)` method on your layer. It should create all variables used by the layer (e.g. by calling `layer.build()` on all its children layers).
Exception encountered: ''Only input tensors may be passed as positional arguments. The following argument value should be passed as a keyword argument: False (of type <class 'bool'>)''


ValueError: Exception encountered when calling Transformer.call().

[1mOnly input tensors may be passed as positional arguments. The following argument value should be passed as a keyword argument: False (of type <class 'bool'>)[0m

Arguments received by Transformer.call():
  • inputs=['tf.Tensor(shape=(1, 4), dtype=int32)', 'tf.Tensor(shape=(1, 1), dtype=int32)']
  • training=False