

# Fourier Detection Statistics

This step is for calculating the fourier detection statstics, and the lower dimensional data \& filter matrix for the further p-value calculation

In [7]:
import numpy as np
import scipy.linalg as sl

class FourierDetectionStatistic:
    """
    Fourier-domain Detection Statistic calculation. providing detection statistics , y as data ,Q as filter matrix and sigma_y for 
    the covariance matrix of y (for p value calculation).

    Parameters
    ----------
    pta_h0, pta_h1 : PTA objects
        h0 supplies φ_N(ρ) (per pulsar, last K coefficients), pta_h0.get_phi(par_dict);
        h1 supplies φ_C(ρ) including cross-correlations (big matrix), given by pta_h1.get_phi(par_dict).
    pta_model : dict
        Your dict with per-pulsar items:
          pta_model[name]['phiinv'], ['Sigma'], ['Sigma_inv'], ['a_hat'].
    fourier_num : int
        K: number of Fourier coefficients kept per pulsar (take the last K). In radio pulsars, Rutger used 60 for radio pulsars; 20 for the gamma pulsars u gived me 
    proj_method : {"mask","svd"}
        How to build the low-rank projector G from Δφ:
        - "mask": take standard-basis columns on the nonzero support of Δφ. (I recommend you always use this one ,it's the one i described in pdf)
        - "svd" : thin SVD of Δφ, take U[:, keep].  (chatgpt give me this as an extra option, so why not?)
    tol : float
        Threshold for zero-detection in "mask" mode (|entry|<=tol → zero). Default 1e-30. (tollerance value for selection projector matrix, it could be even lower)
    rel_eps : float or None
        Relative SVD cutoff in "svd" mode. None ⇒ use (machine_eps * n * smax). (tollerance value for selection projector matrix)
    order : list[str] or None
        Pulsar order. If None, use pta_h0.pulsars. (an extra option given by chatgpt. I've been always using "None" anyway.)
    verbose : bool
        Print shapes and scalar diagnostics. (an extra option given by chatgpt, I always used verbose=False for clean output)

    Main API
    --------
    compute_os(params) -> float
        Return the deflection-type OS value.
    get_deflection_coordinates(params, normalize_Q=True)
        Return (OS_value, y, Q,Sigma_y), where os_value is detection statistics, y = G_A^T â, 
        Q = Φ_N ΔΦ Φ_N^T (optionally normalized by the denominator), sigma_y is covariance matrix of y for p value calculation
    """

    # ----------------------------------------------------------------------------------
    # High-level math map (for readers):
    #
    #  - Σ^{-1} = Σ0^{-1} + φ_N^{-1} - Φ0^{-1};  â = Σ Σ0^{-1} â0         (transfer between parameters)
    #  - Δφ = φ_C - φ_N                                                    (contrast get_phi between H1 and H0)
    #  - Low-rank projection: build G ∈ R^{n×r} from Δφ, then reduce:
    #        y       = G_A^T â,                          G_A spans range(A), A = φ_N^{-1} G
    #        Φ_N     = G_A^T φ_N^{-1} G                  (r_A×r)
    #        ΔΦ      = G^T Δφ G                          (r×r)
    #        Q_num   = Φ_N ΔΦ Φ_N^T                      (r_A×r_A)
    #        Numerator = y^T Q_num y
    #  - Normalization (real basis): Nor = sqrt( 2 Tr( [ (Σ^{-1} - φ_N^{-1}) Δφ φ_N^{-1} Σ ]^2 ) )
    #    Then the normalized Q = Q_num / Nor, and OS = (y^T Q_num y) / Nor.
    #
    # Shapes:
    #   - n = total number of kept Fourier modes across pulsars (sum over pulsars of K)
    #   - r = rank of Δφ support (columns/rows retained by G)
    #   - r_A = rank of A = φ_N^{-1} G  (≤ r), basis given by thin SVD (U columns kept)
    # ----------------------------------------------------------------------------------

    def __init__(
        self, pta_h0, pta_h1, pta_model,
        fourier_num=60, proj_method="mask",
        tol=0, rel_eps=None, order=None, verbose=False
    ):
        # Store external objects and options
        self.pta_h0 = pta_h0
        self.pta_h1 = pta_h1
        self.pta_model = pta_model
        self.fourier_num = int(fourier_num)

        self.proj_method = str(proj_method)
        self.tol = float(tol)
        self.rel_eps = rel_eps
        self.verbose = bool(verbose)

        # Pulsar order used to stack block-diagonals and a_hat consistently
        self.order = list(order) if order is not None else list(pta_h0.pulsars)

        # (1) Build big block-diagonal matrices from the per-pulsar model:
        #     BigPhi0Inv = Φ0^{-1} (prior inverse at ρ0), Sigma0 = Σ0, Sigma0Inv = Σ0^{-1}
        #     These are used for the ρ-transfer formulas Σ^{-1} = Σ0^{-1} + φ_N^{-1} - Φ0^{-1}
        self.BigPhi0Inv, self._slc = self._build_block_diag_from_model('phiinv')
        self.Sigma0,    _          = self._build_block_diag_from_model('Sigma')
        self.Sigma0Inv, _          = self._build_block_diag_from_model('Sigma_inv')

        # (2) Stack â0 across pulsars (each K-vector) → (#psr, K) → ravel to (n,)
        #     We will later compute â(ρ) = Σ Σ0^{-1} â0.
        self.ahat0 = self._stack_a_hat()

        # (3) Indices bookkeeping for the "last-K" coefficients per pulsar
        self._per_psr_param_len = None
        self._idx_lastK = None

    def compute_os(self, params):
        """Return detection statistic.
        (if you need this only)
        """
        os_val, _, _ = self.get_deflection_coordinates(params)
        return os_val

    def get_deflection_coordinates(self, params):
        """
        Return (OS_value, y, Q).

        y      = G_A^T â
        Q_num  = Φ_N ΔΦ Φ_N^T  with  Φ_N = G_A^T φ_N^{-1} G,  ΔΦ = G^T Δφ G
        Denom  = sqrt( 2 Tr( [ (Σ^{-1} - φ_N^{-1}) Δφ φ_N^{-1} Σ ]^2 ) )
        Sigma_y = Cov(y|H0).
        """
        # ---------- 1) ρ-dependent pieces: φ_N, φ_N^{-1}, φ_C, Δφ ----------
        # From H0 (diagonal blocks per pulsar, last K per pulsar) and H1 (full cross-correlation),
        # compute φ_N (diag), φ_C (dense), and Δφ = φ_C - φ_N (difference).
        phiN, phiNinv, phiC, dphi = self._phiN_phiC_delta(params)

        # ---------- 2) Σ^{-1}, Σ, and â(ρ) ----------
        # Use Σ^{-1} = Σ0^{-1} + φ_N^{-1} - Φ0^{-1}, then recover Σ by Cholesky solve.
        # Compute â(ρ) = Σ Σ0^{-1} â0 (ρ-transfer identity).
        SigmaInv, Sigma = self._Sigma_and_inv(phiNinv)
        ahat = Sigma @ (self.Sigma0Inv @ self.ahat0)

        # ---------- 3) Dimension reduction: G from Δφ; G_A from A = φ_N^{-1} G ----------
        # Build G capturing the low-rank support of Δφ (either by mask or SVD).
        # Then build G_A as an orthonormal basis for range(A) with thin SVD on A = φ_N^{-1} G.
        G  = self._project_G(dphi)             # n×r
        GA = self._project_GA(phiNinv @ G)     # n×rA

        # Reduced coordinates and reduced operators:
        #   y        = G_A^T â
        #   Φ_N_red  = G_A^T φ_N^{-1} G
        #   ΔΦ_red   = G^T Δφ G
        y         = GA.T @ ahat                               # rA
        PhiN_red  = GA.T @ phiNinv @ G                        # rA×r
        dphi_red  = G.T  @ dphi     @ G                       # r×r

        # ---------- 4) Numerator: y^T Φ_N ΔΦ Φ_N^T y ----------
        # This is exactly the unnormalized quadratic form in the reduced subspace.
        Q_num = PhiN_red @ dphi_red @ PhiN_red.T              # rA×rA
        numerator = float(y.T @ (Q_num @ y))

        # ---------- 5) Denominator (full-dim) ----------
        # Nor = sqrt( 2 Tr( [ (Σ^{-1} - φ_N^{-1}) Δφ φ_N^{-1} Σ ]^2 ) ).
        # Compute the inner matrix M first, then Tr(MM) is the squared Frobenius norm.
        M = (SigmaInv - phiNinv) @ dphi @ (phiNinv @ Sigma)
        den = np.sqrt(2.0 * np.trace(M @ M))

        # Normalize Q if requested; OS = numerator / den
        Q = Q_num / den 
        os_val = numerator / den if den > 0 else np.nan

        # Sigma_y = Cov(y | H0) = G_A^T (φ_N - Σ) G_A   (see your PDF derivation)
        Sigma_y= GA.T @ (phiN- Sigma) @ GA

        if self.verbose:
            print(f"[FourierDS] y:{y.shape}, Q:{Q.shape}, num={numerator:.6e}, den={den:.6e}, OS={os_val:.6e}")

        return os_val, y, Q ,  Sigma_y

    # ====================== the exact steps from your notebook ======================
    def _build_block_diag_from_model(self, field):
        """
        Build a big block-diagonal from pta_model[name][field].
        - If a value is 1D (diagonal entries), convert to a diagonal matrix.
        - If 2D square, use as-is.
        Also returns per-pulsar slice ranges in the big matrix.

        Math intent: build Σ0, Σ0^{-1}, Φ0^{-1} in block-diagonal form across pulsars.
        Implementation notes:
          * Preserves per-pulsar block sizes and records index slices for later use.
          * Ensures dtype=float for LAPACK compatibility.
        """
        blocks, sizes = [], []
        for name in self.order:
            M = np.asarray(self.pta_model[name][field])
            if M.ndim == 1:
                M = np.diag(M)  # turn a diagonal vector into a square diag matrix
            elif M.ndim != 2 or M.shape[0] != M.shape[1]:
                raise ValueError(f"{name}: {field} must be vector or square matrix, got {M.shape}")
            blocks.append(M.astype(float))
            sizes.append(M.shape[0])

        Big = sl.block_diag(*blocks)  # big block-diagonal matrix
        slc, start = {}, 0
        for name, m in zip(self.order, sizes):
            slc[name] = slice(start, start + m)
            start += m
        return Big, slc

    def _stack_a_hat(self):
        """
        Stack each pulsar's a_hat (K,) into (#psr, K) and ravel to (n,).

        Math: â0 is concatenation of last-K Fourier coefficients per pulsar.
        Implementation: validate each 'a_hat' matches requested K.
        """
        rows = []
        for name in self.order:
            a = np.asarray(self.pta_model[name]['a_hat'], dtype=float)
            if a.shape != (self.fourier_num,):
                raise ValueError(f"{name}: a_hat must be ({self.fourier_num},), got {a.shape}")
            rows.append(a)
        return np.vstack(rows).ravel()

    def _ensure_lastK_indices(self, params):
        """
        Cache global indices for the last K coefficients per pulsar.

        Math: we operate only on the highest-frequency K modes per pulsar (as per your setup).
        Implementation:
          - per pulsar, get full Fourier length m; take the last K indices.
          - Build a global index vector for slicing dense φ_C big matrix.
        """
        if self._per_psr_param_len is None:
            per = [len(v) for v in self.pta_h0.get_phi(params)]
            self._per_psr_param_len = np.array(per, dtype=int)

            K = self.fourier_num
            idx, start = [], 0
            for m in self._per_psr_param_len:
                idx.extend(range(start + m - K, start + m))  # pick last K of each pulsar
                start += m
            self._idx_lastK = np.array(idx, dtype=int)

            
    def _phiN_phiC_delta(self, params):
        """
        φ_N: from H0, take the last K diagonal entries per pulsar → big diagonal.
        φ_C: from H1, take the last K×K block per pulsar pair from the big matrix.
        Δφ = φ_C - φ_N.

        Math:
          - φ_N is block-diagonal (per pulsar); here represented as a diagonal matrix in full n×n.
          - φ_C is dense across pulsars (Hellings–Downs etc.); we restrict to last-K indices.
          - Δφ is the contrast driving the statistic.

        Implementation: uses cached _idx_lastK to slice the big φ_C efficiently.
        """
        self._ensure_lastK_indices(params)
        K = self.fourier_num

        # H0: list of 1D arrays. Keep the last K per pulsar → big diagonal φ_N.
        diag = []
        for v in self.pta_h0.get_phi(params):
            diag.extend(v[-K:])
        diag = np.asarray(diag, dtype=float)
        phiN    = np.diag(diag)
        phiNinv = np.diag(1.0 / diag)

        # H1: big matrix with cross-correlations, pick the last-K indices
        big  = self.pta_h1.get_phi(params)
        I    = self._idx_lastK
        phiC = big[np.ix_(I, I)].astype(float)

        dphi = phiC - phiN
        return phiN, phiNinv, phiC, dphi

    def _Sigma_and_inv(self, phiNinv):
        """
        Σ^{-1} = Σ0^{-1} + φ_N^{-1} - Φ0^{-1}; obtain Σ by Cholesky.

        Math: This is the ρ-transfer identity to move from (Σ0, Φ0) at ρ0 to (Σ, φ_N) at current ρ.
        Implementation: build Σ^{-1} then invert via Cholesky for numerical stability (SPD).
        """
        SigmaInv = (self.Sigma0Inv + phiNinv - self.BigPhi0Inv).astype(float)
        cf = sl.cho_factor(SigmaInv, lower=True, check_finite=False)
        Sigma = sl.cho_solve(cf, np.eye(SigmaInv.shape[0]))
        return SigmaInv, Sigma

    # ----------------------- low-rank projectors: G and G_A -----------------------
    def _project_G(self, dphi):
        """
        Build G (n×r) from Δφ:
          - "mask": take standard basis on nonzero support (rows or cols > tol).
          - "svd" : thin SVD, take U[:, keep].

        Math:
          - If Δφ has zero-only rows/cols for some modes, those modes do not contribute;
            "mask" mode selects the active coordinate subspace.
          - "svd" mode picks the dominant left singular vectors of Δφ (range(Δφ)).

        Implementation:
          - Mask: boolean OR over rows/cols identifies active indices; build a selection matrix.
          - SVD : LAPACK thin-SVD; threshold s > thr keeps columns.
        """
        M = np.asarray(dphi, dtype=float)

        if self.proj_method == "mask":
            nonzero_row = np.any(np.abs(M) > self.tol, axis=1)
            nonzero_col = np.any(np.abs(M) > self.tol, axis=0)
            mask = nonzero_row | nonzero_col
            idx = np.flatnonzero(mask)
            r = idx.size
            G = np.zeros((M.shape[0], r), dtype=float)
            if r > 0:
                G[idx, np.arange(r)] = 1.0  # standard-basis selector
            return G

        elif self.proj_method == "svd":
            U, s, _ = self._safe_svd(M)
            thr = self._svd_threshold(s, M.shape[0])
            keep = np.flatnonzero(s > thr)
            if keep.size == 0:
                keep = np.array([0], dtype=int)  # degenerate case: keep one column to avoid empty basis
            return U[:, keep]

        else:
            raise ValueError("proj_method must be 'mask' or 'svd'")

    def _project_GA(self, A):
        """
        Build G_A as the column-space basis of A = φ_N^{-1} G via thin SVD.

        Math: G_A spans range(A) with orthonormal columns (G_A^T G_A = I).
              This is the QR/SVD step in your dimension reduction derivation.
        Implementation: thin SVD (full_matrices=False), keep singular vectors above threshold.
        """
        U, s, _ = self._safe_svd(np.asarray(A, dtype=float))
        thr = self._svd_threshold(s, A.shape[0])
        keep = np.flatnonzero(s > thr)
        if keep.size == 0:
            keep = np.array([0], dtype=int)
        return U[:, keep]

    @staticmethod
    def _safe_svd(M):
        """
        Robust thin SVD wrapper:
          - try default backend; if it fails, fall back to 'gesvd'.
        Returns U, s, VT with full_matrices=False.
        """
        try:
            return sl.svd(M, full_matrices=False, check_finite=False)
        except sl.LinAlgError:
            return sl.svd(M, full_matrices=False, lapack_driver='gesvd', check_finite=False)

    def _svd_threshold(self, s, n):
        """
        SVD cutoff:
          - rel_eps is None: thr = max(n,1) * eps * smax   (LAPACK-style)
          - else:             thr = rel_eps * smax

        Math/implementation: avoids keeping numerically null directions; scales with matrix size.
        """
        smax = float(s[0]) if s.size else 0.0
        if self.rel_eps is None:
            eps = np.finfo(float).eps
            return max(n, 1) * eps * smax
        else:
            return float(self.rel_eps) * smax

In [8]:
par_dict=big_array[-1]
os_fourier = FourierDetectionStatistic(
    pta_h0, pta_h1, pta_model,
    fourier_num=20,
    proj_method="mask",  # or "svd"
    tol=1e-30,
    rel_eps=None,
    order=None,
    verbose=False
)

os_val, y, Q,Sigma_y = os_fourier.get_deflection_coordinates(par_dict)