In [None]:
# ============================
# Imports & Setup
# ============================

import pymc as pm
import arviz as az
import numpy as np
import pandas as pd
import os
from sklearn.metrics import mean_absolute_percentage_error, mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from google.colab import drive

In [None]:
# ============================
# Mount Drive
# ============================
drive.mount('/content/drive')

# ============================
# Configuration
# ============================
DATA_PATH = '/content/drive/My Drive/Objective1/Bayesian_Inference/Objective1_bayesian_data.xlsx'
OUT_DIR = '/content/drive/My Drive/Objective1/Bayesian_Inference'
os.makedirs(OUT_DIR, exist_ok=True)

FORM_CONFIG = {
    "semi_log": {"SRC_mu": -1.7209, "iterations": 1},
    "power": {"SRC_mu": -2.41, "iterations": 1},
    "ihs": {"SRC_mu": -2.6886, "iterations": 100},
}

METRICS_XLSX = os.path.join(OUT_DIR, 'final_metrics_ALL.xlsx')
POSTERIOR_XLSX = os.path.join(OUT_DIR, 'final_posterior_percentiles_ALL.xlsx')
PREDICT_XLSX = os.path.join(OUT_DIR, 'predictions_actual_vs_predicted_ALL.xlsx')

In [None]:
# ============================
# Performance and Model Functions
# ============================
def calculate_metrics(y_true, y_pred):
    mape = mean_absolute_percentage_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    return mape, rmse, mae, r2

log10 = lambda x: pm.math.log(x) / np.log(10.0)
asinh = pm.math.asinh

def build_qs_estimated(formulation, m, a, rho, A, v, d, Cd, SRC):
    g = 9.81
    numerator = (m * g) - (m * a) - (0.5 * rho * Cd * A * (v ** 2))
    vr = v / (d * 0.55)
    if formulation == "semi_log":
        denom = A * (1 + log10(vr) * SRC)
    elif formulation == "power":
        denom = A * (vr ** SRC)
    elif formulation == "ihs":
        denom = A * (1 + asinh(vr) * SRC)
    else:
        raise ValueError(f"Unknown formulation: {formulation}")
    return pm.Deterministic('qs_estimated', (numerator / denom) / 1000.0)

In [None]:
# ============================
# Bayesian Inference
# ============================
def run_one_formulation(formulation_key, cfg, data, predictions_writer):
    SRC_mu = cfg["SRC_mu"]
    iters = cfg["iterations"]
    metrics_rows = []
    posterior_list = []
    for i in range(iters):
        rs = np.random.randint(0, 10_000)
        train_df, test_df = train_test_split(data, test_size=0.8, random_state=rs)
        m_train = train_df['Mass'].values
        a_train = train_df['Acceleration'].values
        rho_train = train_df['Density'].values
        A_train = train_df['Area'].values
        v_train = train_df['Velocity'].values
        d_train = train_df['Diameter'].values
        y_train = train_df['Static Tip Resistance'].values
        m_test = test_df['Mass'].values
        a_test = test_df['Acceleration'].values
        rho_test = test_df['Density'].values
        A_test = test_df['Area'].values
        v_test = test_df['Velocity'].values
        d_test = test_df['Diameter'].values
        y_test = test_df['Static Tip Resistance'].values

        with pm.Model() as model:
            m_d = pm.Data("m_train", m_train)
            a_d = pm.Data("a_train", a_train)
            rho_d = pm.Data("rho_train", rho_train)
            A_d = pm.Data("A_train", A_train)
            v_d = pm.Data("v_train", v_train)
            d_d = pm.Data("d_train", d_train)
            SRC = pm.LogNormal('SRC', mu=SRC_mu, sigma=0.472)
            Cd = pm.LogNormal('Cd', mu=-0.468, sigma=0.472)
            sigma = pm.Uniform('sigma', lower=0, upper=100)
            qs_estimated = build_qs_estimated(formulation_key, m_d, a_d, rho_d, A_d, v_d, d_d, Cd, SRC)
            pm.Normal('qs_likelihood', mu=qs_estimated, sigma=sigma, observed=y_train)
            trace = pm.sample(10000, tune=2000, step=pm.Metropolis(), chains=2, cores=1, return_inferencedata=True)

        summ = az.summary(trace, var_names=['Cd', 'SRC', 'sigma'], hdi_prob=0.95)
        summ.index.name = 'param'
        posterior_list.append(summ)
        sigma_mean = trace.posterior['sigma'].mean().item()

        with model:
            pm.set_data({"m_train": m_test, "a_train": a_test, "rho_train": rho_test, "A_train": A_test, "v_train": v_test, "d_train": d_test})
            pp_test = pm.sample_posterior_predictive(trace, var_names=['qs_estimated'])
        with model:
            pm.set_data({"m_train": m_train, "a_train": a_train, "rho_train": rho_train, "A_train": A_train, "v_train": v_train, "d_train": d_train})
            pp_train = pm.sample_posterior_predictive(trace, var_names=['qs_estimated'])

        pred_train_mean = pp_train.posterior_predictive['qs_estimated'].mean(dim=("chain", "draw")).values
        pred_train_std = np.sqrt(pp_train.posterior_predictive['qs_estimated'].var(dim=("chain", "draw")).values + sigma_mean**2)
        pred_test_mean = pp_test.posterior_predictive['qs_estimated'].mean(dim=("chain", "draw")).values
        pred_test_std = np.sqrt(pp_test.posterior_predictive['qs_estimated'].var(dim=("chain", "draw")).values + sigma_mean**2)

        mape_tr, rmse_tr, mae_tr, r2_tr = calculate_metrics(y_train, pred_train_mean)
        mape_te, rmse_te, mae_te, r2_te = calculate_metrics(y_test, pred_test_mean)
        metrics_rows.append({
            'Formulation': formulation_key,
            'Iteration': i + 1,
            'R_squared_train': r2_tr,
            'MAE_train': mae_tr,
            'MAPE_train': mape_tr,
            'RMSE_train': rmse_tr,
            'R_squared_test': r2_te,
            'MAE_test': mae_te,
            'MAPE_test': mape_te,
            'RMSE_test': rmse_te
        })

        df_train = pd.DataFrame({'Actual': y_train, 'Predicted_Mean': pred_train_mean, 'Predicted_Std': pred_train_std})
        df_test = pd.DataFrame({'Actual': y_test, 'Predicted_Mean': pred_test_mean, 'Predicted_Std': pred_test_std})
        df_train.to_excel(predictions_writer, sheet_name=f'{formulation_key.upper()}_Train_Iter_{i+1}', index=False)
        df_test.to_excel(predictions_writer, sheet_name=f'{formulation_key.upper()}_Test_Iter_{i+1}', index=False)
    return metrics_rows, posterior_list

In [None]:
# ============================
# Run All Formulations
# ============================
data = pd.read_excel(DATA_PATH)
all_metrics = []
posterior_blocks = []
with pd.ExcelWriter(PREDICT_XLSX, engine='openpyxl') as pred_writer:
    for form_key, cfg in FORM_CONFIG.items():
        m_rows, post_list = run_one_formulation(form_key, cfg, data, pred_writer)
        all_metrics.extend(m_rows)
        for j, dfp in enumerate(post_list, start=1):
            dfp = dfp.copy()
            dfp['Formulation'] = form_key
            dfp['Iteration'] = j
            posterior_blocks.append(dfp)

In [None]:
# ============================
# Save Results
# ============================
metrics_df = pd.DataFrame(all_metrics)
metrics_df.to_excel(METRICS_XLSX, index=False)
if posterior_blocks:
    posterior_cat = pd.concat(posterior_blocks, axis=0)
    cols = ['Formulation', 'Iteration'] + [c for c in posterior_cat.columns if c not in ('Formulation', 'Iteration')]
    posterior_cat = posterior_cat[cols]
    with pd.ExcelWriter(POSTERIOR_XLSX, engine='openpyxl') as w:
        posterior_cat.to_excel(w, sheet_name='ALL', index=True)
        for form_key in FORM_CONFIG.keys():
            sub = posterior_cat[posterior_cat['Formulation'] == form_key]
            sub.to_excel(w, sheet_name=form_key.upper(), index=True)

print("All done. Files created:")
print(" - Metrics:", METRICS_XLSX)
print(" - Posterior summaries:", POSTERIOR_XLSX)
print(" - Predictions:", PREDICT_XLSX)