In [None]:
import random
import time
import tensorflow as tf
import numpy as np

# Create Train Data

In [None]:
char_to_id = {
    '<PAD>': 0,
    '<BOS>': 1,
    '<EOS>': 2,
    '0': 3,
    '1': 4,
    '2': 5,
    '3': 6,
    '4': 7,
    '5': 8,
    '6': 9,
    '7': 10,
    '8': 11,
    '9': 12,
    '+': 13,
    '-': 14,
}

id_to_char = {
    0: '<PAD>',
    1: '<BOS>',
    2: '<EOS>',
    3: '0',
    4: '1',
    5: '2',
    6: '3',
    7: '4',
    8: '5',
    9: '6',
    10: '7',
    11: '8',
    12: '9',
    13: '+',
    14: '-',
}

In [None]:
def id_list_to_sequence(sequence):
    return ''.join([id_to_char.get(i) for i in sequence])

def sequence_to_id_list(sequence):
    return [char_to_id.get(c) for c in sequence]

def create_dataset(size, num_digit=5, ops=['+', '-']):
    source_sequences = []
    target_sequences = []

    for _ in range(size):
        a = random.randint(0, 10**num_digit)
        b = random.randint(0, 10**num_digit)
        op = random.choice(ops)

        if op == '+':
            source_tokens = '{}+{}'.format(a, b)
            target_tokens = '{}'.format(a + b)
        elif op == '-':
            source_tokens = '{}-{}'.format(a, b)
            target_tokens = '{}'.format(a - b)

        source_sequences.append(source_tokens)
        target_sequences.append(target_tokens)

    return source_sequences, target_sequences

def tokenize(sequences, bos=False, eos=False):
    tensor = [
        ([char_to_id['<BOS>']] if bos else []) + \
        sequence_to_id_list(s) + \
        ([char_to_id['<EOS>']] if eos else [])
        for s in sequences
    ]
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post')
    return tensor

In [None]:
train_data_num = 25600
batch_size = 256

train_source_sequences, train_target_sequences = create_dataset(train_data_num)
train_dataset = tf.data.Dataset.from_tensor_slices((
    tokenize(train_source_sequences),
    tokenize(train_target_sequences, bos=True),
    tokenize(train_target_sequences, eos=True)
)).shuffle(len(train_source_sequences))
train_dataset = train_dataset.batch(batch_size, drop_remainder=True).repeat().prefetch(8)
train_data_iter = iter(train_dataset)

In [None]:
for i in range(10):
    print(train_source_sequences[i], '=', train_target_sequences[i])

# Implementation of Transformer

In [None]:
vocab_size = len(char_to_id)
num_blocks = 2
num_hidden_size = 128
num_heads = 8
dropout_rate = 0.1

num_epochs = 100
num_batches = train_data_num // batch_size

In [None]:
NEG_INF = -1e9


def gelu(x):
    cdf = 0.5 * (1.0 + tf.tanh(0.7978845608 * (x + 0.044715 * tf.pow(x, 3))))
    return x * cdf


def get_position_encoding(length, hidden_size, dtype=tf.float32):
    position = tf.cast(tf.range(length), dtype)
    timescale = tf.cast(tf.range(hidden_size // 2), dtype)

    angle_rates = 1.0 / tf.pow(10000.0, (2 * timescale) / tf.cast(hidden_size, dtype))
    angle_rads = position[:, tf.newaxis] * angle_rates[tf.newaxis, :]

    position_encoding = tf.stack([
        tf.sin(angle_rads), tf.cos(angle_rads)
    ], axis=2)
    position_encoding = tf.reshape(position_encoding, [length, hidden_size])

    return position_encoding


def get_padding_bias(x):
    padding = tf.cast(tf.equal(x, 0), tf.float32)
    attention_bias = padding * NEG_INF
    attention_bias = tf.expand_dims(
        tf.expand_dims(attention_bias, axis=1), axis=1)
    return attention_bias


def get_decoder_self_attention_bias(length):
    valid_locs = tf.linalg.band_part(tf.ones([length, length]), -1, 0)
    valid_locs = tf.reshape(valid_locs, [1, 1, length, length])
    decoder_bias = NEG_INF * (1.0 - valid_locs)
    return decoder_bias

In [None]:
class EmbeddingSharedWeights(tf.keras.layers.Layer):

    def __init__(self, vocab_size, hidden_size):
        super(EmbeddingSharedWeights, self).__init__()
        self.vocab_size = vocab_size
        self.hidden_size = hidden_size
        self.embedding_weights = self.add_weight(
            name='embedding_weights',
            shape=[self.vocab_size, self.hidden_size],
            initializer=tf.random_normal_initializer(mean=0., stddev=self.hidden_size**-0.5)
        )

    def call(self, inputs, mode='embedding'):
        if mode == 'embedding':
            return self._embedding(inputs)
        elif mode == 'linear':
            return self._linear(inputs)

    def _embedding(self, inputs):
        embeddings = tf.gather(self.embedding_weights, inputs)
        mask = tf.cast(tf.not_equal(inputs, 0), embeddings.dtype)
        embeddings *= tf.expand_dims(mask, -1)
        embeddings *= self.hidden_size ** 0.5
        return embeddings

    def _linear(self, inputs):
        outputs = tf.matmul(inputs, self.embedding_weights, transpose_b=True)
        return outputs

In [None]:
class FeedForwardNetwork(tf.keras.layers.Layer):

    def __init__(self, hidden_size, filter_size, dropout_rate) -> None:
        super(FeedForwardNetwork, self).__init__()
        self.filter_dense_layer = tf.keras.layers.Dense(
            filter_size, use_bias=True, activation=gelu, name='filter_layer')
        self.output_dense_layer = tf.keras.layers.Dense(
            hidden_size, use_bias=True, name='output_layer')
        self.dropout_layer = tf.keras.layers.Dropout(dropout_rate)

    def call(self, x, training):
        output = self.filter_dense_layer(x)
        output = self.dropout_layer(output, training=training)
        output = self.output_dense_layer(output)

        return output

In [None]:
class Attention(tf.keras.layers.Layer):

    def __init__(self, hidden_size, num_heads, dropout_rate):
        super(Attention, self).__init__()
        self.hidden_size = hidden_size
        self.num_heads = num_heads
        self.depth = hidden_size // num_heads

        self.q_dense_layer = tf.keras.layers.Dense(hidden_size, use_bias=False, name='q')
        self.k_dense_layer = tf.keras.layers.Dense(hidden_size, use_bias=False, name='k')
        self.v_dense_layer = tf.keras.layers.Dense(hidden_size, use_bias=False, name='v')
        self.output_dense_layer = tf.keras.layers.Dense(
            hidden_size, use_bias=False, name='output_transform')
        self.dropout_layer = tf.keras.layers.Dropout(dropout_rate)

    def call(self, x, y, bias, training):
        q = self.q_dense_layer(x)
        k = self.k_dense_layer(y)
        v = self.v_dense_layer(y)

        q = self._split_heads(q)
        k = self._split_heads(k)
        v = self._split_heads(v)

        q = q * self.depth ** -0.5

        logits = tf.matmul(q, k, transpose_b=True)
        logits += bias
        weights = tf.nn.softmax(logits, name='attention_weights')
        weights = self.dropout_layer(weights, training=training)

        attention_output = tf.matmul(weights, v)
        attention_output = self._combine_heads(attention_output)
        attention_output = self.output_dense_layer(attention_output)

        return attention_output

    def _split_heads(self, x):
        batch_size = tf.shape(x)[0]
        length = tf.shape(x)[1]
        x = tf.reshape(x, [batch_size, length, self.num_heads, self.depth])
        return tf.transpose(x, [0, 2, 1, 3])

    def _combine_heads(self, x):
        batch_size = tf.shape(x)[0]
        length = tf.shape(x)[2]
        x = tf.transpose(x, [0, 2, 1, 3])
        return tf.reshape(x, [batch_size, length, self.hidden_size])

In [None]:
class SelfAttention(Attention):
    def call(self, x, bias, training):
        return super(SelfAttention, self).call(x, x, bias, training=training)

In [None]:
class PrePostProcessingWrapper(tf.keras.layers.Layer):

    def __init__(self, layer, dropout_rate):
        super(PrePostProcessingWrapper, self).__init__()
        self.layer = layer
        self.layer_norm = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout_layer = tf.keras.layers.Dropout(dropout_rate)

    def call(self, x, *args, **kwargs):
        y = self.layer_norm(x)
        y = self.layer(y, *args, **kwargs)
        y = self.dropout_layer(y, training=kwargs['training'])

        return x + y

In [None]:
class EncoderStack(tf.keras.layers.Layer):

    def __init__(self, num_blocks, hidden_size, num_heads, dropout_rate):
        super(EncoderStack, self).__init__()
        self.layers = []
        for _ in range(num_blocks):
            self_attention_layer = SelfAttention(hidden_size, num_heads, dropout_rate)
            feed_forward_network = FeedForwardNetwork(hidden_size, hidden_size * 4, dropout_rate)

            self.layers.append([
                PrePostProcessingWrapper(self_attention_layer, dropout_rate),
                PrePostProcessingWrapper(feed_forward_network, dropout_rate)
            ])

        self.output_normalization = tf.keras.layers.LayerNormalization(epsilon=1e-6)

    def call(self, encoder_inputs, attention_bias, training):
        for n, layer in enumerate(self.layers):
            self_attention_layer = layer[0]
            feed_forward_network = layer[1]

            encoder_inputs = self_attention_layer(
                encoder_inputs, attention_bias, training=training)
            encoder_inputs = feed_forward_network(
                encoder_inputs, training=training)

        return self.output_normalization(encoder_inputs)

In [None]:
class DecoderStack(tf.keras.layers.Layer):

    def __init__(self, num_blocks, hidden_size, num_heads, dropout_rate):
        super(DecoderStack, self).__init__()
        self.layers = []
        for _ in range(num_blocks):
            self_attention_layer = SelfAttention(hidden_size, num_heads, dropout_rate)
            enc_dec_attention_layer = Attention(hidden_size, num_heads, dropout_rate)
            feed_forward_network = FeedForwardNetwork(hidden_size, hidden_size * 4, dropout_rate)

            self.layers.append([
                PrePostProcessingWrapper(self_attention_layer, dropout_rate),
                PrePostProcessingWrapper(enc_dec_attention_layer, dropout_rate),
                PrePostProcessingWrapper(feed_forward_network, dropout_rate)
            ])

        self.output_normalization = tf.keras.layers.LayerNormalization(epsilon=1e-6)

    def call(
        self,
        decoder_inputs,
        encoder_outputs,
        decoder_self_attention_bias,
        attention_bias,
        training
    ):
        for n, layer in enumerate(self.layers):
            self_attention_layer = layer[0]
            enc_dec_attention_layer = layer[1]
            feed_forward_network = layer[2]

            decoder_inputs = self_attention_layer(
                decoder_inputs, decoder_self_attention_bias, training=training)
            decoder_inputs = enc_dec_attention_layer(
                decoder_inputs, encoder_outputs, attention_bias, training=training)
            decoder_inputs = feed_forward_network(decoder_inputs, training=training)

        return self.output_normalization(decoder_inputs)

In [None]:
class Transformer(tf.keras.Model):

    def __init__(
        self,
        vocab_size,
        num_blocks,
        hidden_size,
        num_heads,
        dropout_rate,
    ):
        super(Transformer, self).__init__()
        self.num_blocks = num_blocks
        self.hidden_size = hidden_size
        self.num_heads = num_heads
        self.dropout_rate = dropout_rate

        self.embedding_softmax_layer = EmbeddingSharedWeights(vocab_size, hidden_size)
        self.encoder_stack = EncoderStack(num_blocks, hidden_size, num_heads, dropout_rate)
        self.decoder_stack = DecoderStack(num_blocks, hidden_size, num_heads, dropout_rate)

        self.encoder_dropout_layer = tf.keras.layers.Dropout(dropout_rate)
        self.decoder_dropout_layer = tf.keras.layers.Dropout(dropout_rate)

    def call(self, encoder_inputs, decoder_inputs, training):
        attention_bias = get_padding_bias(encoder_inputs)
        encoder_outputs = self.encode(encoder_inputs, attention_bias, training=training)
        logits = self.decode(decoder_inputs, encoder_outputs, attention_bias, training=training)
        return logits

    def encode(self, inputs, attention_bias, training):
        embedded_inputs = self.embedding_softmax_layer(inputs)

        with tf.name_scope('add_pos_encoding'):
            length = tf.shape(embedded_inputs)[1]
            pos_encoding = get_position_encoding(length, self.hidden_size)
            encoder_inputs = embedded_inputs + pos_encoding
            encoder_inputs = self.encoder_dropout_layer(encoder_inputs, training=training)

        return self.encoder_stack(encoder_inputs, attention_bias, training=training)

    def decode(self, inputs, encoder_outputs, attention_bias, training):
        embedded_inputs = self.embedding_softmax_layer(inputs)

        with tf.name_scope('add_pos_encoding'):
            length = tf.shape(embedded_inputs)[1]
            pos_encoding = get_position_encoding(length, self.hidden_size)
            decoder_inputs = embedded_inputs + pos_encoding
            decoder_inputs = self.decoder_dropout_layer(decoder_inputs, training=training)

        decoder_self_attention_bias = get_decoder_self_attention_bias(length)
        decoder_outputs = self.decoder_stack(
            decoder_inputs,
            encoder_outputs,
            decoder_self_attention_bias,
            attention_bias,
            training=training
        )
        logits = self.embedding_softmax_layer(decoder_outputs, mode='linear')

        return logits

In [None]:
transformer = Transformer(
    vocab_size,
    num_blocks,
    num_hidden_size,
    num_heads,
    dropout_rate,
)

optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002)

In [None]:
train_loss_metric = tf.keras.metrics.Mean(name='train_loss')
train_accuracy_metric = tf.keras.metrics.Mean(name='train_accuracy')

def loss_function(real, pred):
    targets_one_hot = tf.one_hot(real, vocab_size)
    loss = tf.nn.softmax_cross_entropy_with_logits(
        logits=pred,
        labels=targets_one_hot
    )

    weights = tf.math.logical_not(tf.math.equal(real, 0))
    weights = tf.cast(weights, dtype=loss.dtype)

    return tf.reduce_sum(loss * weights) / tf.reduce_sum(weights)


def accuracy_function(real, pred):
    predicted_ids = tf.cast(tf.argmax(pred, axis=-1), tf.int32)
    correct = tf.cast(tf.equal(predicted_ids, real), tf.float32)

    weights = tf.math.logical_not(tf.math.equal(real, 0))
    weights = tf.cast(weights, dtype=tf.float32)

    return tf.reduce_sum(correct * weights) / tf.reduce_sum(weights)


@tf.function
def train_step(dataset_inputs):
    encoder_inputs, decoder_inputs, decoder_targets = dataset_inputs

    with tf.GradientTape() as tape:
        logits = transformer(encoder_inputs, decoder_inputs, training=True)
        loss = loss_function(decoder_targets, logits)

    variables = transformer.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))

    train_loss_metric(loss)
    accuracy = accuracy_function(decoder_targets, logits)
    train_accuracy_metric(accuracy)

In [None]:
template = '{}/{} (epoch {}), Train Loss: {:.4f}, Train Accuracy: {:.4f}, Elapsed Time: {:.2f}'
start = time.time()
for e in range(num_epochs):
    for i in range(num_batches):
        batch_train_data = next(train_data_iter)
        train_step(batch_train_data)

        if (e * num_batches + i + 1) % 100 == 0:
            print(template.format(
                e * num_batches + i + 1,
                num_epochs * num_batches,
                e + 1,
                train_loss_metric.result().numpy(),
                train_accuracy_metric.result().numpy(),
                time.time() - start
            ))

            train_loss_metric.reset_states()
            train_accuracy_metric.reset_states()
            start = time.time()


# Validation

In [None]:
valid_data_num = 10240
valid_batch_size = 1024

valid_source_sequences, valid_target_sequences = create_dataset(valid_data_num)
valid_dataset = tf.data.Dataset.from_tensor_slices((
    tokenize(valid_source_sequences),
    tokenize(valid_target_sequences, bos=True),
    tokenize(valid_target_sequences, eos=True)
))
valid_dataset = valid_dataset.batch(valid_batch_size, drop_remainder=False).repeat().prefetch(8)
valid_data_iter = iter(valid_dataset)

In [None]:
valid_loss_metric = tf.keras.metrics.Mean(name='valid_loss')
valid_accuracy_metric = tf.keras.metrics.Mean(name='valid_accuracy')

for _ in range(valid_data_num // valid_batch_size):
    encoder_inputs, decoder_inputs, decoder_targets = next(valid_data_iter)
    logits = transformer(encoder_inputs, decoder_inputs, training=False)
    loss = loss_function(decoder_targets, logits)

    valid_loss_metric(loss)
    accuracy = accuracy_function(decoder_targets, logits)
    valid_accuracy_metric(accuracy)

print('Valid Loss: {:.4f}, Valid Accuracy: {:.4f}'.format(
    valid_loss_metric.result().numpy(),
    valid_accuracy_metric.result().numpy()
))