In [1]:
#EXCEL_PATH = r"D:\FILIP\DOKTORSKE STUDIJE\III GODINA\AIC M21 CASOPIS\MATLAB CODE\1.PRIPREMLJENA BAZA PODATAKA\FUNDAMENTAL PERIOD PYTHON.xlsx"

In [2]:
# Great it works!

# Can you now provide directly in the same way to jupyter-lab updated script for: 
# Script 6 — Genetic Programming Symbolic Regression (GEP-style)
# Script 5 — LASSO Polynomial Regression (sparse closed-form)
# Script 4 — Additive Spline GAM (explicit equation)
# Script 3 — Model Tree (piecewise linear equations per region)
# Script 2 — MARS (py-earth) to get piecewise-linear equations
# Script 1 — Symbolic Regression (PySR) to discover equations

In [3]:
# JUPYTER CELL — Script 2: MARS-style (degree=2 with pairwise products), pure Python
# Dataset: ['NoSt','NoSp','LoSp','OP','MWS'] -> TFP (original units)
# Greedy forward selection + backward pruning using hinge and hinge*hinge bases.
# Displays pretty math, prints metrics, and saves small artifacts.

import os, json, math, warnings
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
import time
warnings.filterwarnings("ignore")
start = time.time()
# ---------- CONFIG ----------
EXCEL_PATH = r"D:\FILIP\DOKTORSKE STUDIJE\IIIII GODINA\8.CSP - NOVA KNJIGA SA VM\MOJE POGLAVLJE\CASE STUDIES\CFST columns Dataset.xlsx"
SHEET      = 0
FEATURES = ["B","t","L","fy","fc"]
TARGET = "Nexp"

TEST_SIZE   = 0.20
RANDOM_SEED = 42

# Hinge candidate generation on TRAIN ONLY (then reused for TEST)
# Make this smaller if runtime/memory is a concern; larger gives more flexibility.
N_KNOTS_PER_FEATURE = 4            # interior knots per feature
QUANTILE_RANGE      = (0.05, 0.95) # ignore tails

# Forward / backward settings
MAX_TERMS   = 60        # max number of basis terms (excluding intercept) to add in forward step
MIN_IMPROV  = 1e-6      # stop forward when best RSS improvement below this
PENALTY_GCV = 4.0       # GCV penalty (higher => simpler after pruning)

OUTDIR = "out_mars_style_cfst"
os.makedirs(OUTDIR, exist_ok=True)

# ---------- LOAD DATA ----------
df = pd.read_excel(EXCEL_PATH, sheet_name=SHEET)
missing = [c for c in FEATURES + [TARGET] if c not in df.columns]
if missing:
    raise ValueError(f"Missing columns: {missing}\nPresent: {list(df.columns)}")

X = df[FEATURES].apply(pd.to_numeric, errors="coerce").values
y = pd.to_numeric(df[TARGET], errors="coerce").values
mask = np.isfinite(X).all(axis=1) & np.isfinite(y)
X, y = X[mask], y[mask]

X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=TEST_SIZE, random_state=RANDOM_SEED)
n_tr, p = X_tr.shape

# ---------- Utilities ----------
def metrics(y_true, y_pred):
    return dict(
        R2=r2_score(y_true, y_pred),
        MAE=mean_absolute_error(y_true, y_pred),
        RMSE=math.sqrt(((y_true - y_pred)**2).mean())
    )

def ols_fit(Z, y):
    """OLS with intercept; returns beta (incl. intercept), yhat and RSS."""
    Z1 = np.column_stack([np.ones(len(Z)), Z]) if Z.size else np.ones((len(y), 1))
    beta, *_ = np.linalg.lstsq(Z1, y, rcond=None)
    yhat = Z1 @ beta
    rss = float(((y - yhat) ** 2).sum())
    return beta, yhat, rss

def gcv(rss, n, k, penalty=2.0):
    """MARS-like GCV: denom = (1 - (C/n))^2, with C = 1 + penalty*k (1 = intercept)."""
    C = 1.0 + penalty * k
    denom = (1.0 - C / n)
    denom = max(denom, 1e-12)
    return rss / (n * denom * denom)

# ---------- TRAIN knots (for single hinges), then reuse ----------
def compute_knots_train(Xtr, n_knots=10, qrange=(0.05, 0.95)):
    knots_by_feature = []
    q_lo, q_hi = qrange
    qs = np.linspace(q_lo, q_hi, n_knots + 2)[1:-1]  # interior only
    for j in range(Xtr.shape[1]):
        x = Xtr[:, j]
        ks = np.quantile(x, qs)
        ks = np.unique(ks.round(12))
        knots_by_feature.append(ks)
    return knots_by_feature

def build_single_hinges(Xmat, feature_names, knots_by_feature):
    """
    Return:
        Z_singles: (n x M1) matrix of single hinges
        names_s:  list of 'h(<expr>)' strings
        meta_s:   list of dicts with {'feat': j, 'knot': t, 'dir': +1/-1}
    """
    Z_cols, names, meta = [], [], []
    for j, name in enumerate(feature_names):
        x = Xmat[:, j]
        for t in knots_by_feature[j]:
            Z_cols.append(np.maximum(0.0, x - t))
            names.append(f"h({name}-{t:.6g})")
            meta.append({"feat": j, "knot": float(t), "dir": +1})
            Z_cols.append(np.maximum(0.0, t - x))
            names.append(f"h({t:.6g}-{name})")
            meta.append({"feat": j, "knot": float(t), "dir": -1})
    if not Z_cols:
        return np.empty((len(Xmat), 0)), [], []
    return np.column_stack(Z_cols), names, meta

def build_pair_hinges(Xmat, names_s, meta_s, idx_pairs):
    """
    Multiply selected pairs of single hinges to form degree-2 interactions.
    idx_pairs: list of (i, j) indices into the single-hinge basis.
    Returns (Z_pairs, names_pairs).
    """
    if not idx_pairs:
        return np.empty((Xmat.shape[0], 0)), []
    Z_cols, names = [], []
    # We will regenerate the single hinges for Xmat from the names_s only when needed.
    # But to be efficient, pass the already-built single hinges if available.
    # For simplicity here, we compute them by evaluating the name string.
    # However, we already have X_tr singles prebuilt; we'll use those for train,
    # and rebuild for test using stored knots/meta.
    raise_if = False  # placeholder to note that we shouldn't be here; we won't call this version.

# Build TRAIN single hinges and their metadata
knots_train = compute_knots_train(X_tr, N_KNOTS_PER_FEATURE, QUANTILE_RANGE)
Zs_tr, names_s, meta_s = build_single_hinges(X_tr, FEATURES, knots_train)

# Build TEST single hinges using the SAME knots
def build_single_hinges_from_meta(Xmat, feature_names, meta_list):
    Z_cols, names = [], []
    for m in meta_list:
        j, t, d = m["feat"], m["knot"], m["dir"]
        x = Xmat[:, j]
        if d == +1:
            Z_cols.append(np.maximum(0.0, x - t)); names.append(f"h({feature_names[j]}-{t:.6g})")
        else:
            Z_cols.append(np.maximum(0.0, t - x)); names.append(f"h({t:.6g}-{feature_names[j]})")
    return np.column_stack(Z_cols), names

Zs_te, _ = build_single_hinges_from_meta(X_te, FEATURES, meta_s)

# Build PAIR indices across different features (degree=2)
# To keep scale reasonable, we build all cross-feature pairs:
single_count = len(names_s)
feat_of = np.array([m["feat"] for m in meta_s], dtype=int)
pair_indices = []
for i in range(single_count):
    fi = feat_of[i]
    for j in range(i+1, single_count):
        fj = feat_of[j]
        if fi != fj:  # cross-feature only (MARS degree=2)
            pair_indices.append((i, j))

# Construct TRAIN pair matrix (hinge_i * hinge_j)
def multiply_pairs(Z_single, pair_idx_list):
    if not pair_idx_list:
        return np.empty((Z_single.shape[0], 0))
    n = Z_single.shape[0]
    Zp = np.empty((n, len(pair_idx_list)), dtype=float)
    for k, (i, j) in enumerate(pair_idx_list):
        Zp[:, k] = Z_single[:, i] * Z_single[:, j]
    return Zp

Zp_tr = multiply_pairs(Zs_tr, pair_indices)

# Pair names for reporting
names_p = [f"{names_s[i]}*{names_s[j]}" for (i, j) in pair_indices]

# Build TEST pair matrix using TEST singles (same pairing structure)
Zp_te = multiply_pairs(Zs_te, pair_indices)

# Combine SINGLE + PAIR into full design
Z_tr_full = np.column_stack([Zs_tr, Zp_tr])
Z_te_full = np.column_stack([Zs_te, Zp_te])
names_full = names_s + names_p

# ---------- Forward selection ----------
selected_idx = []
current_Z = np.empty((n_tr, 0))
beta, yhat, rss = ols_fit(current_Z, y_tr)
fw_trace = [(selected_idx.copy(), beta.copy(), rss)]

for step in range(MAX_TERMS):
    best_improve = 0.0
    best_j = None
    # Try adding each unused candidate
    for j in range(Z_tr_full.shape[1]):
        if j in selected_idx:
            continue
        Z_try = np.column_stack([current_Z, Z_tr_full[:, j]])
        _, _, rss_try = ols_fit(Z_try, y_tr)
        improve = rss - rss_try
        if improve > best_improve:
            best_improve = improve
            best_j = j
    if best_j is None or best_improve < MIN_IMPROV:
        break
    selected_idx.append(best_j)
    current_Z = np.column_stack([current_Z, Z_tr_full[:, best_j]])
    beta, yhat, rss = ols_fit(current_Z, y_tr)
    fw_trace.append((selected_idx.copy(), beta.copy(), rss))

# ---------- Backward pruning (GCV) ----------
def prune_backward(Z_full, y, selected):
    snapshots = []
    sel = selected.copy()
    Z_sel = Z_full[:, sel] if sel else np.empty((len(y),0))
    beta, _, rss = ols_fit(Z_sel, y)
    k = len(sel)
    snapshots.append((sel.copy(), beta.copy(), rss, gcv(rss, len(y), k, PENALTY_GCV)))

    while len(sel) > 0:
        best_gcv = float("inf")
        best_drop = None
        best_beta = None
        best_rss = None
        for idx_pos in range(len(sel)):
            trial = sel[:idx_pos] + sel[idx_pos+1:]
            Z_trial = Z_full[:, trial] if trial else np.empty((len(y),0))
            beta_t, _, rss_t = ols_fit(Z_trial, y)
            gcv_t = gcv(rss_t, len(y), len(trial), PENALTY_GCV)
            if gcv_t < best_gcv:
                best_gcv = gcv_t
                best_drop = idx_pos
                best_beta = beta_t
                best_rss = rss_t
        sel.pop(best_drop)
        snapshots.append((sel.copy(), best_beta.copy(), best_rss, best_gcv))
    # choose snapshot with min GCV
    return min(snapshots, key=lambda tup: tup[3])

sel_fw, beta_fw, rss_fw = fw_trace[-1]
sel_final, beta_final, rss_final, gcv_final = prune_backward(Z_tr_full, y_tr, sel_fw)

# ---------- Predictions & metrics ----------
def predict_from_indices(Z_full, beta, sel_idx):
    if len(sel_idx) == 0:
        Z1 = np.ones((Z_full.shape[0], 1))
    else:
        Z1 = np.column_stack([np.ones(Z_full.shape[0]), Z_full[:, sel_idx]])
    return Z1 @ beta

yhat_tr = predict_from_indices(Z_tr_full, beta_final, sel_final)
yhat_te = predict_from_indices(Z_te_full, beta_final, sel_final)

m_train = metrics(y_tr, yhat_tr)
m_test  = metrics(y_te, yhat_te)

# ---------- Pretty equation (SymPy) ----------
from sympy import symbols, Max, sympify, latex
from IPython.display import Math, display

sym_vars = {name: symbols(name, real=True) for name in FEATURES}

def hinge_name_to_sympy(nm: str):
    inside = nm[nm.find("(")+1: nm.rfind(")")]  # e.g., "NoSt-3.2" or "3.2-NoSt"
    return Max(sympify(inside, locals=sym_vars), 0)

# Decode whether a selected term is single or pair
single_len = len(names_s)
terms_report = [{"term": "1 (intercept)", "coefficient": float(beta_final[0])}]
sym_expr = sympify(float(beta_final[0]))

for pos, j in enumerate(sel_final, start=1):
    nm = names_full[j]
    c  = float(beta_final[pos])
    if j < single_len:  # single hinge
        term_expr = hinge_name_to_sympy(nm)
    else:               # pair: "h(..)*h(..)"
        left, right = nm.split("*", 1)
        term_expr = hinge_name_to_sympy(left) * hinge_name_to_sympy(right)
    sym_expr += c * term_expr
    terms_report.append({"term": nm, "coefficient": c})

def python_equation(beta, sel_idx, names):
    parts = [f"{float(beta[0]):+.16g}"]
    for pos, j in enumerate(sel_idx, start=1):
        nm = names[j]
        if "*" in nm:
            lft, rgt = nm.split("*", 1)
            in_l = lft[lft.find("(")+1: lft.rfind(")")]
            in_r = rgt[rgt.find("(")+1: rgt.rfind(")")]
            parts.append(f"{float(beta[pos]):+.16g}*max(({in_l}), 0)*max(({in_r}), 0)")
        else:
            inside = nm[nm.find("(")+1: nm.rfind(")")]
            parts.append(f"{float(beta[pos]):+.16g}*max(({inside}), 0)")
    return "y = " + " ".join(parts)

eq_python = python_equation(beta_final, sel_final, names_full)

# ---------- OUTPUT ----------
print("=== MARS-style — CFST ===")
print(f"KNOTS per feature (train): {N_KNOTS_PER_FEATURE}, quantile_range={QUANTILE_RANGE}")
print(f"Forward selected terms: {len(sel_fw)} | Final after pruning: {len(sel_final)} | GCV={gcv_final:.6g}")
print("Train:", {k: round(v, 6) for k, v in m_train.items()})
print("Test :", {k: round(v, 6) for k, v in m_test.items()})
print("\nClosed-form (Python):\n", eq_python)

# Pretty math in notebook
display(Math(r"y = " + latex(sym_expr)))

# ---------- Save artifacts ----------
with open(os.path.join(OUTDIR, "equation_python.txt"), "w", encoding="utf-8") as f:
    f.write(eq_python + "\n")
pd.DataFrame(terms_report).to_csv(os.path.join(OUTDIR, "terms_selected.csv"), index=False)
with open(os.path.join(OUTDIR, "metrics.json"), "w", encoding="utf-8") as f:
    json.dump({
        "file": EXCEL_PATH,
        "sheet": SHEET,
        "features": FEATURES,
        "target": TARGET,
        "degree": 2,
        "n_knots_per_feature": N_KNOTS_PER_FEATURE,
        "quantile_range": QUANTILE_RANGE,
        "forward_max_terms": MAX_TERMS,
        "penalty_gcv": PENALTY_GCV,
        "forward_selected": len(sel_fw),
        "final_selected": len(sel_final),
        "train": m_train,
        "test":  m_test,
        "gcv_final": gcv_final
    }, f, indent=2)

end = time.time()
running_time = (end - start)
print('Running Time: ', running_time, ' seconds')

=== MARS-style — CFST ===
KNOTS per feature (train): 4, quantile_range=(0.05, 0.95)
Forward selected terms: 60 | Final after pruning: 40 | GCV=118917
Train: {'R2': 0.985213, 'MAE': 181.587729, 'RMSE': 276.888014}
Test : {'R2': 0.969235, 'MAE': 256.477503, 'RMSE': 417.069568}

Closed-form (Python):
 y = +831.0187793733453 +19.45046240159981*max((B-101.436), 0) +0.3493143182739675*max((B-101.436), 0)*max((fc-28.8766), 0) -1.300784719132318*max((5.7592-t), 0)*max((74.4-fc), 0) -0.3323742087371549*max((B-101.436), 0)*max((28.8766-fc), 0) -129.0975447184525*max((B-199), 0) -12.05937495901545*max((101.436-B), 0) -0.01681779069342457*max((B-101.436), 0)*max((437.918-fy), 0) -0.7208005153277808*max((B-101.436), 0)*max((fc-74.4), 0) -0.3925299234864209*max((B-199), 0)*max((36.8-fc), 0) -0.006558642879666468*max((L-1200), 0)*max((fy-288.2), 0) -0.150239219923505*max((B-149.709), 0)*max((630-L), 0) -0.06207153772974447*max((149.709-B), 0)*max((fy-437.918), 0) +0.005039676793133698*max((L-630), 0)

<IPython.core.display.Math object>

Running Time:  52.43389081954956  seconds


In [4]:
# Complexity controls:

# N_KNOTS_PER_FEATURE & QUANTILE_RANGE → candidate richness.

# MAX_TERMS & MIN_IMPROV → forward stopping.

# PENALTY_GCV → how aggressively the pruning simplifies the model.

In [5]:
# Tune simplicity vs. accuracy:

# Make it simpler: increase --penalty (e.g., 3–5) and/or reduce --max_terms, set --max_degree 1 (additive only).

# Make it more accurate: lower --penalty, allow --max_degree 2, raise --max_terms