## Transformer based sequence-to-sequence learning

In [1]:
import os
import re

import collections

In [2]:
def read_data_from_file(filename, data_dict):

    with open(filename) as fp:
        line = fp.readline()
        while line:
            bo, ch, ve, text = tuple(line.strip().split('\t'))
            words = text.split()
            for w in words:  
                # in the output data, composite placenames have a '_', which cannot be found in the input data
                words_split = w.split('_')               
                for word_split in words_split:
                    data_dict[bo].append(word_split)
        
            line = fp.readline()
            
    return data_dict

In [3]:
#input_file = '/content/gdrive/My Drive/data/t-in_voc'
input_file = 'data/t-in_voc'
input_data = collections.defaultdict(list)

#output_file = '/content/gdrive/My Drive/data/t-out'
output_file = 'data/t-out'
output_data = collections.defaultdict(list)

input_data = read_data_from_file(input_file, input_data)
output_data = read_data_from_file(output_file, output_data)

In [4]:
# The reduction consists of removing the left-most marker from all
# the doubly marked prefixes and the redundant colon of the vowel
# pattern mark.

prefixes = ['!', ']', '@']

def mc_reduce(s):
   for c in prefixes:
      s = re.sub(f'{c}([^{c}]*{c})', r'\1', s)
   return s.replace(':', '')

In [5]:
def make_in_sequences(data_dict, sequence_length):
    
    all_sequences = []
    for words_list in data_dict.values():

        for w in range(len(words_list) - sequence_length + 1):
    
            seq = ' '.join([words_list[ind] for ind in list(range(w, w + sequence_length))])
        
            all_sequences.append(seq)
        
    return all_sequences

In [11]:
def make_out_sequences(data_dict, sequence_length):
    
    all_sequences = []
    for words_list in data_dict.values():

        for w in range(len(words_list) - sequence_length + 1):
    
            seq = ' '.join([words_list[ind] for ind in list(range(w, w + sequence_length))])
        
            seq = mc_reduce(seq)
            all_sequences.append(seq)
        
    return all_sequences

In [12]:
sequence_length = 1

all_in_seqs = make_in_sequences(input_data, sequence_length)
all_out_seqs = make_out_sequences(output_data, sequence_length)

In [13]:
all_in_seqs[0:10]

['B.:R;>CIJT',
 'B.@R@>',
 '>:ELOHIJM',
 '>;T',
 'HAC.@MAJIM',
 'W:>;T',
 'H@>@REY',
 'W:H@>@REY',
 'H@J:T@H',
 'TOHW.']

In [14]:
all_out_seqs[0:10]

['B-R>CJT/',
 'BR>[',
 '>LH(J(M/JM',
 '>T',
 'H-CMJ(M/(JM',
 'W->T',
 'H->RY/a',
 'W-H->RY/a',
 'HJ(H[&TH',
 'THW/']

In [15]:
len(set(all_out_seqs))

49490

In [16]:
text_pairs = []
for inp, outp in zip(all_in_seqs, all_out_seqs):
    
    outp = "s " + outp + " e"
    text_pairs.append((inp, outp))

In [17]:
import random
print(random.choice(text_pairs))

('<@NIJ', 's <NJ/ e')


In [18]:
import random
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples:]

In [19]:
out_set = set()
max_len = 0

for inp, outp in train_pairs:
    
    if len(outp) > max_len:
        max_len = len(outp)
        
    for char in outp:
        out_set.add(char)
    
print(out_set)
print(len(out_set))
print(max_len)

{'D', '<', 'd', 'K', 's', '[', 'B', '(', ' ', 'c', 'F', 'M', '=', ']', 'n', 'u', '-', 'Z', '>', '!', 'V', '+', 'C', 'H', 'o', 'T', 'S', 'X', 'N', 'a', 'p', 'R', 'W', '~', 'e', 'Y', 'J', '&', '/', 'P', 'G', 'L', 'Q'}
43
35


In [20]:
in_set = set()


for inp, outp in train_pairs:
        
    for char in inp:
        in_set.add(char)
    
print(in_set)
print(len(in_set))


{'D', '<', 'K', 'U', ';', 'B', 'F', 'M', '@', 'O', ':', '.', '>', 'Z', 'V', 'C', 'A', 'H', 'T', 'S', 'X', 'N', 'R', 'W', 'Y', 'J', 'I', 'P', 'G', 'L', 'E', 'Q', '*'}
33


**Vectorizing the input and output text pairs**

In [21]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
import string
import re

vocab_size = 46
sequence_length = 20

def char_split(input_data):
  return tf.strings.unicode_split(input_data, 'UTF-8')

source_vectorization = TextVectorization(
    #max_tokens=vocab_size,
    max_tokens=vocab_size,
    split=char_split,
    output_mode="int",
    output_sequence_length=sequence_length,
    standardize=None,
)
target_vectorization = TextVectorization(
    #max_tokens=vocab_size,
    split=char_split,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=None, 
)
train_input_texts = [pair[0] for pair in train_pairs]
train_output_texts = [pair[1] for pair in train_pairs]
source_vectorization.adapt(train_input_texts)
target_vectorization.adapt(train_output_texts)

In [22]:
len(source_vectorization.get_vocabulary())

35

In [23]:
len(train_input_texts)

210474

**Preparing training and validation datasets for the translation task**

In [24]:
batch_size = 64

def format_dataset(inp, outp):
    inp = source_vectorization(inp)
    outp = target_vectorization(outp)
    return ({
        "input": inp,
        "output": outp[:, :-1],
    }, outp[:, 1:])

def make_dataset(pairs):
    in_texts, out_texts = zip(*pairs)
    in_texts = list(in_texts)
    out_texts = list(out_texts)
    dataset = tf.data.Dataset.from_tensor_slices((in_texts, out_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [25]:
len(train_ds)

3289

In [26]:
len(val_ds)

705

In [27]:
for inputs, targets in train_ds.take(1):
    print(f"inputs['input'].shape: {inputs['input'].shape}")
    print(f"inputs['output'].shape: {inputs['output'].shape}")
    print(f"targets.shape: {targets.shape}")

inputs['input'].shape: (64, 20)
inputs['output'].shape: (64, 20)
targets.shape: (64, 20)


In [28]:
import tensorflow
print(tensorflow.__version__)

2.5.0


In [29]:
from tensorflow.keras.layers import MultiHeadAttention

In [30]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

#### The Transformer decoder

In [31]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)

**PositionalEmbedding layer**

In [32]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

In [33]:
vocab_size

46

**End-to-end Transformer**

In [34]:
embed_dim = 30
dense_dim = 2048
num_heads = 8

encoder_inputs = tf.keras.Input(shape=(None,), dtype="int64", name="input")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

decoder_inputs = tensorflow.keras.Input(shape=(None,), dtype="int64", name="output")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

**Training the sequence-to-sequence Transformer**

In [35]:
transformer.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
transformer.fit(train_ds, epochs=12, validation_data=val_ds)

Epoch 1/12
Epoch 2/12
Epoch 3/12
Epoch 4/12
Epoch 5/12
Epoch 6/12
Epoch 7/12
Epoch 8/12
Epoch 9/12
Epoch 10/12
Epoch 11/12
Epoch 12/12


<tensorflow.python.keras.callbacks.History at 0x12bc14ba8e0>

**"Translating" new sentences with our Transformer model**

In [168]:
import numpy as np
out_vocab = target_vectorization.get_vocabulary()
out_index_lookup = dict(zip(range(len(out_vocab)), out_vocab))
max_decoded_sentence_length = 30

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "s"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization(
            [decoded_sentence])[:, :-1]
        predictions = transformer(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = out_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == "e":
            break
    return decoded_sentence

test_input_texts = [pair[0] for pair in test_pairs]
for _ in range(20):
    input_sentence = random.choice(test_input_texts)
    print("-")
    print(input_sentence)
    print(decode_sequence(input_sentence))

-
L:HIM.OL
s   L H H ] M L M / e
-
T.:HIJ
s   T T T = e
-
W@KOH
s   W W K H H H e
-
B.:KOWR
s   B B K K K W e
-
>ET
s   > T e
-
HAG.IT.IJ
s   H G G T G ( T J e
-
K.IJ
s   K K J J e
-
<IM.OW
s   < M M M M + e
-
Y:B@>OWT
s   Y Y Y > Y ( e
-
XAV.A>T
s   X V V V V > T / e
-
D.@WID
s   D D W W W = e
-
W.B:NIJTIJH@
s   W B B - N ( T J T J J ( e
-
L;K:
s   ! ! ! ( ! ( e
-
L@GW.R
s   L L ( G G W e
-
>:ACER
s   > > C C C C e
-
WAJ:DAB.;R
s   W J J ! J ! H R e
-
WAJ.A<AF
s   W J J J J ! e
-
L:B;JT
s   L L L - B J e
-
>:ACER
s   > > C C C C e
-
>@RW.R
s   > > > R R W e
