In [16]:
import tensorflow as tf

In [17]:
class Time2Vector(tf.keras.layers.Layer):
  def __init__(self, seq_len, **kwargs):
    super(Time2Vector, self).__init__()
    self.seq_len = seq_len

  def build(self, input_shape):
    '''Initialize weights and biases with shape (batch, seq_len)'''
    self.weights_linear = self.add_weight(name='weight_linear',
                                shape=(int(self.seq_len),),
                                initializer='uniform',
                                trainable=True)
    
    self.bias_linear = self.add_weight(name='bias_linear',
                                shape=(int(self.seq_len),),
                                initializer='uniform',
                                trainable=True)
    
    self.weights_periodic = self.add_weight(name='weight_periodic',
                                shape=(int(self.seq_len),),
                                initializer='uniform',
                                trainable=True)

    self.bias_periodic = self.add_weight(name='bias_periodic',
                                shape=(int(self.seq_len),),
                                initializer='uniform',
                                trainable=True)
    
  def call(self, x):
    '''Calculate linear and periodic time features
    
    Args:
        x (tensor): Input tensor of shape (batch_size, seq_len, features).
        
    Returns:
        tensor: Concatenated linear and periodic time features of shape (batch_size, seq_len, 2).
    '''
    # Exclude volume and average across all features for our data, resulting in the shape (batch_size, seq_len)
    x = tf.math.reduce_mean(x[:,:,:], axis=-1)

    # Calculate the non-periodic (linear) time feature and expand the dimension by 1 again i.e., (batch_size, seq_len, 1)
    time_linear = self.weights_linear * x + self.bias_linear # Linear time feature
    time_linear = tf.expand_dims(time_linear, axis=-1) # Add dimension (batch, seq_len, 1)
    
    # Repeat for the periodic time feature, also resulting in the same matrix shape. (batch_size, seq_len, 1)
    time_periodic = tf.math.sin(tf.multiply(x, self.weights_periodic) + self.bias_periodic)
    time_periodic = tf.expand_dims(time_periodic, axis=-1) # Add dimension (batch, seq_len, 1)

    # Concatenate the linear and periodic time feature. (batch_size, seq_len, 2)
    return tf.concat([time_linear, time_periodic], axis=-1) # shape = (batch, seq_len, 2)
   
  def get_config(self):
      '''Get configuration for saving and loading model with custom layer.
      
      Returns:
          dict: Configuration dictionary.
      '''
      config = super().get_config().copy()
      config.update({'seq_len': self.seq_len})
      return config

In [18]:
class SingleAttention(tf.keras.layers.Layer):
    '''Single Attention Layer'''
    def __init__(self, d_k, d_v):
        super(SingleAttention, self).__init__()
        self.d_k = d_k
        self.d_v = d_v

    def build(self, input_shape):
        '''Builds the layer'''
        self.query = tf.keras.layers.Dense(self.d_k, 
                                           input_shape=input_shape, 
                                           kernel_initializer='glorot_uniform', 
                                           bias_initializer='glorot_uniform')
        
        self.key = tf.keras.layers.Dense(self.d_k, 
                                         input_shape=input_shape, 
                                         kernel_initializer='glorot_uniform', 
                                         bias_initializer='glorot_uniform')
        
        self.value = tf.keras.layers.Dense(self.d_v, 
                                           input_shape=input_shape, 
                                           kernel_initializer='glorot_uniform', 
                                           bias_initializer='glorot_uniform')

    def call(self, inputs): # inputs = (in_seq, in_seq, in_seq)
        '''Executes the layer'''
        q = self.query(inputs[0])
        k = self.key(inputs[1])

        attn_weights = tf.matmul(q, k, transpose_b=True)
        attn_weights = tf.map_fn(lambda x: x/np.sqrt(self.d_k), attn_weights)
        attn_weights = tf.nn.softmax(attn_weights, axis=-1)
        
        v = self.value(inputs[2])
        attn_out = tf.matmul(attn_weights, v)
        return attn_out    

class MultiAttention(tf.keras.layers.Layer):
    '''Multi-Head Attention Layer'''
    def __init__(self, d_k, d_v, n_heads):
        super(MultiAttention, self).__init__()
        self.d_k = d_k
        self.d_v = d_v
        self.n_heads = n_heads
        self.attn_heads = list()

    def build(self, input_shape):
        '''Builds the layer'''
        for n in range(self.n_heads):
            self.attn_heads.append(SingleAttention(self.d_k, self.d_v))  
        
        # input_shape[0]=(batch, seq_len, num_features), input_shape[0][-1]=num_features
        self.linear = tf.keras.layers.Dense(input_shape[0][-1], 
                                            input_shape=input_shape, 
                                            kernel_initializer='glorot_uniform', 
                                            bias_initializer='glorot_uniform')

    def call(self, inputs):
        '''Executes the layer'''
        attn = [self.attn_heads[i](inputs) for i in range(self.n_heads)]
        concat_attn = tf.concat(attn, axis=-1)
        multi_linear = self.linear(concat_attn)
        return multi_linear

In [19]:
class TransformerEncoder(tf.keras.layers.Layer):
    '''Transformer Encoder Layer'''
    def __init__(self, d_k, d_v, n_heads, ff_dim, dropout=0.1, **kwargs):
        super(TransformerEncoder, self).__init__()
        self.d_k = d_k
        self.d_v = d_v
        self.n_heads = n_heads
        self.ff_dim = ff_dim
        self.attn_heads = list()
        self.dropout_rate = dropout

    def build(self, input_shape):
        '''Builds the layer'''
        self.attn_multi = MultiAttention(self.d_k, self.d_v, self.n_heads)
        self.attn_dropout = tf.keras.layers.Dropout(self.dropout_rate)
        self.attn_normalize = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.ff_dense_1 = tf.keras.layers.Dense(units=self.ff_dim, activation='relu')
        self.ff_dense_2 = tf.keras.layers.Dense(units=input_shape[0][-1]) 
        self.ff_dropout = tf.keras.layers.Dropout(self.dropout_rate)
        self.ff_normalize = tf.keras.layers.LayerNormalization(epsilon=1e-6)    

    def call(self, inputs): # inputs = (in_seq, in_seq, in_seq)
        '''Executes the layer'''
        # Multi-Head Self Attention
        attention_output = self.attn_multi(inputs)
        # Apply dropout for regularization
        attention_output = self.attn_dropout(attention_output)

        # Add and Normalize step after Multi-Head Self Attention
        norm_attention_output = self.attn_normalize(inputs[0] + attention_output)

        # Feedforward Neural Network
        ff_output = self.ff_dense_1(norm_attention_output)
        ff_output = self.ff_dense_2(ff_output)
        # Apply dropout for regularization
        ff_output = self.ff_dropout(ff_output)

        # Add and Normalize step after the Feedforward Neural Network
        encoder_output = self.ff_normalize(inputs[0] + ff_output)

        return encoder_output 

    def get_config(self): # Needed for saving and loading model with custom layer
        '''Gets configuration for saving and loading'''
        config = super().get_config().copy()
        config.update({'d_k': self.d_k,
                       'd_v': self.d_v,
                       'n_heads': self.n_heads,
                       'ff_dim': self.ff_dim,
                       'attn_heads': self.attn_heads,
                       'dropout_rate': self.dropout_rate})
        return config

In [20]:
class TransformerDecoder(tf.keras.layers.Layer):
    '''Transformer Decoder Layer'''
    def __init__(self, d_k, d_v, n_heads, ff_dim, dropout=0.1, **kwargs):
        super(TransformerDecoder, self).__init__()
        self.d_k = d_k
        self.d_v = d_v
        self.n_heads = n_heads
        self.ff_dim = ff_dim
        self.dropout_rate = dropout

    def build(self, input_shape):
        '''Builds the layer'''
        self.dec_attn_multi = MultiAttention(self.d_k, self.d_v, self.n_heads)
        self.dec_attn_dropout = tf.keras.layers.Dropout(self.dropout_rate)
        self.dec_attn_normalize = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.enc_dec_attn_multi = MultiAttention(self.d_k, self.d_v, self.n_heads)
        self.enc_dec_attn_dropout = tf.keras.layers.Dropout(self.dropout_rate)
        self.enc_dec_attn_normalize = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.ff_dense_1 = tf.keras.layers.Dense(units=self.ff_dim, activation='relu')
        self.ff_dense_2 = tf.keras.layers.Dense(units=input_shape[0][-1]) 
        self.ff_dropout = tf.keras.layers.Dropout(self.dropout_rate)
        self.ff_normalize = tf.keras.layers.LayerNormalization(epsilon=1e-6)    

    def call(self, inputs): 
        '''Executes the layer'''
        dec_inputs, enc_outputs = inputs
        # Masked Self-Attention
        masked_attention_output = self.dec_attn_multi((dec_inputs, dec_inputs, dec_inputs))
        # Apply dropout for regularization
        masked_attention_output = self.dec_attn_dropout(masked_attention_output)

        # Add and Normalize the Masked Self-Attention output
        norm_masked_attention_output = self.dec_attn_normalize(dec_inputs + masked_attention_output)

        # Cross-Attention with Encoder Output
        attention_output = self.enc_dec_attn_multi((norm_masked_attention_output, enc_outputs, enc_outputs))
        # Apply dropout for regularization
        attention_output = self.enc_dec_attn_dropout(attention_output)

        # Add and Normalize the Cross-Attention output
        attention_output = self.enc_dec_attn_normalize(norm_masked_attention_output + attention_output)

        # Feedforward Neural Network
        ff_output = self.ff_dense_1(attention_output)
        ff_output = self.ff_dense_2(ff_output)
        # Apply dropout for regularization
        ff_output = self.ff_dropout(ff_output)

        # Add and Normalize
        decoder_output = self.ff_normalize(attention_output + ff_output)
        
        return decoder_output
    
    def get_config(self): 
        '''Gets configuration for saving and loading'''
        config = super().get_config().copy()
        config.update({'d_k': self.d_k,
                       'd_v': self.d_v,
                       'n_heads': self.n_heads,
                       'ff_dim': self.ff_dim,
                       'dropout_rate': self.dropout_rate})
        return config

In [21]:
def Time2VecTranformer(d_k, d_v, n_heads, ff_dim, num_layers, sequence_length, num_features):
    '''Initialize time and transformer layers'''
    time_embedding = Time2Vector(sequence_length)
    
    # Initialize multiple TransformerEncoder layers
    encoder_layers = [TransformerEncoder(d_k, d_v, n_heads, ff_dim) for _ in range(num_layers)]
    
    # Initialize multiple TransformerDecoder layers
    decoder_layers = [TransformerDecoder(d_k, d_v, n_heads, ff_dim) for _ in range(num_layers)]

    '''Construct model'''
    in_seq = tf.keras.layers.Input(shape=(sequence_length, num_features))
    input_embeddings = time_embedding(in_seq)
    inputs_encoder = tf.keras.layers.Concatenate(axis=-1)([in_seq, input_embeddings])
    
    # Connect multiple TransformerEncoder layers sequentially
    enc_output = inputs_encoder
    for encoder_layer in encoder_layers:
        enc_output = encoder_layer((enc_output, enc_output, enc_output))
    
    # Connect multiple TransformerDecoder layers sequentially
    dec_output = enc_output
    for decoder_layer in decoder_layers:
        dec_output = decoder_layer((dec_output, enc_output))

    x = tf.keras.layers.GlobalAveragePooling1D(data_format='channels_first')(dec_output)
    x = tf.keras.layers.Dropout(0.1)(x)
    out = tf.keras.layers.Dense(1, activation='linear')(x)

    return tf.keras.Model(inputs=in_seq, outputs=out)