In [None]:
from tensorflow.keras.layers import Bidirectional, Concatenate, Permute, Dot, Input, LSTM, Multiply
from tensorflow.keras.layers import RepeatVector, Dense, Activation, Lambda
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import load_model, Model
import tensorflow.keras.backend as K
import tensorflow as tf
import numpy as np

from faker import Faker
import random
from tqdm import tqdm
from babel.dates import format_date
from nmt_utils import *
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
m = 10000
dataset, human_vocab, machine_vocab, inv_machine_vocab = load_dataset(m)

In [None]:
dataset[:10]

In [None]:
Tx = 30
Ty = 10
X, Y, Xoh, Yoh = preprocess_data(dataset, human_vocab, machine_vocab, Tx, Ty)

print("X.shape:", X.shape)
print("Y.shape:", Y.shape)
print("Xoh.shape:", Xoh.shape)
print("Yoh.shape:", Yoh.shape)Tx = 30
Ty = 10
X, Y, Xoh, Yoh = preprocess_data(dataset, human_vocab, machine_vocab, Tx, Ty)

print("X.shape:", X.shape)
print("Y.shape:", Y.shape)
print("Xoh.shape:", Xoh.shape)
print("Yoh.shape:", Yoh.shape)

In [None]:
index = 0
print("Source date:", dataset[index][0])
print("Target date:", dataset[index][1])
print()
print("Source after preprocessing (indices):", X[index])
print("Target after preprocessing (indices):", Y[index])
print()
print("Source after preprocessing (one-hot):", Xoh[index])
print("Target after preprocessing (one-hot):", Yoh[index]

In [None]:
# Defined shared layers as global variables
repeator = RepeatVector(Tx)
concatenator = Concatenate(axis=-1)
densor1 = Dense(10, activation = "tanh")
densor2 = Dense(1, activation = "relu")
activator = Activation(softmax, name='attention_weights') # We are using a custom softmax(axis = 1) loaded in this notebook
dotor = Dot(axes = 1)

In [None]:
def one_step_attention(a, s_prev):
    """
    Performs one step of attention: Outputs a context vector computed as a dot product of the attention weights
    "alphas" and the hidden states "a" of the Bi-LSTM.
    
    Arguments:
    a -- hidden state output of the Bi-LSTM, numpy-array of shape (m, Tx, 2*n_a)
    s_prev -- previous hidden state of the (post-attention) LSTM, numpy-array of shape (m, n_s)
    
    Returns:
    context -- context vector, input of the next (post-attention) LSTM cell
    """
    

    # Use repeator to repeat s_prev to be of shape (m, Tx, n_s) so that you can concatenate it with all hidden states "a"
    s_prev = repeator(s_prev)
    
    # Use concatenator to concatenate a and s_prev on the last axis. List 'a' first and 's_prev' second.
    concat = concatenator([a, s_prev])
    
    # Use densor1 to propagate concat through a small fully-connected neural network to compute "intermediate energies"
    e = densor1(concat)
    
    # Use densor2 to propagate e through another fully-connected neural network to compute "energies"
    energies = densor2(e)
    
    # Use "activator" (a custom softmax function) on "energies" to compute attention weights "alphas"
    alphas = activator(energies)
    
    # Use dotor to compute the context vector as the weighted average of "a" and the attention weights "alphas"
    context = dotor([alphas, a])

    
    return context

In [None]:
#UNIT TEST
def one_step_attention_test(target):
    np.random.seed(10)
    tf.random.set_seed(10)

    m = 10
    Tx = 30
    n_a = 32
    n_s = 64
    
    a = np.random.uniform(1, 0, (m, Tx, 2 * n_a)).astype(np.float32)
    s_prev =np.random.uniform(1, 0, (m, n_s)).astype(np.float32) * 1
    context = target(a, s_prev)
    
    expected_output = np.load('expected_output_ex1.npy')
    
    assert type(context) == tf.python.framework.ops.EagerTensor, "Unexpected type. It should be a Tensor"
    assert tuple(context.shape) == (m, 1, n_s), "Unexpected output shape"
    assert np.all(context.numpy() == expected_output), "Unexpected values in the result"
    
    print("\033[92mAll tests passed!")
    
one_step_attention_test(one_step_attention)

In [None]:
n_a = 32 # number of units for the pre-attention, bi-directional LSTM's hidden state 'a'
n_s = 64 # number of units for the post-attention LSTM's hidden state "s"

# Please note, this is the post attention LSTM cell.  
post_activation_LSTM_cell = LSTM(n_s, return_state = True) # Please do not modify this global variable.
output_layer = Dense(len(machine_vocab), activation=softmax)

In [None]:
def modelf(Tx, Ty, n_a, n_s, human_vocab_size, machine_vocab_size):
    """
    Builds a model that translates human-readable dates to machine-readable dates.
    
    Arguments:
    Tx -- length of the input sequence
    Ty -- length of the output sequence
    n_a -- hidden state size of the Bi-LSTM
    n_s -- hidden state size of the post-attention LSTM
    human_vocab_size -- size of the python dictionary "human_vocab"
    machine_vocab_size -- size of the python dictionary "machine_vocab"
    
    Returns:
    model -- Keras model instance
    """
    
    # Input layer for the sequence of one-hot vectors (shape Tx, human_vocab_size)
    X = Input(shape=(Tx, human_vocab_size))
    
    # Initial hidden and cell states for the decoder LSTM
    s0 = Input(shape=(n_s,), name='s0')  # initial hidden state
    c0 = Input(shape=(n_s,), name='c0')  # initial cell state
    
    # Initialize hidden and cell states to be used in the loop
    s = s0
    c = c0
    
    # Initialize an empty list to collect the outputs
    outputs = []
    
    # Step 1: Define pre-attention Bi-LSTM with return_sequences=True to keep the hidden state at each time step
    a = Bidirectional(LSTM(units=n_a, return_sequences=True))(X)
    
    # Step 2: Iterate over the output sequence length (Ty)
    for t in range(Ty):
    
        # Step 2.A: Compute context vector using the attention mechanism
        context = one_step_attention(a, s)  # Compute attention context vector for this time step
        
        # Step 2.B: Apply post-attention LSTM cell to context vector and update hidden/cell states
        s, _, c = post_activation_LSTM_cell(context, initial_state=[s, c])  # Update hidden and cell states
        
        # Step 2.C: Generate an output at the current time step (dense layer with softmax activation)
        out = output_layer(s)  # Dense layer to generate prediction
        
        # Step 2.D: Append the current output to the list of outputs
        outputs.append(out)  # Store each time-step prediction separately
    
    # Step 3: Create a Keras model instance with the input X, s0, c0, and the list of outputs
    model = Model(inputs=[X, s0, c0], outputs=outputs)  # List of outputs for all time steps
    
    return model

In [None]:
# UNIT TEST
from test_utils import *

def modelf_test(target):
    Tx = 30
    n_a = 32
    n_s = 64
    len_human_vocab = 37
    len_machine_vocab = 11
    
    
    model = target(Tx, Ty, n_a, n_s, len_human_vocab, len_machine_vocab)
    
    print(summary(model))

    
    expected_summary = [['InputLayer', [(None, 30, 37)], 0],
                         ['InputLayer', [(None, 64)], 0],
                         ['Bidirectional', (None, 30, 64), 17920],
                         ['RepeatVector', (None, 30, 64), 0, 30],
                         ['Concatenate', (None, 30, 128), 0],
                         ['Dense', (None, 30, 10), 1290, 'tanh'],
                         ['Dense', (None, 30, 1), 11, 'relu'],
                         ['Activation', (None, 30, 1), 0],
                         ['Dot', (None, 1, 64), 0],
                         ['InputLayer', [(None, 64)], 0],
                         ['LSTM',[(None, 64), (None, 64), (None, 64)], 33024,[(None, 1, 64), (None, 64), (None, 64)],'tanh'],
                         ['Dense', (None, 11), 715, 'softmax']]

    assert len(model.outputs) == 10, f"Wrong output shape. Expected 10 != {len(model.outputs)}"

    comparator(summary(model), expected_summary)
    

modelf_test(modelf)

In [None]:
model = modelf(Tx, Ty, n_a, n_s, len(human_vocab), len(machine_vocab))

In [None]:
model.summary()

In [None]:
opt = Adam(learning_rate=0.005,decay=0.01)
model.compile(loss=tf.keras.losses.categorical_crossentropy(), 
              optimizer=opt, 
              metrics=['accuracy'])

In [None]:
# UNIT TESTS
assert opt.lr == 0.005, "Set the lr parameter to 0.005"
assert opt.beta_1 == 0.9, "Set the beta_1 parameter to 0.9"
assert opt.beta_2 == 0.999, "Set the beta_2 parameter to 0.999"
assert opt.decay == 0.01, "Set the decay parameter to 0.01"
assert model.loss == "categorical_crossentropy", "Wrong loss. Use 'categorical_crossentropy'"
assert model.optimizer == opt, "Use the optimizer that you have instantiated"
assert model.compiled_metrics._user_metrics[0] == 'accuracy', "set metrics to ['accuracy']"

print("\033[92mAll tests passed!")

In [None]:
s0 = np.zeros((m, n_s))
c0 = np.zeros((m, n_s))
outputs = list(Yoh.swapaxes(0,1))

In [None]:
model.fit([Xoh, s0, c0], outputs, epochs=1, batch_size=100)

In [None]:
model.load_weights('models/model.h5')

In [None]:
EXAMPLES = ['3 May 1979', '5 April 09', '21th of August 2016', 'Tue 10 Jul 2007', 'Saturday May 9 2018', 'March 3 2001', 'March 3rd 2001', '1 March 2001']
s00 = np.zeros((1, n_s))
c00 = np.zeros((1, n_s))
for example in EXAMPLES:
    source = string_to_int(example, Tx, human_vocab)
    #print(source)
    source = np.array(list(map(lambda x: to_categorical(x, num_classes=len(human_vocab)), source))).swapaxes(0,1)
    source = np.swapaxes(source, 0, 1)
    source = np.expand_dims(source, axis=0)
    prediction = model.predict([source, s00, c00])
    prediction = np.argmax(prediction, axis = -1)
    output = [inv_machine_vocab[int(i)] for i in prediction]
    print("source:", example)
    print("output:", ''.join(output),"\n")

In [None]:
attention_map = plot_attention_map(model, human_vocab, inv_machine_vocab, "Tuesday 09 Oct 1993", num = 7, n_s = 64);