In [465]:
import tensorflow as tf
import numpy as np
import datetime

In [466]:
#-----------HYPERPARAMETERS----------------#

SEQUENCE_LENGTH = [2, 4, 8, 16, 32]
HIDDEN_LENGTH = 20
N_FEATURES = 1

LEARNING_RATE = 1e-3
EPOCHS = 100
CUMSUM = True
NUM_OF_LAYERS = 3


In [467]:
def create_dataset_for_sum_prediction(n_samples, sequence_length, n_features, validation):
    """
    Function to create a dataset for the sum prediction task. The function generates random integers between 0 and 10 and calculates the sum of the sequence
    :param n_samples:  number of samples in the dataset
    :param sequence_length: length of the sequence
    :param n_features: number of features in the sequence
    :return: a tf.data.Dataset object containing the input and output pairs
    """
    x = tf.cast(np.random.randint(low=0, high=11, size=(n_samples, sequence_length, n_features)), tf.float32)
    sums = []
    for sample in range(n_samples):
        value = x[sample, :, :]
        sums.append(tf.reduce_sum(value))
    y = tf.cast(tf.expand_dims(tf.convert_to_tensor(sums), axis=1), tf.float32)
    if not validation:
        return tf.data.Dataset.from_tensor_slices((x, y)).shuffle(n_samples).batch(32).prefetch(tf.data.AUTOTUNE)
    else:
        return tf.data.Dataset.from_tensor_slices((x, y)).batch(512).prefetch(tf.data.AUTOTUNE)

In [468]:
def create_dataset_for_cum_sum_prediction(n_samples, sequence_length, n_features, validation):
    """
    Function to create a dataset for the cumulative sum prediction task. The function generates random integers between 0 and 10 and calculates the cumulative sum of the sequence
    :param n_samples: number of samples in the dataset
    :param sequence_length: length of the sequence
    :param n_features: number of features in the sequence
    :return: a tf.data.Dataset object containing the input and output pairs
    """
    x = tf.cast(np.random.randint(low=0, high=11, size=(n_samples, sequence_length, n_features)), tf.float32)
    sums = []
    for sample in range(n_samples):
        value = x[sample, :, :]
        sums.append(tf.cumsum(value, axis=0))
    y = tf.cast(tf.convert_to_tensor(sums), tf.float32)
    if not validation:
        return tf.data.Dataset.from_tensor_slices((x, y)).batch(32).prefetch(tf.data.AUTOTUNE)
    else:
        return tf.data.Dataset.from_tensor_slices((x, y)).shuffle(n_samples).batch(512).prefetch(tf.data.AUTOTUNE)

In [469]:
class LSTMCell(tf.keras.layers.AbstractRNNCell):
    """
    Custom LSTM cell implementation
    """
    
    def __init__(self, input_length, hidden_length):
        """
        Initializes the LSTM cell with input and hidden dimensions
        
        :param input_length: Length of the input vector
        :param hidden_length: Length of the hidden state vector
        """
        super(LSTMCell, self).__init__()
        self.input_length = input_length
        self.hidden_length = hidden_length

        # forget gate components
        self.linear_forget_w1 = tf.keras.layers.Dense(self.hidden_length, use_bias=True)
        self.linear_forget_r1 = tf.keras.layers.Dense(self.hidden_length, use_bias=False)

        # input gate components
        self.linear_gate_w2 = tf.keras.layers.Dense(self.hidden_length, use_bias=True)
        self.linear_gate_r2 = tf.keras.layers.Dense(self.hidden_length, use_bias=False)

        # cell memory components
        self.linear_gate_w3 = tf.keras.layers.Dense(self.hidden_length, use_bias=True)
        self.linear_gate_r3 = tf.keras.layers.Dense(self.hidden_length, use_bias=False)

        # out gate components
        self.linear_gate_w4 = tf.keras.layers.Dense(self.hidden_length, use_bias=True)
        self.linear_gate_r4 = tf.keras.layers.Dense(self.hidden_length, use_bias=False)

        self.sigmoid = tf.keras.layers.Activation('sigmoid')
        self.tanh = tf.keras.layers.Activation('tanh')

    @property
    def state_size(self):
        """
        Returns the size of the LSTM cell state
        """
        return self.hidden_length, self.hidden_length

    def call(self, inputs, states):
        """
        Performs the forward pass through the LSTM cell
        
        :param inputs: Input tensor
        :param states: Tuple containing the previous hidden state and cell state
        """
        
        h, c = states

        # forget gate
        f = self.sigmoid(self.linear_forget_w1(inputs) + self.linear_forget_r1(h))

        # input gate
        i = self.sigmoid(self.linear_gate_w2(inputs) + self.linear_gate_r2(h))

        # cell memory
        g = self.tanh(self.linear_gate_w3(inputs) + self.linear_gate_r3(h))
        c_next = f * c + i * g

        # output gate
        o = self.sigmoid(self.linear_gate_w4(inputs) + self.linear_gate_r4(h))

        # next hidden state
        h_next = o * self.tanh(c_next)

        return h_next, [h_next, c_next]


In [470]:
class RNNModel(tf.keras.Model):
    def __init__(self, num_layers, sequence_length, hidden_length, cumsum):
        super().__init__()
        
        self.rnn_cells = [LSTMCell(input_length=sequence_length, hidden_length=hidden_length) for _ in range(num_layers)]
        
        self.rnn_layers = [tf.keras.layers.RNN(cell, return_sequences=cumsum, unroll=True) for cell in self.rnn_cells]
        
        if CUMSUM:
            self.output_layer = tf.keras.layers.Dense(sequence_length)
        else:
            self.output_layer = tf.keras.layers.Dense(1)

        self.metrics_list = [tf.keras.metrics.Mean(name="loss")]
        
        self.optimizer = tf.keras.optimizers.legacy.Adam(learning_rate=LEARNING_RATE)
        
        self.loss_function = tf.keras.losses.MeanAbsoluteError()

    @property
    def metrics(self):
        return self.metrics_list

    def reset_metrics(self):
        for metric in self.metrics:
            metric.reset_state()
    
    @tf.function
    def call(self, sequence, training=False):

        x = sequence
        for layer in self.rnn_layers:
            x = layer(x)

        return self.output_layer(x)

    def train_step(self, data):
        """
        Standard train_step method
        :param data: 
        :return: 
        """

        sequence, label = data
        with tf.GradientTape() as tape:
            output = self.call(sequence, training=True)
            loss = self.loss_function(label, output) + tf.reduce_sum(self.losses)
        gradients = tape.gradient(loss, self.trainable_variables)

        self.optimizer.apply_gradients(grads_and_vars=zip(gradients, self.trainable_variables))

        self.metrics[0].update_state(loss)

        return {m.name : m.result() for m in self.metrics}

    def test_step(self, data):
        """
        Standard test_step method
        :param data: 
        :return: 
        """

        sequence, label = data
        output = self.call(sequence, training=False)
        loss = self.loss_function(label, output) + tf.reduce_sum(self.losses)

        self.metrics[0].update_state(loss)

        return {m.name : m.result() for m in self.metrics}

In [471]:
#----------Training------------#

import tqdm

def training_loop(model, train, test, train_summary_writer, test_summary_writer):
    # Lists to store training and validation metrics across epochs
    train_loss = []
    val_loss = []
    train_acc = []
    val_acc = []

    # Loop through epochs
    for epoch in range(EPOCHS):

        # Training
        for data in tqdm.tqdm(train, position=0, leave=False, desc=f"Epoch {epoch}"):
            # Perform a training step using the model
            metrics = model.train_step(data)

            # Log training metrics to TensorBoard
            with train_summary_writer.as_default():
                for metric in model.metrics:
                    tf.summary.scalar(f"{metric.name}", metric.result(), step=epoch)

        # Store training metrics for the epoch
        train_loss.append(metrics["loss"].numpy())

        # Print and reset training metrics
        if epoch % 20 == 0:
            print(f"EPOCH {epoch}")
            print([f"{key}: {value.numpy()}" for (key, value) in metrics.items()])
        model.reset_metrics()

        # Testing
        for data in test:
            # Perform a testing step using the model
            metrics = model.test_step(data)

            # Log validation metrics to TensorBoard
            with test_summary_writer.as_default():
                for metric in model.metrics:
                    tf.summary.scalar(f"{metric.name}", metric.result(), step=epoch)

        # Store validation metrics for the epoch
        val_loss.append(metrics["loss"].numpy())

        # Print validation metrics
        if epoch % 20 == 0:
            print([f"val_{key}: {value.numpy()}" for (key, value) in metrics.items()])

        # Reset validation metrics
        model.reset_metrics()

    # Return lists of training and validation metrics for analysis or plotting
    return train_loss, train_acc, val_loss, val_acc

In [None]:
#-----------Loop for different sequence lengths with custom training loop----------------#

config_name= "LSTM"

for length in SEQUENCE_LENGTH:
    train_dataset = create_dataset_for_cum_sum_prediction(1024, length, 1, False)
    val_dataset = create_dataset_for_cum_sum_prediction(512, length, 1, True)
    
    train_log_path = f"logs/{config_name}/{length}/train"
    test_log_path = f"logs/{config_name}/{length}/val"
    
    # log writer for training metrics
    train_summary_writer = tf.summary.create_file_writer(train_log_path)
    
    # log writer for validation metrics
    test_summary_writer = tf.summary.create_file_writer(test_log_path)
    
    model = RNNModel(num_layers=NUM_OF_LAYERS, sequence_length=length, hidden_length=HIDDEN_LENGTH, cumsum=CUMSUM)
    train_losses, train_accuracies, val_losses, val_accuracies = training_loop(model, train_dataset, val_dataset, train_summary_writer, test_summary_writer)
    

In [None]:
#-----------Loop for different sequence lengths with compile and fit----------------#

for length in SEQUENCE_LENGTH:
    train_dataset = create_dataset_for_cum_sum_prediction(1024, length, 1, False)
    val_dataset = create_dataset_for_cum_sum_prediction(512, length, 1, True)
    
    model = RNNModel(num_layers=NUM_OF_LAYERS, sequence_length=length, hidden_length=HIDDEN_LENGTH, cumsum=CUMSUM)
    optimizer = tf.keras.optimizers.legacy.Adam(learning_rate=LEARNING_RATE)
    loss = tf.keras.losses.MeanAbsoluteError()

    # compile the model
    model.compile(optimizer = optimizer, loss=loss)
    
    EXPERIMENT_NAME = "LSTM_sum_prediction"
    current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    logging_callback = tf.keras.callbacks.TensorBoard(log_dir=f"./logs/{EXPERIMENT_NAME}/{length}")
    
    history = model.fit(train_dataset, 
                        validation_data=val_dataset,
                        epochs=EPOCHS,
                        callbacks=[logging_callback],
                        verbose=0)

In [448]:
%load_ext tensorboard

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


In [421]:
%tensorboard --logdir="logs/LSTM_sum_prediction" --port=6007

In [475]:
%tensorboard --logdir="logs/LSTM" --port=6008

Reusing TensorBoard on port 6008 (pid 11088), started 0:50:28 ago. (Use '!kill 11088' to kill it.)