# Robust grouped linear fits with IRLS + Huber

This notebook performs a robust linear regression for **each unique value** of a column
you specify as `line_specifier`. For every group, it fits `y` vs `x` using IRLS with
Huber weights, bootstraps the fit parameters, prints the results, and plots each
group with a different color.

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tqdm

# Robust Estimation of means:
# Huber weight function

def Huber(ei, sigma, s, a=1.345):
    w1 = 1 / sigma**2
    w2 = 1 / sigma**2 * (a / (np.abs(ei) / (sigma * s)))
    weights = np.where(np.abs(ei) <= a * sigma * s, w1, w2)
    return weights


def ZL_robust_mean(y_, dy_, max_iter=50, tol=1e-6):
    """Output:
    robust mean,
    s (a sqrt(reduced chi2-like scale factor), s^2 ~ reduced chi2)
    """
    y = y_
    dy = dy_
    y_bar = np.average(y, weights=1 / dy**2)
    s = 1.0
    for _ in range(max_iter):
        ei = y - y_bar
        w = Huber(ei, dy, s)
        y_bar_old = y_bar
        y_bar = np.sum(w * y) / np.sum(w)
        mad = np.median(np.abs(y - y_bar) / dy)
        s = 1.4826 * mad if mad > 0 else 1.0
        # tolerance check
        if np.abs(y_bar - y_bar_old) < tol * (np.abs(y_bar_old) + 1e-12):
            break
    return y_bar, s


def bootstrap_ZL_robust_mean(y, dy, n_boot=10000, random_state=None):
    """Output:
    mean of bootstrap samples,
    std of bootstrap samples,
    all bootstrap samples
    """
    rng = np.random.default_rng(random_state)
    boot_means = np.empty(n_boot)
    n = len(y)
    for i in tqdm.tqdm(range(n_boot)):
        indices = rng.integers(0, n, n)
        y_sample = y[indices]
        dy_sample = dy[indices]
        boot_means[i] = ZL_robust_mean(y_sample, dy_sample)[0]
    return boot_means.mean(), boot_means.std(), boot_means


# Robust linear regression using IRLS and Huber weights
def irls_robust_linear(x, y, dy, max_iter=50, tol=1e-6):
    x = np.asarray(x, dtype=float)
    y = np.asarray(y, dtype=float)
    dy = np.asarray(dy, dtype=float)

    # Design matrix: y = a + b x
    A = np.vstack([np.ones_like(x), x]).T

    # Initial conditions
    n = len(y)
    s = 1.0
    w = np.ones(n) / dy**2

    def solve_wls(A, y, weights):
        W = np.sqrt(weights)
        Aw = A * W[:, None]
        yw = y * W
        theta, *_ = np.linalg.lstsq(Aw, yw, rcond=None)
        return theta

    # Initial least squares solution
    theta = solve_wls(A, y, w)

    for _ in range(max_iter):
        theta_old = theta.copy()
        s_old = s

        # 1. compute residuals
        y_fit = A @ theta
        r = y - y_fit

        # 2. update scale s using MAD of |r| / dy
        mad = np.median(np.abs(r) / dy)
        s = 1.4826 * mad if mad > 0 else 1.0

        # 3. update weights using Huber function
        w = Huber(r, dy, s)

        # 4. solve weighted least squares
        theta = solve_wls(A, y, w)

        # 5. convergence test
        if np.linalg.norm(theta - theta_old) < tol * (np.linalg.norm(theta_old) + 1e-12) and \
           abs(s - s_old) < tol * (s_old + 1e-12):
            break
    beta0, beta1 = theta
    return beta0, beta1, s, w


def bootstrap_irls(x, y, dy, n_boot=10000, random_state=None):
    """Bootstrap IRLS over (x, y, dy) triplets.

    Returns:
        beta0_samples : array of intercepts
        beta1_samples : array of slopes
        s_samples : array of scale estimates
    """
    if random_state is not None:
        np.random.seed(random_state)

    x = np.asarray(x)
    y = np.asarray(y)
    dy = np.asarray(dy)

    n = len(x)

    beta0_samples = np.empty(n_boot)
    beta1_samples = np.empty(n_boot)
    s_samples = np.empty(n_boot)

    for i in tqdm.tqdm(range(n_boot)):
        # 1. Resample indices with replacement
        idx = np.random.randint(0, n, size=n)

        # 2. Resample triplets together
        xb = x[idx]
        yb = y[idx]
        dyb = dy[idx]

        # 3. Run IRLS on bootstrap sample
        beta0, beta1, s, _ = irls_robust_linear(xb, yb, dyb)

        # 4. Store results
        beta0_samples[i] = beta0
        beta1_samples[i] = beta1
        s_samples[i] = s

    return beta0_samples, beta1_samples, s_samples

In [None]:
# --- User configuration ---

# Path to your CSV file
csv_path = r"C:\\ACME_analysis\\multiple_results\\sequencedf_result\\0016.0433_0016.0434_0016.0435_0016.0436_0016.0437_.csv"

# Column names in the dataframe
x_name = '1090_STIRAP_ellipticity_angle'
y_name = 'omega_E'
dy_name = 'uncertainty_omega_E'

# Column that specifies which line/group each point belongs to
line_specifier = 'line_specifier'  # <-- change this to your actual column name

# x-axis unit for printing/labeling
x_unit = 'deg'

# Load and clean data
df = pd.read_csv(csv_path)
df = df.dropna(subset=[x_name, y_name, dy_name, line_specifier])

print(f"Loaded {len(df)} rows from {csv_path}")
print(f"Unique {line_specifier} values: {df[line_specifier].unique()}")

In [None]:
# --- Robust grouped linear fits and plotting ---

unique_groups = df[line_specifier].unique()

# For consistent colors across groups
cmap = plt.get_cmap('tab10')

fig, ax = plt.subplots(figsize=(7, 4))

results = []  # store per-group fit results if you want to inspect later

for i, group in enumerate(unique_groups):
    sub = df[df[line_specifier] == group]
    x = sub[x_name].values
    y = sub[y_name].values
    dy = sub[dy_name].values

    if len(sub) < 2:
        print(f"Skipping group {group!r}: not enough points ({len(sub)}) for a linear fit.")
        continue

    # Robust IRLS fit
    beta0, beta1, s, w = irls_robust_linear(x, y, dy)

    # Bootstrap uncertainties
    beta0_samples, beta1_samples, s_samples = bootstrap_irls(x, y, dy, n_boot=10000, random_state=None)
    se_beta0 = np.std(beta0_samples)
    se_beta1 = np.std(beta1_samples)

    # Store numerical results
    results.append({
        'group': group,
        'beta0': beta0,
        'beta1': beta1,
        's': s,
        'se_beta0': se_beta0,
        'se_beta1': se_beta1
    })

    # Print results for this group
    print("\n" + "-" * 60)
    print(f"{line_specifier} = {group}")
    print(f"Intercept: {beta0 * 1e6:.4f} ± {se_beta0 * 1e6:.4f} μrad/s")
    print(f"Slope:     {beta1 * 1e6:.4f} ± {se_beta1 * 1e6:.4f} μrad/s/{x_unit}")
    print(f"Scale s:   {s:.4f}")

    # Normalize weights for plotting transparency
    w_norm = w / np.max(w)

    # Color for this group
    color = cmap(i % 10)

    # Scatter plot of data points
    ax.scatter(
        x,
        y * 1e6,
        s=80,
        marker='s',
        c=[color],
        alpha=0.2 + 0.5 * w_norm,  # weight-dependent transparency
        edgecolors='none',
        label=f"{line_specifier} = {group}"
    )

    # Best-fit line for this group
    x_fit = np.linspace(np.min(x), np.max(x), 100)
    y_fit = beta0 + beta1 * x_fit
    ax.plot(x_fit, y_fit * 1e6, '-', color=color)

ax.grid(True)
ax.set_xlabel(f"{x_name} ({x_unit})", fontsize=14)
ax.set_ylabel(f"{y_name} (μrad/s)", fontsize=14)
ax.set_title("Robust linear fit per group", fontsize=14)
ax.legend(fontsize=9, loc='best')
fig.tight_layout()

plt.show()