In [1]:

import numpy as np
from sklearn.linear_model import LassoCV, ElasticNetCV
from scipy.interpolate import UnivariateSpline
from scipy.signal import savgol_filter
import scipy.special as sp_special
from sklearn.model_selection import cross_val_score
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
import warnings

In [2]:

def create_polynomial_feature_descriptions(candidate_function_descriptions, degree):
    poly = PolynomialFeatures(degree, include_bias=False)
    feature_combinations = poly.fit_transform(np.zeros((1, len(candidate_function_descriptions))))
    feature_names = poly.get_feature_names_out(candidate_function_descriptions)
    return feature_names
    
def robust_differential_equation_discovery(
    data,
    candidate_functions,
    candidate_function_descriptions,
    derivative_order=3,
    alpha_range=(1e-7, 1e-4),
    l1_ratio_range=(0.1, 0.9),
    cv_folds=10,
    n_bootstrap=100
):
    # Step 1: Denoising
    denoised_data = denoise_data(data)
   
    # Step 2: Interpolation
    interpolated_data = interpolate_data(denoised_data)
   
    # Step 3: Derivative Estimation
    derivatives = estimate_derivatives(interpolated_data, order=derivative_order)
   
    # Step 4: Feature Engineering
    X = create_feature_matrix(interpolated_data, derivatives, candidate_functions)
   
    # Polynomial Features Expansion
    degree = 3
    poly = PolynomialFeatures(degree=degree, include_bias=False)
    X_poly = poly.fit_transform(X)
   
    # Generate Polynomial Feature Descriptions
    poly_feature_descriptions = create_polynomial_feature_descriptions(candidate_function_descriptions, degree)
   
    # Feature Scaling
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_poly)
   
    # Step 5: Sparse Identification using ElasticNet
    elastic_net = ElasticNetCV(
        alphas=np.logspace(np.log10(alpha_range[0]), np.log10(alpha_range[1]), 100),
        l1_ratio=np.linspace(l1_ratio_range[0], l1_ratio_range[1], 10),
        cv=cv_folds,
        max_iter=10000,
        random_state=0
    )
    elastic_net.fit(X_scaled, interpolated_data['y'])
   
    # Step 6: Model Validation
    scores = cross_val_score(
        elastic_net,
        X_scaled,
        interpolated_data['y'],
        cv=cv_folds,
        scoring='neg_mean_squared_error'
    )
    print("Cross-validation MSE:", -np.mean(scores))
   
    # Step 7: Uncertainty Quantification using Bootstrapping
    bootstrapped_models = bootstrap_model_fitting(
        X_scaled,
        interpolated_data['y'],
        elastic_net,
        n_bootstrap=n_bootstrap
    )
   
    # Step 8: Final Model Selection
    final_model = select_best_model(bootstrapped_models, X_scaled, interpolated_data['y'])
   
    return final_model, elastic_net.coef_, poly_feature_descriptions

def denoise_data(data):
    # Implement a denoising technique like Savitzky-Golay filtering
    denoised_y = savgol_filter(data['y'], window_length=7, polyorder=3)
    return {'x': data['x'], 'y': denoised_y}

def interpolate_data(denoised_data):
    # Implement an interpolation technique like Spline Interpolation
    spline = UnivariateSpline(denoised_data['x'], denoised_data['y'], s=0)
    interpolated_y = spline(denoised_data['x'])
    return {'x': denoised_data['x'], 'y': interpolated_y}

def estimate_derivatives(data, order=3):
    # Use finite differences to estimate derivatives up to the specified order
    derivatives = {'0th': data['y']}
    for i in range(1, order + 1):
        derivatives[f'{i}th'] = np.gradient(derivatives[f'{i-1}th'], data['x'])
    return derivatives

def create_feature_matrix(data, derivatives, candidate_functions):
    # Construct a matrix with candidate functions applied to the data and its derivatives
    X = []
    feature_names = []
    for order_label, derivative in derivatives.items():
        for func_index, func in enumerate(candidate_functions):
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                try:
                    feature = func(derivative)
                    # Check for invalid values
                    if np.any(np.isnan(feature)) or np.any(np.isinf(feature)) or np.iscomplexobj(feature):
                        continue
                    # Clip extreme values to prevent overflow
                    feature = np.clip(feature, -1e6, 1e6)
                    X.append(feature.real)  # Ensure real values
                    feature_names.append(f'{order_label}_func_{func_index}')
                except Exception:
                    continue
    if not X:
        raise ValueError("No valid features generated. Check candidate functions and data.")
    X = np.column_stack(X)
    return X

def bootstrap_model_fitting(X, y, base_model, n_bootstrap=100):
    # Apply bootstrapping to fit the model multiple times and quantify uncertainty
    bootstrapped_models = []
    n_samples = X.shape[0]
    for i in range(n_bootstrap):
        sample_indices = np.random.choice(n_samples, size=n_samples, replace=True)
        X_sample, y_sample = X[sample_indices], y[sample_indices]
        model = LassoCV(
            alphas=base_model.alphas_,
            cv=base_model.cv,            # Corrected attribute for cross-validation folds
            max_iter=base_model.max_iter,
            random_state=i
        )
        model.fit(X_sample, y_sample)
        bootstrapped_models.append(model)
    return bootstrapped_models# Output the discovered model coefficients

def select_best_model(models, X, y):
    # Select the model with the best performance according to mean squared error
    mse_scores = [mean_squared_error(m.predict(X), y) for m in models]
    best_model_index = np.argmin(mse_scores)
    best_model = models[best_model_index]
    print(f"Best model MSE: {mse_scores[best_model_index]}")
    return best_model

# Updated candidate functions with safer implementations
candidate_functions = [
    # Basic functions
    lambda x: x,                                # Linear term
    lambda x: x**2,                             # Quadratic term
    lambda x: x**3,                             # Cubic term
    lambda x: x**4,                             # Fourth power term
    lambda x: x**5,                             # Fifth power term

    # Trigonometric functions
    lambda x: np.sin(x),                        # Sine function
    lambda x: np.cos(x),                        # Cosine function
    lambda x: np.tan(x),                        # Tangent function with domain restriction
    lambda x: np.sin(2 * x),                    # Harmonic sine term
    lambda x: np.cos(2 * x),                    # Harmonic cosine term

    # Exponential and logarithmic functions
    lambda x: np.exp(np.clip(x, -100, 100)),    # Exponential function with clipping
    lambda x: np.exp(-np.clip(x, -100, 100)),   # Decaying exponential function with clipping
    lambda x: np.log(np.abs(x) + 1e-6),         # Logarithmic function
    lambda x: np.exp(np.clip(x**2, -100, 100)), # Exponential of a quadratic term with clipping
    lambda x: np.log(x**2 + 1e-6),              # Logarithm of a quadratic term

    # Hyperbolic functions
    lambda x: np.tanh(x),                       # Hyperbolic tangent function
    lambda x: np.sinh(x),                       # Hyperbolic sine function
    lambda x: np.cosh(x),                       # Hyperbolic cosine function with clipping
     
    # Special functions with safe evaluations
    lambda x: sp_special.gamma(np.clip(x, 1e-6, 100)),          # Gamma function
    lambda x: sp_special.psi(np.clip(x, 1e-6, 100)),            # Digamma function
    lambda x: sp_special.erf(x),                                 # Error function
    lambda x: sp_special.erfc(x),                                # Complementary error function
    lambda x: sp_special.jv(0, x),                               # Bessel function of the first kind (order 0)
    lambda x: sp_special.yv(0, np.clip(x, 1e-6, 100)),          # Bessel function of the second kind (order 0)
    lambda x: sp_special.beta(np.clip(x, 1e-6, 100), np.clip(x+1, 1e-6, 100)), # Beta function
    lambda x: sp_special.lambertw(x).real,                       # Lambert W function (principal branch)
    lambda x: sp_special.zeta(np.clip(x, 1.1, 100)),             # Riemann zeta function

    # Inverse and root functions
    lambda x: 1 / (np.abs(x) + 1e-6),                            # Inverse function
    lambda x: np.sqrt(np.abs(x) + 1e-6),                         # Square root function
]

# List of descriptions for the candidate functions
candidate_function_descriptions = [
    "x",                                        # Linear term
    "x^2",                                      # Quadratic term
    "x^3",                                      # Cubic term
    "x^4",                                      # Fourth power term
    "x^5",                                      # Fifth power term

    # Trigonometric functions
    "sin(x)",                                   # Sine function
    "cos(x)",                                   # Cosine function
    "tan(x)",                                   # Tangent function with domain restriction
    "sin(2x)",                                  # Harmonic sine term
    "cos(2x)",                                  # Harmonic cosine term

    # Exponential and logarithmic functions
    "exp(clip(x, -100, 100))",                  # Exponential function with clipping
    "exp(-clip(x, -100, 100))",                 # Decaying exponential function with clipping
    "log(abs(x) + 1e-6)",                       # Logarithmic function
    "exp(clip(x^2, -100, 100))",                # Exponential of a quadratic term with clipping
    "log(x^2 + 1e-6)",                          # Logarithm of a quadratic term

    # Hyperbolic functions
    "tanh(x)",                                  # Hyperbolic tangent function
    "sinh(x)",                                  # Hyperbolic sine function
    "cosh(x)",                                  # Hyperbolic cosine function with clipping
     
    # Special functions with safe evaluations
    "gamma(clip(x, 1e-6, 100))",                # Gamma function
    "psi(clip(x, 1e-6, 100))",                  # Digamma function
    "erf(x)",                                   # Error function
    "erfc(x)",                                  # Complementary error function
    "jv(0, x)",                                 # Bessel function of the first kind (order 0)
    "yv(0, clip(x, 1e-6, 100))",                # Bessel function of the second kind (order 0)
    "beta(clip(x, 1e-6, 100), clip(x+1, 1e-6, 100))", # Beta function with arbitrary parameters
    "lambertw(x).real",                         # Lambert W function (principal branch)
    "zeta(clip(x, 1.1, 100))",                  # Riemann zeta function

    # Inverse and root functions
    "1 / (abs(x) + 1e-6)",                      # Inverse function
    "sqrt(abs(x) + 1e-6)",                      # Square root function
]

In [3]:

# Simulate some noisy data for testing
np.random.seed(0)
x = np.linspace(0, 10, 100)
y = 2*x - 0.5*x**2 + np.sin(x)
data = {"x": x, "y": y}

In [None]:

# Discover the differential equation
model, coefficients, poly_feature_descriptions = robust_differential_equation_discovery(
    data,
    candidate_functions,
    candidate_function_descriptions,
    derivative_order=3
)

# Identify the non-zero coefficients
non_zero_indices = np.nonzero(coefficients)[0]

# Ensure that the index does not exceed the length of feature descriptions
valid_non_zero_indices = [idx for idx in non_zero_indices if idx < len(poly_feature_descriptions)]

# Print the non-zero coefficients and corresponding functions
print("Discovered model:")
terms = []
for idx in valid_non_zero_indices:
    description = poly_feature_descriptions[idx]  # Get the description of the function
    coefficient = coefficients[idx]
    terms.append(f"{coefficient:.6f} * {description}")
    print(f"Coefficient: {coefficient:.6f}, Function: {description}")

# Print the final equation
equation = " + ".join(terms)
print(f"Final Model: y = {equation}")