In [46]:
# softmax_gp_composition_svgp.py
# Python >= 3.9; pip install numpy pandas scikit-learn tensorflow>=2.10 gpflow>=2.9
# This script:
#   1) loads sensory and ingredient CSVs (joined by 'sample_id'),
#   2) adds an 'Other' ingredient to preserve totals,
#   3) standardizes sensory inputs,
#   4) fits a Softmax-GP (SVGP multi-output) with a Monte-Carlo variational expectation,
#   5) predicts valid compositions (sum to 1, nonnegative),
#   6) (optional) evaluates Aitchison distance via CLR.

import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import List, Dict, Tuple

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold

import tensorflow as tf
import gpflow
from gpflow.kernels import RBF
from gpflow.inducing_variables import InducingPoints, SharedIndependentInducingVariables
from gpflow.kernels.multioutput import SeparateIndependent, SharedIndependent
gpflow.config.set_default_float(tf.float64)

In [None]:
# -----------------------------
# Utilities (simplex, CLR, metrics)
# -----------------------------
def to_simplex(rows: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    rows = np.clip(rows, eps, None)
    rows = rows / rows.sum(axis=1, keepdims=True)
    return rows

def clr(x: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    x = np.clip(x, eps, None)
    g = np.exp(np.mean(np.log(x), axis=1, keepdims=True))
    return np.log(x / g)

def aitchison_distance(y_true: np.ndarray, y_pred: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    C1 = clr(y_true, eps)
    C2 = clr(y_pred, eps)
    return np.sqrt(np.sum((C1 - C2) ** 2, axis=1))

def softmax_tf(z, axis=-1):
    zmax = tf.reduce_max(z, axis=axis, keepdims=True)
    ez = tf.exp(z - zmax)
    return ez / tf.reduce_sum(ez, axis=axis, keepdims=True)

# -----------------------------
# Data handling
# -----------------------------
@dataclass
class DataSpec:
    sensory_csv: str
    ingredients_csv: str
    id_col: str = "idx"
    add_other: bool = True
    min_presence: int = 1     # keep an ingredient if present >= this many samples
    scale_X: bool = True

def load_and_align(spec: DataSpec) -> Tuple[pd.DataFrame, pd.DataFrame]:
    Xdf = pd.read_csv(spec.sensory_csv)
    Ydf = pd.read_csv(spec.ingredients_csv)
    df = Xdf.merge(Ydf, on=spec.id_col, how="inner")
    sensory_cols = [c for c in Xdf.columns if c != spec.id_col]
    ingredient_cols = [c for c in Ydf.columns if c != spec.id_col]
    X = df[[spec.id_col] + sensory_cols].copy()
    Y = df[[spec.id_col] + ingredient_cols].copy()
    return X, Y

def build_composition(Y: pd.DataFrame, spec: DataSpec) -> Tuple[np.ndarray, List[str]]:
    id_col = spec.id_col
    cols = [c for c in Y.columns if c != id_col]
    Yvals = Y[cols].to_numpy(dtype=float)

    # If given in 0..100, normalize to 0..1 by row sum
    row_sums = Yvals.sum(axis=1, keepdims=True)
    row_sums = np.where(row_sums == 0.0, 1.0, row_sums)
    Yvals = Yvals / row_sums

    # Presence filter
    present_counts = (Yvals > 0).sum(axis=0)
    keep_mask = present_counts >= spec.min_presence
    kept_cols_raw = [c for c, m in zip(cols, keep_mask) if m]
    Y_keep = Yvals[:, keep_mask]

    if spec.add_other:
        leftover = 1.0 - Y_keep.sum(axis=1)
        other = np.clip(leftover, 0.0, 1.0)
        Y_aug = np.concatenate([Y_keep, other[:, None]], axis=1)
        kept_cols = kept_cols_raw + ["Other"]
    else:
        # Renormalize kept subset
        s = Y_keep.sum(axis=1, keepdims=True)
        s = np.where(s == 0.0, 1.0, s)
        Y_aug = Y_keep / s
        kept_cols = kept_cols_raw

    Y_aug = to_simplex(Y_aug)
    return Y_aug, kept_cols

# -----------------------------
# Softmax-GP custom "likelihood"
# -----------------------------
class SoftmaxCompositional(gpflow.likelihoods.Likelihood):
    """
    Softmax-based compositional likelihood:
      log p(y | f) = tau * sum_k y_k * log softmax(f)_k
    Vector-valued: latent_dim = observation_dim = input_dim = D.
    """

    def __init__(self, D: int, tau: float = 100.0, eps: float = 1e-8):
        super().__init__(input_dim=D, latent_dim=D, observation_dim=D)
        self.D = int(D)
        self.tau = tf.convert_to_tensor(tau, dtype=gpflow.default_float())
        self.eps = tf.convert_to_tensor(eps, dtype=gpflow.default_float())

    # -------- helpers --------
    @staticmethod
    def _softmax(z, axis=-1):
        zmax = tf.reduce_max(z, axis=axis, keepdims=True)
        ez = tf.exp(z - zmax)
        return ez / tf.reduce_sum(ez, axis=axis, keepdims=True)

    def _loglik_point(self, F, Y):
        P = tf.clip_by_value(self._softmax(F), self.eps, 1.0)
        return self.tau * tf.reduce_sum(Y * tf.math.log(P), axis=-1)  # (...,)

    def _mc_sample_f(self, Fmu, Fvar, num_mc: int):
        # Build epsilon from Fvar to guarantee broadcast compatibility
        eps_shape = tf.concat([[num_mc], tf.shape(Fvar)], axis=0)  # (S, N, D)
        eps = tf.random.normal(shape=eps_shape, dtype=Fvar.dtype)
        Fstd = tf.sqrt(tf.maximum(Fvar, 0.0))
        return Fmu[None, ...] + Fstd[None, ...] * eps  # (S, N, D)
    def _ensure_nd_and_cast(self, X, Fmu, Fvar, Y):
        # Cast dtypes
        Fmu = tf.cast(Fmu, gpflow.default_float())
        Fvar = tf.cast(Fvar, gpflow.default_float())
        Y   = tf.cast(Y,   gpflow.default_float())

        # Ranks must be 2: (N, D)
        tf.debugging.assert_rank(Fmu, 2, message="Fmu must be rank-2 [N,D]")
        tf.debugging.assert_rank(Fvar, 2, message="Fvar must be rank-2 [N,D]")
        tf.debugging.assert_rank(Y,   2, message="Y must be rank-2 [N,D]")

        # Match N and D
        n_fmu = tf.shape(Fmu)[0]; d_fmu = tf.shape(Fmu)[1]
        n_fvar= tf.shape(Fvar)[0]; d_fvar= tf.shape(Fvar)[1]
        n_y   = tf.shape(Y)[0];    d_y   = tf.shape(Y)[1]

        tf.debugging.assert_equal(n_fmu, n_fvar, message="N mismatch: Fmu vs Fvar")
        tf.debugging.assert_equal(d_fmu, d_fvar, message="D mismatch: Fmu vs Fvar")
        tf.debugging.assert_equal(n_fmu, n_y,    message="N mismatch: Fmu/Fvar vs Y")
        tf.debugging.assert_equal(d_fmu, d_y,    message="D mismatch: Fmu/Fvar vs Y")

        # One-time shape print (remove after debugging)
        tf.print("[LIK] shapes -> Fmu", tf.shape(Fmu), "Fvar", tf.shape(Fvar), "Y", tf.shape(Y), summarize=-1)
        return Fmu, Fvar, Y

    # -------- required by GPflow Likelihood (note the X arg) --------
    def _log_prob(self, X, F, Y):
        # X is unused, but required by the signature
        return self._loglik_point(F, Y)

    def _variational_expectations(self, X, Fmu, Fvar, Y):
        F_sample = self._mc_sample_f(Fmu, Fvar, num_mc=8)
        logp = self._loglik_point(F_sample, Y[None, ...])  # (S, N)
        return tf.reduce_mean(logp, axis=0)

    def _predict_mean_and_var(self, X, Fmu, Fvar):
        F_sample = self._mc_sample_f(Fmu, Fvar, num_mc=64)
        P = self._softmax(F_sample, axis=-1)              # (S, N, D)
        mean = tf.reduce_mean(P, axis=0)                  # (N, D)
        var  = tf.math.reduce_variance(P, axis=0)         # (N, D)
        mean = tf.clip_by_value(mean, self.eps, 1.0)
        mean = mean / tf.reduce_sum(mean, axis=-1, keepdims=True)
        return mean, var

    def _predict_log_density(self, X, Fmu, Fvar, Y):
        F_sample = self._mc_sample_f(Fmu, Fvar, num_mc=32)
        logp = self._loglik_point(F_sample, Y[None, ...])  # (S, N)
        return tf.reduce_mean(logp, axis=0)

    def predictive_mean_from_moments(self, Fmu, Fvar, mc: int = 64):
        F_sample = self._mc_sample_f(Fmu, Fvar, num_mc=mc)
        P = self._softmax(F_sample, axis=-1)
        mean = tf.reduce_mean(P, axis=0)
        mean = tf.clip_by_value(mean, self.eps, 1.0)
        mean = mean / tf.reduce_sum(mean, axis=-1, keepdims=True)
        return mean


# -----------------------------
# Build & train the SVGP
# -----------------------------
@dataclass
class ModelConfig:
    num_inducing: int = 20
    ard: bool = True
    tau: float = 100.0
    mc_pred: int = 64
    max_iters: int = 5000
    lr: float = 0.01
    seed: int = 0

def build_svgp_softmax(X: np.ndarray, Y: np.ndarray, cfg: ModelConfig):
    tf.random.set_seed(cfg.seed)
    N, P = X.shape
    D = Y.shape[1]

    M = min(cfg.num_inducing, N)
    perm = np.random.RandomState(cfg.seed).permutation(N)[:M]
    Z = X[perm, :].copy()

    base_kern = RBF(lengthscales=np.ones(P), variance=1.0)  # ARD via vector lengthscales
    kern = SharedIndependent(base_kern, output_dim=D)

    lik = SoftmaxCompositional(D=D, tau=cfg.tau)  # <-- pass D here

    inducing = SharedIndependentInducingVariables(InducingPoints(Z.astype(np.float64)))
    model = gpflow.models.SVGP(
        kernel=kern,
        likelihood=lik,
        inducing_variable=inducing,
        num_latent_gps=D,
        q_diag=True,
        whiten=True,
    )

    # small init for q_var improves stability
    # Zero mean
    model.q_mu.assign(tf.zeros_like(model.q_mu))

    # q_sqrt shape = (L, M) when q_diag=True
    model.q_sqrt.assign(1e-3 * tf.ones_like(model.q_sqrt))

    return model, lik, Z



def train_svgp(
    model,
    X,
    Y,
    cfg: ModelConfig,
    patience: int = 50,       # how many iterations to wait for improvement
    min_delta: float = 1e-3,  # minimal ELBO gain to count as improvement
    verbose: bool = True
):
    """
    Train SVGP with Adam, monitoring ELBO and stopping early if it stalls.

    Args:
        model: gpflow SVGP
        X, Y: training arrays
        cfg: ModelConfig
        patience: stop if no ELBO improvement for this many steps
        min_delta: threshold for "improvement"
        verbose: whether to print progress
    """
    Xtf = tf.convert_to_tensor(X, dtype=tf.float64)
    Ytf = tf.convert_to_tensor(Y, dtype=tf.float64)

    opt = tf.optimizers.Adam(learning_rate=cfg.lr)

    @tf.function(autograph=False)
    def step():
        with tf.GradientTape() as tape:
            elbo = model.elbo((Xtf, Ytf))
            loss = -elbo
        grads = tape.gradient(loss, model.trainable_variables)
        opt.apply_gradients(zip(grads, model.trainable_variables))
        return elbo

    best_elbo = -np.inf
    wait = 0

    for it in range(cfg.max_iters):
        elbo_val = step().numpy()

        # Check improvement
        if elbo_val > best_elbo + min_delta:
            best_elbo = elbo_val
            wait = 0
        else:
            wait += 1

        # Logging
        if verbose and (it + 1) % 50 == 0:
            print(f"[train] iter {it+1:5d}, ELBO={elbo_val:.4f}, best={best_elbo:.4f}, wait={wait}")

        # Early stop
        if wait >= patience:
            if verbose:
                print(f"[train] Early stopping at iter {it+1}, best ELBO={best_elbo:.4f}")
            break

    return best_elbo

# -----------------------------
# Predictions
# -----------------------------
def predict_composition(model, Xnew: np.ndarray, mc_samples: int = 64) -> np.ndarray:
    """
    Returns predictive mean composition (N*, D) by MC over q(f).
    """
    Xtf = tf.convert_to_tensor(Xnew, dtype=tf.float64)
    Fmu, Fvar = model.predict_f(Xtf, full_cov=False, full_output_cov=False)
    lik: SoftmaxCompositional = model.likelihood  # type: ignore
    Pmean = lik.predictive_mean_from_moments(Fmu, Fvar, mc=mc_samples).numpy()
    # Ensure valid simplex numerically
    Pmean = to_simplex(Pmean)
    return Pmean

# -----------------------------
# End-to-end pipeline
# -----------------------------
def run_pipeline(
    sensory_csv: str,
    ingredients_csv: str,
    id_col: str = "sample_id",
    min_presence: int = 1,
    add_other: bool = True,
    num_inducing: int = 20,
    tau: float = 100.0,
    max_iters: int = 5000,
    do_loocv: bool = True,
    seed: int = 0
) -> Dict:
    spec = DataSpec(
        sensory_csv=sensory_csv,
        ingredients_csv=ingredients_csv,
        id_col=id_col,
        add_other=add_other,
        min_presence=min_presence,
        scale_X=True
    )

    # Load
    Xdf, Ydf = load_and_align(spec)
    meds = Xdf.iloc[:,1:].median(axis=0)
    Xdf = Xdf.fillna(meds)
    Ydf = Ydf.fillna(0)
    X_cols = [c for c in Xdf.columns if c != id_col]
    X_raw = Xdf[X_cols].to_numpy(dtype=float)
    Y_mat, ing_names = build_composition(Ydf, spec)  # (N, D)

    # Standardize X
    xscaler = StandardScaler().fit(X_raw)
    X = xscaler.transform(X_raw)

    # Build + train model
    cfg = ModelConfig(
        num_inducing=num_inducing, tau=tau, max_iters=max_iters, seed=seed, lr=0.01, ard=True
    )
    model, lik, Z = build_svgp_softmax(X, Y_mat, cfg)

    train_svgp(model, X, Y_mat, cfg)

    artifacts = {
        "xscaler": xscaler,
        "x_imputer": meds,          # <-- store the training medians
        "model": model,
        "ingredient_names": ing_names,
        "X_cols": X_cols,
        "id_col": id_col,
        "config": cfg.__dict__,
    }

    n = X.shape[0]
    run_cv = bool(do_loocv)
    n_splits = None
    scheme = None

    if do_loocv is True:
        n_splits = n
        scheme = "LOOCV"
    elif isinstance(do_loocv, int) and do_loocv >= 2:
        n_splits = min(do_loocv, n)  # cap at N
        scheme = f"{n_splits}-Fold CV"

    if run_cv and n_splits is not None and n_splits >= 2:
        # We will CV on *raw* X (before full-data scaling) to avoid leakage.
        # Recreate raw X and fixed Y from the same columns used to train the full model.
        X_raw_full = Xdf[X_cols].to_numpy(dtype=float)  # raw (post-imputation only for final model)
        Y_full = Y_mat                                  # (N, D) fixed composition columns

        kf = KFold(n_splits=n_splits, shuffle=True, random_state=seed)
        ad_all, l2_all = [], []

        for fold_idx, (tr_idx, te_idx) in enumerate(kf.split(X_raw_full), 1):
            # --- Train split: imputer + scaler fit only on TRAIN ---
            Xtr_raw = X_raw_full[tr_idx]
            Xte_raw = X_raw_full[te_idx]

            # Impute with training medians (column-wise)
            meds_tr = np.nanmedian(Xtr_raw, axis=0)
            # If a column is entirely NaN, replace nanmedian with 0
            meds_tr = np.where(np.isfinite(meds_tr), meds_tr, 0.0)
            Xtr_imp = np.where(np.isnan(Xtr_raw), meds_tr[None, :], Xtr_raw)
            Xte_imp = np.where(np.isnan(Xte_raw), meds_tr[None, :], Xte_raw)

            # Scale with training scaler
            scaler_tr = StandardScaler().fit(Xtr_imp)
            # Guard zero-variance columns
            if hasattr(scaler_tr, "scale_"):
                zero_scale = (scaler_tr.scale_ == 0)
                if np.any(zero_scale):
                    scaler_tr.scale_[zero_scale] = 1.0
                    if hasattr(scaler_tr, "var_"):
                        scaler_tr.var_[zero_scale] = 1.0

            Xtr = scaler_tr.transform(Xtr_imp)
            Xte = scaler_tr.transform(Xte_imp)

            # Targets
            Ytr, Yte = Y_full[tr_idx], Y_full[te_idx]

            # Build & train fold model
            cfg_cv = ModelConfig(
                num_inducing=min(cfg.num_inducing, Xtr.shape[0]),
                tau=cfg.tau,
                max_iters=max(1, int(max_iters / 2)),   # faster per fold
                seed=seed,
                lr=cfg.lr,
                ard=True
            )
            m_cv, _, _ = build_svgp_softmax(Xtr, Ytr, cfg_cv)
            train_svgp(m_cv, Xtr, Ytr, cfg_cv)

            # Predict on test split
            Yhat = predict_composition(m_cv, Xte, mc_samples=cfg.mc_pred)

            # Metrics
            ad = aitchison_distance(Yte, Yhat)
            l2 = np.sqrt(np.sum((Yte - Yhat) ** 2, axis=1))
            ad_all.extend(list(ad))
            l2_all.extend(list(l2))

        artifacts["cv"] = {
            "scheme": scheme,
            "folds": int(n_splits),
            "aitchison_mean": float(np.mean(ad_all)),
            "aitchison_std":  float(np.std(ad_all)),
            "l2_mean":        float(np.mean(l2_all)),
            "l2_std":         float(np.std(l2_all)),
            "n_samples":      int(n),
        }

    return artifacts

def predict_from_artifacts(artifacts: Dict, X_new: pd.DataFrame) -> pd.DataFrame:
    X_cols = artifacts["X_cols"]
    xscaler = artifacts["xscaler"]
    x_imputer = artifacts["x_imputer"]      # <-- use the stored medians
    model = artifacts["model"]
    ing_names = artifacts["ingredient_names"]

    # Make a copy, ensure order, impute
    Xarr_df = X_new[X_cols].astype(float).copy()
    Xarr_df = Xarr_df.fillna(x_imputer)

    # (Optional) if any constant columns at train time, skip second check here
    Xs = xscaler.transform(Xarr_df.values)

    # Simple finite check (see Patch C)
    assert np.isfinite(Xs).all(), "Xs has non-finite values after impute+scale."

    Yhat = predict_composition(model, Xs, mc_samples=artifacts["config"].get("mc_pred", 64))
    Yhat_pct = 100 * Yhat
    return pd.DataFrame(Yhat_pct, columns=ing_names, index=X_new.index)

In [60]:
# -----------------------------
# Example usage
# -----------------------------

SENSORY_CSV = "../../data/recipes/data_sens.csv"      # columns: sample_id, s1..s8
ING_CSV     = "../../data/recipes/data_recipe.csv"  # columns: sample_id, many ingredients (0..100 or 0..1)

artifacts = run_pipeline(
    sensory_csv=SENSORY_CSV,
    ingredients_csv=ING_CSV,
    id_col="idx",
    min_presence=1,     # keep all ingredients as separate cols (rare ones flow into 'Other')
    add_other=True,
    num_inducing=20,
    tau=150.0,          # try 50..300; larger -> sharper fits to observed compositions
    max_iters=10,
    do_loocv=True,
    seed=0
)

if "cv" in artifacts:
    print("LOOCV Aitchison mean±std:",
            artifacts["cv"]["aitchison_mean"], "±", artifacts["cv"]["aitchison_std"])
    print("LOOCV L2 mean±std:",
            artifacts["cv"]["l2_mean"], "±", artifacts["cv"]["l2_std"])

# Demo predictions on the same X (replace with your new sensory rows)
Xdf, _ = load_and_align(DataSpec(SENSORY_CSV, ING_CSV))
preds = predict_from_artifacts(artifacts, Xdf.drop(columns=["idx"]))
print(preds.head())


UnboundLocalError: cannot access local variable 'StandardScaler' where it is not associated with a value

## debugging

In [16]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler

def load_and_align_simple(
    sensory_csv: str,
    ingredients_csv: str,
    id_col: str = "sample_id",
    add_other: bool = True,
    min_presence: int = 1,   # keep columns that appear in >= min_presence samples
):
    """
    Returns:
      X_df: imputed+z-scored sensory DataFrame [id_col + features]
      Y_df: row-normalized composition DataFrame [id_col + ingredients_kept (+ Other)]
      x_imputer: pd.Series of medians (for reuse at predict time)
      x_scaler: fitted StandardScaler (for reuse at predict time)
    """
    # --- Load & join ---
    Xraw = pd.read_csv(sensory_csv)
    Yraw = pd.read_csv(ingredients_csv)
    df = Xraw.merge(Yraw, on=id_col, how="inner")

    # --- Identify cols ---
    sensory_cols = [c for c in Xraw.columns if c != id_col]
    ing_cols     = [c for c in Yraw.columns if c != id_col]

    # --- X: impute (median) + z-score ---
    X_sens = df[sensory_cols].astype(float)
    x_imputer = X_sens.median(axis=0)                # robust to outliers / NaNs
    X_imputed = X_sens.fillna(x_imputer)

    x_scaler = StandardScaler()
    X_scaled = x_scaler.fit_transform(X_imputed.values)

    X_df = pd.DataFrame(X_scaled, columns=sensory_cols, index=df.index)
    X_df.insert(0, id_col, df[id_col].values)

    # --- Y: fillna->0, (optional) frequency filter, add 'Other', row-normalize to 1 ---
    Y_mat = df[ing_cols].astype(float).fillna(0.0).to_numpy()

    # frequency filter (keep ingredients appearing in >= min_presence samples)
    present_counts = (Y_mat > 0).sum(axis=0)
    keep_mask = present_counts >= int(min_presence)
    kept_cols = [c for c, keep in zip(ing_cols, keep_mask) if keep]
    Y_keep = Y_mat[:, keep_mask]

    if add_other:
        leftover = 1.0 - Y_keep.sum(axis=1, keepdims=True)
        other = np.clip(leftover, 0.0, None)  # anything not explained by kept ingredients
        Y_aug = np.concatenate([Y_keep, other], axis=1)
        out_cols = kept_cols + ["Other"]
    else:
        Y_aug = Y_keep
        out_cols = kept_cols

    # row normalize to the simplex
    row_sums = Y_aug.sum(axis=1, keepdims=True)
    row_sums[row_sums == 0.0] = 1.0
    Y_comp = Y_aug / row_sums

    Y_df = pd.DataFrame(Y_comp, columns=out_cols, index=df.index)
    Y_df.insert(0, id_col, df[id_col].values)

    # --- Sanity prints (once) ---
    N, P = X_df.shape[0], len(sensory_cols)
    D = len(out_cols)
    print(f"[load] N={N}, P={P} sensory; D={D} ingredients (incl. Other={add_other})")
    print("  • X: any NaN? ->", pd.isna(X_df[sensory_cols]).any().any())
    print("  • Y: any NaN? ->", pd.isna(Y_df[out_cols]).any().any())
    print("  • Y rows (first 3) sums:", Y_df[out_cols].sum(axis=1).head(3).round(6).to_list())

    return X_df, Y_df, x_imputer, x_scaler


In [22]:
SENSORY_CSV = "../../data/recipes/data_sens.csv"      # has: idx, s1..s8
ING_CSV     = "../../data/recipes/data_recipe.csv"    # has: idx, many ingredients

X_df, Y_df, x_imputer, x_scaler = load_and_align_simple(
    sensory_csv=SENSORY_CSV,
    ingredients_csv=ING_CSV,
    id_col="idx",
    add_other=False,
    min_presence=1,   # keep everything; rare stuff goes to Other implicitly
)

# Check a few shapes
id_col = "idx"
sens_cols = [c for c in X_df.columns if c != id_col]
ing_cols  = [c for c in Y_df.columns if c != id_col]
print("X_df shape:", X_df.shape)
print("Y_df shape:", Y_df.shape)
assert np.allclose(Y_df[ing_cols].sum(axis=1).values, 1.0, atol=1e-8)


[load] N=93, P=8 sensory; D=75 ingredients (incl. Other=False)
  • X: any NaN? -> False
  • Y: any NaN? -> False
  • Y rows (first 3) sums: [1.0, 1.0, 1.0]
X_df shape: (93, 9)
Y_df shape: (93, 76)


In [24]:
import numpy as np

def helmert_basis(d: int) -> np.ndarray:
    """
    Orthonormal Helmert sub-matrix (d x (d-1)).
    """
    H = np.zeros((d, d-1))
    for j in range(1, d):
        e = np.ones(j) / j
        v = np.concatenate([e, [-1.0], np.zeros(d - j - 1)])
        H[:, j-1] = v
    # scale columns to get orthonormal basis
    for j in range(d - 1):
        H[:, j] *= np.sqrt((j + 1) / (j + 2))
    return H

def to_simplex(A: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    A = np.clip(A, eps, None)
    return A / A.sum(axis=1, keepdims=True)

def clr(X: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    X = np.clip(X, eps, None)
    g = np.exp(np.mean(np.log(X), axis=1, keepdims=True))
    return np.log(X / g)

def clr_inv(C: np.ndarray) -> np.ndarray:
    Y = np.exp(C)
    return Y / Y.sum(axis=1, keepdims=True)

def ilr(X: np.ndarray, H: np.ndarray) -> np.ndarray:
    return clr(X) @ H

def ilr_inv(Z: np.ndarray, H: np.ndarray) -> np.ndarray:
    return clr_inv(Z @ H.T)

# ---- Round-trip test with your Y ----
id_col = "idx"
ing_cols = [c for c in Y_df.columns if c != id_col]
Y = Y_df[ing_cols].to_numpy(dtype=float)
D = Y.shape[1]
H = helmert_basis(D)

Z = ilr(Y, H)
Y_back = ilr_inv(Z, H)

print("ILR Z shape:", Z.shape)     # (N, D-1)
print("Round-trip max abs err:", np.max(np.abs(Y - Y_back)))
print("Round-trip row sums (first 3):", Y_back.sum(axis=1)[:3])


ILR Z shape: (93, 74)
Round-trip max abs err: 7.399902912652578e-11
Round-trip row sums (first 3): [1. 1. 1.]
