In [None]:
import sys, os; sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__) if '__file__' in globals() else os.getcwd(), '..')))
#import os; os.chdir(os.path.dirname(os.getcwd()))
from utils.model_loader import get_model_fits
import numpy as np
import pandas as pd
import re
from sklearn.metrics import mean_squared_error
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
data_dir = f"datasets/mutual_information"
results_dir_tanh = "results/mutual_information"
model_names_tanh = ["Gaussian tanh", "Regularized Horseshoe tanh", "Dirichlet Horseshoe tanh", "Dirichlet Student T tanh"]


#full_config_path = "correlated_N400_p6_sigma_0.5"

tanh_fit_sigma_00 = get_model_fits(
    config="mutual_information_N80_p6_sigma_0.0",
    results_dir=results_dir_tanh,
    models=model_names_tanh,
    include_prior=False,
)

tanh_fit_sigma_05 = get_model_fits(
    config="mutual_information_N80_p6_sigma_0.5",
    results_dir=results_dir_tanh,
    models=model_names_tanh,
    include_prior=False,
)

tanh_fit_sigma_1 = get_model_fits(
    config="mutual_information_N80_p6_sigma_1.0",
    results_dir=results_dir_tanh,
    models=model_names_tanh,
    include_prior=False,
)

tanh_fit_sigma_2 = get_model_fits(
    config="mutual_information_N80_p6_sigma_2.0",
    results_dir=results_dir_tanh,
    models=model_names_tanh,
    include_prior=False,
)

In [3]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.neighbors import NearestNeighbors
from scipy.special import digamma
import matplotlib.pyplot as plt

# ---------------------------
# 1) Data generator with σ
# ---------------------------
def generate_correlated_data_sigma(
    n, p=6, random_state=42, test_size=0.2,
    rho_strong=0.92, rho_weak=0.5, sigma=0.5,
    standardize_X=True, standardize_y=False
):
    """
    Nonlinear Y = g(X) + N(0, sigma^2).
    Keep standardize_y=False so MI varies with sigma.
    """
    if p != 6:
        raise ValueError("This generator is designed for p=6.")
    rng = np.random.default_rng(random_state)

    # latent factors -> correlated features
    h1 = rng.normal(size=n); h2 = rng.normal(size=n)
    def make_block(h, m, rho):
        eps = rng.normal(size=(n, m))
        return rho * h[:, None] + np.sqrt(max(1e-12, 1 - rho**2)) * eps

    X_strong = make_block(h1, 4, rho_strong)  # X1..X4
    X_weak   = make_block(h2, 2, rho_weak)    # X5..X6
    X = np.concatenate([X_strong, X_weak], axis=1)

    if standardize_X:
        X = (X - X.mean(axis=0)) / (X.std(axis=0) + 1e-12)

    X1, X2, X3, X4, X5, X6 = [X[:, j] for j in range(6)]
    strong_lin = 0.6*X1 + 0.4*X2 - 0.2*X3 + 0.1*X4
    strong_int = 1.2*(X1*X2) + 0.8*(X3*X4) - 0.6*(X1*X4)
    weak_part  = 0.15*(0.7*X5 + 0.3*X6) + 0.12*(X5*X6)
    g = np.tanh(strong_lin + strong_int) + 0.3*np.sin(0.5*X2 - 0.25*X3) + weak_part

    y = g + rng.normal(scale=sigma, size=n)  # ADD noise with std = sigma

    # do NOT standardize y; we want MI to depend on sigma
    return train_test_split(X, y, test_size=test_size, random_state=random_state)

# ------------------------------------
# 2) KSG (k-NN) MI estimator for X,Y
# ------------------------------------
def ksmi_xy(X, y, k=5):
    """
    Kraskov–Stögbauer–Grassberger MI estimator (type I).
    X: (n, d), y: (n,)
    Returns MI in nats.
    """
    X = np.asarray(X, float)
    y = np.asarray(y, float).reshape(-1, 1)
    n = X.shape[0]

    # joint space with Chebyshev (L∞) norm
    Z = np.hstack([X, y])
    nn_z = NearestNeighbors(n_neighbors=k+1, metric="chebyshev").fit(Z)
    dists, _ = nn_z.kneighbors(Z)
    eps = dists[:, k] - 1e-12  # distance to k-th neighbor (exclude self)

    # counts in marginal spaces within same eps (Chebyshev balls)
    nx = NearestNeighbors(metric="chebyshev").fit(X)\
         .radius_neighbors_graph(X, eps, mode="distance").sum(axis=1) - 1
    ny = NearestNeighbors(metric="chebyshev").fit(y)\
         .radius_neighbors_graph(y, eps, mode="distance").sum(axis=1) - 1

    mi = digamma(k) + digamma(n) - np.mean(digamma(nx + 1) + digamma(ny + 1))
    return float(mi)

# ------------------------------------------
# 3) Sweep over σ and compute KSG MI(X;Y)
# ------------------------------------------
def sweep_sigma_ksg(sigmas, n=1500, p=6, k=5, seed=7):
    """
    Returns DataFrame with columns: sigma, mi.
    """
    rows = []
    for sigma in sigmas:
        X_tr, X_te, y_tr, y_te = generate_correlated_data_sigma(
            n=n, p=p, random_state=seed, sigma=sigma,
            standardize_X=True, standardize_y=False
        )
        mi = ksmi_xy(X_tr, y_tr, k=k)
        rows.append({"sigma": float(sigma), "mi": mi})
    return pd.DataFrame(rows)

# -----------------------
# 4) Example + plotting
# -----------------------

In [None]:
sigmas = np.linspace(0.0, 2.0, 15)   # sweep noise up
df = sweep_sigma_ksg(sigmas, n=2000, k=5, seed=11)
print(df)

# Plot like your sketch: x = MI(X;Y), y = performance proxy (1/σ)
fig, ax = plt.subplots(figsize=(7, 6))
ax.plot(df["sigma"], df["mi"])
ax.set_xlabel("Estimated I(X;Y)")
ax.set_ylabel("σ")
ax.set_title("As σ increases, I(X;Y) decreases (KSG)")
plt.show()

In [4]:
import numpy as np
import pandas as pd
from sklearn.neighbors import NearestNeighbors
from sklearn.neighbors import KDTree
from scipy.special import digamma

# ---------------------------
# KSG MI estimator (multivariate X, multivariate Y)
# ---------------------------

def ksmi_xy_multi(X, Y, k=5, jitter=0.0, random_state=0):
    """
    KSG MI estimator (type I) for multivariate X and Y.
    Uses KDTree with Minkowski p=∞ (Chebyshev).
    X: (n, dx)
    Y: (n, dy)
    Returns MI in nats.
    """
    X = np.asarray(X, float)
    Y = np.asarray(Y, float)
    assert X.shape[0] == Y.shape[0]
    n = X.shape[0]

    rng = np.random.default_rng(random_state)
    if jitter and jitter > 0:
        X = X + jitter * rng.normal(size=X.shape)
        Y = Y + jitter * rng.normal(size=Y.shape)

    # Joint space distances to k-th neighbor
    Z = np.hstack([X, Y])
    tree_Z = KDTree(Z, metric='minkowski', p=np.inf)
    dists, _ = tree_Z.query(Z, k=k+1)         # includes self at position 0
    eps = dists[:, k] - 1e-12                 # per-sample radius

    # Counts in marginals within same eps (Chebyshev balls)
    tree_X = KDTree(X, metric='minkowski', p=np.inf)
    tree_Y = KDTree(Y, metric='minkowski', p=np.inf)

    # query_radius supports a vector of radii
    nx = tree_X.query_radius(X, r=eps, count_only=True) - 1
    ny = tree_Y.query_radius(Y, r=eps, count_only=True) - 1

    # KSG type-I estimate
    mi = digamma(k) + digamma(n) - np.mean(digamma(nx + 1) + digamma(ny + 1))
    return float(mi)

# ---------------------------
# Helpers
# ---------------------------
def _choose_sample_indices(S, K):
    K = min(K, S)
    return np.linspace(0, S-1, K, dtype=int)

def _standardize_cols(A):
    A = np.asarray(A, float)
    return (A - A.mean(axis=0)) / (A.std(axis=0) + 1e-12)

# ---------------------------
# Compute T = tanh(X W1 + b1) for selected posterior draws
# ---------------------------
def hidden_T_from_posterior(posterior, X, K=50, sample_indices=None, standardize_X=True, standardize_T=True, seed=0):
    """
    posterior: CmdStanPy posterior object (the one inside tanh_fit[name]['posterior'])
    X: (n, p) design matrix
    Returns: (T_list, indices)
      - T_list: list of arrays [ (n, H) per draw ]
      - indices: the draw indices used
    """
    X = np.asarray(X, float)
    if standardize_X:
        X_use = _standardize_cols(X)
    else:
        X_use = X

    W1_all = posterior.stan_variable("W_1")           # (S, p, H)
    b1_all = posterior.stan_variable("hidden_bias")   # (S, 1, H)
    S, p, H = W1_all.shape
    assert b1_all.shape == (S, 1, H)

    if sample_indices is None:
        rng = np.random.default_rng(seed)
        # stratified picks across S (stable & reproducible)
        sample_indices = _choose_sample_indices(S, K)

    T_list = []
    for s in sample_indices:
        W1 = W1_all[s]            # (p, H)
        b1 = b1_all[s].reshape(1, H)  # (1, H)
        Z1 = X_use @ W1 + b1      # (n, H)
        T = np.tanh(Z1)           # tanh activation
        if standardize_T:
            T = _standardize_cols(T)
        T_list.append(T)

    return T_list, np.asarray(sample_indices, int)

# ---------------------------
# Main wrapper: I(X;T) across draws
# ---------------------------
def estimate_ix_t_ksg(
    tanh_fit, model_name, X,
    K=50, sample_indices=None, k=5,
    standardize_X=True, standardize_T=True, seed=0,
    jitter=1e-8  # small jitter helps when mappings are near-deterministic
):
    posterior = tanh_fit[model_name]["posterior"]

    # Build hidden reps for chosen draws
    T_list, indices = hidden_T_from_posterior(
        posterior, X,
        K=K, sample_indices=sample_indices,
        standardize_X=standardize_X, standardize_T=standardize_T, seed=seed
    )

    X_std = _standardize_cols(X) if standardize_X else np.asarray(X, float)

    rows = []
    for draw_idx, T in zip(indices, T_list):
        mi_xt = ksmi_xy_multi(X_std, T, k=k, jitter=jitter, random_state=seed)
        rows.append({"model": model_name, "draw": int(draw_idx), "IXT": float(mi_xt)})

    return pd.DataFrame(rows)

def rmse_from_output_test(posterior, y_test, sample_indices=None):
    """
    posterior: CmdStanPosterior
    y_test: (n_test,) or (n_test, 1)
    
    Returns:
        rmse: (num_draws,) array of RMSE values for each draw.
        sample_indices: indices used (in case of subsetting)
    """
    y_test = np.asarray(y_test).reshape(-1, 1)  # ensure column
    y_pred = posterior.stan_variable("output_test")  # (S, n_test, 1)
    S = y_pred.shape[0]

    # choose which draws to use
    if sample_indices is None:
        sample_indices = np.arange(S)
    else:
        sample_indices = np.asarray(sample_indices)

    # slice predictions for chosen draws
    y_pred_sel = y_pred[sample_indices]         # (K, n_test, 1)

    # compute RMSE draw-wise
    diff = y_pred_sel - y_test[None, :, :]      # broadcast
    rmse = np.sqrt(np.mean(diff**2, axis=(1, 2)))

    return rmse, sample_indices



In [5]:
def build_df_for_sigma(tanh_fit_sigma, sigma_value, K=100, k=5, seed=123):
    """
    tanh_fit_sigma: dict like {"Gaussian tanh": {...}, "Regularized Horseshoe tanh": {...}}
    sigma_value: float noise level used when generating data.
    Returns a tidy dataframe with columns: model, IXT, RMSE, draw, sigma.
    """
    # 1) Generate data
    X_train, X_test, y_train, y_test = generate_correlated_data_sigma(
        n=100, p=6, random_state=42, sigma=sigma_value
    )
    
    dfs = []  # collect one df per model

    for model_name, model_dict in tanh_fit_sigma.items():

        # 2) I(X,T)
        df_ixt = estimate_ix_t_ksg(
            tanh_fit_sigma,
            model_name=model_name,
            X=X_train,
            K=K,
            k=k,
            standardize_X=True,
            standardize_T=True,
            seed=seed
        )

        # 3) RMSE
        rmse, draw_all = rmse_from_output_test(
            model_dict["posterior"],
            y_test
        )

        # 4) Match RMSE to the draws in df_ixt
        rmse_matched = rmse[df_ixt["draw"].to_numpy()]

        # 5) Build tidy df for this model
        df_model = pd.DataFrame({
            "model": df_ixt["model"],
            "IXT":   df_ixt["IXT"],
            "RMSE":  rmse_matched,
            "draw":  df_ixt["draw"],
            "sigma": sigma_value
        })
        dfs.append(df_model)

    # 6) Return tidy df for all models at this sigma
    return pd.concat(dfs, ignore_index=True)


In [9]:
df_sigma_00 = build_df_for_sigma(tanh_fit_sigma_00, sigma_value=0.0)
df_sigma_05 = build_df_for_sigma(tanh_fit_sigma_05, sigma_value=0.5)
df_sigma_1 = build_df_for_sigma(tanh_fit_sigma_1, sigma_value=1.0)
df_sigma_2 = build_df_for_sigma(tanh_fit_sigma_2, sigma_value=2.0)

df_all = pd.concat([df_sigma_00, df_sigma_05, df_sigma_1, df_sigma_2], ignore_index=True)


In [10]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse

def plot_multi_ellipse(df, n_std=2.0):
    fig, ax = plt.subplots(figsize=(6,5))

    # ---- NEW: Rename models here ----
    name_map = {
        "Gaussian tanh": "Gauss",
        "Regularized Horseshoe tanh": "RHS",
        "Dirichlet Horseshoe tanh": "DHS",
        "Dirichlet Student T tanh": "DST"
    }
    df = df.copy()
    df["model_short"] = df["model"].map(name_map).fillna(df["model"])

    models = df["model_short"].unique()
    sigmas = np.sort(df["sigma"].unique())

    # Colors per model
    color_map = {m: c for m, c in zip(models, ["red","blue","green","purple","orange","black"])}
    # Markers per sigma
    marker_map = {s: m for s, m in zip(sigmas, ["o","s","D","^","v","P"])}

    for (model_short, sigma), sub in df.groupby(["model_short","sigma"]):
        x = sub["IXT"].to_numpy()
        y = sub["RMSE"].to_numpy()

        xm, ym = np.mean(x), np.mean(y)
        xs, ys = np.std(x, ddof=1), np.std(y, ddof=1)

        color = color_map[model_short]
        marker = marker_map[sigma]

        ax.scatter([xm], [ym], c=color, s=80, marker=marker,
                   label=f"{model_short}, σ={sigma}")

        ell = Ellipse((xm, ym),
                      width=2*n_std*xs,
                      height=2*n_std*ys,
                      edgecolor=color,
                      facecolor='none',
                      lw=2,
                      alpha=0.9)
        ax.add_patch(ell)

    ax.set_xlabel("I(X;T)")
    ax.set_ylabel("RMSE")
    ax.set_title(f"I(X,T) vs RMSE — {n_std}σ ellipses")
    ax.legend(frameon=False, fontsize=9)
    return fig, ax


In [None]:
fig, ax = plot_multi_ellipse(df_all, n_std=1.0)
plt.show()


In [33]:
def plot_multi_ellipse_like_Sigurd(df, n_std=2.0):
    fig, ax = plt.subplots(figsize=(6,5))

    name_map = {
        "Gaussian tanh": "Gauss",
        "Regularized Horseshoe tanh": "RHS",
        "Dirichlet Horseshoe tanh": "DHS",
        "Dirichlet Student T tanh": "DST"
    }
    df = df.copy()
    df["model_short"] = df["model"].map(name_map).fillna(df["model"])

    models = df["model_short"].unique()
    sigmas = np.sort(df["sigma"].unique())

    color_map = {m: c for m, c in zip(models, ["red","blue","green","purple","orange","black"])}
    marker_map = {s: m for s, m in zip(sigmas, ["o","s","D","^","v","P"])}

    for model_short in models:

        centers_x = []
        centers_y = []
        sigma_list = []

        for sigma in sigmas:
            sub = df[(df["model_short"] == model_short) & (df["sigma"] == sigma)]
            x = sub["IXT"].to_numpy()
            y = sub["RMSE"].to_numpy()
            xm, ym = np.mean(x), np.mean(y)
            xs, ys = np.std(x, ddof=1), np.std(y, ddof=1)

            color = color_map[model_short]
            marker = marker_map[sigma]

            # store center points to connect later
            centers_x.append(xm)
            centers_y.append(ym)
            sigma_list.append(sigma)

            # draw ellipse
            ell = Ellipse((xm, ym), width=2*n_std*xs, height=2*n_std*ys,
                          edgecolor=color, facecolor='none', lw=2)
            ax.add_patch(ell)

            # draw center marker
            ax.scatter([xm], [ym], c=color, s=80, marker=marker)

        # ✅ draw line through centers in order of σ
        ax.plot(centers_x, centers_y, '-', color=color_map[model_short], lw=2,
                label=model_short)

    ax.set_xlabel("I(X;T)")
    ax.set_ylabel("RMSE")
    ax.set_title(f"Information–Performance Tradeoff ({n_std}σ ellipses)")
    ax.legend(frameon=False)
    return fig, ax


In [None]:
fig, ax = plot_multi_ellipse_like_Sigurd(df_all, n_std=1.0)
plt.show()


In [15]:
p1 = np.array([0.1, 0.1, 0.8])
p2 = np.array([1/3, 1/3, 1/3])
ent1 = np.sum(-(p1 * np.log(p1 + 1e-12)))
ent2 = np.sum(-(p2 * np.log(p2 + 1e-12)))

In [None]:
ent1

In [None]:
ent2

In [None]:
import numpy as np
X_train, X_test, y_train, y_test = generate_correlated_data_sigma(
        n=100, p=6, random_state=42, sigma=0.0
    )
def hidden_activations_from_fit(fit, X):
    """
    fit : tanh_fit_sigma_XX['Model']['posterior']
    X   : (N, P) numpy array
    Returns T_all : (draws, N, H)
    """
    W = fit.stan_variable("W_1")          # (draws, P, H)
    b = fit.stan_variable("hidden_bias")  # (draws, 1, H)

    # X has shape (N, P) → add new axis for broadcasting
    # result: (draws, N, H)
    T = np.tanh(X @ W + b)
    return T

# ---- Extract post-activations for Gaussian and DHS ----
X = X_train  # ensure standardized exactly as during training!

T_gauss = hidden_activations_from_fit(
    tanh_fit_sigma_00["Gaussian tanh"]["posterior"], X
)

T_dhs = hidden_activations_from_fit(
    tanh_fit_sigma_00["Dirichlet Horseshoe tanh"]["posterior"], X
)

print("T_gauss shape:", T_gauss.shape)
print("T_dhs shape:",   T_dhs.shape)


In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Pick hidden unit index
unit = 0  

# Flatten across draws + samples
u_gauss = T_gauss[:, :, unit].ravel()
u_dhs   = T_dhs[:,   :, unit].ravel()

plt.figure(figsize=(6,4))
plt.hist(u_gauss, bins=30, alpha=0.6, label="Gaussian", density=True)
plt.hist(u_dhs,   bins=30, alpha=0.6, label="DHS",      density=True)

plt.axvline(0, color='black', lw=1, alpha=0.4)
plt.xlabel("activation value (tanh)")
plt.ylabel("density")
plt.title(f"Hidden Unit {unit} Activation Distribution")
plt.legend(frameon=False)
plt.show()
