# Refactor tdnoisefit

Determine $\hat{\mathbf{\mu}}$ by solving $\partial C/\partial\mathbf{\mu} = \mathbf{0}$.

In [1]:
from __future__ import annotations

import numdifftools as nd
import numpy as np
import scipy.optimize as opt

from numpy.typing import ArrayLike
from scipy.optimize import minimize
from matplotlib import pyplot as plt
from matplotlib.figure import figaspect
from scipy.optimize import approx_fprime
from numpy.random import default_rng

import thztools as thz

## Simulate measurements
Simulate a set of `m` waveforms, each sampled at `n` time points, with noise parameters
`sigma_alpha`, `sigma_beta`, and `sigma_tau`, and store them in an array `x`. Note that
`x` stores the waveforms in row orientation, with shape `(m, n)`, because NumPy
broadcasting rules and FFT functions are simpler for arrays that are row-oriented.

In [2]:
rng = np.random.default_rng(0)
n = 256
m = 64
ts = 0.05
t = np.arange(n) * ts
mu, _ = thz.thzgen(n, ts=ts, t0=n * ts / 3)
sigma = np.array([1e-5, 1e-2, 1e-3])
noise = thz.noiseamp(sigma, mu, ts) * rng.standard_normal((m, n))
x = np.array(mu + noise)
a = np.ones(m)
eta = np.zeros(m)

## Refactor tdnoisefit using root to solve for mu

In [3]:
from thztools import tdnll

NUM_NOISE_PARAMETERS = 3
NUM_NOISE_DATA_DIMENSIONS = 2


def tdnoisefit(
    x: ArrayLike,
    *,
    v0: ArrayLike | None = None,
    a0: ArrayLike | None = None,
    eta0: ArrayLike | None = None,
    ts: float = 1.0,
    fix_v: bool = False,
    fix_a: bool = True,
    fix_eta: bool = True,
) -> tuple[dict, float, dict]:
    r"""
    Compute time-domain noise model parameters.

    Computes the noise parameters sigma and the underlying signal vector ``mu``
    for the data matrix ``x``, where the columns of ``x`` are each noisy
    measurements of ``mu``.

    Parameters
    ----------
    x : ndarray
        Data array.
    v0 : ndarray, optional
        Initial guess, noise model parameters with size (3,), expressed as
        variance amplitudes.
    a0 : ndarray, optional
        Initial guess, amplitude vector with size (m,).
    eta0 : ndarray, optional
        Initial guess, delay vector with size (m,).
    ts : float, optional
        Sampling time
    fix_v : bool, optional
        Noise variance parameters.
    fix_a : bool, optional
        Amplitude vector.
    fix_eta : bool, optional
        Delay vector.

    Returns
    --------
    p : dict
        Output parameter dictionary containing:
            var : ndarray
                Noise parameters, expressed as variance amplitudes.
            mu : ndarray
                Signal vector.
            a : ndarray
                Amplitude vector.
            eta : ndarray
                Delay vector.
    fval : float
        Value of NLL cost function from FMINUNC
    Diagnostic : dict
        Dictionary containing diagnostic information
            err : dic
                Dictionary containing  error of the parameters.
            grad : ndarray
                Negative loglikelihood cost function gradient from
                scipy.optimize.minimize BFGS method.
            hessian : ndarray
                Negative loglikelihood cost function hessian from
                scipy.optimize.minimize BFGS method.
    """
    if fix_v and fix_a and fix_eta:
        msg = "All variables are fixed"
        raise ValueError(msg)
    # Parse and validate function inputs
    x = np.asarray(x)
    if x.ndim != NUM_NOISE_DATA_DIMENSIONS:
        msg = "Data array x must be 2D"
        raise ValueError(msg)
    n, m = x.shape

    if v0 is None:
        v0 = np.mean(np.var(x, 1)) * np.ones(NUM_NOISE_PARAMETERS)
    else:
        v0 = np.asarray(v0)
        if v0.size != NUM_NOISE_PARAMETERS:
            msg = (
                "Noise parameter array logv must have "
                f"{NUM_NOISE_PARAMETERS} elements."
            )
            raise ValueError(msg)

    if a0 is None:
        a0 = np.ones(m)
    else:
        a0 = np.asarray(a0)
        if a0.size != m:
            msg = "Size of a0 is incompatible with data array x."
            raise ValueError(msg)

    if eta0 is None:
        eta0 = np.zeros(m)
    else:
        eta0 = np.asarray(eta0)
        if eta0.size != m:
            msg = "Size of eta0 is incompatible with data array x."
            raise ValueError(msg)

    # Set initial guesses for all free parameters
    p0 = np.array([])
    if not fix_v:
        # Replace log(x) with -inf when x <= 0
        logv0 = np.ma.log(v0).filled(-np.inf)
        p0 = np.concatenate((p0, logv0))
    if not fix_a:
        p0 = np.concatenate((p0, a0[1:] / a0[0]))
    if not fix_eta:
        p0 = np.concatenate((p0, eta0[1:] - eta0[0]))

    # Bundle free parameters together into objective function
    def objective(_p):
        if fix_v:
            _logv = np.ma.log(v0).filled(-np.inf)
        else:
            _logv = _p[:3]
            _p = _p[3:]
        if fix_a:
            _a = a0
        else:
            _a = np.concatenate((np.array([1.0]), _p[: m - 1]))
            _p = _p[m - 1 :]
        if fix_eta:
            _eta = eta0
        else:
            _eta = np.concatenate((np.array([0.0]), _p[: m - 1]))

        def _grad_mu(_mu: ArrayLike) -> ArrayLike:
            _, grad = thz.tdnll(x.T, _mu, _logv, _a, _eta, ts, fix_logv=True, fix_mu=False, 
                        fix_a=True, fix_eta=True)
            return grad

        x_scale_shift = thz.scaleshift(x.T, a=1.0 / _a, eta=-_eta, ts=ts)
        
        mu0 = np.mean(x_scale_shift, axis=0)
        sol = opt.root(_grad_mu, mu0, tol=np.finfo(float).eps)
        mu_est = sol.x
        
        return tdnll(
            x.T,
            mu_est,
            _logv,
            _a,
            _eta,
            ts,
            fix_logv=fix_v,
            fix_mu=True,
            fix_a=fix_a,
            fix_eta=fix_eta,
        )

    # Minimize cost function with respect to free parameters
    out = minimize(objective, p0, method="BFGS", jac=True)

    # Parse output
    p = {}
    x_out = out.x
    if fix_v:
        p["var"] = v0
    else:
        p["var"] = np.exp(x_out[:3])
        x_out = x_out[3:]

    if fix_a:
        p["a"] = a0
    else:
        p["a"] = np.concatenate(([1], x_out[: m - 1]))
        x_out = x_out[m - 1 :]

    if fix_eta:
        p["eta"] = eta0
    else:
        p["eta"] = np.concatenate(([0], x_out[: m - 1]))

    p["ts"] = ts

    # def _grad_mu(_mu: ArrayLike) -> ArrayLike:
    #     _, grad = thz.tdnll(x.T, _mu, np.log(p["var"]), p["a"], p["eta"], ts, 
    #                         fix_logv=True, fix_mu=False, fix_a=True, fix_eta=True)
    #     return grad
    # 
    # x_scale_shift = thz.scaleshift(x.T, a=1.0 / p["a"], eta=-p["eta"], ts=ts)    
    # mu0 = np.mean(x_scale_shift, axis=0)
    # sol = opt.root(_grad_mu, mu0, tol=np.finfo(float).eps)
    # mu_est = sol.x
    # p["mu"] = mu_est
    # 
    diagnostic = {
        "grad": out.jac,
        "cov": out.hess_inv,
        "err": {
            "var": np.array([]),
            "a": np.array([]),
            "eta": np.array([]),
        },
        "success": out.success,
        "status": out.status,
        "message": out.message,
        "nfev": out.nfev,
        "njev": out.njev,
        "nit": out.nit,
    }
    err = np.sqrt(np.diag(diagnostic["cov"]))
    if not fix_v:
        # Propagate error from log(V) to V
        diagnostic["err"]["var"] = np.sqrt(
            np.diag(np.diag(p["var"]) @ diagnostic["cov"][0:3, 0:3])
            @ np.diag(p["var"])
        )
        err = err[3:]

    if not fix_a:
        diagnostic["err"]["a"] = np.concatenate(([0], err[: m - 1]))
        err = err[m - 1 :]

    if not fix_eta:
        diagnostic["err"]["eta"] = np.concatenate(([0], err[: m - 1]))

    return p, out.fun, diagnostic

In [4]:
p, out_fun, diagnostic = thz.tdnoisefit(x.T, v0=sigma**2, a0=None, eta0=None, fix_v=False, fix_a=True, fix_eta=True)

In [5]:
diagnostic

{'grad': array([ 2.82614283e-07, -1.96575571e-08,  3.51556190e-09]),
 'cov': array([[ 1.68048025e-04, -1.06279276e-05,  1.87937999e-05],
        [-1.06279276e-05,  6.85055950e-04, -5.32350921e-04],
        [ 1.87937999e-05, -5.32350921e-04,  5.41555231e-03]]),
 'err': {'var': array([1.27746569e-12, 2.52587949e-06, 2.72935306e-05]),
  'mu': array([], dtype=float64),
  'a': array([], dtype=float64),
  'eta': array([], dtype=float64)},
 'success': True,
 'status': 0,
 'message': 'Optimization terminated successfully.',
 'nfev': 20,
 'njev': 20,
 'nit': 15}

In [6]:
p, out_fun, diagnostic = tdnoisefit(x.T, v0=sigma**2, a0=None, eta0=None, fix_v=False, fix_a=True, fix_eta=True)

In [7]:
diagnostic

{'grad': array([ 2.82614283e-07, -1.96575571e-08,  3.51556190e-09]),
 'cov': array([[ 1.68048025e-04, -1.06279276e-05,  1.87937999e-05],
        [-1.06279276e-05,  6.85055950e-04, -5.32350921e-04],
        [ 1.87937999e-05, -5.32350921e-04,  5.41555231e-03]]),
 'err': {'var': array([1.27746569e-12, 2.52587949e-06, 2.72935306e-05]),
  'a': array([], dtype=float64),
  'eta': array([], dtype=float64)},
 'success': True,
 'status': 0,
 'message': 'Optimization terminated successfully.',
 'nfev': 20,
 'njev': 20,
 'nit': 15}