In [1]:
import glob as gb
import pandas as pd
import numpy as np
import onnxruntime as ort
import math
import time
from scipy.stats import skew 


# Path to trained model
onnx_path = "./../neural_network/saved_models/NN_model_for_uq_analysis.onnx"

# Create session
session = ort.InferenceSession(
    onnx_path,
    providers=["CPUExecutionProvider"]
)

input_name  = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name

print("Input shape:", session.get_inputs()[0].shape)
print("Output shape:", session.get_outputs()[0].shape)

Input shape: ['N', 2]
Output shape: ['N', 7]


In [2]:
# Read the data from the FlameMaster files
fname = './../neural_network/data/chemtable_FVV_2D_Enthalpy/*.kg'
files = gb.glob(fname)
nfiles = len(files)


# Read column names from the second line of the first file
with open(files[0], 'r') as f:
    lines = f.readlines()
    column_names = lines[1].strip().split('\t')  # Read second line (index 1) and split by whitespace

# Create empty lists to store the data
data_flameMaster = []

# Load and concatenate the data into DataFrames
for f in files:

    df_flameMaster_temp = pd.DataFrame(np.loadtxt(f, skiprows=2, dtype=np.float64),columns=column_names)
    data_flameMaster.append(df_flameMaster_temp)

# Concatenate all data into final DataFrames
df_flameMaster_all = pd.concat(data_flameMaster, ignore_index=True)

#Computing diffusivity 
df_flameMaster_all['Diff [kg/ms]'] = df_flameMaster_all['lambda [W/mK]'] / df_flameMaster_all['cp [J/kgK]']

# Select the relevant columns for input and output
input_data = ['ProgVar', 'TotalEnthalpy [J/kg]']
referenceEnthalpy = 276240

# Shift the enthalpy values to be relative to the reference enthalpy
df_flameMaster_all['TotalEnthalpy [J/kg]'] = df_flameMaster_all['TotalEnthalpy [J/kg]'] - referenceEnthalpy

# Select the relevant columns for input and output
output_data = ['ProdRateProgVar [kg/m^3s]', 'temperature [K]', 'Y-CO', 'density', 'mu [kg/ms]', 'cp [J/kgK]', 'Diff [kg/ms]']

# Create DataFrames for input and output data
X_all = df_flameMaster_all[input_data].to_numpy()
Z_all = df_flameMaster_all[output_data].to_numpy()

# Load mask from trained model
mask = np.load("./../neural_network/train_test_mask.npy")

X_train = X_all[mask].astype(np.float32)
Z_train = Z_all[mask].astype(np.float32)
X_test  = X_all[~mask].astype(np.float32)
Z_test  = Z_all[~mask].astype(np.float32)


In [3]:
sigma_rel = 0.05    # 5% relative uncertainty

# Bounds for clipping (same as MC)
c_min, c_max = float(X_all[:,0].min()), float(X_all[:,0].max())
h_min, h_max = float(X_all[:,1].min()), float(X_all[:,1].max())

def noisy_inputs_from_xi(x0, xi, sigma_rel, c_min, c_max, h_min, h_max):
    """
    x0: (2,) base point [c0,h0]
    xi: (N,2) standard normal noise samples
    """
    x0 = np.asarray(x0, dtype=np.float64)
    X_noisy = x0[None, :] + sigma_rel * x0[None, :] * xi
    X_noisy[:,0] = np.clip(X_noisy[:,0], c_min, c_max)
    X_noisy[:,1] = np.clip(X_noisy[:,1], h_min, h_max)
    return X_noisy


In [4]:
def hermite_polynomial(n, x):
    if n == 0:
        return np.ones_like(x)
    elif n == 1:
        return x
    return x * hermite_polynomial(n-1, x) - (n-1) * hermite_polynomial(n-2, x)

def orthonormal_hermite(n, x):
    return hermite_polynomial(n, x) / np.sqrt(math.factorial(n))

def pce2d_setup_noise(order):
    q = order + 1
    x, w = np.polynomial.hermite.hermgauss(q)  # exp(-x^2)
    z = np.sqrt(2) * x                         # z ~ N(0,1)

    basis = [(i, j) for i in range(order+1) for j in range(order+1) if i + j <= order]

    psi = np.zeros((order+1, q))
    for n in range(order+1):
        psi[n, :] = orthonormal_hermite(n, z)

    Z1, Z2 = np.meshgrid(z, z, indexing="ij")
    Wq = np.outer(w, w)
    return Z1, Z2, Wq, psi, basis

In [5]:
def pce_noise_mean_var_one_point(x0, order):
    Z1, Z2, Wq, psi, basis = pce2d_setup_noise(order)
    q = order + 1

    c0, h0 = float(x0[0]), float(x0[1])

    C = c0 + sigma_rel * c0 * Z1
    H = h0 + sigma_rel * h0 * Z2

    C = np.clip(C, c_min, c_max)
    H = np.clip(H, h_min, h_max)

    X_grid = np.stack([C.ravel(), H.ravel()], axis=1).astype(np.float32)

    Y = session.run([output_name], {input_name: X_grid})[0]  # (q*q,7)
    Y = Y.reshape(q, q, -1)

    coeffs = np.zeros((len(basis), Y.shape[-1]), dtype=np.float64)
    norm = math.pi

    for k, (i, j) in enumerate(basis):
        Phi = np.outer(psi[i, :], psi[j, :])
        coeffs[k, :] = np.sum((Wq * Phi)[:, :, None] * Y, axis=(0, 1)) / norm

    mean = coeffs[0, :]
    var  = np.sum(coeffs[1:, :]**2, axis=0)
    return mean, var


In [6]:
output_data = ['ProdRateProgVar [kg/m^3s]', 'temperature [K]', 'Y-CO', 'density',
               'mu [kg/ms]', 'cp [J/kgK]', 'Diff [kg/ms]']

def pce_noise_stats_over_points(X_sel, order):
    means = np.zeros((len(X_sel), 7))
    vars_ = np.zeros((len(X_sel), 7))

    for i, x0 in enumerate(X_sel):
        m, v = pce_noise_mean_var_one_point(x0, order)
        means[i, :] = m
        vars_[i, :] = v

    mean_phys = means.mean(axis=0)
    var_phys  = vars_.mean(axis=0)   # variance due to noise, averaged over points
    std_phys  = np.sqrt(var_phys)
    return mean_phys, std_phys, var_phys

rng = np.random.default_rng(0)
N_sel = 1000  # start small
idx = rng.choice(len(X_test), size=N_sel, replace=False)
X_sel = X_test[idx].astype(np.float64)

order_noise = 20
t0 = time.time()
mean_phys, std_phys, var_phys = pce_noise_stats_over_points(X_sel, order_noise)
t1 = time.time()

df_uq_pce = pd.DataFrame({
    "Output": output_data,
    "Mean": mean_phys,
    "Variance": var_phys,
})
print(f"Done. N_sel={N_sel}, order={order_noise}, time={t1-t0:.2f}s")
df_uq_pce


KeyboardInterrupt: 