<a href="https://colab.research.google.com/github/2003Yash/complete-decoder/blob/main/Complete_Decoder_Code_for_Transformer_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
import math

def scaled_dot_product(q, k, v, mask=None): #used by multi head attention for normalizing q,k vector variance using softmax
    d_k = tf.shape(q)[-1]
    scaled = tf.matmul(q, tf.transpose(k, perm=[0, 1, 3, 2])) / math.sqrt(d_k)
    print(f"scaled.shape : {scaled.shape}")
    if mask is not None: #add mask on top of attention vector
        print(f"-- ADDING MASK of shape {mask.shape} --")
        scaled += mask #here mask is of different shape
        # we use broadcasting i.e since scaled is 30 x 8 x 200 x 200 and mask is 200 x 200 so we put that 200 x 200 on over all those 30 x 8 vectors and derive an output
    attention = tf.nn.softmax(scaled, axis=-1) # 30 x 8 x 200 x 200
    values = tf.matmul(attention, v) # 30 x 8 x 200 x 64
    return values, attention

class PositionwiseFeedForward(tf.keras.layers.Layer):
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.d_model = d_model
        self.hidden = hidden
        self.drop_prob = drop_prob
        self.linear1 = tf.keras.layers.Dense(hidden)
        self.linear2 = tf.keras.layers.Dense(d_model)
        self.relu = tf.keras.layers.ReLU()
        self.dropout = tf.keras.layers.Dropout(rate=drop_prob)

    def call(self, x, training=False):
        #  x: [batch_size, seq_len, d_model]
        x = self.linear1(x)  # [batch_size, seq_len, hidden]
        print(f"x after first linear layer: {x.shape}")
        x = self.relu(x)     # [batch_size, seq_len, hidden]
        print(f"x after relu layer: {x.shape}")
        x = self.dropout(x, training=training)  # [batch_size, seq_len, hidden]
        print(f"x after dropout layer: {x.shape}")
        x = self.linear2(x)  # [batch_size, seq_len, d_model]
        print(f"x after 2nd linear layer: {x.shape}")
        return x

class LayerNormalization(tf.keras.layers.Layer):
  # same as encoder layer normalization for the end of attenion vecotr  to norma and add
    def __init__(self, parameters_shape, eps=1e-5):
        super(LayerNormalization, self).__init__()
        self.parameters_shape = parameters_shape
        self.eps = eps
        self.gamma = self.add_weight(shape=parameters_shape,
                                     initializer='ones',
                                     trainable=True,
                                     name='gamma')
        self.beta = self.add_weight(shape=parameters_shape,
                                    initializer='zeros',
                                    trainable=True,
                                    name='beta')

    def call(self, inputs, training=False):
        # inputs : [batch_size, seq_len, d_model]
        mean = tf.reduce_mean(inputs, axis=-1, keepdims=True)  # [batch_size, seq_len, 1]
        var = tf.reduce_mean((inputs - mean) ** 2, axis=-1, keepdims=True)  # [batch_size, seq_len, 1]
        std = tf.sqrt(var + self.eps)  # [batch_size, seq_len, 1]
        y = (inputs - mean) / std  # [batch_size, seq_len, d_model]
        out = self.gamma * y + self.beta  # [batch_size, seq_len, d_model]
        return out

class MultiHeadAttention(tf.keras.layers.Layer):
  #same concept from encoder
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_layer = tf.keras.layers.Dense(3 * d_model)
        self.linear_layer = tf.keras.layers.Dense(d_model)

    def call(self, x, mask=None, training=False):
        batch_size, sequence_length, d_model = tf.shape(x)
        print(f"x.shape: {x.shape}")
        qkv = self.qkv_layer(x)
        print(f"qkv.shape: {qkv.shape}")
        qkv = tf.reshape(qkv, (batch_size, sequence_length, self.num_heads, 3 * self.head_dim))
        print(f"qkv after reshape .shape: {qkv.shape}")
        qkv = tf.transpose(qkv, perm=[0, 2, 1, 3])
        print(f"qkv after permutation: {qkv.shape}")
        q, k, v = tf.split(qkv, num_or_size_splits=3, axis=-1)
        print(f"q: {q.shape}, k:{k.shape}, v:{v.shape}")
        values, attention = scaled_dot_product(q, k, v, mask) # 30 x 8 x 200 x 64
        print(f"values: {values.shape}, attention:{attention.shape}")
        values = tf.reshape(values, (batch_size, sequence_length, self.num_heads * self.head_dim)) # 30 x 200 x 512
        print(f"values after reshaping: {values.shape}")
        out = self.linear_layer(values)
        print(f"out after passing through linear layer: {out.shape}")
        return out # 30 x 200 x 512 same as the input dimensions

class MultiHeadCrossAttention(tf.keras.layers.Layer):
 # very similar to working of multihead attention and self attention but only difference is here we are taking the q of english and k,v from telugu and translate the word
    def __init__(self, d_model, num_heads):
        super(MultiHeadCrossAttention, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.kv_layer = tf.keras.layers.Dense(2 * d_model)  # 1024
        self.q_layer = tf.keras.layers.Dense(d_model)
        self.linear_layer = tf.keras.layers.Dense(d_model)

    def call(self, x, y, mask=None, training=False):
        batch_size, sequence_length, d_model = tf.shape(x)
        print(f"x.shape: {x.shape}")
        kv = self.kv_layer(x)  # [batch_size, sequence_length, 2 * d_model]
        print(f"kv.shape: {kv.shape}")
        q = self.q_layer(y)  # [batch_size, sequence_length, d_model]
        print(f"q.shape: {q.shape}")

        kv = tf.reshape(kv, [batch_size, sequence_length, self.num_heads, 2 * self.head_dim])  # [batch_size, sequence_length, num_heads, 2 * head_dim]
        q = tf.reshape(q, [batch_size, sequence_length, self.num_heads, self.head_dim])  # [batch_size, sequence_length, num_heads, head_dim]

        kv = tf.transpose(kv, perm=[0, 2, 1, 3])  # [batch_size, num_heads, sequence_length, 2 * head_dim]
        q = tf.transpose(q, perm=[0, 2, 1, 3])  # [batch_size, num_heads, sequence_length, head_dim]

        k, v = tf.split(kv, num_or_size_splits=2, axis=-1)  # K: [batch_size, num_heads, sequence_length, head_dim], V: [batch_size, num_heads, sequence_length, head_dim]

        values, attention = self.scaled_dot_product(q, k, v, mask)  # [batch_size, num_heads, sequence_length, head_dim]
        print(f"values: {values.shape}, attention: {attention.shape}")

        values = tf.reshape(values, [batch_size, sequence_length, d_model])  # [batch_size, sequence_length, d_model]
        out = self.linear_layer(values)  # [batch_size, sequence_length, d_model]
        print(f"out after passing through linear layer: {out.shape}")
        return out  # [batch_size, sequence_length, d_model]

    def scaled_dot_product(self, q, k, v, mask=None):
        d_k = tf.shape(q)[-1]
        scaled = tf.matmul(q, tf.transpose(k, perm=[0, 1, 3, 2])) / tf.sqrt(tf.cast(d_k, tf.float32))
        if mask is not None:
            scaled += mask
        attention = tf.nn.softmax(scaled, axis=-1)
        values = tf.matmul(attention, v)
        return values, attention

class DecoderLayer(tf.keras.layers.Layer):
  #creates one complete decoder layer
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(rate=drop_prob)
        self.encoder_decoder_attention = MultiHeadCrossAttention(d_model=d_model, num_heads=num_heads)
        self.norm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout2 = tf.keras.layers.Dropout(rate=drop_prob)
        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout3 = tf.keras.layers.Dropout(rate=drop_prob)

    def call(self, x, y, decoder_mask, training=False):
        _y = y # [batch_size, seq_len, d_model]
        print("MASKED SELF ATTENTION")
        y = self.self_attention(y, mask=decoder_mask) # [batch_size, seq_len, d_model]
        print("DROP OUT 1")
        y = self.dropout1(y, training=training) # [batch_size, seq_len, d_model]
        print("ADD + LAYER NORMALIZATION 1")
        y = self.norm1(y + _y) # [batch_size, seq_len, d_model]

        _y = y # [batch_size, seq_len, d_model]
        print("CROSS ATTENTION")
        y = self.encoder_decoder_attention(x, y, mask=None) # [batch_size, seq_len, d_model]
        print("DROP OUT 2")
        y = self.dropout2(y, training=training) # [batch_size, seq_len, d_model]
        print("ADD + LAYER NORMALIZATION 2")
        y = self.norm2(y + _y) # [batch_size, seq_len, d_model]

        _y = y # [batch_size, seq_len, d_model]
        print("FEED FORWARD 1")
        y = self.ffn(y) # [batch_size, seq_len, d_model]
        print("DROP OUT 3")
        y = self.dropout3(y, training=training) # [batch_size, seq_len, d_model]
        print("ADD + LAYER NORMALIZATION 3")
        y = self.norm3(y + _y) # [batch_size, seq_len, d_model]
        return y # [batch_size, seq_len, d_model]

class SequentialDecoder(tf.keras.layers.Layer):
  #create a single decoder block using architecture params
    def __init__(self, *layers):
        super(SequentialDecoder, self).__init__()
        self.layers = layers

    def call(self, x, y, mask, training=False):
        for layer in self.layers:
            y = layer(x, y, mask, training=training) # x value is value from encoder
        return y

class Decoder(tf.keras.layers.Layer): #calling sequential decoder with architecture params
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers=1):
        super(Decoder, self).__init__()
        self.layers = SequentialDecoder(*[
            DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob)
            for _ in range(num_layers) #creating 5 decoders using same params
        ])

    def call(self, x, y, mask, training=False):
      # x: 30 x 200 x 512
      # y: 30 x 200 x 512
      # mask: 200 x 200
        y = self.layers(x, y, mask, training=training)
        return y


MASKED SELF ATTENTION
MASKED SELF ATTENTION
MASKED SELF ATTENTION
MASKED SELF ATTENTION
MASKED SELF ATTENTION
MASKED SELF ATTENTION
MASKED SELF ATTENTION
MASKED SELF ATTENTION
x.shape: (30, 200, 512)
qkv.shape: (30, 200, 1536)
qkv after reshape .shape: (30, 200, 8, 192)
qkv after permutation: (30, 8, 200, 192)
q: (30, 8, 200, 64), k:(30, 8, 200, 64), v:(30, 8, 200, 64)
scaled.shape : (30, 8, 200, 200)
-- ADDING MASK of shape (200, 200) --
values: (30, 8, 200, 64), attention:(30, 8, 200, 200)
values after reshaping: (30, 200, 512)
out after passing through linear layer: (30, 200, 512)
DROP OUT 1
ADD + LAYER NORMALIZATION 1
CROSS ATTENTION
x.shape: (30, 200, 512)
kv.shape: (30, 200, 1024)
q.shape: (30, 200, 512)
values: (30, 8, 200, 64), attention: (30, 8, 200, 200)
out after passing through linear layer: (30, 200, 512)
DROP OUT 2
ADD + LAYER NORMALIZATION 2
FEED FORWARD 1
x after first linear layer: (30, 200, 2048)
x after relu layer: (30, 200, 2048)
x after dropout layer: (30, 200, 204

In [None]:
d_model = 512
num_heads = 8
drop_prob = 0.1 #we use dropout to enable model to learn patterns in multiple paths
batch_size = 30 #no.of sentences to we take for batch optimization
max_sequence_length = 200
ffn_hidden = 2048 #no.of neural network nodes -> 2048 is just an hyper param in the research paper "attenion is all you need"
num_layers = 5 #no.of decoders we use to pass the data through

In [None]:

# Inputs
x = tf.random.normal((batch_size, max_sequence_length, d_model))  # random English sentence positional encoded
y = tf.random.normal((batch_size, max_sequence_length, d_model))  # random telugu sentence positional encoded

# Mask
mask = tf.fill([max_sequence_length, max_sequence_length], float('-inf'))
mask = tf.linalg.band_part(mask, 0, -1) #creating mask using tensorflow

# Decoder
decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)
out = decoder(x, y, mask, training=True) # creating decoder with it's architecture param values

In [None]:
mask #just looking at mask

<tf.Tensor: shape=(200, 200), dtype=float32, numpy=
array([[-inf, -inf, -inf, ..., -inf, -inf, -inf],
       [  0., -inf, -inf, ..., -inf, -inf, -inf],
       [  0.,   0., -inf, ..., -inf, -inf, -inf],
       ...,
       [  0.,   0.,   0., ..., -inf, -inf, -inf],
       [  0.,   0.,   0., ...,   0., -inf, -inf],
       [  0.,   0.,   0., ...,   0.,   0., -inf]], dtype=float32)>