In [2]:
"""Reproduce fit error derivation for a damped sine wave using sympy.

This module symbolically derives the variances of the estimated amplitude,
phase and angular frequency for a damped sine wave measured with white
Gaussian noise.  It closely follows the derivation given in the provided
LaTeX document: the measurement model is

    m(t) = A*exp(-lambda_D*t) * sin(omega*t + phi) + n(t),

where ``n(t)`` is zero‑mean noise with variance ``sigma_m**2``.  The
estimates are obtained by a least‑squares fit, and the uncertainties are
found via a Fisher information analysis assuming that the sampling rate is
large compared to the signal frequency.  Under these assumptions the
trigonometric products average to zero and one half where appropriate.

The resulting expressions match the formulas given in the appendix of the
user‑supplied derivation.  Running this module will print the derived
variances along with a check that they agree with the target expressions.
"""

from __future__ import annotations

import sympy as sp
from typing import Dict, Tuple


def derive_variances() -> Dict[str, sp.Expr]:
    """Derive the variances of amplitude, phase and frequency estimates.

    Returns
    -------
    Dict[str, sp.Expr]
        Dictionary with keys ``var_A``, ``var_phi`` and ``var_omega`` giving
        the symbolic expressions for the variances of the amplitude,
        phase and angular frequency estimators respectively.

    Notes
    -----
    The derivation uses the Fisher information matrix (FIM) for the
    parameter vector ``(A, omega, phi)`` while treating the damping
    constant ``lambda_D`` as known.  Under the high‑sample‑rate
    approximation the trigonometric terms average over many cycles such
    that cross terms vanish.  The diagonal and off‑diagonal FIM elements
    are computed via continuous integrals which are weighted by the
    discrete sampling rate ``N/T``.  Inverting this matrix yields the
    Cramér–Rao bounds for the variances of the parameter estimates.
    """

    # Declare symbolic parameters (all positive to aid simplification)
    lambda_D: sp.Symbol = sp.symbols("lambda_D", positive=True)
    T: sp.Symbol = sp.symbols("T", positive=True)
    A: sp.Symbol = sp.symbols("A", positive=True)
    sigma_m: sp.Symbol = sp.symbols("sigma_m", positive=True)
    N: sp.Symbol = sp.symbols("N", positive=True)

    # Sampling rate (samples per unit time)
    R: sp.Expr = N / T

    # Integration variable
    t: sp.Symbol = sp.symbols("t", real=True)

    # Helpers for common integrals
    # ∫_0^T e^{-2 λ t} dt
    I0: sp.Expr = sp.integrate(sp.exp(-2 * lambda_D * t), (t, 0, T))
    # ∫_0^T t e^{-2 λ t} dt
    I1: sp.Expr = sp.integrate(t * sp.exp(-2 * lambda_D * t), (t, 0, T))
    # ∫_0^T t^2 e^{-2 λ t} dt
    I2: sp.Expr = sp.integrate(t**2 * sp.exp(-2 * lambda_D * t), (t, 0, T))

    # Under the rapidly oscillating approximation the averages satisfy:
    # ⟨sin^2⟩ = ⟨cos^2⟩ = 1/2, ⟨sin* cos⟩ = 0.
    # Fisher information elements (per derivation) for parameters (A, omega, phi)
    # F_AA
    F_AA: sp.Expr = R / (sigma_m**2) * sp.Rational(1, 2) * I0
    # F_phi_phi
    F_phi_phi: sp.Expr = R / (sigma_m**2) * A**2 * sp.Rational(1, 2) * I0
    # F_omega_omega
    F_omega_omega: sp.Expr = R / (sigma_m**2) * A**2 * sp.Rational(1, 2) * I2
    # F_omega_phi = F_phi_omega
    F_omega_phi: sp.Expr = R / (sigma_m**2) * A**2 * sp.Rational(1, 2) * I1

    # Determinant of the 2x2 submatrix for (omega, phi)
    det_sub: sp.Expr = sp.simplify(F_omega_omega * F_phi_phi - F_omega_phi**2)

    # Variances via inverse of FIM: var_A = 1/F_AA, var_omega = F_phi_phi/det, var_phi = F_omega_omega/det
    var_A: sp.Expr = sp.simplify(1 / F_AA)
    var_omega: sp.Expr = sp.simplify(F_phi_phi / det_sub)
    var_phi: sp.Expr = sp.simplify(F_omega_omega / det_sub)

    return {
        "var_A": sp.simplify(var_A),
        "var_phi": sp.simplify(var_phi),
        "var_omega": sp.simplify(var_omega),
    }


"""Run the derivation and display results.

This function computes the symbolic variances, prints them in a
human‑readable form and checks them against the expected expressions
reported in the derivation.  At the end it asserts that all
comparisons succeed.
"""

derived: Dict[str, sp.Expr] = derive_variances()
print("Derived variances:\n")
for name, expr in derived.items():
    print(f"{name}: {sp.simplify(expr)}\n")

Derived variances:

var_A: 4*T*lambda_D*sigma_m**2*exp(2*T*lambda_D)/(N*(exp(2*T*lambda_D) - 1))

var_phi: 8*T*lambda_D*sigma_m**2*(2*T**2*lambda_D**2 + 2*T*lambda_D - exp(2*T*lambda_D) + 1)*exp(2*T*lambda_D)/(A**2*N*(4*T**2*lambda_D**2*exp(2*T*lambda_D) - exp(4*T*lambda_D) + 2*exp(2*T*lambda_D) - 1))

var_omega: 16*T*lambda_D**3*sigma_m**2*(1 - exp(2*T*lambda_D))*exp(2*T*lambda_D)/(A**2*N*(4*T**2*lambda_D**2*exp(2*T*lambda_D) - exp(4*T*lambda_D) + 2*exp(2*T*lambda_D) - 1))



In [3]:
"""Reproduce fit error derivation for a damped sine wave using sympy.

This module symbolically derives the variances of the estimated amplitude,
phase and angular frequency for a damped sine wave measured with white
Gaussian noise.  It closely follows the derivation given in the provided
LaTeX document: the measurement model is

    m(t) = A*exp(-lambda_D*t) * sin(omega*t + phi) + n(t),

where ``n(t)`` is zero‑mean noise with variance ``sigma_m**2``.  The
estimates are obtained by a least‑squares fit, and the uncertainties are
found via a Fisher information analysis assuming that the sampling rate is
large compared to the signal frequency.  Under these assumptions the
trigonometric products average to zero and one half where appropriate.

The resulting expressions match the formulas given in the appendix of the
user‑supplied derivation.  Running this module will print the derived
variances along with a check that they agree with the target expressions.
"""

from __future__ import annotations

import sympy as sp
from typing import Dict, Tuple


def derive_variances() -> Dict[str, sp.Expr]:
    """Derive variances of amplitude, phase, frequency, and damping estimates."""
    # Declare symbols
    lambda_D, T = sp.symbols("lambda_D T", positive=True)
    A, sigma_m, N = sp.symbols("A sigma_m N", positive=True)
    omega, phi = sp.symbols("omega phi", real=True)
    t = sp.symbols("t", real=True)
    R = N / T

    # Model: f(t) = A * exp(-lambda_D * t) * sin(omega t + phi)
    f = A * sp.exp(-lambda_D * t) * sp.sin(omega * t + phi)

    # Derivatives
    df_dA = sp.diff(f, A)
    df_dphi = sp.diff(f, phi)
    df_domega = sp.diff(f, omega)
    df_dlambda = sp.diff(f, lambda_D)

    # Fisher elements
    def FIM_entry(df1, df2):
        return sp.integrate(df1 * df2, (t, 0, T)) * R / sigma_m**2

    F_AA = FIM_entry(df_dA, df_dA)
    F_pp = FIM_entry(df_dphi, df_dphi)
    F_oo = FIM_entry(df_domega, df_domega)
    F_ll = FIM_entry(df_dlambda, df_dlambda)
    F_po = FIM_entry(df_dphi, df_domega)
    F_Al = FIM_entry(df_dA, df_dlambda)

    # (phi, omega) block
    fim_phi_omega = sp.Matrix([[F_pp, F_po], [F_po, F_oo]])
    cov_phi_omega = fim_phi_omega.inv()
    var_phi = cov_phi_omega[0, 0]
    var_omega = cov_phi_omega[1, 1]

    # (A, lambda) block
    fim_A_lambda = sp.Matrix([[F_AA, F_Al], [F_Al, F_ll]])
    cov_A_lambda = fim_A_lambda.inv()
    var_A = cov_A_lambda[0, 0]
    var_lambda = cov_A_lambda[1, 1]

    return {
        "var_A": sp.simplify(var_A),
        "var_phi": sp.simplify(var_phi),
        "var_omega": sp.simplify(var_omega),
        "var_lambda": sp.simplify(var_lambda),
    }


"""Run the derivation and display results.

This function computes the symbolic variances, prints them in a
human‑readable form and checks them against the expected expressions
reported in the derivation.  At the end it asserts that all
comparisons succeed.
"""

derived: Dict[str, sp.Expr] = derive_variances()
print("Derived variances:\n")
for name, expr in derived.items():
    print(f"{name}: {sp.simplify(expr)}\n")

KeyboardInterrupt: 

In [4]:
from __future__ import annotations

import sympy as sp
from typing import Dict, Tuple


"""Derive variances of amplitude, phase, frequency, and damping estimates."""
# Declare symbols
lambda_D, T = sp.symbols("lambda_D T", positive=True)
A, sigma_m, N = sp.symbols("A sigma_m N", positive=True)
omega, phi = sp.symbols("omega phi", real=True)
t = sp.symbols("t", real=True)
R = N / T

# Model: f(t) = A * exp(-lambda_D * t) * sin(omega t + phi)
f = A * sp.exp(-lambda_D * t) * sp.sin(omega * t + phi)

# Derivatives
df_dA = sp.diff(f, A)
df_dphi = sp.diff(f, phi)
df_domega = sp.diff(f, omega)
df_dlambda = sp.diff(f, lambda_D)


# Fisher elements
def FIM_entry(df1, df2):
    return sp.integrate(df1 * df2, (t, 0, T)) * R / sigma_m**2


F_AA = FIM_entry(df_dA, df_dA)
F_pp = FIM_entry(df_dphi, df_dphi)
F_oo = FIM_entry(df_domega, df_domega)
F_ll = FIM_entry(df_dlambda, df_dlambda)
F_po = FIM_entry(df_dphi, df_domega)
F_Al = FIM_entry(df_dA, df_dlambda)

In [None]:
# (phi, omega) block
fim_phi_omega = sp.Matrix([[F_pp, F_po], [F_po, F_oo]])
fim_phi_omega

Matrix([
[                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              

In [7]:
cov_phi_omega = fim_phi_omega.inv()

KeyboardInterrupt: 

In [None]:
cov_phi_omega = fim_phi_omega.inv()
var_phi = cov_phi_omega[0, 0]
var_omega = cov_phi_omega[1, 1]

# (A, lambda) block
fim_A_lambda = sp.Matrix([[F_AA, F_Al], [F_Al, F_ll]])
cov_A_lambda = fim_A_lambda.inv()
var_A = cov_A_lambda[0, 0]
var_lambda = cov_A_lambda[1, 1]

return {
    "var_A": sp.simplify(var_A),
    "var_phi": sp.simplify(var_phi),
    "var_omega": sp.simplify(var_omega),
    "var_lambda": sp.simplify(var_lambda),
}


"""Run the derivation and display results.

This function computes the symbolic variances, prints them in a
human‑readable form and checks them against the expected expressions
reported in the derivation.  At the end it asserts that all
comparisons succeed.
"""

derived: Dict[str, sp.Expr] = derive_variances()
print("Derived variances:\n")
for name, expr in derived.items():
    print(f"{name}: {sp.simplify(expr)}\n")

In [None]:
10**4 * 10 * 2

200000

In [31]:
import sympy as sp
from sympy import Matrix, symbols, integrate, diff, simplify, eye
from typing import Dict, Tuple
from multiprocessing import Pool, cpu_count


# Update the integral computation function to include simplification inside the parallelized task
def compute_integral_pair_simplified(
    args: Tuple[sp.Expr, sp.Expr, sp.Symbol, sp.Symbol, sp.Symbol, sp.Symbol],
) -> sp.Expr:
    df1, df2, t, T, R, sigma_m = args
    product = sp.expand(df1 * df2)
    product = sp.expand_trig(product)
    integral = sp.integrate(product, (t, 0, T))
    scaled = integral * R / sigma_m**2
    expr = sp.expand(scaled)  # Expand basic algebra first
    expr = sp.expand_trig(expr)  # Expand trig products (e.g., sin^2)
    expr = sp.factor(expr)  # Pull out common exponentials
    expr = sp.cancel(expr)  # Clean up rational forms
    expr = sp.collect(expr, T)  # Group by powers of T (or lambda_D, etc.)
    # expr = sp.simplify(expr)
    return expr


def compute_fisher_matrix_parallel_simplified(
    model: sp.Expr,
    parameters: list[sp.Symbol],
    t: sp.Symbol,
    T: sp.Symbol,
    sigma_m: sp.Symbol,
    N: sp.Symbol,
) -> sp.Matrix:
    R = N / T
    tasks = [
        (sp.diff(model, p1), sp.diff(model, p2), t, T, R, sigma_m)
        for p1 in parameters
        for p2 in parameters
    ]

    with Pool(cpu_count()) as pool:
        simplified_entries = pool.map(compute_integral_pair_simplified, tasks)

    n = len(parameters)
    return sp.Matrix(n, n, simplified_entries)


def compute_variance_schur(args):
    i, FIM = args
    D = FIM[i, i]
    if i == 0:
        return i, sp.cancel(1 / D)
    A = FIM[:i, :i]
    B = FIM[:i, i]
    C = FIM[i, :i]
    try:
        A_inv = A.inv()
    except Exception:
        return i, sp.nan  # Or log and skip
    schur = D - C @ A_inv @ B
    return i, sp.cancel(1 / schur)


# Symbols
t = sp.symbols("t", positive=True, real=True)
T = sp.symbols("T", positive=True, real=True, nonzero=True)
A = sp.symbols("T", positive=True, real=True, nonzero=True)
N = sp.symbols("N", positive=True, real=True, nonzero=True)
sigma_m = sp.symbols("sigma_m", positive=True, real=True, nonzero=True)
lambda_D = sp.symbols("lambda_D", real=True, positive=True, nonzero=True)
omega = sp.symbols("omega", real=True, positive=True, nonzero=True)
phi = sp.symbols("phi", real=True)

# Model
f = A * sp.exp(-lambda_D * t) * sp.sin(omega * t + phi)

# Parameter list
params = [A, lambda_D, omega, phi]

In [20]:
# runtime ~ 10 min
print("parallel compute matrix")
FIM = compute_fisher_matrix_parallel_simplified(f, params, t, T, sigma_m, N)
# Parallel Schur complement variance estimation

parallel compute matrix


In [21]:
FIM

Matrix([
[                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              

In [22]:
FIM = sp.expand(FIM)  # Expand basic algebra first

In [None]:
# Invert and simplify
FIM_inv = FIM.inv(method="LU")  # Faster than default symbolic inverse

In [None]:
FIM_inv = FIM_inv.applyfunc(sp.cancel)  # Clean up each entry

In [35]:
#
print("parallel compute variances")
with Pool(cpu_count()) as pool:
    results = pool.map(compute_variance_schur, [(i, FIM) for i in range(FIM.shape[0])])

parallel compute variances


Process ForkPoolWorker-205:
Process ForkPoolWorker-210:


KeyboardInterrupt: 

In [None]:
def derive_covariance(
    model: sp.Expr,
    parameters: list[sp.Symbol],
    t: sp.Symbol,
    T: sp.Symbol,
    sigma_m: sp.Symbol,
    N: sp.Symbol,
) -> Tuple[Dict[str, sp.Expr], sp.Matrix]:
    """Compute estimator variances and full covariance matrix via Fisher analysis.

    Parameters
    ----------
    model : Expr
        The symbolic model function f(t).
    parameters : list of Symbol
        Parameter symbols to estimate.
    t : Symbol
        The integration variable (e.g., time).
    T : Symbol
        Total observation time.
    sigma_m : Symbol
        Standard deviation of measurement noise.
    N : Symbol
        Number of samples.

    Returns
    -------
    Tuple[Dict[str, Expr], Matrix]
        A dictionary mapping each parameter to its variance, and
        the full symbolic covariance matrix (inverse FIM).
    """
    R = N / T
    FIM = sp.Matrix(
        [
            [
                sp.integrate(sp.diff(model, p1) * sp.diff(model, p2), (t, 0, T))
                * R
                / sigma_m**2
                for p2 in parameters
            ]
            for p1 in parameters
        ]
    )
    covariance = FIM.inv()
    variances = {
        str(param): sp.simplify(covariance[i, i]) for i, param in enumerate(parameters)
    }
    return variances, covariance


# Symbols
t, T = sp.symbols("t T", positive=True, real=True)
A, sigma_m, N = sp.symbols("A sigma_m N", positive=True)
lambda_D, omega, phi = sp.symbols("lambda_D omega phi", real=True)

# Model
f = A * sp.exp(-lambda_D * t) * sp.sin(omega * t + phi)

# Parameter list
params = [A, lambda_D, omega, phi]

# Compute
variances, covariance_matrix = derive_covariance(f, params, t, T, sigma_m, N)

# Display
print("Variances:")
for name, var in variances.items():
    print(f"{name}: {var}\n")

In [None]:
def compare_with_target(expressions: Dict[str, sp.Expr]) -> Dict[str, bool]:
    """Compare derived variances with the target formulas from the derivation.

    Parameters
    ----------
    expressions : Dict[str, sp.Expr]
        Dictionary containing the derived variances ``var_A``, ``var_phi``
        and ``var_omega``.

    Returns
    -------
    Dict[str, bool]
        Dictionary indicating whether each derived expression is
        algebraically equivalent to the corresponding target expression.
    """

    lambda_D, T, A, sigma_m, N = sp.symbols("lambda_D T A sigma_m N", positive=True)
    # Target expressions from the supplied derivation (see Eqs. (A.36)–(A.38)).
    var_A_target: sp.Expr = sp.simplify(
        (2 * lambda_D * sigma_m**2 * T * (sp.coth(lambda_D * T) + 1)) / N
    )
    var_phi_target: sp.Expr = sp.simplify(
        (
            4
            * lambda_D
            * sigma_m**2
            * T
            * (-2 * lambda_D * T * (lambda_D * T + 1) + sp.exp(2 * lambda_D * T) - 1)
        )
        / (A**2 * N * (-2 * lambda_D**2 * T**2 + sp.cosh(2 * lambda_D * T) - 1))
    )
    var_omega_target: sp.Expr = sp.simplify(
        (8 * lambda_D**3 * sigma_m**2 * T * (sp.exp(2 * lambda_D * T) - 1))
        / (A**2 * N * (-2 * lambda_D**2 * T**2 + sp.cosh(2 * lambda_D * T) - 1))
    )

    # Substitution dictionary for matching symbols in derived expressions
    subs_dict = {
        sp.symbols("lambda_D", positive=True): lambda_D,
        sp.symbols("T", positive=True): T,
        sp.symbols("A", positive=True): A,
        sp.symbols("sigma_m", positive=True): sigma_m,
        sp.symbols("N", positive=True): N,
    }

    # Compare via ratio: two expressions are equivalent if their ratio simplifies to 1
    def match(expr1: sp.Expr, expr2: sp.Expr) -> bool:
        """Return True if two expressions are equivalent.

        The comparison first attempts symbolic simplification of the ratio
        ``expr1/expr2`` to 1.  If that fails (e.g. due to latent
        exponential–hyperbolic identities), a numerical check is
        performed at several positive values of ``lambda_D`` and ``T``.
        """
        ratio = sp.simplify(expr1 / expr2)
        # Direct symbolic check
        if sp.simplify(ratio - 1) == 0:
            return True
        # Numerical sampling fallback
        numeric_tests = [(0.1, 2.0), (0.5, 1.2), (0.2, 3.5)]
        lambda_D_sym, T_sym = sp.symbols("lambda_D T", positive=True)
        f_lambda = lambda_D_sym
        f_T = T_sym
        # Create callable function for ratio using lambdify to ensure numeric evaluation
        ratio_func = sp.lambdify((lambda_D_sym, T_sym), ratio, "numpy")
        import math

        for lam_val, T_val in numeric_tests:
            try:
                val = ratio_func(lam_val, T_val)
            except Exception:
                return False
            if not math.isfinite(val) or abs(val - 1.0) > 1e-10:
                return False
        return True

    var_A_match: bool = match(expressions["var_A"].subs(subs_dict), var_A_target)
    var_phi_match: bool = match(expressions["var_phi"].subs(subs_dict), var_phi_target)
    var_omega_match: bool = match(
        expressions["var_omega"].subs(subs_dict), var_omega_target
    )

    return {
        "var_A": var_A_match,
        "var_phi": var_phi_match,
        "var_omega": var_omega_match,
    }