In [1]:
import pandas as pd
from datetime import datetime
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import json
%matplotlib inline
import warnings
warnings.filterwarnings('ignore')
import os
import tensorflow as tf
from tensorflow import keras
from keras import layers, models, optimizers
from sklearn.model_selection import KFold
from sklearn.preprocessing import StandardScaler
import optuna
import random
import winsound
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras import Model, initializers
from tensorflow.keras.regularizers import L2
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten
from tensorflow.keras.layers import Dropout, BatchNormalization, Dense, LeakyReLU, Input
from IPA_architecture import IPA

seed = 47
random.seed(seed)
np.random.seed(seed)
tf.random.set_seed(seed)

X = pd.read_csv("C:/Users/Administrator/Desktop/PFE/data/new/miscplants_Xp.csv", sep=';')
Y = pd.read_csv("C:/Users/Administrator/Desktop/PFE/data/new/miscplants_Y.csv", sep=';')
M = pd.read_csv("C:/Users/Administrator/Desktop/PFE/data/new/miscplants_M.csv", sep=';', na_values ='missing')

In [2]:
def split_data(Var):

    if Var not in Y.columns or Var not in M.columns:
        raise ValueError(f"Errer Erreur Erreur ! ! !")

    mask = M[Var]

    # Split X and Y based on M.csv values
    X_cal = X[mask == 'cal']
    Y_cal = Y.loc[X_cal.index, Var]

    X_val = X[mask == 'val']
    Y_val = Y.loc[X_val.index, Var]

    X_test = X[mask == 'test']
    Y_test = Y.loc[X_test.index, Var]

    return (X_cal, Y_cal), (X_val, Y_val), (X_test, Y_test)

In [30]:
Var = "ndf"

In [31]:
# Data Splitting
(X_cal, Y_cal), (X_val, Y_val), (X_test, Y_test) = split_data(Var)
Y_train = pd.concat([Y_cal, Y_val])
X_train = pd.concat([X_cal, X_val])

# Convert to NumPy arrays
# X_cal = X_cal.to_numpy()
# X_val = X_val.to_numpy()
X_train = X_train.to_numpy()
X_test = X_test.to_numpy()

num_features = 700  # Spectral features

# Standardization (using X_cal statistics)
# mean_cal, std_cal = X_cal.mean(), X_cal.std()
# X_cal_N = (X_cal - mean_cal) / std_cal
# X_val_N = (X_val - mean_cal) / std_cal

# Standardization (using X_train statistics)
scaler = StandardScaler()
scaler.fit(X_train)
# Transform both training and test sets using the training set's statistics
X_train_N = scaler.transform(X_train)
X_test_N = scaler.transform(X_test)

# Reshape for 1D CNN (batch_size, sequence_length, channels) (taille du batch,longueur de la séquence,canaux)
# X_cal_f = X_cal_N[..., np.newaxis]  # Shape: (samples, features, 1)
# X_val_f = X_val_N[..., np.newaxis]
X_train_f = X_train_N[..., np.newaxis]
X_test_f = X_test_N[..., np.newaxis]

# X_cal_f.shape, X_val_f.shape, X_train_f.shape, X_test_f.shape
# Y_cal.shape, Y_val.shape
print(f"Y: {Y_train.shape}, {Y_test.shape}")
print(f"X: {X_train_f.shape}, {X_test_f.shape}")

Y: (1147,), (382,)
X: (1147, 700, 1), (382, 700, 1)


In [5]:
# Check GPU availability
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print("GPUs Available:", gpus)
else:
    print("No GPU detected; please configure CUDA.")

GPUs Available: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


# The model

In [6]:
# ipa_model = IPA ( seed_value = 47, regularization_factor = .0095 )

# ipa_model.compile(
#     optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
#     loss='mse',
#     metrics=[tf.keras.metrics.RootMeanSquaredError(name='rmse')]
# )
Modd = "IPA"

In [32]:
def custom_loss(lambda_l2, model):
    def loss_fn(y_true, y_pred):
        mae = tf.reduce_mean(tf.abs(y_true - y_pred))
        # Only use this model's trainable vars
        l2 = tf.add_n([
            tf.nn.l2_loss(v)
            for v in model.trainable_variables
            if 'kernel' in v.name
        ])
        return mae + lambda_l2 * l2
    return loss_fn

In [None]:
def objective(trial):
    import tensorflow as tf
    from sklearn.model_selection import KFold
    import numpy as np

    # 1. Hyperparameter suggestions
    lr = trial.suggest_loguniform("lr", 1e-5, 1e-2)
    l2 = trial.suggest_loguniform("l2", 1e-5, 1e-2)

    # 2. Learning rate schedule
    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate=lr,
        decay_steps=10000,
        decay_rate=1e-3
    )

    # 3. Cross-validation setup
    kf = KFold(n_splits=5, shuffle=True, random_state=seed)
    rmse_scores = []
    epochs_list = []
    history_dicts = []

    for train_idx, val_idx in kf.split(X_train_f):  # X_train_f = your full calibration set
        X_train_cv, X_val_cv = X_train_f[train_idx], X_train_f[val_idx]
        Y_train_cv, Y_val_cv = Y_train.iloc[train_idx], Y_train.iloc[val_idx]

        # 4. Build model for each fold
        ipa_model = IPA(seed_value=seed, regularization_factor=l2)

        ipa_model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule),
            loss=custom_loss(l2, ipa_model),
            metrics=[tf.keras.metrics.RootMeanSquaredError()]
        )


        # 5. Callbacks
        es_callback = tf.keras.callbacks.EarlyStopping(
            monitor="val_root_mean_squared_error", min_delta=5e-2, patience=16, restore_best_weights=True)

        # 6. Train
        history = ipa_model.fit(
            X_train_cv, Y_train_cv,
            validation_data=(X_val_cv, Y_val_cv),
            epochs=100,
            batch_size=16,
            callbacks=[es_callback],
            verbose=0
        )

        # 7. Log metrics
        val_loss, val_rmse = ipa_model.evaluate(X_val_cv, Y_val_cv, verbose=0)
        rmse_scores.append(val_rmse)
        epochs_list.append(len(history.history["loss"]))
        history_dicts.append(history.history)

    # 8. Record info in trial
    avg_epochs = int(np.mean(epochs_list))
    trial.set_user_attr("avg_epochs", avg_epochs)
    trial.set_user_attr("epochs_list", epochs_list)
    trial.set_user_attr("fold_histories", history_dicts)

    return np.mean(rmse_scores)


In [None]:
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=30)

# Print the best trial
print("Best trial :")
print(f"  RMSE on Val: {study.best_value:.4f}")
print("  Best params:")
for k, v in study.best_trial.params.items():
    print(f"    {k}: {v}")
for i in range(5): winsound.Beep(500, 500)

[I 2025-05-20 03:25:32,527] Trial 0 finished with value: 2.547660970687866 and parameters: {'lr': 0.0006866635737651326, 'l2': 2.5435124712539083e-05}. Best is trial 0 with value: 2.547660970687866.
[I 2025-05-20 03:28:06,796] Trial 1 finished with value: 2.6163887977600098 and parameters: {'lr': 0.002932053067956513, 'l2': 0.0013128096721692786}. Best is trial 0 with value: 2.547660970687866.
[I 2025-05-20 03:31:17,266] Trial 2 finished with value: 2.600533103942871 and parameters: {'lr': 0.009730456434055847, 'l2': 0.0011364368178092916}. Best is trial 0 with value: 2.547660970687866.
[I 2025-05-20 03:36:08,022] Trial 3 finished with value: 3.8494243144989015 and parameters: {'lr': 1.3559844958191087e-05, 'l2': 1.4275912760750544e-05}. Best is trial 0 with value: 2.547660970687866.
[I 2025-05-20 03:40:29,388] Trial 4 finished with value: 3.94004545211792 and parameters: {'lr': 1.0056181031227177e-05, 'l2': 0.00023403285010985112}. Best is trial 0 with value: 2.547660970687866.
[I 202

Best trial :
  RMSE on Val: 2.5149
  Best params:
    lr: 0.0003624540613247698
    l2: 3.9878331436182786e-05


In [33]:
# train the final model
best_params = {
    "lr": 0.001,
    "l2": 0.0001
}

best_lr = best_params["lr"]
best_l2 = best_params["l2"]
final_model = IPA(seed_value=47, regularization_factor=best_l2)

lr_schedule = ExponentialDecay(
    initial_learning_rate=best_lr,
    decay_steps=10000,
    decay_rate=1e-3
)

early_stop = tf.keras.callbacks.EarlyStopping(
    monitor='loss', 
    patience=5,
    restore_best_weights=True
)

final_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule),
    loss=custom_loss(best_l2, final_model),
    metrics=[tf.keras.metrics.RootMeanSquaredError()]
)

history = final_model.fit(
    X_train_f, Y_train,
    epochs=100,
    batch_size=16,
    callbacks=[early_stop],
    verbose=1
)


# Evaluate on test set
test_loss, test_rmse = final_model.evaluate(X_test_f, Y_test, verbose=1)
print(f"RMSE on test set: {test_rmse:.4f}")

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
RMSE on test set: 3.7735


In [34]:
# Calcul des métriques
rmse = test_rmse
rpd = np.std(Y_test) / rmse
relative_error = rmse / np.mean(Y_test)

# Préparation de la nouvelle ligne
new_row = pd.DataFrame({
    "Modèle": [Modd],
    "Variable": [Var],
    "RMSE": [rmse],
    "RE": [relative_error],
    "RPD": [rpd]
})

# Chemin vers ton fichier CSV
csv_path = "C:/Users/Administrator/Desktop/PFE/otha/results/resultats_IPA_default.csv"

# Si le fichier existe déjà, on l'ouvre et on ajoute la nouvelle ligne
if os.path.exists(csv_path):
    existing_results = pd.read_csv(csv_path)
    updated_results = pd.concat([existing_results, new_row], ignore_index=True)
else:
    # Si le fichier n'existe pas encore, on crée un nouveau fichier avec juste cette ligne
    updated_results = new_row

# Sauvegarde
updated_results.to_csv(csv_path, index=False, sep=',')
updated_results

Unnamed: 0,Modèle,Variable,RMSE,RE,RPD
0,IPA,adf,3.564828,0.107272,2.800879
1,IPA,adl,2.905515,0.264555,2.673225
2,IPA,cf,2.936756,0.101943,3.11617
3,IPA,cp,1.112475,0.097648,4.18134
4,IPA,dmdcell,4.909425,0.092111,3.372931
5,IPA,ndf,3.77349,0.071644,3.59639


In [None]:
# csv_path = "C:/Users/Administrator/Desktop/PFE/otha/results/resultats_IPA.csv"
# dff = pd.read_csv(csv_path)
# # Remove duplicate rows (keeping only the first occurrence)
# df_unique = dff.drop_duplicates(subset=['Modèle', 'Variable'], keep='last')
# # Save the cleaned CSV
# df_unique.to_csv(csv_path, index=False, sep=',')
# df_unique

In [None]:
# Save the model summary to a text file
Models = f"C:/Users/Administrator/Desktop/PFE/otha/models/Archs_.txt"
final_model.save_weights(f"ipa_{Modd}.h5")

# Open the file in append mode
with open(Models, "a") as file:
    # Write the header with the model name
    file.write(f" ************************************************************\n")
    file.write(f" ******************  Model: {Modd}__{Var}  ******************\n")
    file.write(f" ************************************************************\n")
    model_summary = []
    final_model.summary(print_fn=lambda x: model_summary.append(x))
    file.write("\n".join(model_summary))
    file.write("\n\n")  # Add some space between entries


In [None]:
# Save best hyperparameters to a JSON file
params_path = f"C:/Users/Administrator/Desktop/PFE/otha/models/best parameters/best_params_{Modd}__{Var}.json"

with open(params_path, "a") as param_file:
    json.dump(best_params, param_file, indent=4)

# ________________________________