<a href="https://colab.research.google.com/github/VK-VCS/NLP/blob/main/Encoder_Decoder_with_Attention_Mechanism.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Embedding, GRU, Dense
import numpy as np

# Define hyperparameters
BATCH_SIZE = 64
EMBEDDING_DIM = 256
UNITS = 512
VOCAB_SIZE = 10000  # Define based on your dataset
MAX_LEN = 50        # Maximum sequence length

# Encoder class
class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, enc_units):
        super(Encoder, self).__init__()
        self.enc_units = enc_units
        self.embedding = Embedding(vocab_size, embedding_dim)
        self.gru = GRU(enc_units, return_sequences=True, return_state=True, recurrent_initializer='glorot_uniform')

    def call(self, x):
        x = self.embedding(x)
        output, state = self.gru(x)
        return output, state  # Return the sequence and final hidden state

# Attention mechanism
class Attention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(Attention, self).__init__()
        self.W1 = Dense(units)
        self.W2 = Dense(units)
        self.V = Dense(1)

    def call(self, query, values):
        query_with_time_axis = tf.expand_dims(query, 1)  # Add time axis
        score = self.V(tf.nn.tanh(self.W1(values) + self.W2(query_with_time_axis)))
        attention_weights = tf.nn.softmax(score, axis=1)
        context_vector = attention_weights * values
        context_vector = tf.reduce_sum(context_vector, axis=1)
        return context_vector, attention_weights

# Decoder class
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units):
        super(Decoder, self).__init__()
        self.dec_units = dec_units
        self.embedding = Embedding(vocab_size, embedding_dim)
        self.gru = GRU(dec_units, return_sequences=True, return_state=True, recurrent_initializer='glorot_uniform')
        self.fc = Dense(vocab_size)
        self.attention = Attention(dec_units)

    def call(self, x, hidden, enc_output):
        context_vector, attention_weights = self.attention(hidden, enc_output)
        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
        output, state = self.gru(x)
        output = tf.reshape(output, (-1, output.shape[2]))
        x = self.fc(output)
        return x, state, attention_weights

# Training step
optimizer = tf.keras.optimizers.Adam()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.not_equal(real, 0)
    loss = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss.dtype)
    loss *= mask
    return tf.reduce_mean(loss)

@tf.function
def train_step(inp, targ, enc_hidden, encoder, decoder, targ_lang_tokenizer):
    loss = 0
    with tf.GradientTape() as tape:
        enc_output, enc_hidden = encoder(inp)
        dec_hidden = enc_hidden
        dec_input = tf.expand_dims([targ_lang_tokenizer.word_index['<start>']] * BATCH_SIZE, 1)

        for t in range(1, targ.shape[1]):
            predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)
            loss += loss_function(targ[:, t], predictions)
            dec_input = tf.expand_dims(targ[:, t], 1)

    batch_loss = loss / int(targ.shape[1])
    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    return batch_loss

# Initialize the model
encoder = Encoder(VOCAB_SIZE, EMBEDDING_DIM, UNITS)
decoder = Decoder(VOCAB_SIZE, EMBEDDING_DIM, UNITS)

# Example input data
example_input_batch = tf.random.uniform((BATCH_SIZE, MAX_LEN), minval=1, maxval=VOCAB_SIZE, dtype=tf.int32)
example_target_batch = tf.random.uniform((BATCH_SIZE, MAX_LEN), minval=1, maxval=VOCAB_SIZE, dtype=tf.int32)

# Tokenizer (mock example for simplicity; replace with actual tokenizer)
class MockTokenizer:
    def __init__(self, vocab_size):
        self.word_index = {f"word{i}": i for i in range(1, vocab_size)}
        self.word_index["<start>"] = 1
        self.word_index["<end>"] = 2

targ_lang_tokenizer = MockTokenizer(VOCAB_SIZE)

# Train one step
sample_hidden = tf.zeros((BATCH_SIZE, UNITS))
batch_loss = train_step(example_input_batch, example_target_batch, sample_hidden, encoder, decoder, targ_lang_tokenizer)
print(f"Batch loss: {batch_loss.numpy()}")


Batch loss: 9.02610969543457


In [None]:
example_input_batch

<tf.Tensor: shape=(64, 50), dtype=int32, numpy=
array([[7016, 3918, 5876, ..., 3926, 8196, 5857],
       [6662, 5179, 1326, ..., 4975, 8471, 3869],
       [8777, 5281, 6305, ..., 3312, 4697, 9123],
       ...,
       [4687, 9361, 4526, ..., 7059, 7347, 9896],
       [1665, 2001, 9261, ...,  139, 8026, 9279],
       [7482, 5396, 7223, ...,  275, 1239, 2462]], dtype=int32)>

In [None]:
## # Maximum sequence length = 50
## mock sentences generated = 64

In [None]:
example_target_batch

<tf.Tensor: shape=(64, 50), dtype=int32, numpy=
array([[2766, 2663, 3783, ..., 1871, 2723, 6020],
       [3782, 5800, 9923, ..., 8229, 2056, 3285],
       [4578, 9453, 5416, ..., 9048,  210, 6338],
       ...,
       [5693, 1072, 2105, ...,  658, 8544, 7083],
       [5870, 7969, 8796, ..., 7422,  495, 2927],
       [6037, 4862, 3418, ..., 1906, 5308, 7958]], dtype=int32)>

In [None]:
def evaluate(sentence, encoder, decoder, inp_lang_tokenizer, targ_lang_tokenizer, max_length_input, max_length_target):
    """
    Translate a given input sentence using the trained encoder-decoder model.

    Args:
    - sentence: The input sentence to translate.
    - encoder: Trained encoder model.
    - decoder: Trained decoder model.
    - inp_lang_tokenizer: Tokenizer for the input language.
    - targ_lang_tokenizer: Tokenizer for the target language.
    - max_length_input: Maximum length of the input sequence.
    - max_length_target: Maximum length of the target sequence.

    Returns:
    - Translation as a string.
    - Attention weights for visualization.
    """
    # Preprocess the input sentence
    inputs = [inp_lang_tokenizer.word_index.get(word, 0) for word in sentence.split()]
    inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs], maxlen=max_length_input, padding='post')
    inputs = tf.convert_to_tensor(inputs)

    # Initialize the encoder
    enc_out, enc_hidden = encoder(inputs)
    dec_hidden = enc_hidden

    # Start decoding with the '<start>' token
    dec_input = tf.expand_dims([targ_lang_tokenizer.word_index['<start>']], 0)

    result = ""
    attention_weights = []

    for t in range(max_length_target):
        predictions, dec_hidden, attention_weight = decoder(dec_input, dec_hidden, enc_out)

        # Save attention weights for visualization
        attention_weights.append(attention_weight)

        # Get the token with the highest probability
        predicted_id = tf.argmax(predictions[0]).numpy()

        # Stop if the '<end>' token is predicted
        if targ_lang_tokenizer.index_word[predicted_id] == '<end>':
            break

        # Append the predicted word to the result
        result += targ_lang_tokenizer.index_word[predicted_id] + " "

        # Use the predicted word as the next decoder input
        dec_input = tf.expand_dims([predicted_id], 0)

    return result.strip(), attention_weights

# Translation example
def translate(sentence, encoder, decoder, inp_lang_tokenizer, targ_lang_tokenizer, max_length_input, max_length_target):
    """
    Wrapper function for evaluation and displaying translation.
    """
    result, attention_weights = evaluate(sentence, encoder, decoder, inp_lang_tokenizer, targ_lang_tokenizer, max_length_input, max_length_target)
    print(f"Input: {sentence}")
    print(f"Predicted translation: {result}")

# Example tokenizers
class MockTokenizer:
    def __init__(self, vocab_size):
        self.word_index = {f"word{i}": i for i in range(1, vocab_size)}
        self.word_index["<start>"] = 1
        self.word_index["<end>"] = 2
        self.index_word = {i: f"word{i}" for i in range(1, vocab_size)}
        self.index_word[1] = "<start>"
        self.index_word[2] = "<end>"

# Example input tokenizer and target tokenizer
inp_lang_tokenizer = MockTokenizer(VOCAB_SIZE)
targ_lang_tokenizer = MockTokenizer(VOCAB_SIZE)

# Parameters
MAX_LEN_INPUT = 50
MAX_LEN_TARGET = 50

# Translate a sample sentence
sample_sentence = "word10 word20 word30"
translate(sample_sentence, encoder, decoder, inp_lang_tokenizer, targ_lang_tokenizer, MAX_LEN_INPUT, MAX_LEN_TARGET)


Input: word10 word20 word30
Predicted translation: word2315 word5689 word8276 word6753 word2867 word7121 word6788 word2867 word7121 word6788 word2867 word7121 word6788 word2867 word7121 word6788 word2867 word7121 word6788 word2867 word7121 word6788 word2867 word7121 word6788 word2867 word7121 word6788 word2867 word7121 word6788 word2867 word7121 word6788 word2867 word7121 word6788 word2867 word7121 word6788 word2867 word7121 word6788 word2867 word7121 word6788 word2867 word7121 word6788 word2867
