In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam
from joblib import Parallel, delayed

# Configuration parameters
DATA_PATH = "Data.xlsx"  
TARGET_COLUMNS = ['qS', 'mu', 'qP']  
INPUT_FEATURES = 7  
SEARCH_SPACE = {
    'hidden_units': [2, 4, 6, 8, 10, 12, 14, 16, 18, 20],
    'learning_rate': [0.001, 0.005, 0.01],
    'batch_size': [16, 32, 64]
}
N_SPLITS = 5  # Cross-validation folds
MAX_EPOCHS = 100
PATIENCE = 5

# load data
def load_and_preprocess(data_path):
    df = pd.read_excel(data_path)
    
    # Data cleaning
    df = df.dropna()  # Remove missing values
    df = df[(df[TARGET_COLUMNS] > 0).all(axis=1)]  # Remove negative values
    
    # Data Splitting
    X = df.iloc[:, :INPUT_FEATURES].values
    y = df[TARGET_COLUMNS].values
    
    # Data standardization
    scaler_X = StandardScaler().fit(X)
    scaler_y = StandardScaler().fit(y)
    
    return scaler_X, scaler_y, X, y

# Neural network constructor
def build_model(hidden_units, learning_rate):
    model = Sequential([
        Dense(hidden_units, activation='relu', input_shape=(INPUT_FEATURES,)),
        Dense(1, activation='linear')
    ])
    model.compile(
        optimizer=Adam(learning_rate=learning_rate),
        loss='mse',
        metrics=[r2_score]
    )
    return model

# Single training validation function
def train_eval_fold(params, X_train, y_train, X_val, y_val):
    model = build_model(params['hidden_units'], params['learning_rate'])
    
    early_stop = EarlyStopping(
        monitor='val_loss',
        patience=PATIENCE,
        restore_best_weights=True
    )
    
    history = model.fit(
        X_train, y_train,
        epochs=MAX_EPOCHS,
        batch_size=params['batch_size'],
        validation_data=(X_val, y_val),
        callbacks=[early_stop],
        verbose=0
    )
    
    # Getting the optimalepoch
    best_epoch = np.argmin(history.history['val_loss'])
    
    # Assessment of indicators
    y_pred = model.predict(X_val, verbose=0)
    return {
        'mse': mean_squared_error(y_val, y_pred),
        'r2': r2_score(y_val, y_pred),
        'epochs': best_epoch + 1
    }

# Parameter Search Main Function
def parameter_search(target_index):
    # load data
    scaler_X, scaler_y, X, y = load_and_preprocess(DATA_PATH)
    X = scaler_X.transform(X)
    y = scaler_y.transform(y)[:, target_index].reshape(-1, 1)
    
    # Generating parameter combinations
    param_list = []
    for hu in SEARCH_SPACE['hidden_units']:
        for lr in SEARCH_SPACE['learning_rate']:
            for bs in SEARCH_SPACE['batch_size']:
                param_list.append({
                    'hidden_units': hu,
                    'learning_rate': lr,
                    'batch_size': bs
                })
    
    # Cross-validation search
    kf = KFold(n_splits=N_SPLITS)
    results = []
    
    for params in param_list:
        fold_metrics = {'mse': [], 'r2': [], 'epochs': []}
        
        for train_idx, val_idx in kf.split(X):
            X_train, X_val = X[train_idx], X[val_idx]
            y_train, y_val = y[train_idx], y[val_idx]
            
            metrics = train_eval_fold(params, X_train, y_train, X_val, y_val)
            fold_metrics['mse'].append(metrics['mse'])
            fold_metrics['r2'].append(metrics['r2'])
            fold_metrics['epochs'].append(metrics['epochs'])
        
        # Calculation of average indicators
        results.append({
            'params': params,
            'mean_mse': np.mean(fold_metrics['mse']),
            'std_mse': np.std(fold_metrics['mse']),
            'mean_r2': np.mean(fold_metrics['r2']),
            'std_r2': np.std(fold_metrics['r2']),
            'mean_epochs': np.mean(fold_metrics['epochs'])
        })
    
    # Results Sorting
    results_df = pd.DataFrame(results)
    results_df = results_df.sort_values(by=['mean_mse', 'mean_r2'], ascending=[True, False])
    
    return results_df, scaler_y

# Parallel execution of parameter searches for three targets
final_results = {}
for i, target in enumerate(TARGET_COLUMNS):
    print(f"\n=== Optimization in progress {target} model ===")
    target_results, scaler = parameter_search(i)
    final_results[target] = {
        'results': target_results,
        'scaler': scaler
    }
    
    # Save results to CSV
    target_results.to_csv(f"{target}_parameter_search_results.csv", index=False)
    
    # Visualization of optimal parameters
    plt.figure(figsize=(10, 6))
    top_params = target_results.head(10)
    sns.barplot(x='mean_r2', y='params', data=top_params, orient='h')
    plt.title(f'Top 10 Parameter Combinations for {target}')
    plt.xlabel('Mean R² Score')
    plt.ylabel('Parameter Combinations')
    plt.tight_layout()
    plt.savefig(f"{target}_parameter_ranking.png")
    plt.close()

# Output the final recommended parameters
print("\n=== Recommended Optimal Parameters ===")
for target in TARGET_COLUMNS:
    best_params = final_results[target]['results'].iloc[0]
    print(f"\n{target} model:")
    print(f"Hidden layer neuron: {best_params['params']['hidden_units']}")
    print(f"Learning rate: {best_params['params']['learning_rate']}")
    print(f"Batch size: {best_params['params']['batch_size']}")
    print(f"Validation set average R²: {best_params['mean_r2']:.4f} ± {best_params['std_r2']:.4f}")
    print(f"Validation set average MSE: {best_params['mean_mse']:.4f} ± {best_params['std_mse']:.4f}")

# Training the final model function
def train_final_model(target, params, scaler_X, scaler_y):
    # Load complete data
    _, _, X, y = load_and_preprocess(DATA_PATH)
    X = scaler_X.transform(X)
    y = scaler_y.transform(y)[:, TARGET_COLUMNS.index(target)].reshape(-1, 1)
    
    # Splitting the training test set
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # Building the final model
    model = build_model(params['hidden_units'], params['learning_rate'])
    
    # Training configuration
    early_stop = EarlyStopping(
        monitor='val_loss',
        patience=PATIENCE,
        restore_best_weights=True
    )
    
    # Training models
    history = model.fit(
        X_train, y_train,
        epochs=MAX_EPOCHS,
        batch_size=params['batch_size'],
        validation_split=0.2,
        callbacks=[early_stop],
        verbose=1
    )
    
    # Test Set Evaluation
    y_pred = model.predict(X_test)
    test_mse = mean_squared_error(y_test, y_pred)
    test_r2 = r2_score(y_test, y_pred)
    
    # Visualization of predicted effects
    plt.figure(figsize=(8, 6))
    plt.scatter(y_test, y_pred, alpha=0.6)
    plt.plot([y.min(), y.max()], [y.min(), y.max()], 'r--')
    plt.title(f'{target} Model Prediction vs Actual')
    plt.xlabel('Actual Values')
    plt.ylabel('Predicted Values')
    plt.savefig(f"{target}_final_model_performance.png")
    plt.close()
    
    return model, test_mse, test_r2

# Train and save all final models
print("\n=== Training the final model ===")
for target in TARGET_COLUMNS:
    print(f"\n Training {target} final model...")
    best_params = final_results[target]['results'].iloc[0]['params']
    scaler_X, scaler_y = load_and_preprocess(DATA_PATH)[0], final_results[target]['scaler']
    
    model, test_mse, test_r2 = train_final_model(
        target, best_params, scaler_X, scaler_y
    )
    
    # save model
    model.save(f"{target}_final_model.h5")
    
    print(f"Test Set Performance - MSE: {test_mse:.4f}, R²: {test_r2:.4f}")