In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns
import pandas as pd
from sklearn.metrics import mean_squared_error, r2_score, accuracy_score
from sklearn.model_selection import train_test_split
from pysindy import SINDy
from sklearn.preprocessing import StandardScaler
from pysindy.optimizers import STLSQ
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix 
from sklearn.model_selection import StratifiedKFold 
from pysindy.feature_library import PolynomialLibrary, FourierLibrary, GeneralizedLibrary

# Model agnostic 
from typing import Optional, List, Callable, Dict, Any, List
from pathlib import Path
from itertools import islice
from  utils import ChiefBldr  # custom model for data handling/model trianing

# Model specific 
from typing import Optional, List 

This version of Feyn and the QLattice is available for academic, personal, and non-commercial use. By using the community version of this software you agree to the terms and conditions which can be found at https://abzu.ai/eula.

In [2]:
# Get the directory this file lives in
nb_dir = Path.cwd() # notebook directory
project_root = nb_dir.parents[1] # project directory
data_path = project_root / "datasets" / "processed_well_data.csv"

includ_cols = ['Dia', 'Dev(deg)','Area (m2)', 'z','GasDens','LiquidDens', 'P/T','friction_factor', 'critical_film_thickness']
D = ChiefBldr(path=data_path, includ_cols=includ_cols, test_size=0.20)

In [3]:
def sindy(
        hparams: Dict[str,Any]
):      
        # partition dict by method
        hparams_opt = dict(list(hparams.items())[:2])
        hparams_poly = dict(list(hparams.items())[-2:-1])
        hparams_fourier = dict(list(hparams.items())[-1:])
        
        # Define optimizer for SINDy
        hparams_opt = dict(islice(hparams.items(), 2))
        optimizer = STLSQ(
        max_iter=10000,
        normalize_columns=True,
        **hparams_opt,
        )
        # specify feature lib
        poly_library = PolynomialLibrary(**hparams_poly)
        fourier_library = FourierLibrary(**hparams_fourier)
        lib = GeneralizedLibrary([poly_library, fourier_library])
        model = SINDy(optimizer=optimizer, feature_library=lib)

        return model 

hparam_grid = {
    'alpha': np.logspace(-4, 0.25, 10),      
    'threshold': np.logspace(-4, -1, 10),  
    'degree': [1, 2, 3, 5],
    'n_frequencies': [1, 2, 3, 4, 5]
}

# train model and optimize hyperparameters via grid search 
trained_model = D.evolv_model(build_model=sindy, hparam_grid=hparam_grid, k_folds=5)


Training model and optimizing hyperparameters via k-fold CV...
Done. Best score = 0.7463458110516934
Best hyperparameters: {'alpha': 1.7782794100389228, 'threshold': 0.1, 'degree': 2, 'n_frequencies': 2}
Retraining optimized model on full training set
Training set score: 0.7650602409638554
Test set score: 0.6666666666666666


# Data preparation 

In [None]:
# K-fold cross validation

def evaluate_sindy(params, X, y, gsflow, loading_class, cv_splits=5):
    alpha, threshold, interval, n, f = params
    
    # Use StratifiedKFold to preserve class balance in splits
    kf = StratifiedKFold(n_splits=cv_splits, shuffle=True, random_state=42)

    acc_scores = []
    
    for train_idx, val_idx in kf.split(X, loading_class):  # Using validation split from training data
        # divide into trianing/validation sets
        X_train_cv, X_val_cv = X[train_idx], X[val_idx]
        y_train_cv, y_val_cv = y[train_idx], y[val_idx]
        gsflow_val_cv = gsflow[val_idx]
        loading_val_cv = loading_class[val_idx]
        
        scaler_X = StandardScaler()
        X_train_cv_scaled = scaler_X.fit_transform(X_train_cv)
        X_val_cv_scaled = scaler_X.transform(X_val_cv)
        
        scaler_y = StandardScaler()
        y_train_cv_scaled = scaler_y.fit_transform(y_train_cv.reshape(-1, 1))
        
        # Define optimizer for SINDy
        optimizer = STLSQ(
            alpha=alpha,
            threshold=threshold,
            max_iter=10000,
            normalize_columns=True
        )
        
        model.fit(X_train_cv_scaled, t=np.arange(len(y_train_cv_scaled)), 
                 x_dot=y_train_cv_scaled)
        
        # Compute performance on validation set 
        y_val_pred_scaled = model.predict(X_val_cv_scaled)
        y_val_pred_cv = scaler_y.inverse_transform(y_val_pred_scaled).flatten()
        
        acc, cm = calculate_accuracy(y_val_pred_cv, gsflow_val_cv, loading_val_cv, interval)
        acc_scores.append(acc)
    
    return np.mean(acc_scores)


In [None]:

def optimize_sindy_hyperparameters(X_train, y_train, gsflow_train, loading_train, param_grid):
    best_score = -1
    best_params = None
    
    X_train = np.array(X_train)
    y_train = np.array(y_train).flatten()
    gsflow_train = np.array(gsflow_train)
    loading_train = np.array(loading_train)
    
    print("Begin training and hyperparameter optimization...")
    for alpha in param_grid['alpha']:
        for threshold in param_grid['threshold']:
            for interval in param_grid['interval']:
                for n in param_grid['n']:
                    for f in param_grid['f']:
                        score = evaluate_sindy((alpha, threshold, interval, n, f), 
                                            X_train, y_train, gsflow_train, loading_train)
                    
                    if score > best_score:
                        best_score = score
                        best_params = {'alpha': alpha, 'threshold': threshold, 'interval': interval, 'n': n}
    
    return best_params, best_score

In [None]:
# Grid search for optimal hyperparameters 
param_grid = {
    'alpha': np.logspace(-4, 0.25, 10),      
    'threshold': np.logspace(-4, -1, 10),  
    'interval': np.linspace(0, 10, 10) , 
    'n': np.array([1, 2, 3, 5]),
    'f':np.array([1, 2, 3, 4, 5])
}

#  Train model and optimize hyperparameters 
best_params, best_score = optimize_sindy_hyperparameters(
    X_train, y_train, gsflow_train, loading_train, param_grid
)

print("\nBest parameters found:")
print(best_params)
print(f"Best CV accuracy: {best_score*100:.2f}%")

# Train final model on FULL training set with best params and evaluate on the final test set
final_optimizer = STLSQ(
    alpha=best_params['alpha'],
    threshold=best_params['threshold'],
    max_iter=10000,
    normalize_columns=True
)

# Evaluate model performance 
fourier_library = FourierLibrary(n_frequencies=int(best_params['n']))
poly_library = PolynomialLibrary(degree=int(best_params['n']))
lib = GeneralizedLibrary([poly_library, fourier_library])

final_model = SINDy(optimizer=final_optimizer, feature_library=lib)
final_model.fit(X_train_scaled, t=t_train, x_dot=y_train_scaled)

# Evaluate on trainin set
y_pred_train_scaled = final_model.predict(X_train_scaled)
y_pred_train = scaler_y.inverse_transform(y_pred_train_scaled).flatten()

# Evaluate on TEST set (previously unseen data)
y_pred_test_scaled = final_model.predict(X_test_scaled)
y_pred_test = scaler_y.inverse_transform(y_pred_test_scaled).flatten()

# Calculate metrics on TEST set
train_acc, train_cm = calculate_accuracy(y_pred_train, gsflow_train, loading_train, best_params['interval'])
test_acc, test_cm = calculate_accuracy(y_pred_test, gsflow_test, loading_test, best_params['interval'])

print("\n=== Final Model Performance ===")
final_model.print()
print(f"\nTraining Set Classification Accuracy: {train_acc*100:.2f}%")
print(f"Test Set Classification Accuracy: {test_acc*100:.2f}%")

print("Confusion Matrix for Train Set Classification")
print(train_cm)
print("Confusion Matrix for Test Set Classification")
print(test_cm)

# Classification graph for well status 

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Set publication-friendly plotting settings
plt.rcParams['text.usetex'] = False
plt.rcParams['font.family'] = 'Arial'  # Publication-friendly font
plt.rcParams['axes.linewidth'] = 0.8
plt.rcParams['xtick.direction'] = 'in'
plt.rcParams['ytick.direction'] = 'in'

# Define a light, publication-friendly color palette without green
color_map = {
    'Loaded': '#FF6363',  # Light red
    'Unloaded': '#3674B5',  # Light purple
    'Questionable': '#D3D3D3',  # Light orange
    'Near L.U': '#FFCC99'  # Light blue
}
status_col = 'Test status'

df = pd.read_csv(data_path)

df.rename(columns={
    'Dev(deg)': 'Dev_deg',
    'Area (m2)': 'Area_m2',
    'g (m/s2)': 'g_m_s2',
    'P/T': 'P_T'
}, inplace=True)

# Map Test status to numerical classes
loading_class_map = {'Unloaded': -1, 'Near L.U': 0, 'Loaded': 1, 'Questionable': 1}
df['loading_class'] = df[status_col].map(loading_class_map)

# Define features and targets
features = ['Dia', 'Dev_deg', 'Area_m2', 'z', 'GasDens', 'LiquidDens', 'g_m_s2', 'P_T', 'friction_factor', 'critical_film_thickness']
output = 'Qcr'
gasflow = 'Gasflowrate'
status_col = 'Test status'

X = df[features]
y = df[output]
gsflow = df[gasflow]
loading_class = df['loading_class']
X_train, X_test, y_train, y_test, gsflow_train, gsflow_test, loading_train, loading_test = train_test_split(
    X, y, gsflow, loading_class, test_size=0.2, random_state=42, stratify=loading_class
)

scaler_X = StandardScaler()
X_train_scaled = scaler_X.fit_transform(X_train)
X_test_scaled = scaler_X.transform(X_test)
X_scaled = scaler_X.transform(X)

scaler_y = StandardScaler()
y_train_scaled = scaler_y.fit_transform(y_train.values.reshape(-1, 1)).flatten()
y_test_scaled = scaler_y.transform(y_test.values.reshape(-1, 1)).flatten()
y_scaled = scaler_y.transform(y.values.reshape(-1, 1)).flatten()

# Slice the first 40 entries for plotting.
gasflow_subset = gsflow
y_pred_subset = trained_model.predict(X_scaled)
colors = df[status_col].map(color_map).fillna('#D3D3D3')  # Light gray for missing values

plt.figure(figsize=(8, 6), dpi=300)
sns.set_theme(style="whitegrid", context="paper", font_scale=1.3, font='Arial')
plt.scatter(gasflow_subset, y_pred_subset, c=colors, alpha=1, s=100, edgecolors="gray", linewidth=0.5)
plt.plot([0, 350000], [0, 350000], '--', color='#FF6666', linewidth=1.5)  # Light red dashed line
plt.title("PySINDy Model", fontsize=16, fontweight='bold', pad=15)
plt.xlabel("Measured Well Flow Rate (m³/day)", fontsize=14, fontweight='bold')
plt.ylabel("Predicted Critical Rate (m³/day)", fontsize=14, fontweight='bold')
plt.grid(True, linestyle='--', alpha=0.7)
plt.xlim(0, 350000)
plt.ylim(0, 350000)
legend_patches = [mpatches.Patch(color=color, label=status) for status, color in color_map.items()]
plt.legend(handles=legend_patches, title='Actual Label', fontsize=12, title_fontsize=14,
           loc='best', frameon=True, edgecolor='gray')
plt.tight_layout()
plt.savefig("pysindy_scatter_new.pdf", format="pdf", dpi=300, bbox_inches='tight')
plt.show()
plt.close()

# Fit model for literature comparsion

In [None]:
# Split features
X = df[['Dia', 'Dev(deg)','Area (m2)', 'z','GasDens','LiquidDens', 'P/T','friction_factor', 'critical_film_thickness']]
y = df['Qcr']
gsflow = df['Gasflowrate']  # This is your additional target for classification metrics

# load class labels: loaded/unloaded/near loaded
loading_class = df['Test status'].apply(
    lambda x: -1 if x == 'Unloaded' else (0 if x == 'Near L.U' else 1)).to_numpy()

# Perform the train-test split, making sure to split all targets simultaneously
X_train, X_test, y_train, y_test, gsflow_train, gsflow_test, loading_train, loading_test = train_test_split(
    X, y, gsflow, loading_class, test_size=0.20, random_state=42, stratify=loading_class
)

"""
For final evaluation of the model, we need designate test data and normalize to all training data
"""
# Scale your features and continuous target (Qcr)
scaler_X = StandardScaler()
scaler_y = StandardScaler()

X_train_scaled = scaler_X.fit_transform(X_train)
X_test_scaled = scaler_X.transform(X_test)
X_scaled = scaler_X.transform(X)

y_train_scaled = scaler_y.fit_transform(y_train.values.reshape(-1, 1))
y_test_scaled = scaler_y.transform(y_test.values.reshape(-1, 1))
y_scaled = scaler_y.transform(y.values.reshape(-1, 1))

# t_train is just an index array for plotting
t_train = np.arange(len(y_train_scaled))
t = np.arange(len(y_scaled))

#convert to a numpy array and store test data 
loading_train = np.array(loading_train)
loading_test = np.array(loading_test)
loading = np.array(loading_class)
gsflow_test = np.array(gsflow_test)
gsflow = np.array(gsflow)
y_test = np.array(y_test)
y = np.array(y)