In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Dense, Dropout
import tensorflow as tf
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score, mean_squared_error

# Load the dataset
data = pd.read_excel('LDC data.xlsx')
# Convert raw data to dimensionless parameters
data['W/H'] = data['W'] / data['H']
data['U/U*'] = data['U'] / data['U*']
data['epsilon/HU*'] = data['epsilon'] / (data['H'] * data['U'])
X = data[['W/H', 'U/U*', '(C)Nitrate (µg/L)']]
y = data['epsilon/HU*']

# Split the data into training (70%) and testing (30%) sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# Standardize the data (mean=0, std=1)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# Define physical constraints for HHPS
def custom_loss(y_true, y_pred):
    # Extract input features (to be used for physical constraints)
    W_H = tf.convert_to_tensor(X_train[:, 0], dtype=tf.float32)
    U_U_star = tf.convert_to_tensor(X_train[:, 1], dtype=tf.float32)
    CNitrate = tf.convert_to_tensor(X_train[:, 2], dtype=tf.float32)
    y_pred_tensor = tf.convert_to_tensor(y_pred, dtype=tf.float32)
    # Physical Parameters
    rho = 1000  # Density of water in kg/m³
    v = 1e-6    # Kinematic viscosity in m²/s
    # Apply automatic differentiation to compute derivatives
    with tf.GradientTape(persistent=True) as tape:
        tape.watch(W_H)
        tape.watch(U_U_star)
        tape.watch(CNitrate)
        # Compute first order gradients
        dy_dt = tape.gradient(y_pred_tensor, W_H)
        dy_dx = tape.gradient(y_pred_tensor, U_U_star)
        dy_dC = tape.gradient(y_pred_tensor, CNitrate)
        # Compute second order gradients
        d2y_dx2 = tape.gradient(dy_dx, U_U_star)
        d2y_dtdt = tape.gradient(dy_dt, W_H)
    # Computing constraint terms
    # Mass Continuity: e_MC = |∂Q/∂x + ∂Q/∂t|^2
    mass_continuity = tf.reduce_sum(tf.square(dy_dx + dy_dt))
    # Momentum Conservation: e_MOM = |∂u/∂t + u ∂u/∂x - ν ∂²u/∂x²|^2
    momentum_conservation = tf.reduce_sum(tf.square(dy_dC + U_U_star * dy_dx - v * d2y_dx2))
    # Navier-Stokes: e_NS = |∂C/∂t + u ∂C/∂x - ε ∂²C/∂x²|^2
    navier_stokes = tf.reduce_sum(tf.square(dy_dt + U_U_star * dy_dC - epsilon * d2y_dx2))
    # Mean Squared Error Loss
    mse = tf.keras.backend.mean(tf.keras.backend.square(y_true - y_pred))
    # Combined Loss Function
    total_loss = mse + 0.1 * (mass_continuity + momentum_conservation + navier_stokes)
    return total_loss
# HHPS Model (neural network)
hhps_model = Sequential()
hhps_model.add(Dense(128, input_dim=X_train.shape[1], activation='relu'))
hhps_model.add(Dropout(0.5))
hhps_model.add(Dense(128, activation='relu'))
hhps_model.add(Dropout(0.5))
hhps_model.add(Dense(128, activation='relu'))
hhps_model.add(Dropout(0.5))
hhps_model.add(Dense(128, activation='relu'))
hhps_model.add(Dropout(0.5))
hhps_model.add(Dense(1))
# Compile the HHPS model with custom loss function including the physical constraints
hhps_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss=custom_loss)
# Train the HHPS model
hhps_model.fit(X_train, y_train, epochs=1000, batch_size=32, verbose=1)
# Predict using the HHPS model
hhps_predictions = hhps_model.predict(X_test)

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from keras.models import Sequential
from keras.layers import Dense, Dropout
import tensorflow as tf
import tensorflow.keras.backend as K
from sklearn.metrics import r2_score, mean_squared_error

# Load the dataset
data = pd.read_excel('LDC data.xlsx')
# Convert raw data to dimensionless parameters
data['W/H'] = data['W'] / data['H']
data['U/U*'] = data['U'] / data['U*']
data['Epsilon/HU*'] = data['epsilon'] / (data['H'] * data['U'])
data['C'] = data['(C)Nitrate (µg/L)']  # Ensure you have this column available
X = data[['W/H', 'U/U*', 'C']]
y = data['Epsilon/HU*']
# Split the data into training (70%) and testing (30%) sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# Standardize the data (mean=0, std=1)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# Define the physical constraints based on given equations
def physics_constraints(y_true, y_pred, X):
    W_H = X[:, 0]
    U_U_star = X[:, 1]
    concentration = X[:, 2]
    rho = 1000  # Density of water in kg/m³
    epsilon = 1e-3  # Example value for dispersion coefficient in m²/s (can be adjusted)
    v = 1e-6  # Kinematic viscosity in m²/s
    # Compute gradients using automatic differentiation
    with tf.GradientTape(persistent=True) as tape:
        tape.watch(W_H)
        tape.watch(U_U_star)
        tape.watch(concentration)
        dy_dt = tape.gradient(y_pred, W_H)
        dy_dx = tape.gradient(y_pred, U_U_star)
        dy_dc = tape.gradient(y_pred, concentration)
        dy_d2x = tape.gradient(dy_dc, U_U_star)
        dy_d2t = tape.gradient(dy_dt, W_H)
    # Mass Continuity: |∂Q/∂x + ∂Q/∂t|^2
    mass_continuity = K.sum(K.square(dy_dx + dy_dt)
    # Momentum Conservation: |∂u/∂t + u ∂u/∂x - ν ∂²u/∂x²|^2
    momentum_conservation = (
        K.sum(K.square(dy_dc + U_U_star * dy_dx))  # ∂u/∂t + u ∂u/∂x
        + K.sum(K.square(v * dy_d2x))  # v ∂²u/∂x²
    )
    # Navier-Stokes: |∂C/∂t + u ∂C/∂x - ε ∂²C/∂x²|^2
    navier_stokes_term = (
        K.sum(K.square(dy_dt))  # ∂C/∂t
        + U_U_star * K.sum(K.square(dy_dx))  # u ∂C/∂x
        - epsilon * K.sum(K.square(dy_d2x))  # ε ∂²C/∂x²
    )
    return K.sum(K.square(mass_continuity)) + K.sum(K.square(momentum_conservation)) + K.sum(K.square(navier_stokes_term))
# Define a custom loss function that includes physical constraints
def custom_loss(y_true, y_pred, X):
    mse = K.mean(K.square(y_true - y_pred))
    pc = physics_constraints(y_true, y_pred, X)
    return mse + 0.3 * pc  # weighting factor for physical constraints
def custom_loss_wrapper(X):
    def loss(y_true, y_pred):
        return custom_loss(y_true, y_pred, X)
    return loss
# PEML Model (Physics-Informed Neural Network)
pinn_model = Sequential()
pinn_model.add(Dense(128, input_dim=X_train.shape[1], activation='relu'))
pinn_model.add(Dropout(0.5))
pinn_model.add(Dense(128, activation='relu'))
pinn_model.add(Dropout(0.5))
pinn_model.add(Dense(128, activation='relu'))
pinn_model.add(Dropout(0.5))
pinn_model.add(Dense(1))
# Compile the PEML model with custom loss function including physical constraints
pinn_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
                   loss=custom_loss_wrapper(X_train))
# Train the PEML model
pinn_model.fit(X_train, y_train, epochs=1000, batch_size=32, verbose=1)
# Predict using the PEML model
pinn_predictions = pinn_model.predict(X_test)

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.base import BaseEstimator, RegressorMixin
import tensorflow as tf
import tensorflow.keras.backend as K

# Load the dataset
data = pd.read_excel('LDC data.xlsx')
# Convert raw data to dimensionless parameters
data['W/H'] = data['W'] / data['H']
data['U/U*'] = data['U'] / data['U*']
data['Epsilon/HU*'] = data['epsilon'] / (data['H'] * data['U'])
data['C'] = data['(C)Nitrate (µg/L)']  # Ensure you have this column available
X = data[['W/H', 'U/U*', 'C']]
y = data['Epsilon/HU*']
# Split the data into training (70%) and testing (30%) sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# Standardize the data (mean=0, std=1)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# Define the physical constraints based on given equations
def physics_constraints(y_true, y_pred, X):
    W_H = X[:, 0]
    U_U_star = X[:, 1]
    concentration = X[:, 2]
    rho = 1000  # Density of water in kg/m³
    v = 1e-6  # Kinematic viscosity in m²/s
    # Compute gradients using automatic differentiation
    with tf.GradientTape(persistent=True) as tape:
        tape.watch(W_H)
        tape.watch(U_U_star)
        tape.watch(concentration)
        y_pred_tensor = tf.convert_to_tensor(y_pred, dtype=tf.float32)
        dy_dt = tape.gradient(y_pred_tensor, W_H)
        dy_dx = tape.gradient(y_pred_tensor, U_U_star)
        dy_dc = tape.gradient(y_pred_tensor, concentration)
        dy_d2x = tape.gradient(dy_dc, U_U_star)
        dy_d2t = tape.gradient(dy_dt, W_H)
    # Mass Continuity: |∂Q/∂x + ∂Q/∂t|^2
    mass_continuity = K.sum(K.square(dy_dx + dy_dt))
    # Momentum Conservation: |∂u/∂t + u ∂u/∂x - ν ∂²u/∂x²|^2
    momentum_conservation = (
        K.sum(K.square(dy_dc + U_U_star * dy_dx))  # ∂u/∂t + u ∂u/∂x
        + K.sum(K.square(v * dy_d2x))  # v ∂²u/∂x²
    )
    # Navier-Stokes: |∂C/∂t + u ∂C/∂x - ε ∂²C/∂x²|^2
    navier_stokes_term = (
        K.sum(K.square(dy_dt))  # ∂C/∂t
        + U_U_star * K.sum(K.square(dy_dx))  # u ∂C/∂x
        - epsilon * K.sum(K.square(dy_d2x))  # ε ∂²C/∂x²
    )
    return K.sum(K.square(mass_continuity)) + K.sum(K.square(momentum_conservation)) + K.sum(K.square(navier_stokes_term))
# Custom Loss Function for PRRT model
class PhysicsRegularizedLoss(BaseEstimator, RegressorMixin):
    def __init__(self, base_estimator, physics_regularization_weight=0.3):
        self.base_estimator = base_estimator
        self.physics_regularization_weight = physics_regularization_weight
    def fit(self, X, y):
        self.base_estimator.fit(X, y)
        return self
    def predict(self, X):
        return self.base_estimator.predict(X)
    def score(self, X, y):
        y_pred = self.predict(X)
        mse = mean_squared_error(y, y_pred)
        pc = physics_constraints(y, y_pred, X)
        total_loss = mse + self.physics_regularization_weight * pc
        return -total_loss
# Base Decision Tree Regressor model
dtr = DecisionTreeRegressor(max_depth=5, 
                            min_samples_split=10)
# PRRT model with custom loss
prrt_model = PhysicsRegularizedLoss(base_estimator=dtr, 
                                    physics_regularization_weight=0.3)
# Train the PRRT model
prrt_model.fit(X_train, y_train)
# Predict using the PRRT model
prrt_predictions = prrt_model.predict(X_test)