In [None]:
import numpy as np
import random
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Embedding, Flatten, Conv1D, BatchNormalization, ReLU, Dropout, Dense
from tensorflow.keras.layers import Concatenate, GlobalAveragePooling1D

In [None]:
scce = tf.keras.losses.SparseCategoricalCrossentropy(reduction='none')

In [None]:
class Transformer:
    
    def __init__(self, input_shape, seed, **kwargs):
        
        self.__dict__.update(kwargs)
        
        self.input_shape = input_shape
        self.seed = seed
        self.train_callback = [keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=self.plateau_patience, min_lr=1e-3),
                               keras.callbacks.EarlyStopping(monitor="val_loss", patience=self.train_patience, 
                                                             restore_best_weights=False, verbose=self.verbose)]
        self.retrain_callback = [keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=self.plateau_patience, min_lr=1e-3),
                                 keras.callbacks.EarlyStopping(monitor="val_loss", patience=self.retrain_patience, 
                                                               restore_best_weights=False, verbose=self.verbose)]
        
        self.model = self.build_model()
        
    def transformer_encoder(self, inputs):
        
        init = tf.keras.initializers.GlorotUniform(self.seed)
        
        # Attention and Normalization
        x = layers.MultiHeadAttention(key_dim=self.head_size, num_heads=self.num_heads, dropout=self.dropout,
                                      kernel_initializer='glorot_uniform', bias_initializer='zeros')(inputs, inputs)
        x = layers.Dropout(self.dropout, seed=self.seed)(x)
        x = layers.LayerNormalization(epsilon=self.layer_norm)(x)
        res = x + inputs

        # Feed Forward Part
        # x = layers.Conv1D(filters=self.ff_dim, kernel_size=self.kernel_size, padding=self.padding,
        #                   activation=self.activation, kernel_initializer=init, bias_initializer='zeros')(res)
        # x = layers.Dropout(self.dropout, seed=self.seed)(x)
        x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=self.kernel_size, padding=self.padding,
                          activation=self.activation, kernel_initializer=init, bias_initializer='zeros')(x)
        x = layers.LayerNormalization(epsilon=self.layer_norm)(x)
        return x + res
    
    def build_model(self):
        
        init = tf.keras.initializers.GlorotUniform(self.seed)
        
        input_layer = keras.Input(shape=self.input_shape)

        embedding_layer = keras.layers.Embedding(input_dim=self.num_of_tokens, output_dim=self.embedding_dim,
                                                embeddings_initializer=init)(self.sic_input)
        embedding_layer = keras.layers.Flatten()(embedding_layer)

        x = input_layer
        for _ in range(self.num_transformer_blocks - 1):
            x = self.transformer_encoder(x)
        
        # Last transformer block
        inputs = x
        x = layers.MultiHeadAttention(key_dim=self.head_size, num_heads=self.num_heads, dropout=self.dropout,
                                      kernel_initializer='glorot_uniform', bias_initializer='zeros')(inputs, inputs)
        x = layers.Dropout(self.dropout, seed=self.seed)(x)
        x = layers.LayerNormalization(epsilon=self.layer_norm)(x)
        x = x + inputs
        for last_ff_dim in self.last_ff_dims:
            x = layers.Conv1D(filters=last_ff_dim, kernel_size=self.kernel_size, padding=self.padding,
                              kernel_initializer=init, bias_initializer='zeros')(x)
            x = layers.BatchNormalization()(x)
            x = layers.ReLU()(x)
            x = layers.Dropout(self.dropout_rate, seed=self.seed)(x)
            # These are the original (probably wrong) order
            # x = layers.Conv1D(filters=last_ff_dim, kernel_size=self.kernel_size, padding=self.padding,
            #                  activation=self.activation, kernel_initializer=init, bias_initializer='zeros')(x)
            # x = layers.Dropout(self.dropout, seed=self.seed)(x)     
            # x = layers.LayerNormalization(epsilon=self.layer_norm)(x)

        x = GlobalAveragePooling1D()(x)
        # x = layers.AveragePooling1D(pool_size=self.pool_size, strides=self.strides, padding=self.padding, data_format="channels_last")(x)
        # x = layers.Flatten()(x)
        x = keras.layers.Concatenate()([x, embedding_layer])

        for dim in self.mlp_units:
            x = layers.Dense(dim, activation=self.activation, kernel_initializer=init, bias_initializer='zeros')(x)
            x = layers.Dropout(self.mlp_dropout, seed=self.seed)(x)
        output_layer = layers.Dense(self.output_dim, activation="softmax", kernel_initializer=init, bias_initializer='zeros')(x)

        model = keras.models.Model(inputs=[self.target, input_layer, self.ret_d, self.sic_input], outputs=output_layer)
        model.add_loss(self.custom_loss(self.target, output_layer, self.ret_d))
        return model
    
    def custom_loss(self, y_true, y_pred, ret_d):
        y_true = tf.cast(y_true, dtype=tf.float32)
        loss = tf.reduce_mean(scce(y_true, y_pred) * tf.math.minimum(tf.abs(ret_d), 1))
        return loss
    
    # Numpy version of loss
    def custom_loss_np(self, y_true, y_pred, ret_d):
        loss = np.mean(scce(y_true, y_pred) * np.minimum(abs(ret_d), 1))
        return loss
    
    def compile_model(self):
        self.model.compile(loss=None, optimizer=keras.optimizers.Adam(self.learning_rate))
        self.model.summary()
    
    # Using the same code for training and retraining model
    def train_model(self, x_train, y_train, ret_d_train, sic_train):
        tf.random.set_seed(self.seed)
        random.seed(self.seed)
        np.random.seed(self.seed)
        history = self.model.fit(
            x=[y_train, x_train, ret_d_train, sic_train],
            y=None,
            batch_size=self.batch_size,
            epochs=self.epochs,
            callbacks=self.train_callback,
            validation_split=self.validation_split,
            verbose=self.verbose
            )
    
    def evaluate_model(self, x_train, y_train, ret_d_train, sic_train, x_test, y_test, ret_d_test, sic_test, batch_size):
        y_pred = self.model.predict([y_train, x_train, ret_d_train, sic_train], batch_size=batch_size, verbose=self.pred_verbose)
        train_loss = self.custom_loss_np(y_train, y_pred, ret_d_train)
        print(f'Training loss {train_loss}')
        y_pred = self.model.predict([y_test, x_test, ret_d_test, sic_test], batch_size=batch_size, verbose=self.pred_verbose)
        if np.std(y_pred) < 1e-5:
            print('OVERREGULARIZED WARNING')
        test_loss = self.custom_loss_np(y_test, y_pred, ret_d_test)
        print(f'Test loss {test_loss}') 
        
    def retrain_model(self, x_train, y_train, ret_d_train, sic_train):
        tf.random.set_seed(self.seed)
        random.seed(self.seed)
        np.random.seed(self.seed)
        history = self.model.fit(
            x=[y_train, x_train, ret_d_train, sic_train],
            y=None,
            batch_size=self.batch_size,
            epochs=self.epochs,
            callbacks=self.retrain_callback,
            validation_split=self.validation_split,
            verbose=self.verbose
            )