In [None]:
from keras.layers import Embedding, Bidirectional
from keras.layers.core import *
from keras.layers.recurrent import LSTM
from keras.models import *
from keras.layers.merge import Multiply
from keras.utils import to_categorical
from keras.layers import TimeDistributed

import keras.backend as K
import numpy as np

## Machine Translation (kind of)

The machine translation is old and well-known field in natural language processing. From the 1950s scientists tried to create a model to automatically translate from say French to English. Nowadays it became possible and the attention mechanism takes great part in that. Here the example image with attention map for the neural machine translation of sample phrase:
<p align="center">
  <img src="http://d3kbpzbmcynnmx.cloudfront.net/wp-content/uploads/2015/12/Screen-Shot-2015-12-30-at-1.23.48-PM.png" width="400">
</p>

In our lab we will concentrate on much simplier task: we will translate from human readable date to machine readable one.

To do this we need to get one more concept - Sequence-to-Sequence language modeling.
The idea of such architecture is here:

<p aling="center">
<img src="https://talbaumel.github.io/blog/attention/img/encdec.jpg" width="400">
</p>

There is an Embeding layer at the bottom, the bidirectional RNN in the middle and softmax as an output.

In [None]:
ENCODER_UNITS = 32
DECODER_UNITS = 32

Here we use more complex idea that simple seq2seq: we're adding two explicit parts of our network - encoder and decoder (which is applied attention on). The explanatory picture for this idea is below:
<p aling="center"><img src="https://i.stack.imgur.com/Zwsmz.png"></p>

The lower part of the network is encoding the input to some hidden intermediate representation and the upper part is decoing the hid–≤en represenataion into some readable output.

Let's implement the attention block:

In [None]:
def attention_3d_block(inputs):
    <your code here>
    return output_attention

Fianlly lets create a machine traslation model:

In [None]:
def model_simple_nmt(in_chars, out_chars):
    inputs = Input(shape=(TIME_STEPS,))
    
    input_embed = Embedding(in_chars, ENCODER_UNITS * 2, input_length=TIME_STEPS, trainable=True,
                            name='embedding')(inputs)
    
    enc_out = Bidirectional(LSTM(ENCODER_UNITS, return_sequences=True))(input_embed)
    dec_out = LSTM(DECODER_UNITS, return_sequences=True)(enc_out)
    attention_mul = attention_3d_block(dec_out)
    
    output = TimeDistributed(Dense(out_chars, activation='softmax'))(attention_mul)
   
    model = Model(input=[inputs], output=output)
    return model

Now we need to generate data. It will be dates in different text formats and in fixed output format.

In [None]:
from faker import Faker
import random
from tqdm import tqdm
from babel.dates import format_date
import numpy as np

In [None]:
fake = Faker()
fake.seed(12345)
random.seed(12345)

FORMATS = ['short',
           'medium',
           'long',
           'full',
           'd MMM YYY', 
           'd MMMM YYY',
           'dd MMM YYY',
           'd MMM, YYY',
           'd MMMM, YYY',
           'dd, MMM YYY',
           'd MM YY',
           'd MMMM YYY',
           'MMMM d YYY',
           'MMMM d, YYY',
           'dd.MM.YY']

# change this if you want it to work with another language
LOCALES = ['en_US']

In [None]:
def create_date():
    """
        Creates some fake dates 
        :returns: tuple containing human readable string, machine readable string, and date object
    """
    dt = fake.date_object()

    try:
        human_readable = format_date(dt, format=random.choice(FORMATS), locale=random.choice(LOCALES))

        case_change = random.choice([0,1,2])
        if case_change == 1:
            human_readable = human_readable.upper()
        elif case_change == 2:
            human_readable = human_readable.lower()
        # if case_change == 0, do nothing

        machine_readable = dt.isoformat()
    except AttributeError as e:
        return None, None, None

    return human_readable, machine_readable, dt

In [None]:
def create_dataset(n_examples):
    """
        Creates a dataset with n_examples and vocabularies
        :n_examples: the number of examples to generate
    """
    human_vocab = set()
    machine_vocab = set()
    dataset = []

    for i in tqdm(range(n_examples)):
        h, m, _ = create_date()
        if h is not None:
            dataset.append((h, m))
            human_vocab.update(tuple(h))
            machine_vocab.update(tuple(m))

    human = dict(zip(list(human_vocab) + ['<unk>', '<pad>'], 
                     list(range(len(human_vocab) + 2))))
    inv_machine = dict(enumerate(list(machine_vocab) + ['<unk>', '<pad>']))
    machine = {v:k for k,v in inv_machine.items()}
 
    return dataset, human, machine, inv_machine

In [None]:
def string_to_int(string, lenght, vocab):
    if len(string) > lenght:
        string = string[:lenght]
        
    rep = list(map(lambda x: vocab.get(x, '<unk>'), string))
    
    if len(string) < lenght:
        rep += [vocab['<pad>']] * (lenght - len(string))
    
    return rep

In [None]:
def int_to_string(ints, inv_vocab):
    return [inv_vocab[i] for i in ints]

Actually generating data:

In [None]:
N = 300000
dataset, human_vocab, machine_vocab, inv_machine_vocab = create_dataset(N)

Compiling and training model:

In [None]:
m = model_simple_nmt(len(human_vocab), len(machine_vocab))

m.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
print(m.summary())

In [None]:
inputs, targets = zip(*dataset)
inputs = np.array([string_to_int(i, TIME_STEPS, human_vocab) for i in inputs])
targets = [string_to_int(t, TIME_STEPS, machine_vocab) for t in targets]
targets = np.array(list(map(lambda x: to_categorical(x, num_classes=len(machine_vocab)), targets)))

In [None]:
m.fit([inputs], targets, epochs=1, batch_size=64, validation_split=0.1)

Lets check our model:

In [None]:
EXAMPLES = ['3 May 1979', '6 Jun 09', '20th February 2016', 'Thu 11 Jul 2007']

def run_example(model, input_vocabulary, inv_output_vocabulary, text):
    encoded = string_to_int(text, TIME_STEPS, input_vocabulary)
    prediction = model.predict(np.array([encoded]))
    prediction = np.argmax(prediction[0], axis=-1)
    return int_to_string(prediction, inv_output_vocabulary)

def run_examples(model, input_vocabulary, inv_output_vocabulary, examples=EXAMPLES):
    predicted = []
    for example in examples:
        predicted.append(''.join(run_example(model, input_vocabulary, inv_output_vocabulary, example)))
        print('input:', example)
        print('output:', predicted[-1])
    return predicted

In [None]:
run_examples(m, human_vocab, inv_machine_vocab)

And visualize the actual map of attention using the function __get_activations__:

In [None]:
def get_activations(model, model_inputs, print_shape_only=False, layer_name=None):
    """
        Return activation vectors
    """
    activations = []
    inp = model.input

    model_multi_inputs_cond = True
    if not isinstance(inp, list):
        # only one input! let's wrap it in a list.
        inp = [inp]
        model_multi_inputs_cond = False

    outputs =  # All layer outputs check layer name!

    funcs = [K.function(inp + [K.learning_phase()], [out]) for out in outputs]  # evaluation functions

    # Create list with model inputs

    # Learning phase. 0 = Test mode (no dropout or batch normalization)
    # layer_outputs = [func([model_inputs, 0.])[0] for func in funcs]
    
    return activations

In [None]:
def attention_map(model, input_vocabulary, inv_output_vocabulary, text):
    """
        Visualization of attention map
    """
    # encode the string
    encoded = string_to_int(text, TIME_STEPS, input_vocabulary)

    # get the output sequence
    prediction = model.predict(np.array([encoded]))
    predicted_text = np.argmax(prediction[0], axis=-1)
    predicted_text = int_to_string(predicted_text, inv_output_vocabulary)

    text_ = list(text)
    # get the lengths of the string
    input_length = len(text)
    output_length = predicted_text.index('<pad>')
    # get the activation map
    attention_vector = get_activations(model, [encoded], layer_name='attention_vec')[0].squeeze()
    activation_map = attention_vector[0:output_length, 0:input_length]
    
    plt.clf()
    f = plt.figure(figsize=(8, 8.5))
    ax = f.add_subplot(1, 1, 1)

    # add image
    i = ax.imshow(activation_map, interpolation='nearest', cmap='gray')

    # add colorbar
    cbaxes = f.add_axes([0.2, 0, 0.6, 0.03])
    cbar = f.colorbar(i, cax=cbaxes, orientation='horizontal')
    cbar.ax.set_xlabel('Probability', labelpad=2)

    # add labels
    ax.set_yticks(range(output_length))
    ax.set_yticklabels(predicted_text[:output_length])

    ax.set_xticks(range(input_length))
    ax.set_xticklabels(text_[:input_length], rotation=45)

    ax.set_xlabel('Input Sequence')
    ax.set_ylabel('Output Sequence')

    # add grid and legend
    ax.grid()

    f.show()

In [None]:
attention_map(m, human_vocab, inv_machine_vocab, EXAMPLES[0])

As you probably see, the default model for this lab is not that good. But you could try to improve it by yourself. You could get better results, like this:

<p align="center"><img src="https://user-images.githubusercontent.com/6295292/26899949-bbac0c7c-4b9e-11e7-84d6-c2f31166af07.png" width="800"></p>

__Acknowledgements__: code based on keras-attention of Philippe Remy

URL: https://github.com/philipperemy/keras-visualize-activations

The idea of date translation is borrowed from https://github.com/datalogue/keras-attention.