In [1]:
import tensorflow as tf

tf.__version__

'2.10.0'

In [2]:
import numpy as np

In [3]:
def positional_encoding(length, depth):
    depth = depth/2

    positions = np.arange(length)[:, np.newaxis]     # (seq, 1)
    depths = np.arange(depth)[np.newaxis, :]/depth   # (1, depth)

    angle_rates = 1 / (10000**depths)         # (1, depth)
    angle_rads = positions * angle_rates      # (pos, depth)

    pos_encoding = np.concatenate(
      [np.sin(angle_rads), np.cos(angle_rads)],
      axis=-1) 

    return tf.cast(pos_encoding, dtype=tf.float32)

In [4]:
positional_encoding(2, 5) # 2 embeddings of dim 5

<tf.Tensor: shape=(2, 6), dtype=float32, numpy=
array([[0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 1.0000000e+00,
        1.0000000e+00, 1.0000000e+00],
       [8.4147096e-01, 2.5116222e-02, 6.3095731e-04, 5.4030228e-01,
        9.9968451e-01, 9.9999982e-01]], dtype=float32)>

In [5]:
class PositionalEmbedding(tf.keras.layers.Layer):
    def __init__(self, vocab_size, d_model):
        super().__init__()
        self.d_model= d_model
        self.vocab_size= vocab_size
        self.embedding= tf.keras.layers.Embedding(vocab_size, d_model, mask_zero= True)
        self.pos_encoding= positional_encoding(length= 2048, depth= d_model)
        
    def compute_mask(self, *args, **kwargs):
        return self.embedding.compute_mask(*args, **kwargs)
    
    def call(self, x):
        length= tf.shape(x)[1]
        x= self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x = x + self.pos_encoding[tf.newaxis, :length, :]
        return x

In [6]:
pos= PositionalEmbedding(100, 128)
example_input = np.array([[1, 2, 3, 0, 0], [4, 5, 0, 0, 0]])
pos(example_input)

<tf.Tensor: shape=(2, 5, 128), dtype=float32, numpy=
array([[[-0.37939352, -0.40388265,  0.5624964 , ...,  1.0007817 ,
          0.53589094,  1.2288162 ],
        [ 0.56170964,  0.89567804,  0.88385195, ...,  1.5125477 ,
          0.59909   ,  1.4567634 ],
        [ 0.8892498 ,  1.3177016 ,  1.4150374 , ...,  0.5758087 ,
          1.3145587 ,  0.7270895 ],
        [-0.11647467,  0.14756337,  1.2834562 , ...,  0.8859288 ,
          0.6709136 ,  1.2637984 ],
        [-1.0143971 , -0.68645775,  0.6467226 , ...,  0.88592875,
          0.6709135 ,  1.2637982 ]],

       [[ 0.35521013, -0.4288043 , -0.01201109, ...,  1.0538578 ,
          1.2682743 ,  0.68189716],
        [ 0.46764663,  0.8095582 ,  0.65483797, ...,  1.0071988 ,
          1.0201993 ,  1.1092464 ],
        [ 0.65170276,  0.61730385,  1.5026636 , ...,  0.88592887,
          0.6709136 ,  1.2637985 ],
        [-0.11647467,  0.14756337,  1.2834562 , ...,  0.8859288 ,
          0.6709136 ,  1.2637984 ],
        [-1.0143971 , -0.68

In [8]:
class BaseAttention(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super().__init__()
        self.mha= tf.keras.layers.MultiHeadAttention(**kwargs)
        self.layernorm= tf.keras.layers.LayerNormalization()
        self.add= tf.keras.layers.Add()

In [10]:
class CrossAttention(BaseAttention):
    def call(self, x, context):
        attn_output, attn_scores= self.mha(
            query= x,
            key= context,
            value= context,
            return_attention_scores= True
        )

        self.last_attn_scores= attn_scores

        x= self.add([x, attn_output])
        x= self.layernorm(x)

        return x
    
    # output length will be query length

In [16]:
class GlobalSelfAttention(BaseAttention):
    def call(self, x):
        attn_output= self.mha(
            query= x,
            key= x, 
            value= x
        )

        x= self.add([x, attn_output])
        x= self.layernorm(x)

        return x

In [21]:
x = tf.random.normal(shape=(32, 10, 64))

# Create an instance of GlobalSelfAttention
attention_layer = GlobalSelfAttention(num_heads=8, key_dim=512)

# Call the attention layer with the input tensor
output = attention_layer(x)

In [25]:
x.shape

TensorShape([32, 10, 64])

In [None]:
output

In [27]:
class CausalSelfAttention(BaseAttention):
    def call(self, x):
        attn_output= self.mha(
            query= x,
            key= x,
            value= x,
            use_causal_mask = True
        )

        x= self.add([x, attn_output])
        x= self.layernorm(x)

        return x

In [31]:
sample_ca= CausalSelfAttention(num_heads= 2, key_dim= 512)
out3= sample_ca(x)

In [32]:
out3.shape

TensorShape([32, 10, 64])

In [34]:
class FeedForward(tf.keras.layers.Layer):
    def __init__(self, d_model, dff, dropout_rate= 0.1):
        super().__init__()
        self.seq= tf.keras.Sequential([
            tf.keras.layers.Dense(dff, activation= 'relu'),
            tf.keras.layers.Dense(d_model),
            tf.keras.layers.Dropout(dropout_rate)
        ])

        self.add= tf.keras.layers.Add()
        self.layer_norm= tf.keras.layers.LayerNormalization()

    def call(self, x):
        x= self.add([x, self.seq(x)])
        x= self.layer_norm(x)

        return x

In [39]:
sample_ffn= FeedForward(64, 2048)
x = tf.random.normal(shape=(32, 10, 64))
out4= sample_ffn(x)

In [40]:
out4.shape

TensorShape([32, 10, 64])

### The encoder layer

The encoder contains a stack of N encoder layers. Where each EncoderLayer contains a GlobalSelfAttention and FeedForward layer:

In [43]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self,*, d_model, num_heads, dff, dropout_rate= 0.1):
        super().__init__()

        # Attention layer
        self.self_attention= GlobalSelfAttention(
            num_heads= num_heads,
            key_dim= d_model,
            dropout= dropout_rate
        )

        self.fnn= FeedForward(d_model, dff)

    def call(self, x):
        x= self.self_attention(x)
        x= self.fnn(x)

        return x

In [45]:
sample_encoder_layer = EncoderLayer(d_model=64, num_heads=8, dff=2048)
x = tf.random.normal(shape=(32, 10, 64))
out5= sample_encoder_layer(x)

In [47]:
out5.shape

TensorShape([32, 10, 64])

In [48]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size, dropout_rate= 0.1):
        super().__init__()
        self.d_model= d_model
        self.num_layers= num_layers

        self.pos_embedding= PositionalEmbedding(
            vocab_size= vocab_size, d_model= d_model
        )

        self.enc_layers= [ 
            EncoderLayer(d_model= d_model, num_heads= num_heads, dff= dff, dropout_rate= dropout_rate)
            for _ in range(num_layers)
        ]

        self.dropout= tf.keras.layers.Dropout(dropout_rate)

    def call(self, x):
        x= self.pos_embedding(x)
        x= self.dropout(x)
        
        for i in range(self.num_layers):
            x= self.enc_layers[i](x)
        
        return x

In [51]:
import tensorflow as tf

# Example input parameters
batch_size = 32
seq_length = 20
vocab_size = 10000  # Example vocabulary size
d_model = 512  # Example dimensionality of the model
num_layers = 6  # Example number of layers in the encoder
num_heads = 8  # Example number of attention heads
dff = 2048  # Example dimensionality of the feedforward layer
dropout_rate = 0.1  # Example dropout rate

# Create a random batch of sequences
input_sequences = tf.random.uniform(shape=(batch_size, seq_length), minval=0, maxval=vocab_size, dtype=tf.int32)

# Create an instance of the Encoder class
encoder = Encoder(num_layers=num_layers, d_model=d_model, num_heads=num_heads, dff=dff, vocab_size=vocab_size, dropout_rate=dropout_rate)

# Call the Encoder with the input sequences
encoder_output = encoder(input_sequences)

# Print the shape of the encoder output
print("Encoder output shape:", encoder_output.shape)


Encoder output shape: (32, 20, 512)


In [None]:
encoder_output

In [53]:
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, *, d_model, num_heads, dff, dropout_rate= 0.1):
        super(DecoderLayer, self).__init__()

        self.causal_self_attention= CausalSelfAttention(num_heads= num_heads, key_dim= d_model, dropout= dropout_rate)

        self.cross_attention= CrossAttention(num_heads= num_heads, key_dim= d_model, dropout= dropout_rate)

        self.fnn= FeedForward(d_model, dff)

    def call(self, x, context):
        x= self.causal_self_attention(x= x)
        x= self.cross_attention(x=x, context= context)

        self.last_attn_scores= self.cross_attention.last_attn_scores

        x= self.fnn(x)

        return x


In [54]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size, dropout_rate= 0.1):
        super(Decoder, self).__init__()

        self.d_model= d_model
        self.num_layers= num_layers

        self.pos_embedding= PositionalEmbedding(vocab_size=vocab_size, d_model=d_model)

        self.dropout= tf.keras.layers.Dropout(dropout_rate)

        self.dec_layers= [
            DecoderLayer(d_model= d_model, num_heads= num_heads, dff= dff, dropout_rate= dropout_rate)
            for _ in range(num_layers)
        ]

        self.last_attn_scores= None

    def call(self, x, context):
        x= self.pos_embedding(x)

        x= self.dropout(x)

        for i in range(self.num_layers):
            x= self.dec_layers[i](x, context)

        self.last_attn_scores= self.dec_layers[-1].last_attn_scores

        return x

In [55]:
class Transformer(tf.keras.Model):
    def __init__(self, *, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, dropout_rate=0.1):
        super().__init__()
        self.encoder = Encoder(num_layers=num_layers, d_model=d_model,
                            num_heads=num_heads, dff=dff,
                            vocab_size=input_vocab_size,
                            dropout_rate=dropout_rate)

        self.decoder = Decoder(num_layers=num_layers, d_model=d_model,
                            num_heads=num_heads, dff=dff,
                            vocab_size=target_vocab_size,
                            dropout_rate=dropout_rate)
        
        self.final_layer= tf.keras.layers.Dense(target_vocab_size)

    def call(self, inputs):
        context, x= inputs
        context= self.encoder(context)
        x= self.decoder(x, context)
        logits= self.final_layer(x)
        try:
            del logits._keras_mask
        except AttributeError:
            pass

        return logits


In [56]:
import tensorflow as tf

# Define example input parameters
batch_size = 32
context_len = 20
target_len = 10
input_vocab_size = 10000  # Example input vocabulary size
target_vocab_size = 8000  # Example target vocabulary size
d_model = 512  # Example dimensionality of the model
num_layers = 6  # Example number of layers in the encoder and decoder
num_heads = 8  # Example number of attention heads
dff = 2048  # Example dimensionality of the feedforward layer
dropout_rate = 0.1  # Example dropout rate

# Create example input tensors
context_input = tf.random.uniform(shape=(batch_size, context_len), minval=0, maxval=input_vocab_size, dtype=tf.int32)
target_input = tf.random.uniform(shape=(batch_size, target_len), minval=0, maxval=target_vocab_size, dtype=tf.int32)

# Create an instance of the Transformer model
transformer_model = Transformer(num_layers=num_layers, d_model=d_model, num_heads=num_heads,
                                dff=dff, input_vocab_size=input_vocab_size,
                                target_vocab_size=target_vocab_size, dropout_rate=dropout_rate)

# Call the Transformer model with the input tensors
logits = transformer_model((context_input, target_input))

# Print the shape of the logits
print("Logits shape:", logits.shape)


Logits shape: (32, 10, 8000)


In [57]:
logits

<tf.Tensor: shape=(32, 10, 8000), dtype=float32, numpy=
array([[[ 8.41520652e-02,  2.92397216e-02,  5.18674016e-01, ...,
          1.52535826e-01,  1.02633379e-01,  3.63305628e-01],
        [ 1.76194888e-02, -1.13686271e-01,  6.17882371e-01, ...,
          1.80696160e-01, -2.19565853e-01, -4.77595342e-04],
        [-6.94560528e-01,  2.60134041e-01,  3.11307967e-01, ...,
          2.75901377e-01,  2.05914587e-01, -1.62046179e-01],
        ...,
        [-3.16444814e-01,  1.37907758e-01,  1.55345887e-01, ...,
          1.69166118e-01,  9.86725790e-04,  4.04311746e-01],
        [-2.98415542e-01, -5.99588314e-03,  3.34141366e-02, ...,
         -1.59954831e-01, -2.80424118e-01,  3.57763052e-01],
        [-4.37622964e-01,  2.85261869e-01,  3.31564933e-01, ...,
          1.12829329e-02, -6.89318776e-02,  3.78182709e-01]],

       [[ 4.77251597e-02,  5.93714528e-02,  3.74049842e-01, ...,
          2.54449308e-01,  1.70889527e-01,  4.49835777e-01],
        [ 2.19382450e-01,  2.37897485e-02,  2.6

In [58]:
transformer_model.summary()

Model: "transformer"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 encoder_3 (Encoder)         multiple                  68139008  
                                                                 
 decoder (Decoder)           multiple                  117529600 
                                                                 
 dense_66 (Dense)            multiple                  4104000   
                                                                 
Total params: 189,772,608
Trainable params: 189,772,608
Non-trainable params: 0
_________________________________________________________________
