In [33]:
!touch 6-multihead_attention.py
!chmod +x *.py

In [30]:
import tensorflow as tf

class RNNEncoder(tf.keras.layers.Layer):
    """ encoder for machine translation """

    def __init__(self, vocab, embedding, units, batch):
        """
        *********************************************
        *****************Constructor*****************
        *********************************************
        @vocab: is an integer representing the size
                of the input vocabulary
        @embedding: is an integer representing the
                    dimensionality of the embedding vector
        @units: is an integer representing the number
                of hidden units in the RNN cell
        @batch: is an integer representing the batch size
        """
        super(RNNEncoder, self).__init__()
        self.vocab = vocab
        
        self.embedding = tf.keras.layers.Embedding(vocab, embedding)
        self.units = units
        self.batch = batch
        
        self.gru = tf.keras.layers.GRU(
                    units,
                    kernel_initializer="glorot_uniform",
                    recurrent_initializer="glorot_uniform",
                    return_sequences=True,
                    return_state=True
                    )

    def initialize_hidden_state(self):
        """
        nitializes the hidden states for
        the RNN cell to a tensor of zeros
        """
        return tf.zeros((self.batch, self.units))

    def call(self, x, initial):
        """
        calls the encoders layers
        """
        embading = self.embedding(x)
        outputs = self.gru(embading, initial_state=initial)
        return outputs

In [31]:
#!/usr/bin/env python3

import numpy as np
import tensorflow as tf
#RNNEncoder = __import__('0-rnn_encoder').RNNEncoder

encoder = RNNEncoder(1024, 128, 256, 32)
print(encoder.batch)
print(encoder.units)
print(type(encoder.embedding))
print(type(encoder.gru))

initial = encoder.initialize_hidden_state()
print(initial)
x = tf.convert_to_tensor(np.random.choice(1024, 320).reshape((32, 10)))
outputs, hidden = encoder(x, initial)
print(outputs)
print(hidden)

32
256
<class 'tensorflow.python.keras.layers.embeddings.Embedding'>
<class 'tensorflow.python.keras.layers.recurrent.GRU'>
Tensor("zeros_14:0", shape=(32, 256), dtype=float32)
Tensor("rnn_encoder_5/gru_14/transpose_1:0", shape=(32, 10, 256), dtype=float32)
Tensor("rnn_encoder_5/gru_14/while/Exit_3:0", shape=(32, 256), dtype=float32)


In [37]:
import tensorflow as tf

class SelfAttention(tf.keras.layers.Layer):
    """ calculate the attention for machine translation """

    def __init__(self, units):
        """
        *********************************************
        *****************Constructor*****************
        *********************************************
        @units: is an integer representing the number
                of hidden units in the alignment model
        """
        super(SelfAttention, self).__init__()
        # a Dense layer with units units, to be applied to
        # the previous decoder hidden state
        self.W = tf.keras.layers.Dense(units)
        # a Dense layer with units units, to be applied to
        # the encoder hidden states
        self.U = tf.keras.layers.Dense(units)
        # a Dense layer with 1 units, to be applied to the
        # tanh of the sum of the outputs of W and U
        self.V = tf.keras.layers.Dense(1)

    def call(self, s_prev, hidden_states):
        """
        ************************************************************
        *****************calls the attention layers*****************
        ************************************************************
        
        @s_prev: is a tensor of shape (batch, units) containing the
                 previous decoder hidden state
        @hidden_states: is a tensor of shape (batch, input_seq_len,
                        units)containing the outputs of the encoder
        Returns:
                context: is a tensor of shape (batch, units) that
                         contains the context vector for the decoder
                weights: is a tensor of shape (batch, input_seq_len, 1)
                         that contains the attention weights
        """
        s_prev = tf.expand_dims(s_prev, 1)
        e = self.V(tf.nn.tanh(self.W(s_prev) + self.U(hidden_states)))
        weights = tf.nn.softmax(e, axis=1)
        context = weights * hidden_states
        context = tf.reduce_sum(context, axis=1)
        return context, weights




In [38]:
#!/usr/bin/env python3

import numpy as np
import tensorflow as tf
#SelfAttention = __import__('1-self_attention').SelfAttention

attention = SelfAttention(256)
print(attention.W)
print(attention.U)
print(attention.V)
s_prev = tf.convert_to_tensor(np.random.uniform(size=(32, 256)), preferred_dtype='float32')
hidden_states = tf.convert_to_tensor(np.random.uniform(size=(32, 10, 256)), preferred_dtype='float32')
context, weights = attention(s_prev, hidden_states)
print(context)
print(weights)

<tensorflow.python.keras.layers.core.Dense object at 0x7f5a0ffc6ba8>
<tensorflow.python.keras.layers.core.Dense object at 0x7f5a0ffc69e8>
<tensorflow.python.keras.layers.core.Dense object at 0x7f5a0f267390>
Tensor("self_attention_1/Sum:0", shape=(32, 256), dtype=float32)
Tensor("self_attention_1/transpose_1:0", shape=(32, 10, 1), dtype=float32)


In [10]:
#!/usr/bin/env python3
""" RNN Decoder """
import tensorflow as tf

SelfAttention = __import__('1-self_attention').SelfAttention


class RNNDecoder(tf.keras.layers.Layer):
    """ decode for machine translation """

    def __init__(self, vocab, embedding, units, batch):
        """
        *********************************************
        *****************Constructor*****************
        *********************************************
        @vocab: is an integer representing the size of
                the output vocabulary
        @embedding: is an integer representing the
                    dimensionality of the embedding vector
        @units: is an integer representing the number
                of hidden units in the RNN cell
        @batch: is an integer representing the batch size
        """
        super(RNNDecoder, self).__init__()
        # keras Embedding layer that converts words from
        # the vocabulary into an embedding vector
        self.embedding = tf.keras.layers.Embedding(vocab, embedding)
        # keras GRU layer with units units
        # return both the full sequence of output
        # as well as the last hidden state
        self.gru = tf.keras.layers.GRU(units, return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        # Dense layer with vocab units
        self.F = tf.keras.layers.Dense(vocab)

    def call(self, x, s_prev, hidden_states):
        """
        **********************************************************
        *****************calls the decoder layers*****************
        **********************************************************
        @x: is a tensor of shape (batch, 1) containing the previous
            word in the target sequence as an index of the target vocabulary
        @s_prev: is a tensor of shape (batch, units) containing the
                 previous decoder hidden state
        @hidden_states: is a tensor of shape (batch, input_seq_len,
                        units)containing the outputs of the encoder
        Returns:
                y: is a tensor of shape (batch, vocab) containing
                   the output word as a one hot vector in the target vocabulary
                s: is a tensor of shape (batch, units) containing
                   the new decoder hidden state
        """
        attention = SelfAttention(s_prev.shape[1])
        context, weights = attention(s_prev, hidden_states)
        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(context, 1), x], -1)
        output, s = self.gru(x)
        output = tf.reshape(output, (-1, output.shape[2]))
        y = self.F(output)
        return y, s


In [11]:
#!/usr/bin/env python3

import numpy as np
import tensorflow as tf
#RNNDecoder = __import__('2-rnn_decoder').RNNDecoder

decoder = RNNDecoder(2048, 128, 256, 32)
print(decoder.embedding)
print(decoder.gru)
print(decoder.F)
x = tf.convert_to_tensor(np.random.choice(2048, 32).reshape((32, 1)))
s_prev = tf.convert_to_tensor(np.random.uniform(size=(32, 256)).astype('float32'))
hidden_states = tf.convert_to_tensor(np.random.uniform(size=(32, 10, 256)).astype('float32'))
y, s = decoder(x, s_prev, hidden_states)
print(y)
print(s)

<tensorflow.python.keras.layers.embeddings.Embedding object at 0x7fced955c860>
<tensorflow.python.keras.layers.recurrent.GRU object at 0x7fced955cb00>
<tensorflow.python.keras.layers.core.Dense object at 0x7fced955ce80>
Tensor("rnn_decoder_6/dense_24/BiasAdd:0", shape=(32, 2048), dtype=float32)
Tensor("rnn_decoder_6/gru_6/while/Exit_3:0", shape=(32, 256), dtype=float32)


In [13]:
#!/usr/bin/env python3

import numpy as np
import tensorflow as tf
#RNNDecoder = __import__('2-rnn_decoder').RNNDecoder

np.random.seed(0)
tf.set_random_seed(0)
decoder = RNNDecoder(2048, 128, 256, 32)
print(type(decoder.embedding), decoder.embedding.input_dim, decoder.embedding.output_dim)
print(type(decoder.gru), decoder.gru.units)
print(type(decoder.F), decoder.F.units)

with open('1-test', 'w+') as f:
    x = tf.convert_to_tensor(np.random.choice(2048, 32).reshape((32, 1)))
    s_prev = tf.convert_to_tensor(np.random.uniform(size=(32, 256)).astype('float32'))
    hidden_states = tf.convert_to_tensor(np.random.uniform(size=(32, 10, 256)).astype('float32'))
    y, s = decoder(x, s_prev, hidden_states)
    Y = tf.keras.backend.eval(y)
    S = tf.keras.backend.eval(s)
    f.write(str(Y.shape) + '\n' + np.array2string(Y, precision=5) + '\n')
    f.write(str(S.shape) + '\n' + np.array2string(S, precision=5) + '\n')

<class 'tensorflow.python.keras.layers.embeddings.Embedding'> 2048 128
<class 'tensorflow.python.keras.layers.recurrent.GRU'> 256
<class 'tensorflow.python.keras.layers.core.Dense'> 2048


In [15]:
#!/usr/bin/env python3
""" Positional Encoding """
import numpy as np


def positional_encoding(max_seq_len, dm):
    """
    calculates the positional encoding for a transformer
    @max_seq_len: is an integer representing the maximum
                  sequence length
    @dm: is the model depth
    Returns:
            a numpy.ndarray of shape (max_seq_len, dm)
            containing the positional encoding vectors
    """
    pEncoding = np.ndarray((max_seq_len, dm))
    for i in range(max_seq_len):
        for j in range(dm):
            if j % 2:
                pEncoding[i][j] = np.cos(i / np.power(10000, (j - 1) / dm))
            else:
                pEncoding[i][j] = np.sin(i / np.power(10000, j / dm))
    return pEncoding


In [17]:
#!/usr/bin/env python3

import numpy as np
#positional_encoding = __import__('4-positional_encoding').positional_encoding

PE = positional_encoding(30, 512)
print(PE.shape)
print(PE)

(30, 512)
[[ 0.00000000e+00  1.00000000e+00  0.00000000e+00 ...  1.00000000e+00
   0.00000000e+00  1.00000000e+00]
 [ 8.41470985e-01  5.40302306e-01  8.21856190e-01 ...  9.99999994e-01
   1.03663293e-04  9.99999995e-01]
 [ 9.09297427e-01 -4.16146837e-01  9.36414739e-01 ...  9.99999977e-01
   2.07326584e-04  9.99999979e-01]
 ...
 [ 9.56375928e-01 -2.92138809e-01  7.91416314e-01 ...  9.99995791e-01
   2.79890525e-03  9.99996083e-01]
 [ 2.70905788e-01 -9.62605866e-01  9.53248145e-01 ...  9.99995473e-01
   2.90256812e-03  9.99995788e-01]
 [-6.63633884e-01 -7.48057530e-01  2.94705106e-01 ...  9.99995144e-01
   3.00623096e-03  9.99995481e-01]]


In [21]:
#!/usr/bin/env python3
""" Scaled Dot Product Attention """
import tensorflow as tf


def sdp_attention(Q, K, V, mask=None):
    """
    Calculate scaled dot product attention
    @Q: is a tensor with its last two dimensions
        as (..., seq_len_q, dk) containing the query matrix
    @K: is a tensor with its last two dimensions
        as (..., seq_len_v, dk) containing the key matrix
    @V: is a tensor with its last two dimensions
        as (..., seq_len_v, dv) containing the value matrix
    @mask: is a tensor that can be broadcast into
           (..., seq_len_q, seq_len_v) containing the optional
           mask, or defaulted to None
    *** If mask is not None, multiply -1e9 to the mask and add
        it to the scaled matrix multiplication
    *** The preceding dimensions of Q, K, and V are the same
    Returns:
            output: a tensor with its last two dimensions as
                    (..., seq_len_q, dv) containing the scaled dot
                    product attention
            weights: a tensor with its last two dimensions as
                     (..., seq_len_q, seq_len_v) containing the
                     attention weights
    """
    sqrt = tf.math.sqrt(tf.cast(tf.shape(K)[-1], float))
    scaled = tf.matmul(Q, K, transpose_b=True) / sqrt
    if mask is not None:
        scaled = mask * -1e9 + scaled
    scaled = tf.nn.softmax(scaled, axis=-1)
    return tf.matmul(scaled, V), scaled


In [22]:
#!/usr/bin/env python3

import numpy as np
import tensorflow as tf
#sdp_attention = __import__('5-sdp_attention').sdp_attention

np.random.seed(0)
Q = tf.convert_to_tensor(np.random.uniform(size=(50, 10, 256)).astype('float32'))
K = tf.convert_to_tensor(np.random.uniform(size=(50, 15, 256)).astype('float32'))
V = tf.convert_to_tensor(np.random.uniform(size=(50, 15, 512)).astype('float32'))
output, weights = sdp_attention(Q, K, V)
print(output)
print(weights)

Tensor("MatMul_1:0", shape=(50, 10, 512), dtype=float32)
Tensor("Softmax:0", shape=(50, 10, 15), dtype=float32)


In [31]:
#!/usr/bin/env python3
""" Multi Head Attention """
import tensorflow as tf

#sdp_attention = __import__('5-sdp_attention').sdp_attention


class MultiHeadAttention(tf.keras.layers.Layer):
    """ perform multi head attention """

    def __init__(self, dm, h):
        """
        *********************************************
        *****************Constructor*****************
        *********************************************
        @dm: is an integer representing the dimensionality
             of the model (divisible by h)
        @h: is an integer representing the number of heads
        """
        super(MultiHeadAttention, self).__init__()
        # the number of heads
        self.h = h
        # the dimensionality of the model
        self.dm = dm
        # the depth of each attention head
        self.depth = dm // h
        # a Dense layer with dm units, used to generate the query matrix
        self.Wq = tf.keras.layers.Dense(dm)
        # a Dense layer with dm units, used to generate the key matrix
        self.Wk = tf.keras.layers.Dense(dm)
        # a Dense layer with dm units, used to generate the value matrix
        self.Wv = tf.keras.layers.Dense(dm)
        # a Dense layer with dm units, used to generate the attention output
        self.linear = tf.keras.layers.Dense(dm)

    def call(self, Q, K, V, mask):
        """
        ************************************************
        ****************Keras layer call****************
        ************************************************
        @Q: is a tensor of shape (batch, seq_len_q, dk)
            containing the input to generate the query matrix
        @K: is a tensor of shape (batch, seq_len_v, dk)
            containing the input to generate the key matrix
        @V: is a tensor of shape (batch, seq_len_v, dv)
            containing the input to generate the value matrix
        @mask: is always None
        Returns:
                output: a tensor with its last two dimensions as
                        (..., seq_len_q, dm) containing the scaled
                        dot product attention
                weights: a tensor with its last three dimensions as
                        (..., h, seq_len_q, seq_len_v) containing
                        the attention weights
        """
        batches = tf.shape(Q)[0]
        Q = self.Wq(Q)
        K = self.Wk(K)
        V = self.Wv(V)

        def split_heads(x, batch_size):
            x = tf.reshape(x, (batch_size, -1, self.h, self.depth))
            return tf.transpose(x, perm=[0, 2, 1, 3])
        Q = split_heads(Q, batches)
        K = split_heads(K, batches)
        V = split_heads(V, batches)
        output, weights = sdp_attention(Q, K, V, mask)
        output = tf.transpose(output, perm=[0, 2, 1, 3])
        output = tf.reshape(output, [batches, -1, self.dm])
        return self.linear(output), weights


In [32]:
#!/usr/bin/env python3

import numpy as np
import tensorflow as tf
#MultiHeadAttention = __import__('6-multihead_attention').MultiHeadAttention

mha = MultiHeadAttention(512, 8)
print(mha.dm)
print(mha.h)
print(mha.depth)
print(mha.Wq)
print(mha.Wk)
print(mha.Wv)
print(mha.linear)
Q = tf.convert_to_tensor(np.random.uniform(size=(50, 15, 256)).astype('float32'))
K = tf.convert_to_tensor(np.random.uniform(size=(50, 15, 256)).astype('float32'))
V = tf.convert_to_tensor(np.random.uniform(size=(50, 15, 256)).astype('float32'))
output, weights = mha(Q, K, V, None)
print(output)
print(weights)

512
8
64
<tensorflow.python.keras.layers.core.Dense object at 0x7fcec13522b0>
<tensorflow.python.keras.layers.core.Dense object at 0x7fcec00cd240>
<tensorflow.python.keras.layers.core.Dense object at 0x7fcec00cd7f0>
<tensorflow.python.keras.layers.core.Dense object at 0x7fcec00cdbe0>
Tensor("multi_head_attention_1/dense_47/BiasAdd:0", shape=(50, 15, 512), dtype=float32)
Tensor("multi_head_attention_1/Softmax:0", shape=(50, 8, 15, 15), dtype=float32)
