In [4]:
import os 
import pandas as pd
from datetime import datetime
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Conv1D, Flatten, LeakyReLU, Dropout, Input, BatchNormalization,Attention, Input, Multiply,Activation,Reshape
from sklearn.preprocessing import MinMaxScaler,StandardScaler
from sklearn.metrics import mean_squared_error, explained_variance_score
from keras_adabound import AdaBound
from keras.callbacks import ModelCheckpoint
from keras.models import load_model
import sklearn.preprocessing
import tensorflow.keras.backend as K
from keras.layers import LSTM, Bidirectional, Dense
import warnings
import keras
from keras.layers import LSTM, Bidirectional, Dense
import tensorflow as tf
from tensorflow.keras.layers import MultiHeadAttention, Dense, Dropout

class AttentionLayer(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super(AttentionLayer, self).__init__(**kwargs)

    def build(self, input_shape):
        self.query_transform = Dense(input_shape[0][-1], activation='tanh')
        super(AttentionLayer, self).build(input_shape)

    def call(self, inputs):
        query, value = inputs
        query_transformed = self.query_transform(query)
        attention_scores = tf.keras.layers.Dot(axes=-1)([query_transformed, value])
        attention_scores = tf.keras.layers.Softmax()(attention_scores)
        context_vector = tf.keras.layers.Dot(axes=1)([attention_scores, value])
        return context_vector


def transformer_encoder(inputs, num_layers, d_model, num_heads, dff, training=False):
    # Embedding layer
    x = Dense(d_model, input_shape=inputs.shape[1:])(inputs)

    # MultiHeadAttention layers
    for _ in range(num_layers):
        attention_output = MultiHeadAttention(num_heads=num_heads, key_dim=d_model)(x, x, x, training=training)
        x = tf.keras.layers.add([x, attention_output])  # residual connection
        x = Dense(dff, activation='relu')(x)
        x = Dense(d_model)(x)
        x = Dropout(0.1)(x, training=training)

    return x

def MML(y_true, y_pred, Beta_level):
    # print(y_true.shape)
    # print(y_pred.shape)
    # loss1 = tf.sqrt(tf.reduce_mean(tf.square(y_true[:, Beta_level] - y_pred[:, Beta_level]))) 
    loss1 = tf.reduce_mean(tf.square(y_true[:, Beta_level] - y_pred[:, Beta_level]))
    excluded_dims = [Beta_level, 10]  

    mask = K.ones_like(y_pred)
    for dim in excluded_dims:
        mask = K.concatenate([mask[:, :dim], K.zeros_like(mask[:, dim:dim+1]), mask[:, dim+1:]], axis=1)

    masked_y_pred = y_pred * mask

    Get_sum = K.sum(masked_y_pred, axis=-1, keepdims=True)
    Beta=y_true[:, 10]
    loss2=tf.reduce_mean(tf.square(Beta - Get_sum- y_pred[:, Beta_level]))
    return loss1+loss2

In [None]:
def MMLSTT(data, Beta_level):
    X = data[X1]
    y = data[y1]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=101)
    
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)
    X_train = np.expand_dims(X_train, axis=2)
    X_test = np.expand_dims(X_test, axis=2)
    
    kernel_size = 3
    dropout = 0.2
    input_shape = X_train.shape[1:]
    input_layer = Input(shape=input_shape)

    x = transformer_encoder(inputs=input_layer,num_layers=2, d_model=128, num_heads=4, dff=128)
    x = Dense(128, activation="relu")(x)
    x = Conv1D(32, kernel_size=3, activation="relu", padding="causal", dilation_rate=1)(x)
    x = LeakyReLU(alpha=0.1)(x)
    x = BatchNormalization()(x)
    x = Conv1D(64, kernel_size=3, padding="causal", activation="relu", dilation_rate=2)(x)
    x = LeakyReLU(alpha=0.01)(x)
    x = BatchNormalization()(x)
    x = Conv1D(128, kernel_size=3, padding="causal", activation="relu", dilation_rate=4)(x)
    x = BatchNormalization()(x)


    x = Flatten()(x)
    x = Dropout(dropout)(x)
    output_layer = Dense()(x)

    model = Model(inputs=input_layer, outputs=output_layer)
    
    checkpoint = ModelCheckpoint(filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
    callbacks_list = [checkpoint]

    model.compile(loss=lambda y_true, y_pred: GetMyloss(y_true, y_pred, Beta_level=Beta_level), optimizer= "adam")

    model.fit(x=X_train, y=y_train, batch_size, epochs, validation_data=(X_test, y_test), verbose=1, callbacks=callbacks_list)
    
    return model
