# Importation des modules

In [9]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import matplotlib.ticker as ticker
import pickle
from pathlib import Path

# Matplotlib settings

from matplotlib import rcParams

# Scikit-learn modules
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

# TensorFlow & Keras
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.optimizers import Adam
from keras.losses import Huber
from keras.callbacks import EarlyStopping, ReduceLROnPlateau

# Keras Tuner
import keras_tuner as kt

# Custom PCA module
import pcax


# Chargement des données

In [10]:
def load_data(file_path):
    data = np.load(file_path)
    spectres = np.log1p(data["simulations"])  # Log transform for stability
    theta = data["theta"]
    return spectres, theta

# Update with your own path
data_path = Path("/Users/xifumacbook/Documents/Codes/Spectres/diskbb/diskbb06-001-50.npz")
spectres, theta = load_data(data_path)

print(f"Theta shape: {theta.shape}")
print(f"Spectrum shape: {spectres.shape}")

# Extract temperature parameter and reshape
temperatures = theta[:, 0].reshape(-1, 1)

# =======================
# Apply Normalization using Spline
# =======================

# Load spline for normalization
spline_filename = data_path.with_suffix(".pkl")
with open(spline_filename, "rb") as f:
    spline_N = pickle.load(f)

# Function to get adjusted normalization
def get_norm_spline(T):
    return 10**spline_N(np.log10(T))

# =======================
# Split Data
# =======================

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    temperatures, spectres, test_size=0.1, random_state=42
)

# Scaling
spectre_scaler = StandardScaler()

y_train_scaled = spectre_scaler.fit_transform(y_train)
y_test_scaled = spectre_scaler.transform(y_test)

# =======================
# PCA Reduction
# =======================

n_PCA = 15
pca_state = pcax.fit(y_train_scaled, n_components=n_PCA)
y_train_pca = pcax.transform(pca_state, y_train_scaled)
y_test_pca = pcax.transform(pca_state, y_test_scaled)

explained_variance = np.sum(y_train_pca.var(axis=0)) / np.sum(y_train_scaled.var(axis=0))
print(f"Variance explained by first {n_PCA} components: {explained_variance:.4%}")

# # Save PCA state
# with open("pca_state.pkl", "wb") as f:
#     pickle.dump(pca_state, f)

Theta shape: (100000, 2)
Spectrum shape: (100000, 10000)
Variance explained by first 15 components: 100.0000%


In [None]:
# def build_model(hp):
#     model = Sequential()

#     # 📌 Première couche cachée (nombre de neurones variable)
#     model.add(Dense(
#         units=hp.Int("units_1", min_value=8, max_value=256, step=8),
#         activation=hp.Choice("activation_1", ["relu", "gelu", "tanh", "swish"]),
#         input_dim=X_train.shape[1]
#     ))

#     # 📌 Ajout possible d'une deuxième couche cachée
#     if hp.Boolean("use_second_layer"):
#         model.add(Dense(
#             units=hp.Int("units_2", min_value=8, max_value=256, step=8),
#             activation=hp.Choice("activation_2", ["relu", "gelu", "tanh", "swish"])
#         ))

#     # 📌 Ajout possible d'une troisième couche cachée
#     if hp.Boolean("use_third_layer"):
#         model.add(Dense(
#             units=hp.Int("units_3", min_value=8, max_value=256, step=8),
#             activation=hp.Choice("activation_3", ["relu", "gelu", "tanh", "swish"])
#         ))

#     # 📌 Couche de sortie
#     model.add(Dense(n_PCA, activation="linear"))

#     # 📌 Compilation du modèle
#     model.compile(
#         optimizer=Adam(learning_rate=hp.Float("learning_rate", min_value=1e-5, max_value=1e-2, sampling="log")),
#         loss=Huber(delta=0.001)
#     )

#     return model

# # 📌 Définition du tuner (Random Search)
# tuner = kt.RandomSearch(
#     build_model,
#     objective="val_loss",
#     max_trials=100,  # Nombre d’architectures testées
#     executions_per_trial=2,  # Moyenne sur plusieurs exécutions pour éviter la variance
#     directory="tuner_results",
#     project_name="spectre_tuning"
# )

# # 📌 Recherche des meilleurs hyperparamètres
# tuner.search(
#     X_train, y_train_pca,
#     validation_split=0.1,
#     epochs=100,
#     batch_size=128,
#     callbacks=[
#         EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True, verbose=1),
#         ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=5, min_lr=1e-6, verbose=1)
#     ],
#     verbose=1
# )

# # 📌 Récupération du meilleur modèle
# best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
# print(f"✅ Meilleurs hyperparamètres trouvés :\n{best_hps.values}")

Trial 63 Complete [00h 06m 38s]
val_loss: 0.001094851759262383

Best val_loss So Far: 0.0003629798593465239
Total elapsed time: 05h 21m 50s

Search: Running Trial #64

Value             |Best Value So Far |Hyperparameter
24                |112               |units_1
relu              |swish             |activation_1
False             |True              |use_second_layer
False             |True              |use_third_layer
0.0035372         |0.0064859         |learning_rate
104               |40                |units_3
tanh              |swish             |activation_3
40                |16                |units_2
relu              |swish             |activation_2

Epoch 1/100
[1m317/317[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 4ms/step - loss: 0.0120 - val_loss: 0.0098 - learning_rate: 0.0035
Epoch 2/100
[1m317/317[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - loss: 0.0093 - val_loss: 0.0088 - learning_rate: 0.0035
Epoch 3/100
[1m317/317[0m [32m━━━

KeyboardInterrupt: 

In [12]:
# =======================
# Hyperparameter Tuning with Keras Tuner
# =======================

def build_tuned_model(hp):
    model = Sequential()
    
    # Input layer
    model.add(Dense(
        units=hp.Int("units_1", min_value=8, max_value=256, step=8),
        activation=hp.Choice("activation_1", ["relu", "gelu", "tanh", "swish"]),
        input_dim=X_train.shape[1]
    ))

    # 📌 Optional second hidden layer
    if hp.Boolean("use_second_layer"):
        model.add(Dense(
            units=hp.Int("units_2", min_value=8, max_value=256, step=8),
            activation=hp.Choice("activation_2", ["relu", "gelu", "tanh", "swish"])
        ))

    # # 📌 Optional third hidden layer
    # if hp.Boolean("use_third_layer"):
    #     model.add(Dense(
    #         units=hp.Int("units_3", min_value=8, max_value=256, step=8),
    #         activation=hp.Choice("activation_3", ["relu", "gelu", "tanh", "swish"])
    #     ))

    # 📌 Output layer
    model.add(Dense(n_PCA, activation="linear"))

    # 📌 Compile model
    model.compile(
        optimizer=Adam(learning_rate=hp.Float("learning_rate", min_value=1e-5, max_value=1e-2, sampling="log")),
        loss=Huber(delta=1.35)
    )

    return model

# Initialize the tuner
tuner = kt.Hyperband(
    build_tuned_model,
    objective='val_loss',
    max_epochs=50,
    factor=3,
    directory='keras_tuner_dir2',
    project_name='spectra_tuning'
)

# Early stopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=5)

# Perform hyperparameter search
tuner.search(
    X_train, y_train_pca,
    epochs=50,
    validation_split=0.1,
    callbacks=[early_stopping],
    verbose=1
)

# Get the optimal hyperparameters


Trial 90 Complete [00h 13m 57s]
val_loss: 3.9085490703582764

Best val_loss So Far: 0.0002678835589904338
Total elapsed time: 01h 19m 51s


In [13]:
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print(f"✅ Meilleurs hyperparamètres trouvés :\n{best_hps.values}")

✅ Meilleurs hyperparamètres trouvés :
{'units_1': 16, 'activation_1': 'gelu', 'use_second_layer': True, 'learning_rate': 0.005058857837580419, 'units_2': 160, 'activation_2': 'gelu', 'tuner/epochs': 50, 'tuner/initial_epoch': 17, 'tuner/bracket': 3, 'tuner/round': 3, 'tuner/trial_id': '0046'}


In [14]:
import numpy as np
import pickle
from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import tensorflow as tf
import keras_tuner as kt

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import Huber
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

# =======================
# Data Loading
# =======================

def load_data(file_path):
    data = np.load(file_path)
    spectres = np.log1p(data["simulations"])  # Log transform for stability
    theta = data["theta"]
    return spectres, theta

# Update with your own path
data_path = Path("/Users/xifumacbook/Documents/Codes/Spectres/diskbb/diskbb06-001-50.npz")
spectres, theta = load_data(data_path)

print(f"Theta shape: {theta.shape}")
print(f"Spectrum shape: {spectres.shape}")

# Extract temperature parameter and reshape
temperatures = theta[:, 0].reshape(-1, 1)

# =======================
# Apply Normalization using Spline
# =======================

# Load spline for normalization
spline_filename = data_path.with_suffix(".pkl")
with open(spline_filename, "rb") as f:
    spline_N = pickle.load(f)

# Function to get adjusted normalization
def get_norm_spline(T):
    return 10**spline_N(np.log10(T))

# =======================
# Split Data
# =======================

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    temperatures, spectres, test_size=0.1, random_state=42
)

# Scaling
spectre_scaler = StandardScaler()
y_train_scaled = spectre_scaler.fit_transform(y_train)
y_test_scaled = spectre_scaler.transform(y_test)

# =======================
# PCA Reduction
# =======================

from sklearn.decomposition import PCA

n_PCA = 15
pca = PCA(n_components=n_PCA)
y_train_pca = pca.fit_transform(y_train_scaled)
y_test_pca = pca.transform(y_test_scaled)

explained_variance = np.sum(pca.explained_variance_ratio_)
print(f"Variance explained by first {n_PCA} components: {explained_variance:.4%}")

# =======================
# Custom Callback to Stop on Global Error Threshold
# =======================

class StopOnGlobalError(tf.keras.callbacks.Callback):
    def __init__(self, X_test, y_test_scaled, pca, spectre_scaler, threshold=1.5):
        super().__init__()
        self.X_test = X_test
        self.y_test_scaled = y_test_scaled
        self.pca = pca
        self.spectre_scaler = spectre_scaler
        self.threshold = threshold
        self.eps = 1e-8

    def on_epoch_end(self, epoch, logs=None):
        y_pred_pca = self.model.predict(self.X_test)
        y_pred_scaled = self.pca.inverse_transform(y_pred_pca)
        y_pred = self.spectre_scaler.inverse_transform(y_pred_scaled)
        y_test_original = self.spectre_scaler.inverse_transform(self.y_test_scaled)

        global_errors = []
        for true_spectrum, pred_spectrum in zip(np.expm1(y_test_original), np.expm1(y_pred)):
            global_error = 100 * np.linalg.norm(true_spectrum - pred_spectrum) / (np.linalg.norm(true_spectrum) + self.eps)
            global_errors.append(global_error)

        mean_global_error = np.mean(global_errors)
        print(f"\nMean Global Error at epoch {epoch + 1}: {mean_global_error:.3f}%")

        if mean_global_error < self.threshold:
            print(f"\n✅ Stopping training as global error fell below {self.threshold}%")
            self.model.stop_training = True

# =======================
# Keras Tuner Integration
# =======================

def build_tuned_model(hp):
    model = Sequential()
    
    # Input layer
    model.add(Dense(
        units=hp.Int("units_1", min_value=8, max_value=256, step=8),
        activation=hp.Choice("activation_1", ["relu", "gelu", "tanh", "swish"]),
        input_dim=X_train.shape[1]
    ))

    # Optional hidden layer
    if hp.Boolean("use_second_layer"):
        model.add(Dense(
            units=hp.Int("units_2", min_value=8, max_value=256, step=8),
            activation=hp.Choice("activation_2", ["relu", "gelu", "tanh", "swish"])
        ))

    # Output layer
    model.add(Dense(n_PCA, activation="linear"))

    # Compile model
    model.compile(
        optimizer=Adam(learning_rate=hp.Float("learning_rate", min_value=1e-5, max_value=1e-2, sampling="log")),
        loss=Huber(delta=1.35)
    )

    return model

# Initialize the tuner
tuner = kt.Hyperband(
    build_tuned_model,
    objective='val_loss',
    max_epochs=50,
    factor=3,
    directory='keras_tuner_dir2',
    project_name='spectra_tuning'
)

# Early stopping on validation loss
early_stopping = EarlyStopping(monitor='val_loss', patience=5)

# Custom callback to stop on global error < 1.5%
stop_on_global_error = StopOnGlobalError(
    X_test=X_test,
    y_test_scaled=y_test_pca,
    pca=pca,
    spectre_scaler=spectre_scaler,
    threshold=1.5
)

# Perform hyperparameter search
tuner.search(
    X_train, y_train_pca,
    epochs=50,
    validation_split=0.1,
    callbacks=[early_stopping, stop_on_global_error],
    verbose=1
)

# Get the best hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print(f"✅ Meilleurs hyperparamètres : {best_hps.values}")

# =======================
# Plot Global Errors Function
# =======================

def plot_global_errors(model, X_test, y_test_scaled, pca, spectre_scaler):
    eps = 1e-8

    # Predict on test set
    y_pred_pca = model.predict(X_test)
    y_pred_scaled = pca.inverse_transform(y_pred_pca)
    y_pred = spectre_scaler.inverse_transform(y_pred_scaled)

    # De-normalize test data
    y_test_original = spectre_scaler.inverse_transform(y_test_scaled)

    # Initialize list for global errors
    global_errors = []
    temperatures_test = X_test.flatten()

    # Calculate global error for each spectrum
    for true_spectrum, pred_spectrum in zip(np.expm1(y_test_original), np.expm1(y_pred)):
        global_error = 100 * np.linalg.norm(true_spectrum - pred_spectrum) / (np.linalg.norm(true_spectrum) + eps)
        global_errors.append(global_error)

    # Plot global errors
    plt.figure(figsize=(10, 6))
    plt.scatter(temperatures_test, global_errors, color='dodgerblue', alpha=0.8, edgecolor='black', s=50)
    plt.xscale('log')
    plt.xlabel('Temperature (keV)')
    plt.ylabel('Global Error (%)')
    plt.title('Global Errors Across Test Samples')
    plt.show()

# Build and evaluate the best model
best_model = tuner.hypermodel.build(best_hps)
best_model.fit(
    X_train, y_train_pca,
    validation_split=0.1,
    epochs=100,
    callbacks=[early_stopping, stop_on_global_error],
    verbose=1
)

# Plot the global errors
plot_global_errors(best_model, X_test, y_test_pca, pca, spectre_scaler)


Theta shape: (100000, 2)
Spectrum shape: (100000, 10000)
Variance explained by first 15 components: 99.9999%
Reloading Tuner from keras_tuner_dir2/spectra_tuning/tuner0.json
✅ Meilleurs hyperparamètres : {'units_1': 16, 'activation_1': 'gelu', 'use_second_layer': True, 'learning_rate': 0.005058857837580419, 'units_2': 160, 'activation_2': 'gelu', 'tuner/epochs': 50, 'tuner/initial_epoch': 17, 'tuner/bracket': 3, 'tuner/round': 3, 'tuner/trial_id': '0046'}


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/100
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step


ValueError: operands could not be broadcast together with shapes (10000,15) (10000,) (10000,15) 