In [1]:
from __future__ import absolute_import, division, print_function

import numpy as np
import random
import tensorflow as tf
from tensorflow.keras import Model, layers

In [2]:
seq_min_len = 5
seq_max_len = 20
num_classes = 2

masking_val = -1
max_value = 10000

learning_rate = 0.001
training_steps = 2000
batch_size = 64
display_step = 100
num_units = 32

In [3]:
def toy_sequence_data():
    while True:
        seq_len = random.randint(seq_min_len, seq_max_len)
        rand_start = random.randint(0, max_value - seq_len)
        if random.random() < 0.5:
            seq = np.arange(start = rand_start, stop = rand_start + seq_len)
            seq = seq / max_value
            seq = np.pad(seq, mode = 'constant', pad_width = (0, seq_max_len - seq_len), constant_values = masking_val)
            label = 0
        else:
            seq = np.random.randint(max_value, size = seq_len)
            seq = seq / max_value
            seq = np.pad(seq, mode = 'constant', pad_width = (0, seq_max_len - seq_len), constant_values = masking_val)
            label = 1
        yield np.array(seq, dtype = np.float32), np.array(label, dtype = np.float32)

In [4]:
train_data = tf.data.Dataset.from_generator(toy_sequence_data, output_types = (tf.float32, tf.float32))
train_data = train_data.repeat().shuffle(5000).batch(batch_size).prefetch(1)

In [5]:
class LSTM(Model):
    def __init__(self):
        super(LSTM, self).__init__()
        self.masking = layers.Masking(mask_value = masking_val)
        self.lstm = layers.LSTM(units = num_units)
        self.out = layers.Dense(num_classes)
    
    def call(self, x, is_training = False):
        x = tf.reshape(x, shape = [-1, seq_max_len, 1])
        x = self.masking(x)
        x = self.lstm(x)
        x = self.out(x)
        if not is_training:
            x = tf.nn.softmax(x)
        return x

In [6]:
lstm_net = LSTM()

In [7]:
def cross_entropy_loss(x, y):
    y = tf.cast(y, tf.int64)
    loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels = y, logits = x)
    return tf.reduce_mean(loss)

def accuracy(y_pred, y_true):
    correct_prediction = tf.equal(tf.argmax(y_pred, 1), tf.cast(y_true, tf.int64))
    return tf.reduce_mean(tf.cast(correct_prediction, tf.float32), axis = -1)

In [8]:
optimizer = tf.optimizers.Adam(learning_rate)

In [9]:
def run_optimization(x, y):
    with tf.GradientTape() as g:
        pred = lstm_net(x, is_training = True)
        loss = cross_entropy_loss(pred, y)
        
    trainable_variables = lstm_net.trainable_variables
    gradients = g.gradient(loss, trainable_variables)
    optimizer.apply_gradients(zip(gradients, trainable_variables))

In [10]:
for step, (batch_x, batch_y) in enumerate(train_data.take(training_steps), 1):
    run_optimization(batch_x, batch_y)
    if step % display_step == 0 or step == 1:
        pred = lstm_net(batch_x, is_training = True)
        loss = cross_entropy_loss(pred, batch_y)
        acc = accuracy(pred, batch_y)
        print("Step: %i, Loss: %f, Accuracy: %f" % (step, loss, acc))

Step: 1, Loss: 0.687976, Accuracy: 0.578125
Step: 100, Loss: 0.623067, Accuracy: 0.687500
Step: 200, Loss: 0.507279, Accuracy: 0.796875
Step: 300, Loss: 0.513295, Accuracy: 0.765625
Step: 400, Loss: 0.385737, Accuracy: 0.843750
Step: 500, Loss: 0.389303, Accuracy: 0.843750
Step: 600, Loss: 0.407296, Accuracy: 0.765625
Step: 700, Loss: 0.300585, Accuracy: 0.921875
Step: 800, Loss: 0.287422, Accuracy: 0.828125
Step: 900, Loss: 0.246140, Accuracy: 0.906250
Step: 1000, Loss: 0.119425, Accuracy: 0.937500
Step: 1100, Loss: 0.225433, Accuracy: 0.890625
Step: 1200, Loss: 0.168575, Accuracy: 0.906250
Step: 1300, Loss: 0.210215, Accuracy: 0.921875
Step: 1400, Loss: 0.081517, Accuracy: 0.968750
Step: 1500, Loss: 0.118762, Accuracy: 0.968750
Step: 1600, Loss: 0.071198, Accuracy: 0.984375
Step: 1700, Loss: 0.093566, Accuracy: 0.968750
Step: 1800, Loss: 0.068406, Accuracy: 0.984375
Step: 1900, Loss: 0.079908, Accuracy: 1.000000
Step: 2000, Loss: 0.073974, Accuracy: 1.000000
