In [15]:
import logging
import time

import numpy as np
import matplotlib.pyplot as plt

import tensorflow_datasets as tfds
import tensorflow as tf

# Import tf_text to load the ops used by the tokenizer saved model
import tensorflow_text  # pylint: disable=unused-import

# Listing 4-7.1 Positional Encoding

In [69]:
def get_angles(pos, i, dim):
    angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(dim))
    return pos * angle_rates

def positional_encoding(position, dim):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                          np.arange(dim)[np.newaxis, :],
                          dim)

    # apply sin to even indices in the array; 2i
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])

    # apply cos to odd indices in the array; 2i+1
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    pos_encoding = angle_rads[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)


n, d = 2048, 512
pos_encoding = positional_encoding(n, d)
pos_encoding

<tf.Tensor: shape=(1, 2048, 512), dtype=float32, numpy=
array([[[ 0.0000000e+00,  1.0000000e+00,  0.0000000e+00, ...,
          1.0000000e+00,  0.0000000e+00,  1.0000000e+00],
        [ 8.4147096e-01,  5.4030228e-01,  8.2185620e-01, ...,
          1.0000000e+00,  1.0366329e-04,  1.0000000e+00],
        [ 9.0929741e-01, -4.1614684e-01,  9.3641472e-01, ...,
          1.0000000e+00,  2.0732658e-04,  1.0000000e+00],
        ...,
        [ 1.7589758e-01, -9.8440850e-01, -1.8608274e-01, ...,
          9.7595036e-01,  2.1040717e-01,  9.7761387e-01],
        [-7.3331332e-01, -6.7989087e-01,  7.0149130e-01, ...,
          9.7592694e-01,  2.1050851e-01,  9.7759205e-01],
        [-9.6831930e-01,  2.4971525e-01,  9.8535496e-01, ...,
          9.7590351e-01,  2.1060985e-01,  9.7757018e-01]]], dtype=float32)>

# Listing 4-7.2. Masking Routines for Handling Padding and to Remove Visibility of Future Words in Target 

In [89]:
def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)

    # add extra dimensions to add the padding
    # to the attention logits.
    return seq[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len)

def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask  # (seq_len, seq_len)

x = tf.constant([[1, 2, 0, 0, 1], [1, 6, 7, 0, 0], [0, 1, 1, 4, 5]])
print(create_padding_mask(x))
print(create_look_ahead_mask(3))


tf.Tensor(
[[[[0. 0. 1. 1. 0.]]]


 [[[0. 0. 0. 1. 1.]]]


 [[[1. 0. 0. 0. 0.]]]], shape=(3, 1, 1, 5), dtype=float32)
tf.Tensor(
[[0. 1. 1.]
 [0. 0. 1.]
 [0. 0. 0.]], shape=(3, 3), dtype=float32)


# Listing 4-7.3. Scaled Dot Product for Attention

In [24]:
def attention_scaled_dot(Q, K, V, mask):
    qk = tf.matmul(Q, K, transpose_b=True)
    _dim_ = tf.cast(tf.shape(K)[-1], tf.float32)
    scaled_qk = qk /tf.math.sqrt(_dim_)

    if mask is not None:
        scaled_qk += (mask * -1e9)

    attention_wts = tf.nn.softmax(scaled_qk, axis=-1) 

    out = tf.matmul(attention_wts, V)

    return out, attention_wts

# Listing 4-7.4. Multihead Attention

In [26]:
from tensorflow.keras.layers import Layer
from tensorflow.keras import layers

class multi_head_attention(Layer):
    
    def __init__(self,*, dim, num_heads):
        super(multi_head_attention, self).__init__()
        self.num_heads = num_heads
        self.dim = dim

        assert self.dim % self.num_heads == 0

        self.head_dim = self.dim // self.num_heads

        self.Wq = layers.Dense(self.dim)
        self.Wk = layers.Dense(self.dim)
        self.Wv = layers.Dense(self.dim)

        self.dense = layers.Dense(self.dim)

    def split_heads(self, x, batch_size):
        """Split the last dimension into (num_heads, depth).
        Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth)
        """
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.head_dim))

        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, V, K, Q, mask):
        
        batch_size = tf.shape(Q)[0]

        Q = self.Wq(Q)  # (batch_size, seq_len, dim)
        K = self.Wk(K)  # (batch_size, seq_len, dim)
        V = self.Wv(V)  # (batch_size, seq_len, dim)

        Q = self.split_heads(Q, batch_size)  # (batch_size, num_heads, seq_len_q, head_dim)
        K = self.split_heads(K, batch_size)  # (batch_size, num_heads, seq_len_k, head_dim)
        V = self.split_heads(V, batch_size)  # (batch_size, num_heads, seq_len_v, head_dim)
           
        scaled_attention, attention_weights = attention_scaled_dot(
            Q, K, V, mask)

        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  # (batch_size, seq_len_q, num_heads, head_dim
        concat_attention = tf.reshape(scaled_attention,
                                      (batch_size, -1, self.dim))  # (batch_size, seq_len_q, dim)

        output = self.dense(concat_attention)  # (batch_size, seq_len_q, dim)

        return output, attention_weights

# MUltiheaded self attention
mha_layer = multi_head_attention(dim=512, num_heads=8)
x = tf.random.uniform((1, 30, 512))  # (batch_size, sequence_len,dim)
out, attn = mha_layer(V=x, K=x, Q=x, mask=None)
out.shape, attn.shape

(TensorShape([1, 30, 512]), TensorShape([1, 8, 30, 30]))

# Listing 4-7.5. Pointwise Feed-Forward Network

In [48]:
def pointwise_mlp(dim, hidden_dim,activation='relu'):
    return tf.keras.Sequential([
      layers.Dense(hidden_dim, activation=activation),  # (batch_size, seq_len, hidden)
      layers.Dense(dim)  # (batch_size, seq_len, dim)
  ])

# Listing 4-7.6. The Encoder Layer Definition

In [65]:
class encoder_layer(Layer):
    def __init__(self,*, dim, num_heads, hidden_dim, dropout=0.1):
        super(encoder_layer, self).__init__()

        self.mha = multi_head_attention(dim=dim, num_heads=num_heads)
        self.mlp = pointwise_mlp(dim,hidden_dim=hidden_dim)

        self.layernorm_1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm_2 = layers.LayerNormalization(epsilon=1e-6)

        self.dropout_1 = layers.Dropout(dropout)
        self.dropout_2 = layers.Dropout(dropout)

    def call(self, x, training, mask):
        # Self Attention   
        attn_output, _ = self.mha(x, x, x, mask)  # (batch_size, input_seq_len, dim)
        attn_output = self.dropout_1(attn_output, training=training)
        out_1 = self.layernorm_1(x + attn_output)  # (batch_size, input_seq_len, dim)

        mlp_output = self.mlp(out_1)  # (batch_size, input_seq_len, dim)
        mlp_output = self.dropout_2(mlp_output, training=training)
        out_2 = self.layernorm_2(out_1 + mlp_output)  # (batch_size, input_seq_len, dim)

        return out_2
    
enc_layer = encoder_layer(dim=512, num_heads=8, hidden_dim=2048)

out_e = enc_layer(tf.random.uniform((32, 30, 512)), False, None)

print(f"Encoder Shape:{out_e.shape}")  # (batch_size, input_seq_len, dim)

Encoder Shape:(32, 30, 512)


# Listing 4-7.7. The Decoder Layer Illustration

In [75]:
class decoder_layer(Layer):
    def __init__(self,*, dim, num_heads, hidden_dim, dropout=0.1):
        super(decoder_layer, self).__init__()

        self.mha_1 = multi_head_attention(dim=dim, num_heads=num_heads) # For self attention
        self.mha_2 = multi_head_attention(dim=dim, num_heads=num_heads) # For Cross attention

        self.mlp = pointwise_mlp(dim,hidden_dim=hidden_dim)

        self.layernorm_1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm_2 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm_3 = layers.LayerNormalization(epsilon=1e-6)

        self.dropout_1 = layers.Dropout(dropout)
        self.dropout_2 = layers.Dropout(dropout)
        self.dropout_3 = layers.Dropout(dropout)

    def call(self, x, encoder_out, training, look_ahead_mask, padding_mask):
    
        # Self attention
        attn_1, attn_wts_block_1 = self.mha_1(x, x, x, mask=look_ahead_mask)  # (batch_size, target_seq_len, dim)
        attn_1 = self.dropout_1(attn_1, training=training)
        out_1 = self.layernorm_1(attn_1 + x)
        # Cross attention 
        attn_2, attn_wts_block_2 = self.mha_2(encoder_out,encoder_out,out_1, mask=padding_mask)  # (batch_size, target_seq_len, dim)
        attn_2 = self.dropout_2(attn_2, training=training)
        out_2 = self.layernorm_2(attn_2 + out_1)  # (batch_size, target_seq_len, dim)
        # Feed forward MLP 
        mlp_output = self.mlp(out_2)  # (batch_size, target_seq_len, dim)
        mlp_output = self.dropout_3(mlp_output, training=training)
        out_3 = self.layernorm_3(mlp_output + out_2)  # (batch_size, target_seq_len, dim)

        return out_3, attn_wts_block_1, attn_wts_block_2
    
dec_layer = decoder_layer(dim=512, num_heads=8, hidden_dim=2048)
out_d, _, _ = dec_layer(tf.random.uniform((32, 30, 512)), out_e, False, None, None)
print("Decoder output shape",out_d.shape)  # (batch_size, target_seq_len, dim)

Decoder output shape (32, 30, 512)


# Listing 4-7.8. Define Encoder as Multiple Encoder Layer Stack

In [76]:
class encoder(Layer):
    def __init__(self,*, num_layers,dim, num_heads, hidden_dim, input_vocab_size, dropout=0.1,max_tokens=2048):
        super(encoder, self).__init__()

        self.dim = dim
        self.num_layers = num_layers
        self.max_tokens = max_tokens
        self.num_heads = num_heads
        self.hidden_dim = hidden_dim

        self.embedding = layers.Embedding(input_vocab_size, dim)
        self.pos_encoding = positional_encoding(self.max_tokens, self.dim)

        self.encoder_layers = [
            encoder_layer(dim=self.dim, num_heads=self.num_heads, hidden_dim=hidden_dim,dropout=dropout)
            for _ in range(self.num_layers)]

        self.dropout = layers.Dropout(dropout)
    
    def call(self, x, training, mask):

        input_seq_len = tf.shape(x)[1]

        x = self.embedding(x)  # (batch_size, input_seq_len, dim)
        x *= tf.math.sqrt(tf.cast(self.dim, tf.float32))
        x += self.pos_encoding[:, :input_seq_len, :]
        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.encoder_layers[i](x, training, mask)

        return x  # (batch_size, input_seq_len, dim)
    
enc = encoder(num_layers=6, dim=512, num_heads=8,
                         hidden_dim=2048,input_vocab_size=1000)

X = tf.random.uniform((32, 30), dtype=tf.int64, minval=0, maxval=200)

encoder_out = enc(X, training=False, mask=None)

print(f"Encoder output shape: {encoder_out.shape}")  # (batch_size, input_seq_len, dim)

Encoder output shape: (32, 30, 512)


# Listing 4-7.9. Decoder as a Stack of Decoder Layers

In [80]:
class decoder(Layer):
    def __init__(self,*, num_layers, dim, num_heads, hidden_dim, target_vocab_size, dropout=0.1,max_tokens=2048):
        super(decoder, self).__init__()

        self.dim = dim
        self.num_layers = num_layers
        self.max_tokens = max_tokens
        self.num_layers = num_layers

        self.embedding = layers.Embedding(target_vocab_size, self.dim)
        self.pos_encoding = positional_encoding(self.max_tokens, self.dim)

        self.decoder_layers = [
            decoder_layer(dim=dim, num_heads=num_heads, hidden_dim=hidden_dim,dropout=dropout)
            for _ in range(num_layers)]
        
        self.dropout = layers.Dropout(dropout)

    def call(self, x, encoder_out, training, look_ahead_mask, padding_mask):

        output_seq_len = tf.shape(x)[1]
        
        attention_wts_dict = {}

        x = self.embedding(x)  # (batch_size, target_seq_len, dim)
        x *= tf.math.sqrt(tf.cast(self.dim, tf.float32))
        x += self.pos_encoding[:, :output_seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x, block_1, block_2 = self.decoder_layers[i](x, encoder_out, training,
                                                 look_ahead_mask, padding_mask)

            attention_wts_dict[f'decoder_layer{i}_block1'] = block_1
            attention_wts_dict[f'decoder_layer{i}_block2'] = block_2

        # x.shape == (batch_size, target_seq_len, dim)
        return x, attention_wts_dict
    
dec = decoder(num_layers=6, dim=512, num_heads=8,
                         hidden_dim=2048, target_vocab_size=1200)
X_target = tf.random.uniform((32, 45), dtype=tf.int64, minval=0, maxval=200)

out_decoder, attn_wts_dict = dec(X_target,
                              encoder_out=encoder_out,
                              training=False,
                              look_ahead_mask=None,
                              padding_mask=None)

print(f"Decoder output shape: {out_decoder.shape}")
print(f"Cross Attention shape :{attn_wts_dict['decoder_layer0_block2'].shape}") 

Decoder output shape: (32, 45, 512)
Cross Attention shape :(32, 8, 45, 30)


# 4-7-10. Putting it all together to create TRANSFORMER

In [110]:
from tensorflow.keras import Model

class transformer(Model):
    def __init__(self,*, num_layers, dim, num_heads, hidden_dim, input_vocab_size,
               target_vocab_size, dropout=0.1,max_tokens_input=20,max_tokens_output=20):
        super(transformer,self).__init__()
        self.encoder = encoder(num_layers=num_layers, dim=dim,
                               num_heads=num_heads, hidden_dim=hidden_dim,
                               input_vocab_size=input_vocab_size, dropout=dropout,max_tokens=max_tokens_input)

        self.decoder = decoder(num_layers=num_layers, dim=dim,
                               num_heads=num_heads, hidden_dim=hidden_dim,
                               target_vocab_size=target_vocab_size, dropout=dropout,max_tokens=max_tokens_output)

        self.final_layer = layers.Dense(target_vocab_size)

    def call(self, inputs, training):
        
        input_, target_ = inputs

        padding_mask, look_ahead_mask = self.create_masks(input_,target_)

        encoder_output = self.encoder(input_, training, padding_mask)  # (batch_size, inp_seq_len, dim)

        decoder_output, attn_wts_dict = self.decoder(
            target, encoder_output, training, look_ahead_mask, padding_mask)

        final_output = self.final_layer(decoder_output)  # (batch_size, target_seq_len, target_vocab_size)

        return final_output, attn_wts_dict
    
    def create_masks(self, input_, target_):
        padding_mask = create_padding_mask(input_)

        look_ahead_mask = create_look_ahead_mask(tf.shape(target_)[1])
        decoder_target_padding_mask = create_padding_mask(target_)
        look_ahead_mask = tf.maximum(decoder_target_padding_mask, look_ahead_mask)

        return padding_mask, look_ahead_mask

model = transformer(
    num_layers=3, dim=512, num_heads=8, hidden_dim=2048,
    input_vocab_size=1000, target_vocab_size=1200,max_tokens_input=30,max_tokens_output=45)

input = tf.random.uniform((32, 30), dtype=tf.int64, minval=0, maxval=200)
target = tf.random.uniform((32, 45), dtype=tf.int64, minval=0, maxval=200)

model_output, _ = model([input,target], training=False)

print(f"Transformer Output :{model_output.shape}")# (batch_size, target_seq_len, target_vocab_size)
print(model.summary())

Transformer Output :(32, 45, 1200)
Model: "transformer_16"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 encoder_20 (encoder)        multiple                  9969152   
                                                                 
 decoder_19 (decoder)        multiple                  13226496  
                                                                 
 dense_1256 (Dense)          multiple                  615600    
                                                                 
Total params: 23,811,248
Trainable params: 23,811,248
Non-trainable params: 0
_________________________________________________________________
None
