In [1]:
# Import libraries

import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Embedding, LSTM, Attention, Input, Flatten, Lambda, dot, Activation, concatenate
from tensorflow.keras.datasets import imdb
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.preprocessing import sequence
from tensorflow.keras.constraints import NonNeg
from matplotlib import pyplot as plt
from matplotlib import cm
from matplotlib.colors import rgb2hex
from sklearn.preprocessing import minmax_scale
import numpy as np
from PIL import Image
from skimage import io
from IPython.core.display import HTML
import os
import time
from attention import Attention

ModuleNotFoundError: No module named 'attention'

In [None]:
# Path to Harry Potter text

path_to_file = tf.keras.utils.get_file('harrypotter.txt', "https://raw.githubusercontent.com/amephraim/nlp/master/texts/J.%20K.%20Rowling%20-%20Harry%20Potter%201%20-%20Sorcerer's%20Stone.txt")

In [None]:
# Read, then decode for py2 compat.
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')
# length of text is the number of characters in it
print(f'Length of text: {len(text)} characters')

In [None]:
# Take a look at the first 250 characters in text
print(text[:250])

In [None]:
# The unique characters in the file
vocab = sorted(set(text))
print(f'{len(vocab)} unique characters')

In [None]:
# Splitting text into characters

chars = tf.strings.unicode_split(text, input_encoding='UTF-8')
chars

In [None]:
# Map characters to IDs

ids_from_chars = tf.keras.layers.StringLookup(
    vocabulary=list(vocab), mask_token=None)

In [None]:
ids = ids_from_chars(chars)
ids

In [None]:
# Map IDs back to characters

chars_from_ids = tf.keras.layers.StringLookup(
    vocabulary=ids_from_chars.get_vocabulary(), invert=True, mask_token=None)

In [None]:
chars = chars_from_ids(ids)
chars

In [None]:
output = tf.strings.reduce_join(chars, axis=-1).numpy()
print(output[:200])

In [None]:
# Function to convert IDs back to text

def text_from_ids(ids):
  return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

In [None]:
# Convert characters to IDs

all_ids = ids_from_chars(tf.strings.unicode_split(text, 'UTF-8'))
all_ids

In [None]:
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)

In [None]:
# Decode IDs back to characters

for ids in ids_dataset.take(10):
    print(chars_from_ids(ids).numpy().decode('utf-8'))

In [None]:
seq_length = 100

In [None]:
# Batch sequences into fixed length sequences

sequences = ids_dataset.batch(seq_length+1, drop_remainder=True)

for seq in sequences.take(1):
  print(chars_from_ids(seq))

In [None]:
# Print text

for seq in sequences.take(5):
  print(text_from_ids(seq).numpy())

In [None]:
# Function for splitting into input and target

def split_input_target(sequence):
    input_text = sequence[:-1]
    target_text = sequence[1:]
    return input_text, target_text

In [None]:
# Perform splitting operation on X_train and X_test
X_train_input, X_train_target = split_input_target(input_text)
X_test_input, X_test_target = split_input_target(target_text)

In [None]:
dataset = sequences.map(split_input_target)


In [None]:
# Print the input and target texts for the first sequence in the dataset.

for input_example, target_example in dataset.take(1):
    print("Input :", text_from_ids(input_example).numpy())
    print("Target:", text_from_ids(target_example).numpy())

In [None]:
#Training

In [None]:
# Batch size
BATCH_SIZE = 64

# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000

dataset = (
    dataset
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))

dataset

In [None]:
#Modeling

In [None]:
# Length of the vocabulary in StringLookup Layer
vocab_size = len(ids_from_chars.get_vocabulary()) #It is 80

# The embedding dimension
embedding_dim = 290

# Number of RNN units
#RNN units is not random and should be chosen
#based on careful consideration of factors such as data complexity,
#model capacity, risk of overfitting,
rnn_units = 384

**RNN with rnn_layer = tf.keras.layers.GRU**

In [None]:
# Define a GRU model for text generation.

class MyModel(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, rnn_units):
    super().__init__(self)
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(rnn_units,
                                   return_sequences=True,
                                   return_state=True)
    self.dense = tf.keras.layers.Dense(vocab_size)

  def call(self, inputs, states=None, return_state=False, training=False):
    x = inputs
    x = self.embedding(x, training=training)
    if states is None:
      states = self.gru.get_initial_state(x)
    x, states = self.gru(x, initial_state=states, training=training)
    x = self.dense(x, training=training)

    if return_state:
      return x, states
    else:
      return x

model = MyModel(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

In [None]:
# Get batches of input and target

for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

In [None]:
model.summary()


In [None]:
# Sample characters from the predicted logits and convert to numpy array

sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()

In [None]:
sampled_indices

In [None]:
print("Input:\n", text_from_ids(input_example_batch[0]).numpy())
print()
print("Next Char Predictions:\n", text_from_ids(sampled_indices).numpy())

In [None]:
# Get loss

loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)

In [None]:
example_batch_mean_loss = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("Mean loss:        ", example_batch_mean_loss)

In [None]:
tf.exp(example_batch_mean_loss).numpy()

In [None]:
# Compile the model

model.compile(optimizer='adam', loss=loss)


In [None]:
# Directory where the checkpoints will be saved
checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

In [None]:
EPOCHS = 20 # We need to increase the epochs as the loss is decreaseing very slowly
#but we will not run the 40 epochs sice we have gpu limitations

In [None]:
# Fit the model

history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])


Wrapper around RNN

In [None]:
class OneStep(tf.keras.Model):
  def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
    super().__init__()
    self.temperature = temperature
    self.model = model
    self.chars_from_ids = chars_from_ids
    self.ids_from_chars = ids_from_chars

    # Create a mask to prevent "[UNK]" from being generated.
    skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
    sparse_mask = tf.SparseTensor(
        # Put a -inf at each bad index.
        values=[-float('inf')]*len(skip_ids),
        indices=skip_ids,
        # Match the shape to the vocabulary
        dense_shape=[len(ids_from_chars.get_vocabulary())])
    self.prediction_mask = tf.sparse.to_dense(sparse_mask)

  @tf.function
  def generate_one_step(self, inputs, states=None):
    # Convert strings to token IDs.
    input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
    input_ids = self.ids_from_chars(input_chars).to_tensor()

    # Run the model.
    # predicted_logits.shape is [batch, char, next_char_logits]
    predicted_logits, states = self.model(inputs=input_ids, states=states,
                                          return_state=True)
    # Only use the last prediction.
    predicted_logits = predicted_logits[:, -1, :]
    predicted_logits = predicted_logits/self.temperature
    # Apply the prediction mask: prevent "[UNK]" from being generated.
    predicted_logits = predicted_logits + self.prediction_mask

    # Sample the output logits to generate token IDs.
    predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
    predicted_ids = tf.squeeze(predicted_ids, axis=-1)

    # Convert from token ids to characters
    predicted_chars = self.chars_from_ids(predicted_ids)

    # Return the characters and model state.
    return predicted_chars, states

In [None]:
# Instance of OneStep model

one_step_model = OneStep(model, chars_from_ids, ids_from_chars)

In [None]:
# Generate text using model

start = time.time()
states = None
next_char = tf.constant(['Ron'])
result = [next_char]

for n in range(1000):
  next_char, states = one_step_model.generate_one_step(next_char, states=states)
  result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*80)
print('\nRun time:', end - start)

**RNN with LSTR**

In [None]:

# Define the MyModel class for LSTM
class MyLSTMModel(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super().__init__(self)
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.lstm = tf.keras.layers.LSTM(rnn_units,
                                         return_sequences=True,
                                         return_state=True)
        self.dense = tf.keras.layers.Dense(vocab_size)

    def call(self, inputs, states=None, return_state=False, training=False):
        x = inputs
        x = self.embedding(x, training=training)
        if states is None:
            states = self.lstm.get_initial_state(x)
        x, states_h, states_c = self.lstm(x, initial_state=states, training=training)
        x = self.dense(x, training=training)

        if return_state:
            return x, [states_h, states_c]
        else:
            return x

# Create an instance of the MyLSTMModel class
lstm_model = MyLSTMModel(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

# Compile the LSTM model
lstm_model.compile(optimizer='adam', loss=loss)

# Define checkpoint callbacks for LSTM model
checkpoint_dir_lstm = './training_checkpoints_lstm'
checkpoint_prefix_lstm = os.path.join(checkpoint_dir_lstm, "ckpt_{epoch}")
checkpoint_callback_lstm = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix_lstm,
    save_weights_only=True)

# Train the LSTM model
history_lstm = lstm_model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback_lstm])

# Define the OneStep class for LSTM
class OneStepLSTM(tf.keras.Model):
    def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
        super().__init__()
        self.temperature = temperature
        self.model = model
        self.chars_from_ids = chars_from_ids
        self.ids_from_chars = ids_from_chars

        # Create a mask to prevent "[UNK]" from being generated.
        skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
        sparse_mask = tf.SparseTensor(
            values=[-float('inf')] * len(skip_ids),
            indices=skip_ids,
            dense_shape=[len(ids_from_chars.get_vocabulary())])
        self.prediction_mask = tf.sparse.to_dense(sparse_mask)

    @tf.function
    def generate_one_step(self, inputs, states=None):
        input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
        input_ids = self.ids_from_chars(input_chars).to_tensor()

        predicted_logits, states = self.model(inputs=input_ids, states=states,
                                              return_state=True)
        predicted_logits = predicted_logits[:, -1, :]
        predicted_logits = predicted_logits / self.temperature

        predicted_logits = predicted_logits + self.prediction_mask

        predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
        predicted_ids = tf.squeeze(predicted_ids, axis=-1)

        predicted_chars = self.chars_from_ids(predicted_ids)

        return predicted_chars, states



In [None]:
lstm_model.summary()

In [None]:
# Create an instance of the OneStepLSTM class
one_step_model_lstm = OneStepLSTM(lstm_model, chars_from_ids, ids_from_chars)

# Generate text using LSTM model
start = time.time()
states = None
next_char = tf.constant(['Ron'])
result = [next_char]

for n in range(1000):
    next_char, states = one_step_model_lstm.generate_one_step(next_char, states=states)
    result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result[0].numpy().decode('utf-8'), '\n\n' + '_' * 80)
print('\nRun time:', end - start)

In [None]:
# model.compile(optimizer='adam', loss=loss)


In [None]:
# Save MyLSTMModel
tf.saved_model.save(lstm_model, 'my_lstm_model')

# Save OneStepLSTM
one_step_lstm = OneStepLSTM(model=lstm_model, chars_from_ids=chars_from_ids, ids_from_chars=ids_from_chars)
tf.saved_model.save(one_step_lstm, 'one_step_lstm_model')


Adding Attention ***layer***

In [None]:
# class MyModelWithAttention(tf.keras.Model):
#     def __init__(self, vocab_size, embedding_dim, rnn_units):
#         super().__init__(self)
#         self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
#         self.gru = tf.keras.layers.GRU(rnn_units,
#                                        return_sequences=True,
#                                        return_state=True)
#         self.attention = AttentionLayer()  # Add attention layer
#         self.dense = tf.keras.layers.Dense(vocab_size)

#     def call(self, inputs, states=None, return_state=False, training=False):
#         x = inputs
#         x = self.embedding(x, training=training)
#         if states is None:
#             states = self.gru.get_initial_state(x)
#         x, states = self.gru(x, initial_state=states, training=training)

#         # Apply attention layer
#         attention_output = attention_3d_block(x)
#         x = self.dense(attention_output, training=training)

#         if return_state:
#             return x, states
#         else:
#             return x


In [None]:
# class MyLSTMModelWithAttention(tf.keras.Model):
#     def __init__(self, vocab_size, embedding_dim, rnn_units):
#         super().__init__(self)
#         self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
#         self.lstm = tf.keras.layers.LSTM(rnn_units,
#                                          return_sequences=True,
#                                          return_state=True)
#         self.attention = AttentionLayer()  # Add attention layer
#         self.dense = tf.keras.layers.Dense(vocab_size)

#     def call(self, inputs, states=None, return_state=False, training=False):
#         x = inputs
#         x = self.embedding(x, training=training)
#         if states is None:
#             states = self.lstm.get_initial_state(x)
#         x, states_h, states_c = self.lstm(x, initial_state=states, training=training)

#         # Apply attention layer
#         attention_output = attention_3d_block(x)
#         x = self.dense(attention_output, training=training)

#         if return_state:
#             return x, [states_h, states_c]
#         else:
#             return x


In [None]:
# Define parameters

max_features   = 50000
maxlen         = 200
embedding_size = 128
num_lstm_units = 256

In [None]:
def attention_3d_block(hidden_states):
  # Shape of hidden_states is (batch_size, seqlen, LSTM size)
  hidden_size       = int(hidden_states.shape[2]) # LSTM size

  # Create a dense layer for the attention score and fetch out the last hidden state
  score_first_part  = Dense(hidden_size, use_bias=False, name='attention_score_vec')(hidden_states)
  h_t               = Lambda(lambda x: x[:, -1, :], output_shape=(hidden_size,), name='last_hidden_state')(hidden_states)
  # score_first_part shape: (batch_size, seqlen, LSTM size)
  # h_t shape: (batch_size, LSTM size)

  # Take the dot product of both, to get the final attention scores and push them through a softmax layer
  score             = dot([score_first_part, h_t], [2, 1], name='attention_score')
  attention_weights = Activation('softmax', name='attention_weight')(score)
  # score shape: (batch_size, seqlen)
  # attention_weights shape: (batch_size, seqlen)

  # Take a dot product again to create a context vector
  context_vector    = dot([hidden_states, attention_weights], [1, 1], name='context_vector')
  # context_vector shape: (batch_size, LSTM)

  # Add this context vector to h_t
  pre_activation    = concatenate([context_vector, h_t], name='attention_output')
  # pre_activation shape: (batch_size, LSTM*2)

  # And create a final dense layer
  attention_vector  = Dense(128, use_bias=False, activation='tanh', name='attention_vector')(pre_activation)
  # attention_vector shape: (batch_size, 128)

  return attention_vector

In [None]:
# lstm_model.summary()

In [None]:
# Define input shape
input_seq = Input(shape=(maxlen,))

# Define the embedding layer
input_emb = Embedding(max_features, embedding_size)(input_seq)

# Define the LSTM layers for both models
lstm_model_emb = LSTM(num_lstm_units, dropout=0.2, recurrent_dropout=0.2, return_sequences=True)(input_emb)
lstm_model_attention = attention_3d_block(lstm_model)

lstm_model_with_attention = Model(inputs=input_seq, outputs=lstm_model_attention)

# Compile the LSTM model with attention
lstm_model_with_attention.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])


# Define the LSTM model with attention
lstm_model = LSTM(num_lstm_units, dropout=0.2, recurrent_dropout=0.2, return_sequences=True)(input_emb)
lstm_model_attention = attention_3d_block(lstm_model)

lstm_model_with_attention = Model(inputs=input_seq, outputs=lstm_model_attention)

# Compile the LSTM model with attention
lstm_model_with_attention.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])



In [None]:
lstm_model_with_attention.summary()

In [None]:
epochs = 20
# Train the LSTM model with attention
history_lstm_with_attention = lstm_model_with_attention.fit(X_train_input, X_train_target, epochs=epochs, validation_data=(X_test_input, X_test_target))


In [None]:
def attention_3d_block(hidden_states):
    # Shape of hidden_states is (batch_size, seqlen, LSTM size)
    hidden_size = int(hidden_states.shape[2])  # LSTM size

    # Create a dense layer for the attention score and fetch out the last hidden state
    score_first_part = Dense(hidden_size, use_bias=False, name='attention_score_vec')(hidden_states)
    h_t = Lambda(lambda x: x[:, -1, :], output_shape=(hidden_size,), name='last_hidden_state')(hidden_states)

    # Take the dot product of both, to get the final attention scores and push them through a softmax layer
    score = dot([score_first_part, h_t], [2, 1], name='attention_score')
    attention_weights = Activation('softmax', name='attention_weight')(score)

    # Take a dot product again to create a context vector
    context_vector = dot([hidden_states, attention_weights], [1, 1], name='context_vector')

    # Add this context vector to h_t
    pre_activation = concatenate([context_vector, h_t], name='attention_output')

    # And create a final dense layer
    attention_vector = Dense(128, use_bias=False, activation='tanh', name='attention_vector')(pre_activation)

    return attention_vector


In [None]:
# Create model with attention layer

class MyModelWithAttention(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super().__init__(self)
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(rnn_units,
                                       return_sequences=True,
                                       return_state=True)
        self.attention = attention_3d_block  # Add attention layer
        self.dense = tf.keras.layers.Dense(vocab_size)

    def call(self, inputs, states=None, return_state=False, training=False):
        x = inputs
        x = self.embedding(x, training=training)
        if states is None:
            states = self.gru.get_initial_state(x)
        x, states = self.gru(x, initial_state=states, training=training)

        # Apply attention layer
        attention_output = self.attention(x)
        x = self.dense(attention_output, training=training)

        if return_state:
            return x, states
        else:
            return x


In [None]:
input_seq = Input(shape=(maxlen))
input_emb = Embedding(max_features, embedding_size)(input_seq)

# Important: since we need to hidden states in our attention layer, we set return_sequences=True
# return_sequences return the hidden state output for each input step
# return_state returns the hidden state output and cell state for the last input step

lstm = LSTM(num_lstm_units, dropout=0.2, recurrent_dropout=0.2, return_sequences=True)(input_emb)
attention = attention_3d_block(lstm)

# We force all weights here to be positive to make visualization of the attention layer easier
dense = Dense(1, activation='sigmoid', use_bias=False, kernel_constraint=NonNeg())(Flatten()(attention))

model = Model(inputs=input_seq, outputs=dense)

model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])



In [None]:
# Create an instance of the MyLSTMModelWithAttention class
lstm_model_with_attention = MyLSTMModelWithAttention(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

# Compile the LSTM model with attention
lstm_model_with_attention.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])



In [None]:
# Train the LSTM model with attention
history_lstm_with_attention = lstm_model_with_attention.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback_lstm])

# Evaluate the LSTM model with attention
test_loss_with_attention, test_accuracy_with_attention = lstm_model_with_attention.evaluate(dataset_test)
print('Test loss with attention:', test_loss_with_attention)
print('Test accuracy with attention:', test_accuracy_with_attention)

In [None]:
# Get attention weights function
get_attention_weights = K.function(inputs=lstm_model_with_attention.input, outputs=lstm_model_with_attention.get_layer(name='attention_weight').output)

# Example usage of attention weights
test_instance_idx = 7
attention_weights = get_attention_weights(np.expand_dims(X_test[test_instance_idx], axis=0))



In [None]:
# Visualization of attention weights
cmap = cm.get_cmap('Reds')
attention_normalized = np.expand_dims(minmax_scale(np.abs(attention_weights[0])), axis=0)

plt.figure(figsize=(10,10))
plt.imshow(attention_normalized, cmap=cmap)



In [None]:
# Example of text highlighting with attention weights
text = ''
for i, w in enumerate(X_test[test_instance_idx]):
    word = [k for k, v in imdb.get_word_index().items() if v == w][0] if w != 0 else '-----'
    text += '<span style="background-color: {}">{}</span> '.format(rgb2hex(cmap(attention_normalized[0, i])[:3]), word)

HTML(text)
