# Section 5.3 - Reproduction using SGLD

# Independent Component Analysis (ICA) with SGLD
This notebook demonstrates Bayesian ICA using SGLD for posterior inference on artificial data.

In [None]:
#1. Generate Synthetic Sources and Mix Signals
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.special import expit
from sklearn.metrics import mean_squared_error

np.random.seed(0)
N = 1000
S = np.vstack([
    np.random.laplace(size=N),
    np.random.laplace(size=N)
])  # two independent sources

# Mixing matrix
A = np.array([[1, 2], [2, 1]])
X = A @ S  # Mixed signals

# Plot original vs mixed signals
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(S[0], label="Source 1")
plt.plot(S[1], label="Source 2")
plt.title("Original Sources")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(X[0], label="Mixed 1")
plt.plot(X[1], label="Mixed 2")
plt.title("Mixed Signals")
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
#2. SGLD Optimization with Natural Gradient
W = np.eye(2)
samples = []
W_trace = []
steps = 5000
epsilon_schedule = lambda t: 0.1 * (1 + t) ** -0.55

for t in range(steps):
    i = np.random.randint(0, N)
    x_i = X[:, i:i+1]  # shape (2,1)
    y = W @ x_i
    grad = (np.eye(2) - np.tanh(0.5 * y) @ x_i.T) - 0.01 * W
    natural_grad = grad @ W.T @ W  # Natural gradient preconditioning

    eps = epsilon_schedule(t)
    noise = np.random.normal(scale=np.sqrt(eps), size=W.shape)
    W += 0.5 * eps * natural_grad + noise
    if t > 4000:
        samples.append(W.copy())
    if t % 100 == 0:
        W_trace.append(W.copy())

samples = np.array(samples)
W_trace = np.array(W_trace)

In [None]:
#3. Posterior Summary Statistics
W_mean = samples.mean(axis=0)
W_std = samples.std(axis=0)
print("Estimated Unmixing Matrix (mean over posterior):\n", W_mean)
print("Posterior Std Deviation of W:\n", W_std)

In [None]:
#4. Recovered Source Signals
S_est = W_mean @ X
plt.figure(figsize=(10, 4))
for i in range(2):
    plt.subplot(2, 1, i+1)
    plt.plot(S_est[i], label=f'Component {i+1}')
    plt.legend()
plt.suptitle("Recovered Sources")
plt.tight_layout()
plt.show()

In [None]:
#5. Trace Evolution of W
fig, axes = plt.subplots(2, 2, figsize=(10, 6))
for i in range(2):
    for j in range(2):
        axes[i, j].plot(W_trace[:, i, j])
        axes[i, j].set_title(f"$W_{{{i},{j}}}$")
plt.suptitle("Trace of W Matrix Entries During SGLD")
plt.tight_layout()
plt.show()

In [None]:
#6. 2D Posterior PDFs (KDE)
sns.jointplot(x=samples[:, 0, 0], y=samples[:, 0, 1], kind="kde")
plt.suptitle("Posterior PDF: $W_{0,0}$ vs $W_{0,1}$", y=1.02)
plt.show()

sns.jointplot(x=samples[:, 0, 0], y=samples[:, 1, 0], kind="kde")
plt.suptitle("Posterior PDF: $W_{0,0}$ vs $W_{1,0}$", y=1.02)
plt.show()

In [None]:
#7. Instability Metric Calculation
x_var = X.var(axis=1)
instability = (samples.var(axis=0) * x_var[:, None]).sum(axis=1)
sns.barplot(x=np.arange(len(instability)), y=instability)
plt.title("Instability Index $I_i = \sum_j var(W_{ij}) var(x_j)$")
plt.xlabel("Component Index")
plt.ylabel("Instability")
plt.show()

In [None]:
#8. Amari Distance Tracking
def amari_distance(W, A):
    P = W @ A
    norm1 = np.sum(np.abs(P), axis=0)
    norm2 = np.sum(np.abs(P), axis=1)
    error = (np.sum(np.abs(P) / norm1[None, :]) - 2 +
             np.sum(np.abs(P) / norm2[:, None]) - 2)
    return error / (2 * P.shape[0])

amari_distances = [amari_distance(W_est, A) for W_est in W_trace]
plt.plot(amari_distances)
plt.title("Amari Distance Between Estimated W and True Mixing A")
plt.xlabel("Iteration (every 100 steps)")
plt.ylabel("Amari Distance")
plt.grid(True)
plt.show()