# Data Preparation

In [1]:
import tensorflow as tf; print(tf.config.list_physical_devices('GPU'))

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [None]:
import os
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import numpy as np
import joblib
import pickle
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input,Conv1D, LayerNormalization,MultiHeadAttention, GlobalAveragePooling1D, Embedding,MaxPooling1D, LSTM, Dense, Dropout, Flatten, GRU
from keras_tuner.tuners import BayesianOptimization

random_seed = 42
np.random.seed(random_seed)

In [3]:
data_folder="../data/"
data_prepared="data_prepared/"
classes=2
num_trials=50
epochs=200
patience=15
window_length=50

In [None]:
if not os.path.exists(data_prepared):
    os.makedirs(data_prepared)

recreate_data = not os.path.exists(data_prepared+"x_train.pkl")
if recreate_data:
    # Step 1: Load the pickle files
    with open(data_folder+"x_lc_training"+str(window_length)+".pkl", "rb") as f:
        x_lc_training = pickle.load(f)
    with open(data_folder+"x_fr_training"+str(window_length)+".pkl", "rb") as f:
        x_fr_training = pickle.load(f)
    with open(data_folder+"y_lc_training"+str(window_length)+".pkl", "rb") as f:
        y_lc_training = pickle.load(f)
    with open(data_folder+"y_fr_training"+str(window_length)+".pkl", "rb") as f:
        y_fr_training = pickle.load(f)
        
    with open(data_folder+"x_lc_validation"+str(window_length)+".pkl", "rb") as f:
        x_lc_validation = pickle.load(f)
    with open(data_folder+"x_fr_validation"+str(window_length)+".pkl", "rb") as f:
        x_fr_validation = pickle.load(f)
    with open(data_folder+"y_lc_validation"+str(window_length)+".pkl", "rb") as f:
        y_lc_validation = pickle.load(f)
    with open(data_folder+"y_fr_validation"+str(window_length)+".pkl", "rb") as f:
        y_fr_validation = pickle.load(f)
    
    with open(data_folder+"x_lc_testing"+str(window_length)+".pkl", "rb") as f:
        x_lc_testing = pickle.load(f)
    with open(data_folder+"x_fr_testing"+str(window_length)+".pkl", "rb") as f:
        x_fr_testing = pickle.load(f)
    with open(data_folder+"y_lc_testing"+str(window_length)+".pkl", "rb") as f:
        y_lc_testing = pickle.load(f)
    with open(data_folder+"y_fr_testing"+str(window_length)+".pkl", "rb") as f:
        y_fr_testing = pickle.load(f)

    # Step 4: Combine `lc` and `fr` for training, validation, and testing
    x_train = np.vstack((x_lc_training, x_fr_training))
    y_train = np.hstack((y_lc_training, y_fr_training))

    x_val = np.vstack((x_lc_validation, x_fr_validation))
    y_val = np.hstack((y_lc_validation, y_fr_validation))

    x_test = np.vstack((x_lc_testing, x_fr_testing))
    y_test = np.hstack((y_lc_testing, y_fr_testing))
    
    # Step 1: Concatenate the windows for each dataset along the time axis
    x_train_combined = np.concatenate(x_train, axis=0)  # Combine all train windows into a single 2D array
    x_val_combined = np.concatenate(x_val, axis=0)      # Combine all val windows
    x_test_combined = np.concatenate(x_test, axis=0)    # Combine all test windows

    # Step 2: Apply scaling
    scaler = StandardScaler()
    x_train_scaled_combined = scaler.fit_transform(x_train_combined)  # Fit and scale training data
    x_val_scaled_combined = scaler.transform(x_val_combined)          # Scale validation data
    x_test_scaled_combined = scaler.transform(x_test_combined)        # Scale test data
    
    #create folder data_prepared
    if not os.path.exists(data_prepared):
        os.makedirs(data_prepared)
    #save scaler
    joblib.dump(scaler, data_prepared+"scaler.pkl")
    
    # Step 3: Split back into windows of 10x30
    x_train = x_train_scaled_combined.reshape(-1, window_length, 30)  # Reshape back to windows
    x_val = x_val_scaled_combined.reshape(-1, window_length, 30)      # Reshape back to windows
    x_test = x_test_scaled_combined.reshape(-1, window_length, 30)    # Reshape back to windows
    
    #zip the data and shuffle
    np.random.shuffle(list(zip(x_train, y_train)))
    np.random.shuffle(list(zip(x_val, y_val)))
    np.random.shuffle(list(zip(x_test, y_test)))
   
    x_train_reshaped = [window.reshape(-1) for window in x_train]
    x_val_reshaped = [window.reshape(-1) for window in x_val]
    x_test_reshaped = [window.reshape(-1) for window in x_test]
    
    y_train = np.array(y_train)
    y_val = np.array(y_val)
    y_test = np.array(y_test)
     
    # Step 6: Save the scaled datasets and corresponding labels
    with open(data_prepared+"x_train.pkl", "wb") as f:
        pickle.dump(x_train, f)
    with open(data_prepared+"x_train_reshaped.pkl", "wb") as f:
        pickle.dump(x_train_reshaped, f)
    with open(data_prepared+"y_train.pkl", "wb") as f:
        pickle.dump(y_train, f)

    with open(data_prepared+"x_val.pkl", "wb") as f:
        pickle.dump(x_val, f)
    with open(data_prepared+"x_val_reshaped.pkl", "wb") as f:
        pickle.dump(x_val_reshaped, f)
    with open(data_prepared+"y_val.pkl", "wb") as f:
        pickle.dump(y_val, f)

    with open(data_prepared+"x_test.pkl", "wb") as f:
        pickle.dump(x_test, f)
    with open(data_prepared+"x_test_reshaped.pkl", "wb") as f:
        pickle.dump(x_test_reshaped, f)
    with open(data_prepared+"y_test.pkl", "wb") as f:
        pickle.dump(y_test, f)

    print("Data successfully split, combined, scaled, and saved.")

else:
    x_train = pickle.load(open(data_prepared+"x_train.pkl", "rb"))
    x_train_reshaped = pickle.load(open(data_prepared+"x_train_reshaped.pkl", "rb"))
    y_train = pickle.load(open(data_prepared+"y_train.pkl", "rb"))
    x_val = pickle.load(open(data_prepared+"x_val.pkl", "rb"))
    x_val_reshaped = pickle.load(open(data_prepared+"x_val_reshaped.pkl", "rb"))
    y_val = pickle.load(open(data_prepared+"y_val.pkl", "rb"))
    x_test = pickle.load(open(data_prepared+"x_test.pkl", "rb"))
    x_test_reshaped = pickle.load(open(data_prepared+"x_test_reshaped.pkl", "rb"))
    y_test = pickle.load(open(data_prepared+"y_test.pkl", "rb"))

In [5]:
if(not os.path.exists("models")):
    os.makedirs("models")
model_folder_path = "./models/"

# Utility functions

In [None]:
def evaluate_best_models(tuner, model_name, num_models=2, retrain_on_full_data=False, early_stop=None):
    """
    Evaluates the top models found by the tuner on validation and test sets, and returns the best model based on test accuracy.
    
    Args:
        tuner: Keras Tuner object that performed the hyperparameter search.        
        num_models: Number of top models to evaluate (default is 5).
        retrain_on_full_data: If True, retrains each model on combined x_train and x_val data.
        early_stop: Optional callback for early stopping during retraining.
    
    Returns:
        The best model based on test accuracy.
    """
    # Retrieve the top models and their best hyperparameters
    best_models = tuner.get_best_models(num_models=num_models)
    best_hyperparameters = tuner.get_best_hyperparameters(num_trials=num_models)

    # Initialize a list to store results
    results = []
    if(not os.path.exists(model_folder_path+model_name)):
            os.makedirs(model_folder_path+model_name)

    for i, (model, hyperparams) in enumerate(zip(best_models, best_hyperparameters), start=1):
        print(f"\n--- Evaluation of Model {i} ---")
        
        # Print the best hyperparameters for this model
        print("Best Hyperparameters:")
        for param, value in hyperparams.values.items():
            print(f"{param}: {value}")
        #save best hyperparameters to file        
        with open(model_folder_path+model_name+"/best_hyperparameters_model_"+str(i)+".txt", 'w') as f:
            f.write("Best Hyperparameters:\n")
            for param, value in hyperparams.values.items():
                f.write(f"{param}: {value}\n")
            f.close()
        
        # Optionally, retrain the model on combined training and validation data
        if retrain_on_full_data and x_train is not None and y_train is not None and x_val is not None and y_val is not None:
            x_train_validation = np.concatenate((x_train, x_val), axis=0)
            y_train_validation = np.concatenate((y_train, y_val), axis=0)            
            if early_stop and early_stop.monitor == 'val_loss':
                retrain_early_stop = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=early_stop.patience, restore_best_weights=early_stop.restore_best_weights)
            else:
                retrain_early_stop = early_stop                
            model.fit(x_train_validation, y_train_validation, epochs=50, batch_size=64, callbacks=[retrain_early_stop] if retrain_early_stop else None)
        
        # Evaluate on validation set if provided        
        val_loss, val_mse, val_mae = model.evaluate(x_val, y_val, verbose=0)
        
        # Evaluate on test set
        test_loss, test_mse, test_mae= model.evaluate(x_test, y_test, verbose=0)
        
        # Generate classification report and confusion matrix for the test set
        y_pred = model.predict(x_test).flatten()

        rmse = np.sqrt(mean_squared_error(y_test,y_pred))
        r_squared= r2_score(y_test,y_pred)
        mae= mean_absolute_error(y_test,y_pred)
        # Save results
        results.append({
            "Model": f"Best_Model_{i}",
            "Validation Loss": val_loss,
            "Validation mse":val_mse,
            "Validation mae": val_mae,
            "Testing Loss": test_loss,
            "Testing mse":test_mse,
            "Testing mae": test_mae,
            "rmse":rmse,
            "r_squared":r_squared,
            "mae":mae
        })         
        
    # Create and display a DataFrame with the accuracy results
    results_df = pd.DataFrame(results)
    print("\nEvaluation Results:")
    display(results_df)  # If using Jupyter, this will show the table directly; otherwise, use `print(results_df)`
    
    #save results to file
    results_df.to_csv(model_folder_path+model_name+"/best_models_evaluation_results.csv", index=False)
    
    # Identify and return the best model based on test accuracy
    best_model_idx = results_df['rmse'].idxmin()
    best_model = best_models[best_model_idx]
    print(f"\nBest Model is Model {best_model_idx + 1} with rmse: {results_df['rmse'].min()}")

    return best_model

In [13]:
if classes==2:
    final_neurons=1
    activation = 'linear'
    loss = 'mean_squared_error'
else:
    final_neurons=3
    activation = 'softmax'
    loss = 'sparse_categorical_crossentropy'


In [14]:
project_name='1'

# LSTM

In [None]:
n_features = x_train.shape[2]
print(n_features)
print( x_train.shape)

30
(165182, 50, 30)


In [None]:
# Determine the number of features
n_features = x_train.shape[2]  # Ensure x_train has the shape (num_windows, window_size, n_features)

def build_model(hp):
    model = Sequential()
    
    # First LSTM layer with variable units and dropout rate
    model.add(LSTM(
        units=hp.Int('units_1', min_value=32, max_value=256, step=32),
        activation='tanh',
        input_shape=(x_train.shape[1], x_train.shape[2]),
        return_sequences=hp.Boolean('use_second_lstm')  # To add additional LSTM layers if needed
    ))
    model.add(Dropout(rate=hp.Float('dropout_1', min_value=0.2, max_value=0.5, step=0.1)))
    
    # Optional second LSTM layer
    if hp.Boolean('use_second_lstm'):
        model.add(LSTM(
            units=hp.Int('units_2', min_value=32, max_value=256, step=32),
            activation='tanh',
            return_sequences=hp.Boolean('use_third_lstm') 
        ))
        model.add(Dropout(rate=hp.Float('dropout_2', min_value=0.2, max_value=0.5, step=0.1)))
    
    # Optional third LSTM layer
    if hp.Boolean('use_second_lstm') and hp.Boolean('use_third_lstm'):
        model.add(LSTM(
            units=hp.Int('units_3', min_value=32, max_value=256, step=32),
            activation='tanh',
            return_sequences=False  # Last LSTM layer
        ))
        model.add(Dropout(rate=hp.Float('dropout_3', min_value=0.2, max_value=0.5, step=0.1)))

    # Output layer
    model.add(Dense(final_neurons, activation=activation)) 
    
    # Compile the model with variable hyperparameters
    model.compile(
        optimizer=tf.keras.optimizers.Adam(
            learning_rate=hp.Choice('learning_rate', values=[1e-2, 1e-3])
        ),
        loss=loss,
       metrics=["mean_squared_error","mean_absolute_error"]
    )
    
    return model

In [None]:
tuner = BayesianOptimization(
    hypermodel=build_model,
    objective='val_mean_absolute_error',
    max_trials=num_trials,  # Maximum number of configurations to test
    executions_per_trial=1,  # Repeat each configuration for stability
    directory=f'lstm_tuning_{classes}_classes',
    project_name=project_name,
    overwrite=False  # Set to True to overwrite previous results
)

Reloading Tuner from lstm_tuning_2_classes\1\tuner0.json


In [None]:
# Callback for Early Stopping to prevent overfitting
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_mean_absolute_error',mode='min', patience=patience, restore_best_weights=True)

# Start hyperparameter search
tuner.search(
    x_train, 
    y_train, 
    epochs=epochs, 
    batch_size=256, 
    validation_data=(x_val, y_val),
    callbacks=[early_stop]
)

In [None]:
best_model = evaluate_best_models(tuner,"LSTM")


--- Evaluation of Model 1 ---
Best Hyperparameters:
units_1: 192
use_second_lstm: False
dropout_1: 0.4
learning_rate: 0.001
units_2: 192
use_third_lstm: True
dropout_2: 0.2
units_3: 160
dropout_3: 0.30000000000000004

--- Evaluation of Model 2 ---
Best Hyperparameters:
units_1: 32
use_second_lstm: True
dropout_1: 0.4
learning_rate: 0.01
units_2: 32
use_third_lstm: False
dropout_2: 0.4
units_3: 96
dropout_3: 0.2

Evaluation Results:


Unnamed: 0,Model,Validation Loss,Validation mse,Validation mae,Testing Loss,Testing mse,Testing mae,rmse,r_squared,mae
0,Best_Model_1,0.214909,0.214909,0.262401,0.301612,0.301612,0.301859,0.549192,0.842535,0.301859
1,Best_Model_2,0.23006,0.23006,0.263704,0.341658,0.341658,0.316896,0.584516,0.821628,0.316895



Best Model is Model 1 with rmse: 0.5491921509380875


In [None]:
# Save best model
best_model.save(model_folder_path+"LSTM/LSTM_model.h5")

In [34]:
#load model lstm and do model summary
loaded_model = tf.keras.models.load_model(model_folder_path+"LSTM/LSTM_model.h5")
loaded_model.summary()





Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, 192)               171264    
                                                                 
 dropout (Dropout)           (None, 192)               0         
                                                                 
 dense (Dense)               (None, 1)                 193       
                                                                 
Total params: 171,457
Trainable params: 171,457
Non-trainable params: 0
_________________________________________________________________


# GRU

In [None]:
n_features = x_train.shape[2]
print(n_features)
print( x_train.shape)

30
(165182, 50, 30)


In [None]:
# Determine the number of features
n_features = x_train.shape[2]  # Ensure x_train has the shape (num_windows, window_size, n_features)

def build_model(hp):
    model = Sequential()
    
    # First GRU layer with variable units and dropout rate
    model.add(GRU(
        units=hp.Int('units_1', min_value=32, max_value=256, step=32),
        activation='tanh',
        input_shape=(x_train.shape[1], x_train.shape[2]),
        return_sequences=hp.Boolean('use_second_gru')  # To add an additional GRU layer if needed
    ))
    model.add(Dropout(rate=hp.Float('dropout_1', min_value=0.2, max_value=0.5, step=0.1)))
    
    # Optional second GRU layer
    if hp.Boolean('use_second_gru'):
        model.add(GRU(
            units=hp.Int('units_2', min_value=32, max_value=256, step=32),
            activation='tanh',
            return_sequences=hp.Boolean('use_third_gru')
        ))
        model.add(Dropout(rate=hp.Float('dropout_2', min_value=0.2, max_value=0.5, step=0.1)))
    
    # Optional third GRU layer
    if hp.Boolean('use_second_gru') and hp.Boolean('use_third_gru'):
        model.add(GRU(
            units=hp.Int('units_2', min_value=32, max_value=256, step=32),
            activation='tanh',
            return_sequences=False  # Last GRU layer
        ))
        model.add(Dropout(rate=hp.Float('dropout_2', min_value=0.2, max_value=0.5, step=0.1)))
        
    # Output layer
    model.add(Dense(final_neurons, activation=activation))
    
    # Compile the model with variable hyperparameters
    model.compile(
        optimizer=tf.keras.optimizers.Adam(
            learning_rate=hp.Choice('learning_rate', values=[1e-2, 1e-3])
        ),
        loss=loss,
       metrics=["mean_squared_error","mean_absolute_error"]
    )
    
    return model

In [None]:
tuner = BayesianOptimization(
    hypermodel=build_model,
    objective='val_mean_absolute_error',
    max_trials=num_trials,  # Maximum number of configurations to test
    executions_per_trial=1,  # Repeat each configuration for stability
    directory=f'gru_tuning_{classes}_classes',
    project_name=project_name,
    overwrite=False  # Set to True to overwrite previous results
)

In [None]:
# Callback for Early Stopping to prevent overfitting
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=patience, restore_best_weights=True)

# Start hyperparameter search
tuner.search(
    x_train, 
    y_train, 
    epochs=epochs, 
    batch_size=256, 
    validation_data=(x_val, y_val),
    callbacks=[early_stop]
)

Trial 50 Complete [00h 04m 22s]
val_mean_absolute_error: 0.2663462460041046

Best val_mean_absolute_error So Far: 0.2569979429244995
Total elapsed time: 03h 03m 24s


In [None]:
best_model = evaluate_best_models(tuner,"GRU")


--- Evaluation of Model 1 ---
Best Hyperparameters:
units_1: 192
use_second_gru: True
dropout_1: 0.30000000000000004
learning_rate: 0.001
units_2: 32
use_third_gru: True
dropout_2: 0.2

--- Evaluation of Model 2 ---
Best Hyperparameters:
units_1: 192
use_second_gru: True
dropout_1: 0.2
learning_rate: 0.001
units_2: 32
use_third_gru: True
dropout_2: 0.2

Evaluation Results:


Unnamed: 0,Model,Validation Loss,Validation mse,Validation mae,Testing Loss,Testing mse,Testing mae,rmse,r_squared,mae
0,Best_Model_1,0.229731,0.229731,0.256998,0.352824,0.352824,0.311828,0.59399,0.815799,0.311828
1,Best_Model_2,0.226062,0.226062,0.263202,0.322791,0.322791,0.297518,0.568147,0.831478,0.297518



Best Model is Model 2 with rmse: 0.5681468760014271


In [None]:
# Save best model
best_model.save(model_folder_path+"GRU/GRU_model.h5")

In [35]:
#load model lstm and do model summary
loaded_model = tf.keras.models.load_model(model_folder_path+"GRU/GRU_model.h5")
loaded_model.summary()





Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 gru (GRU)                   (None, 50, 192)           129024    
                                                                 
 dropout (Dropout)           (None, 50, 192)           0         
                                                                 
 gru_1 (GRU)                 (None, 50, 32)            21696     
                                                                 
 dropout_1 (Dropout)         (None, 50, 32)            0         
                                                                 
 gru_2 (GRU)                 (None, 32)                6336      
                                                                 
 dropout_2 (Dropout)         (None, 32)                0         
                                                                 
 dense (Dense)               (None, 1)                 3

# 1D CNN

In [None]:
n_features = x_train.shape[2]
window_size = x_train.shape[1] 
def build_model(hp):
    model = Sequential()
    
    # Number of Conv1D layers
    num_conv = hp.Int('num_conv', 1, 2)  # 1 or 2 Conv1D layers
    
    for i in range(num_conv):
        # Number of filters for Conv1D
        filters = hp.Int(f'filters_{i+1}', min_value=32, max_value=256, step=32)
        
        # Kernel size
        kernel_size = hp.Choice(f'kernel_size_{i+1}', values=[3, 5, 7])
        
        # Add Conv1D layer with 'same' padding
        if i == 0:
            model.add(Conv1D(
                filters=filters,
                kernel_size=kernel_size,
                activation='relu',
                padding='same',
                input_shape=(window_size, n_features)
            ))
        else:
            # Add additional Conv1D layer
            model.add(Conv1D(
                filters=filters,
                kernel_size=kernel_size,
                activation='relu',
                padding='same'
            ))
        
        # MaxPooling1D layer
        model.add(MaxPooling1D(pool_size=2))
        
        # Dropout layer
        model.add(Dropout(rate=hp.Float(f'dropout_conv_{i+1}', min_value=0.2, max_value=0.5, step=0.1)))
    
    # Flatten layer to transition to fully connected part
    model.add(Flatten())
    
    #put an optional dense
    if hp.Boolean('use_dense'):
        model.add(Dense(units=hp.Int('units_dense', min_value=32, max_value=256, step=32), activation='relu'))
        model.add(Dropout(rate=hp.Float('dropout_dense', min_value=0.2, max_value=0.5, step=0.1)))
    
    # Output layer
    model.add(Dense(final_neurons, activation=activation)) 
    
    # Compile the model with variable hyperparameters
    model.compile(
        optimizer=tf.keras.optimizers.Adam(
            learning_rate=hp.Choice('learning_rate', values=[1e-2, 1e-3])
        ),
        loss=loss,
       metrics=["mean_squared_error","mean_absolute_error"]
    )
    
    return model


In [None]:
# Initialize the tuner with Bayesian Optimization
tuner = BayesianOptimization(
    hypermodel=build_model,
    objective='val_mean_absolute_error',
    max_trials=num_trials,  # Maximum number of configurations to test
    executions_per_trial=1,  # Repeat each configuration for stability
    directory=f'cnn1d_{classes}_cl',
    project_name=project_name,
    overwrite=False
)

Reloading Tuner from cnn1d_2_cl\1\tuner0.json


In [None]:
# Callback for Early Stopping
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=patience, restore_best_weights=True)

# Start hyperparameter search
tuner.search(
    x_train, 
    y_train, 
    epochs=epochs, 
    batch_size=256, 
    validation_data=(x_val, y_val),
    callbacks=[early_stop]
)

In [None]:
best_model = evaluate_best_models(tuner,"CNN1D")


--- Evaluation of Model 1 ---
Best Hyperparameters:
num_conv: 2
filters_1: 32
kernel_size_1: 5
dropout_conv_1: 0.4
use_dense: True
learning_rate: 0.001
filters_2: 32
kernel_size_2: 7
dropout_conv_2: 0.4
units_dense: 32
dropout_dense: 0.2

--- Evaluation of Model 2 ---
Best Hyperparameters:
num_conv: 2
filters_1: 64
kernel_size_1: 3
dropout_conv_1: 0.2
use_dense: True
learning_rate: 0.001
filters_2: 32
kernel_size_2: 3
dropout_conv_2: 0.4
units_dense: 32
dropout_dense: 0.2

Evaluation Results:


Unnamed: 0,Model,Validation Loss,Validation mse,Validation mae,Testing Loss,Testing mse,Testing mae,rmse,r_squared,mae
0,Best_Model_1,0.220119,0.220119,0.27851,0.295553,0.295553,0.3103,0.543647,0.845699,0.3103
1,Best_Model_2,0.214281,0.214281,0.28164,0.330212,0.330212,0.328941,0.57464,0.827604,0.328941



Best Model is Model 1 with rmse: 0.5436473828652598


In [None]:
# Save the best model
best_model.save(model_folder_path+"CNN1D/CNN1D_model.h5")

# Transformer (Bayesian Optimization)

In [15]:
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, sequence_length, embed_dim):
        super(PositionalEncoding, self).__init__()
        self.position_embedding = Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )

    def call(self, x):
        positions = tf.range(start=0, limit=tf.shape(x)[1], delta=1)
        positions = self.position_embedding(positions)
        return x + positions

In [16]:
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Layer Normalization
    x = LayerNormalization(epsilon=1e-6)(inputs)
    # Multi-Head Attention
    x = MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(x, x)
    x = Dropout(dropout)(x)
    # Residual Connection
    x = x + inputs

    # Feed-Forward Network
    x_skip = x
    x = LayerNormalization(epsilon=1e-6)(x)
    x = Dense(ff_dim, activation='relu')(x)
    x = Dropout(dropout)(x)
    x = Dense(inputs.shape[-1])(x)
    # Residual Connection
    return x + x_skip

In [17]:
input_shape = x_train.shape[1:]  # (window_size, num_features)
print(f"Input shape: {input_shape}")

Input shape: (50, 30)


In [18]:
# Definisci builder con iperspazio per la ricerca
def build_transformer_model(hp):
    inputs = Input(shape=input_shape)

    # iperparametri da ottimizzare
    head_size = hp.Int("head_size", min_value=32, max_value=128, step=32)
    num_heads = hp.Int("num_heads", 1, 8, step=1)
    ff_dim = hp.Int("ff_dim", 64, 256, step=64)
    num_blocks = hp.Int("num_transformer_blocks", 1, 4, step=1)
    dropout = hp.Float("dropout", 0.0, 0.5, step=0.1)
    mlp_units = hp.Int("mlp_units", min_value=32, max_value=256, step=32)
    mlp_dropout = hp.Float("mlp_dropout", 0.0, 0.5, step=0.1)
    lr = hp.Choice("learning_rate", values=[1e-2, 1e-3, 1e-4])

    x = PositionalEncoding(sequence_length=input_shape[0],
                           embed_dim=input_shape[1])(inputs)

    for _ in range(num_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

    x = GlobalAveragePooling1D()(x)
    x = Dense(mlp_units, activation="relu")(x)
    x = Dropout(mlp_dropout)(x)
    outputs = Dense(final_neurons, activation=activation)(x)

    model = tf.keras.Model(inputs, outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
        loss=loss,
        metrics=["mean_squared_error", "mean_absolute_error"]
    )
    return model

In [19]:
# Instanzia il tuner con Bayesian Optimization
tuner = BayesianOptimization(
    build_transformer_model,
    objective="val_mean_squared_error",
    max_trials=num_trials,
    seed=42,
    directory=f"transformer_{classes}_cl",
    project_name=project_name,
    overwrite=False
)

# Callback per early stopping
early_stop = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=patience, restore_best_weights=True
)

In [20]:
# Avvia la ricerca degli iperparametri
tuner.search(
    x_train, y_train,
    validation_data=(x_val, y_val),
    epochs=epochs,
    batch_size=64,
    callbacks=[early_stop]
)

Trial 50 Complete [00h 16m 30s]
val_mean_squared_error: 0.21584130823612213

Best val_mean_squared_error So Far: 0.20337837934494019
Total elapsed time: 17h 14m 54s


In [22]:
# Se vuoi, valuta e salva
test_loss, test_mse, test_mae = best_model.evaluate(x_test, y_test, verbose=0)
print(f"Test MAE: {test_mae:.4f} | Test MSE: {test_mse:.4f} | Test Loss: {test_loss:.4f}")

Test MAE: 0.2978 | Test MSE: 0.2603 | Test Loss: 0.2603


In [30]:
best_model = evaluate_best_models(tuner,"Transformer")


--- Evaluation of Model 1 ---
Best Hyperparameters:
head_size: 128
num_heads: 1
ff_dim: 128
num_transformer_blocks: 2
dropout: 0.0
mlp_units: 160
mlp_dropout: 0.1
learning_rate: 0.001

--- Evaluation of Model 2 ---
Best Hyperparameters:
head_size: 32
num_heads: 5
ff_dim: 64
num_transformer_blocks: 1
dropout: 0.0
mlp_units: 64
mlp_dropout: 0.30000000000000004
learning_rate: 0.01

--- Evaluation of Model 3 ---
Best Hyperparameters:
head_size: 128
num_heads: 1
ff_dim: 64
num_transformer_blocks: 2
dropout: 0.2
mlp_units: 224
mlp_dropout: 0.30000000000000004
learning_rate: 0.001

Evaluation Results:


Unnamed: 0,Model,Validation Loss,Validation mse,Validation mae,Testing Loss,Testing mse,Testing mae,rmse,r_squared,mae
0,Best_Model_1,0.203378,0.203378,0.275383,0.260263,0.260263,0.29784,0.51016,0.864123,0.29784
1,Best_Model_2,0.207795,0.207795,0.291974,0.28081,0.28081,0.328871,0.529915,0.853395,0.328871
2,Best_Model_3,0.209153,0.209153,0.291513,0.295441,0.295441,0.328711,0.543545,0.845757,0.328711



Best Model is Model 1 with rmse: 0.5101597554583367


In [33]:
best_model.save(model_folder_path+"Transformer/model/")



INFO:tensorflow:Assets written to: ./models/Transformer/model/assets


INFO:tensorflow:Assets written to: ./models/Transformer/model/assets
