In [1]:
%reload_ext autoreload
%autoreload 2
import IPython, IPython.display, os, datetime
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 


import tensorflow as tf
from tensorflow import keras
from keras.callbacks  import EarlyStopping
from keras.models import Sequential, Model
from keras.layers import Lambda, Dropout, Input
from keras import regularizers
from keras.utils import plot_model


mpl.rcParams['figure.figsize'] = (14, 4)
mpl.rcParams['axes.grid'] = True

print(f"Tensorflow Version {tf.__version__}, Keras Vesion: {keras.__version__}")

Tensorflow Version 2.10.0, Keras Vesion: 2.10.0


In [63]:
#%%writefile "transformermodel.py"

# DO NOT EDIT THIS FILE - GENERATED FROM 06_transformer.ipynb

import tensorflow as tf
import numpy as np

from keras.layers import Embedding, MultiHeadAttention, LayerNormalization
from keras_preprocessing.sequence import pad_sequences
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


"""
    Precomputes a matrix with all the positional encodings 

    Arguments:
        positions (int) -- Maximum number of positions to be encoded 
        d (int) -- Encoding size 

    Returns:
        pos_encoding -- (1, position, d_model) A matrix with the positional encodings
"""
def positional_encoding(positions, d):
    # initialize a matrix angle_rads of all the angles 
    pos = np.arange(positions)[:, np.newaxis] 
    d = np.float32(d)

    angle_rads = pos/ np.power(10000, (2 * (np.arange(d)[np.newaxis, :]//2)) / d)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    
    pos_encoding = angle_rads[np.newaxis, ...]
    
    return tf.cast(pos_encoding, dtype=tf.float32)



"""
    Creates a matrix mask for the padding cells

    Arguments:
        decoder_token_ids -- (n, m) matrix

    Returns:
        mask -- (n, 1, m) binary tensor
"""    
def create_padding_mask(decoder_token_ids):
    seq = 1 - tf.cast(tf.math.equal(decoder_token_ids, 0), tf.float32)
  
    # add extra dimensions to add the padding
    # to the attention logits.
    return seq[:, tf.newaxis, :]


"""
Returns a lower triangular matrix filled with ones

Arguments:
    sequence_length -- matrix size

Returns:
    mask -- (size, size) tensor
"""
def create_look_ahead_mask(sequence_length):
    mask = tf.linalg.band_part(tf.ones((1, sequence_length, sequence_length)), -1, 0)
    return mask 


def FullyConnected(embedding_dim, fully_connected_dim):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(fully_connected_dim, activation='relu'),  # (batch_size, seq_len, dff)
        tf.keras.layers.Dense(embedding_dim)  # (batch_size, seq_len, d_model)
    ])


"""
    The encoder layer is composed by a multi-head self-attention mechanism,
    followed by a simple, positionwise fully connected feed-forward network. 
    This archirecture includes a residual connection around each of the two 
    sub-layers, followed by layer normalization.
"""
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, embedding_dim, num_heads, fully_connected_dim,
                 dropout_rate=0.1, layernorm_eps=1e-6):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(num_heads=num_heads,
                                      key_dim=embedding_dim,
                                      dropout=dropout_rate)

        self.ffn = FullyConnected(embedding_dim=embedding_dim,
                                  fully_connected_dim=fully_connected_dim)

        self.layernorm1 = LayerNormalization(epsilon=layernorm_eps)
        self.layernorm2 = LayerNormalization(epsilon=layernorm_eps)

        self.dropout_ffn = Dropout(dropout_rate)
    
    def call(self, x, training, mask):
        """
        Forward pass for the Encoder Layer
        
        Arguments:
            x -- Tensor of shape (batch_size, input_seq_len, fully_connected_dim)
            training -- Boolean, set to true to activate
                        the training mode for dropout layers
            mask -- Boolean mask to ensure that the padding is not 
                    treated as part of the input
        Returns:
            encoder_layer_out -- Tensor of shape (batch_size, input_seq_len, embedding_dim)
        """
        # calculate self-attention using mha(~1 line).
        # Dropout is added by Keras automatically if the dropout parameter is non-zero during training
        self_mha_output = self.mha(query=x,
                                   value=x,
                                   key=x,
                                   attention_mask=mask,training=training) 
        # Self attention (batch_size, input_seq_len, 
        
        # skip connection
        # apply layer normalization on sum of the input and the attention output to get the  
        # output of the multi-head attention layer (~1 line)
        skip_x_attention = self.layernorm1( x + self_mha_output)  # (batch_size, input_seq_len, fully_connected_dim)

        # pass the output of the multi-head attention layer through a ffn (~1 line)
        ffn_output = self.ffn(skip_x_attention)  # (batch_size, input_seq_len, fully_connected_dim)
        
        # apply dropout layer to ffn output during training (~1 line)
        # use `training=training` 
        ffn_output = self.dropout_ffn(ffn_output, training)
        
        # apply layer normalization on sum of the output from multi-head attention (skip connection) and ffn output to get the
        # output of the encoder layer (~1 line)
        encoder_layer_out = self.layernorm2(skip_x_attention + ffn_output)  # (batch_size, input_seq_len, embedding_dim)
        
        return encoder_layer_out



"""
The entire Encoder starts by passing the input to an embedding layer 
and using positional encoding to then pass the output through a stack of
encoder Layers
    
"""  
class Encoder(tf.keras.layers.Layer):
    '''
        embedding_dim = send in dimension of the input features for time series
        maximum_positions = in case of time series, it is the windows length

    '''
    def __init__(self,   embedding_dim,
                         maximum_position_encoding=1024,
                        num_layers = 6, num_heads=3, 
                        fully_connected_dim=64, input_vocab_size=None,
                        dropout_rate=0.1,  layernorm_eps=1e-6,
                        task="timseries|NLP"):

        super(Encoder, self).__init__()

        # Shape is same same (positions, features_len) for time series
        self.myinput_shape   = (maximum_position_encoding, embedding_dim)
        self.embedding_dim = embedding_dim
        self.num_layers = num_layers

        self.input_vocab_size = input_vocab_size
        self.maximum_position_encoding = maximum_position_encoding


        self.enc_layers = [EncoderLayer(embedding_dim=self.embedding_dim,
                                        num_heads=num_heads,
                                        fully_connected_dim=fully_connected_dim,
                                        dropout_rate=dropout_rate,
                                        layernorm_eps=layernorm_eps) 
                           for _ in range(self.num_layers)]

        self.dropout = Dropout(dropout_rate)
        self.task = task
        
    def embed_timeseries(self, x):
        assert self.task != "NLP", "Hmmm calling embed on non timeseries app"

        # For now it is noop
        return x

    def embed(self, x):
        if self.task != "NLP":
            return self.embed_timeseries(x)

        if ( not self.embedding):
            self.embedding = Embedding(self.input_vocab_size, self.embedding_dim)
        x = self.embedding(x)  # (batch_size, input_seq_len, embedding_dim)

        # Scale embedding by multiplying by square root embedding dimension
        x *= np.sqrt( self.embedding_dim )
        return x
                
    def positional_encode(self, x):
        if self.task != "NLP":
            return x
        if ( not self.pos_encoding):
            pe = positional_encoding(self.maximum_position_encoding, 
                                     self.embedding_dim)
            self.pos_encoding = pe

        seq_len = tf.shape(x)[1]
        x += self.pos_encoding[:, :seq_len, :]
        return x

    """
    Arguments:
        x -- Tensor of shape (batch_size, input_seq_len) for NLP
                            (batch_size, input_seq_len, features_len) for time series

        training -- Boolean, set to true to activate
                    the training mode for dropout layers
        mask -- Boolean mask to ensure that the padding is not 
                treated as part of the input
    Returns:
        out2 -- Tensor of shape (batch_size, input_seq_len, embedding_dim)
    """
    def call(self, x, training = True, mask=None):
        # Note followig two lines are noop for time series analysis
        x = self.embed(x)
        x = self.positional_encode (x)
        x = self.dropout(x, training)
        # Pass the output through the stack of encoding layers 
        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)
            #x = self.enc_layers[0](x, training, mask)

        return x  # (batch_size, input_seq_len, embedding_dim)

    def build(self):
        inputs = keras.Input(shape=self.myinput_shape)
        output = self.call(inputs)
        return inputs, output

In [None]:
class Decoder(tf.keras.layers.Layer):
    '''
        output features len 
    '''
    def __init__(self, output_feat_len, output_len = 1
                        num_layers = 6, num_heads=3, 
                        fully_connected_dim=64, input_vocab_size=None,
                        dropout_rate=0.1,  layernorm_eps=1e-6,
                        task="timseries|NLP"):

        super(Encoder, self).__init__()

    

In [64]:
encoder = Encoder(2)
i, o = encoder.build()
model = keras.Model(i, o)
model.summary()

Model: "model_22"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_24 (InputLayer)       [(None, 1024, 2)]         0         
                                                                 
 dropout_145 (Dropout)       (None, 1024, 2)           0         
                                                                 
 encoder_layer_122 (EncoderL  (None, 1024, 2)          398       
 ayer)                                                           
                                                                 
 encoder_layer_123 (EncoderL  (None, 1024, 2)          398       
 ayer)                                                           
                                                                 
 encoder_layer_124 (EncoderL  (None, 1024, 2)          398       
 ayer)                                                           
                                                          

In [54]:
o

In [59]:
encoder = EncoderLayer(4,2,2)
i = keras.Input(shape=(2,2))
encoder.build(i)
model = keras.Model(Sequential([encoder]))
model.build(i)
model.summary()

ValueError: Specified input shape is not one of the valid types. Please specify a batch input shape of type tuple or list of input shapes. User provided input type: <class 'keras.engine.keras_tensor.KerasTensor'>.

In [35]:
keras.

<KerasTensor: shape=(None, 1024, 2) dtype=float32 (created by layer 'encoder_layer_60')>