# Power spectral density estimation methods

This notebook introduces and compares in-built methods within `Bilby` for the estimation of the power spectal density (PSD) and discusses some of the theory that goes into estimating PSDs and evaluating their performance.

## Standard imports

In [None]:
import os
os.environ["GWPY_RCPARAMS"] = "FALSE"

import warnings
warnings.filterwarnings("ignore", "Wswiglal-redir-stdio")

import bilby
from gwpy.timeseries import TimeSeries
import numpy as np
import matplotlib.pyplot as plt
import scipy.special
import scipy.interpolate
import scipy.signal

print(bilby.__version__)

%matplotlib inline

## Construction of helper functions

In [None]:
def empirical_cdf(data):
    """ Compute the empirical cumulative distribution function (ECDF). """
    sorted_data = np.sort(data)
    n = len(data)
    
    def ecdf(x):
        return np.searchsorted(sorted_data, x, side='right') / n
    
    return ecdf, sorted_data

def anderson_darling_statistic(data):
    """ Compute the Anderson-Darling test statistic for normality. """
    n = len(data)
    ecdf, sorted_data = empirical_cdf(data)
    
    # Transform data to standard normal quantiles
    mean = np.mean(data)
    std = np.std(data, ddof=1)  # Sample standard deviation
    standardized = (sorted_data - mean) / std
    
    # Compute theoretical normal CDF values
    normal_cdf = 0.5 * (1 + scipy.special.erf(standardized / np.sqrt(2)))  # Standard normal CDF
    
    # Compute Anderson-Darling test statistic
    s = np.sum((2 * np.arange(1, n + 1) - 1) * (np.log(normal_cdf) + np.log(1 - normal_cdf[::-1])))
    A2 = -n - s / n
    
    return A2

def anderson_p_value(data, freqs=None, fmin=0, fmax=np.inf):
    """ Approximate the p-value for the Anderson-Darling test for normality. """
   
    # If provided, cut the frequencies to a min/max value
    if freqs is not None:
        idxs = (freqs > fmin) & (freqs < fmax)
        data = data[idxs]

    # Concatenate the real and imaginary parts together
    data = np.concatenate([data.real, data.imag]) 

    if len(data) == 0:
        return np.nan
        
    A2 = anderson_darling_statistic(data)

    critical_values = [
        0.200, 0.300, 0.400, 0.500, 0.576, 0.656, 0.787, 0.918, 
        1.092, 1.250, 1.500, 1.750, 2.000, 2.500, 3.000, 3.500, 
        4.000, 4.500, 5.000, 6.000, 7.000, 8.000, 10.000
    ]
    
    significance_levels = [
        0.90, 0.85, 0.80, 0.75, 0.70, 0.60, 0.50, 0.40, 
        0.30, 0.25, 0.20, 0.15, 0.10, 0.05, 0.01, 0.005, 
        0.0025, 0.001, 0.0005, 0.0002, 0.0001, 0.00005, 0.00001
    ]

    # Approximate p-value using interpolation
    if A2 < critical_values[0]:
        pval = significance_levels[0]
    elif A2 > critical_values[-1]:
        pval = significance_levels[-1]
    else:
        pval = np.interp(A2, critical_values, significance_levels)

    return float(pval)

def fbins_anderson_p_value(freq, data, bin_width_Hz=8, fmin=0, fmax=np.inf):
    """
    Compute Anderson-Darling p-values for frequency bins.
    
    Parameters
    ----------
    freq : array-like
        Frequency values.
    data : array-like
        Data corresponding to frequencies.
    bin_width_Hz : float, optional
        Width of frequency bins in Hz. Default is 8.
    fmin : float, optional
        Minimum frequency for analysis. Default is 0.
    fmax : float, optional
        Maximum frequency for analysis. Default is np.inf.
    
    Returns
    -------
    tuple
        A tuple containing frequency bins and corresponding p-values.
    """
    bin_width = int(bin_width_Hz * duration)
    idxs = np.arange(0, len(data), bin_width)[:-1]
    pvals = [anderson_p_value(data[ii:ii+bin_width], freq[ii:ii+bin_width], fmin=fmin, fmax=fmax) for ii in idxs]
    fbins = [freq[ii + bin_width // 2] for ii in idxs]
    return fbins, pvals

def get_window_like(data, roll_off=0.1):
    """
    Generate a Tukey window with a specified roll-off.
    
    Parameters
    ----------
    data : array-like
        Input data to determine window size.
    roll_off : float, optional
        Roll-off factor for the Tukey window. Default is 0.1.
    
    Returns
    -------
    array
        A Tukey window of the same length as the input data.
    """
    psd_alpha = 2 * roll_off / duration
    return scipy.signal.get_window(("tukey", psd_alpha), len(data))

def whiten(timeseries, asd, asd_frequencies, roll_off=0.1):
    """
    Whiten a time series by dividing by the amplitude spectral density (ASD).
    
    Parameters
    ----------
    timeseries : array-like
        The input time series data.
    asd : array-like
        The amplitude spectral density.
    asd_frequencies : array-like
        Frequencies corresponding to ASD values.
    roll_off : float, optional
        Roll-off factor for windowing. Default is 0.1.
    
    Returns
    -------
    tuple
        The original frequency series and the whitened frequency series.
    """
    duration = timeseries.duration.value
    timeseries_windowed = timeseries * get_window_like(timeseries, roll_off)
    frequency_series = timeseries_windowed.fft()
    whitened_frequencyseries = frequency_series * np.sqrt(4 / duration) / asd
    return frequency_series, whitened_frequencyseries

def plot_whitening(frequencies, asd, h_f, wh_f, label=None, bin_width_Hz=8):
    """
    Plot whitening analysis, including the frequency domain signal, ASD, p-values, and histogram.
    
    Parameters
    ----------
    frequencies : array-like
        Frequency values.
    asd : array-like
        Amplitude spectral density.
    h_f : array-like
        Frequency-domain representation of the original signal.
    wh_f : array-like
        Frequency-domain representation of the whitened signal.
    label : str, optional
        Label for the ASD plot. Default is None.
    bin_width_Hz : float, optional
        Width of frequency bins for p-value computation. Default is 8.
    
    Returns
    -------
    None
    """
    fig, axes = plt.subplots(nrows=2, ncols=2, sharex=True, figsize=(8, 5))
    ax1, ax2 = axes[:, 0]
    gs = axes[1, 0].get_gridspec()
    axes[0, 1].remove()
    axes[1, 1].remove()
    ax3 = fig.add_subplot(gs[0:, 1])

    ax1.semilogy(frequencies, np.abs(h_f), label=r"$|\tilde{h}(f)|$")
    ax1.semilogy(frequencies, asd, label=label)
    ax1.set(ylabel="Strain data [Hz$^{-1/2}$]")
    ax1.legend()

    fbins, pvals = fbins_anderson_p_value(frequencies, wh_f, bin_width_Hz=bin_width_Hz)
    ax2.scatter(fbins, pvals, s=2)
    ax2.axhline(1e-2, color="r")
    ax2.set(xlabel="Frequency [Hz]", ylabel="$p$-value", yscale="log")

    bin_width = 0.025
    bins = np.arange(0, 1 + bin_width, bin_width)
    ax3.hist(pvals, bins=bins)
    ax3.set(xlabel="p-values", ylabel="Density", xlim=(0, 1))
    fig.tight_layout()
    plt.show()

Next, we define the `sampling_frequency` and `duration` of data that we will utilise throughout this notebook and construct a set of frequencies.

In [None]:
sampling_frequency = 4096
duration = 4
frequencies = np.fft.rfftfreq(sampling_frequency * duration, d=1/sampling_frequency)

Note that `frequencies` will have a length of `duration * sampling_frequency / 2 + 1` as it includes both the `0` Hz bin and the Nyquist frequency `sampling_frequency/2`.

## Getting started with a known PSD

We will begin with a case where we *know* the PSD $S(f)$ a priori by simulating the data (or equivalently that we known the amplitude spectral density (ASD) $\sqrt{S(f)}$). The idea here is to motivate the later sections where we are faced with real inteferometer data without a known PSD and must estimate its properties and evaluate its performance.

First, let's use `bilby` to simulate some coloured Gaussian noise from a known PSD; this method simulates the noise in the frequency domain directly.

We start by first constructing a known PSD using an estimate of O4 sensitivity curve. This is provided from a frequency of 10Hz, therefore we extrapolate down to 0Hz.

In [None]:
# Create a bilby PSD object: the argument points to a named ASD file packaged as part of bilby
# This file can be replaced by a path to a specific ASD on file
psd = bilby.gw.detector.PowerSpectralDensity.from_amplitude_spectral_density_file("aLIGO_O4_high_asd.txt")

psd_array = psd.power_spectral_density_interpolated(frequencies)
psd_array[np.isinf(psd_array)] = psd.psd_array[0]

fig, ax = plt.subplots()
ax.loglog(frequencies, psd_array)
ax.set(xlabel="Frequency [Hz]", ylabel="PSD [Hz$^{-1}$]")
plt.show()

Next, we construct another `bilby` `psd` object, but this time from the extrapolated PSD array. This ensures that when we simulate the data it will contain low-frequency noise.

In [None]:
psd = bilby.gw.detector.PowerSpectralDensity.from_power_spectral_density_array(frequencies, psd_array)

# We generate a random noise realization in the frequency domain h_f
h_f_sim, _ = psd.get_noise_realisation(sampling_frequency, duration)

# Inverse FFT to get the timeseries
h_t_sim = np.fft.irfft(h_f_sim) 
time = np.arange(0, duration, 1 / sampling_frequency)

fig, (ax1, ax2) = plt.subplots(ncols=2, figsize=(10, 4))
ax1.loglog(frequencies, np.abs(h_f_sim), label=r"$|\tilde{h}(f)|$")
ax1.loglog(frequencies, psd.asd_array, label=r"$\sqrt{S(f)}$")
ax1.set(xlabel="Frequency [Hz]", ylabel="Strain data [Hz$^{-1/2}$]")
ax1.legend()

ax2.plot(time, h_t_sim)
ax2.set(xlabel="Time [s]", ylabel="Strain []")
plt.show()

**Note**: In the frequency domain the strain data (and ASD) have units of $\textrm{Hz}^{-1/2}$ because we use a normalised Fourier transform:
$$ \tilde{h}(f) = \frac{1}{f_s} \textrm{FFT}(h(t))\, $$
see [Thrane & Talbot (2020)](https://arxiv.org/pdf/1809.02293) for a detailed discussion.

### Whitening 

Whitening refers to a transformation applied to a timeseieres to make it uncorrelated and have unit variance. The goal is to remove dependencies between variables (decorrelation) and scale them uniformly. Specifically, given a PSD $S(f)$, the whitening filter in the frequency domain is proportional to $1/\sqrt{S(f)}$, including the relevant normalisation factors the whitened data is given by
$$ \tilde{h}_w(f) = \tilde{h}(f) \sqrt{\frac{4}{T S(f)}}\,$$ 
where $T$ is the data duration.

The end result is that, if the data is properly whitened, the real and imaginary parts of $\tilde{h}_w(f)$ are standard normal variables. Let's test this out. Below we plot a histogram of the whitened data alongside the probability density function (PDF) of a standard normal distribution.

In [None]:
wh_f_sim = h_f_sim *  np.sqrt(4 / duration) / np.sqrt(psd.psd_array)

fig, ax = plt.subplots()

kwargs = dict(bins="auto", alpha=0.5, density=True)
ax.hist(wh_f_sim.real, label=r"$\mathrm{Re}(\tilde{h}_w(f))$", **kwargs)
ax.hist(wh_f_sim.imag, label=r"$\mathrm{Im}(\tilde{h}_w(f))$", **kwargs)

xs = np.linspace(-5, 5, 1000)
ax.plot(xs, np.exp(-xs**2 / 2) / np.sqrt(2 * np.pi), label="Standard normal PDF")
ax.set(xlabel="Whitened strain")
ax.legend()
plt.show()

### Testing normality

Visually, the two histograms agree nicely with the PDF, but to quantify the agreement, we need a test of normality. The standard approach used in the field is the [Anderson-Darling](https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test) test. There is a nice [scipy implementation](https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.anderson.html), however the number of critical values it provides is limited therefore in the helper function we have implemented a more involved test. Note that this is equivalent to the $\mathcal{N}(\mu, \sigma)$ test of [Gupta & Cornish (2023)](https://arxiv.org/html/2312.11808v1) in which the mean and variance are estimated from the data.

Below, we apply our helper function to the whitened frequency domain data generated above. Note that internally, it combined the real and imaginary components together.

In [None]:
print(anderson_p_value(wh_f_sim))

The resulting $p$-values indicate that the data is consistent with the null hypothesis (that the data is distributed normally). If the $p$-values are $\ll 1$ this indicates inconsistency and suggests we can reject the null hypothesis.

### Frequency-dependent tests of normality.

Finally, following [Gupta & Cornish (2023)](https://arxiv.org/html/2312.11808v1), we can also apply the Anderson-Darling test to separate frequency-domain bins, enabling a study of the whitening across the spectrum.


In [None]:
plot_whitening(psd.frequency_array, psd.asd_array, h_f_sim, wh_f_sim, label="True ASD")

From this figure, we see that across the spectrum, the $p$-value is always above 0.01 (we also mark the 0.01 threshold used by Gupta and Cornish for later reference.)

## Estimating the PSD
In practise, we do not know the PSD of the real interferometric strain data, but instead we must estimate it. To demonstrate the various methods to do this, we will consider estimating a PSD for the Hanford (H1) data during O3b (note, you may find [this GWOSC tool](https://gwosc.org/timeline/show/O3b_4KHZ_R1/H1_DATA/1256655618/12708000/) useful to discover data availability.)

In [None]:
t0 = 1256790000   # A time when the Hanford is online during O3b
ifo = "H1"  # The detector name as used by `gwpy`
duration = 4   # The duration of data used for analysis
post_trigger_duration = 2  # The end time of the analysis data relative to the trigger time

# We download a total of 132 seconds of data around the signal (128s before and then 4s of analysis data)
start = t0 + post_trigger_duration
end = t0 + post_trigger_duration - duration - 128
data = TimeSeries.fetch_open_data(ifo, start, end, cache=True)

# We then create two sets of data
data_psd = data.crop(end=t0 + post_trigger_duration - duration)
data_analysis = data.crop(start=t0 + post_trigger_duration - duration, end=t0 + post_trigger_duration)

### Off-source averaging

The first method we will introduce is [Welch's method](https://en.wikipedia.org/wiki/Welch%27s_method) which averages Fourier transforms of off-source data. Specifically, we will use the `median` method including the bias correction, as introduced in [Allen et al. (2012)](https://arxiv.org/pdf/gr-qc/0509116). This method is implemented in [scipy.signal.welch](https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.welch.html#scipy.signal.welch) when using the `method='median'` argument and we can access it directly from `gwpy.timeseries.TimeSeries.psd` [see docs](https://gwpy.github.io/docs/stable/api/gwpy.timeseries.TimeSeries/#gwpy.timeseries.TimeSeries.psd).

Below we provide a function implementing this method. Of specific note, this implements the standard Tukey windowing used within Bilby - it is important that the same window is applied to the analysis data as when estimating the PSD.

In [None]:
def estimate_psd_offsource_averaging(
    psd_data,
    duration,
    psd_fractional_overlap,
    psd_method,
    roll_off,
):
    """ Estimate a Power Spectral Density (PSD) from averaging off-source strain data

    Note: this function utilises [gwpy.timeseries.TimeSeries.psd](https://gwpy.github.io/docs/stable/api/gwpy.timeseries.TimeSeries/#gwpy.timeseries.TimeSeries.psd). It is recommended you read the documentation for that function for a detailed understanding of the implementation and options.

    Parameters
    ----------
    psd_data: gwpy.timeseries.TimeSeries
        A timeseries of the strain data to use for off-source averaging. Note: this method assumes
        the data has been truncated and does not include the signal.
    duration: float
        The duration (in seconds) to use for the PSD estimation (assumed to be identical to the
        duration of the analysis data).
    psd_fractional_overlap: float [0, 1]
        The fractional amount of overlap between neigbourning FFT estimates.
    psd_method: str
        See gwpy documentation
    roll_off: float [0, 1]
        MATCH WITH BILBY_PIPE

    Returns
    -------
    frequencies, psd: array_like
        The frequencies and estimated PSD

    """

    psd_alpha = 2 * roll_off / duration
    overlap = psd_fractional_overlap * duration
    window = ("tukey", psd_alpha)

    psd = psd_data.psd(
        fftlength=duration,
        overlap=overlap,
        window=window,
        method=psd_method,
    )
    return psd.frequencies.value, psd.value

And now let's apply the method to our `psd_data`. We choose to estimate a PSD for the full 128s of data and then another estimate for a shorter 64s of data for comparison.

In [None]:
kwargs = dict(
    duration=4,
    psd_fractional_overlap=0.5,
    roll_off=0.1,
    psd_method="median"
)

_, median_psd = estimate_psd_offsource_averaging(
    data_psd, **kwargs
)
median_asd = np.sqrt(median_psd)

And finally create a plot comparing the estimates

In [None]:
fig, ax = plt.subplots()

kwargs = dict(lw=1, alpha=0.8)
ax.loglog(frequencies, median_asd, label="Median: 128s", **kwargs)
ax.legend()
ax.set(ylim=(1e-24, 1e-19), xlabel='Frequency [Hz]', ylabel="PSD")
plt.show()

Okay, so now lets test how well these ASD estimates whiten the data

In [None]:
kwargs = dict(bins="auto", alpha=0.5, density=True)
fig, ax = plt.subplots()

h_f_median, wh_f_median = whiten(data_analysis, median_asd, frequencies)


ax.hist(wh_f_median.real, label=r"$\mathrm{Re}(\tilde{h}_w(f))$", **kwargs)
ax.hist(wh_f_median.imag, label=r"$\mathrm{Im}(\tilde{h}_w(f))$", **kwargs)
ax.set_title(f"p-value={anderson_p_value(wh_f_median):0.5f}")
xs = np.linspace(-5, 5, 1000)
ax.plot(xs, np.exp(-xs**2 / 2) / np.sqrt(2 * np.pi))
ax.set(xlim=(-5, 5))
ax.legend()
plt.show()

In [None]:
plot_whitening(frequencies, median_asd, h_f_median, wh_f_median, bin_width_Hz=8)

## MESA

Next, we investigate the MESA approach as implemented in [memspectrum](https://pypi.org/project/memspectrum/) (see [Martini et al. (2024)](https://arxiv.org/pdf/2106.09499)).

In [None]:
! pip install memspectrum

In [None]:
def estimate_psd_memspectrum(
    psd_data,
    frequencies,
    mesa_solve_kwargs=None,
):
    """ Estimate a Power Spectral Density (PSD) use Maximum Entropy Spectal Estimation (MESA)

    Note: this function utilises [memspectrum](https://maximum-entropy-spectrum.readthedocs.io/en/latest/usage/overview.html).

    Parameters
    ----------
    psd_data: gwpy.timeseries.TimeSeries
        A timeseries of the strain data to use for MESA.
    frequencies: TBD
    mesa_solve_kwargs: dict (None)
        A dictionary of optional arguments to pass to [memspectrum.solve](https://maximum-entropy-spectrum.readthedocs.io/en/latest/package_reference/mesa.html#memspectrum.memspectrum.MESA.solve)

    Returns
    -------
    frequencies, psd: array_like
        The frequencies and estimated PSD

    """
    import memspectrum

    if mesa_solve_kwargs is None:
        mesa_solve_kwargs = dict()

    mesa = memspectrum.MESA()
    mesa.solve(psd_data.value, **mesa_solve_kwargs)
    psd = mesa.spectrum(psd_data.dt.value, frequencies=frequencies)
    psd[-1] = np.inf

    return psd

For MESA, we will use a stretch of data equal in length to the analysis data.

In [None]:
data_psd_MESA = data_psd.crop(start=data_analysis.t0 - 1 * data_analysis.duration)
print(f"Data duration is {data_psd_MESA.duration}")
mesa_psd = estimate_psd_memspectrum(data_psd_MESA, frequencies, mesa_solve_kwargs=dict(method="Standard", verbose=False))
mesa_asd = np.sqrt(mesa_psd)

fig, ax = plt.subplots()
kwargs = dict(lw=1, alpha=0.8)
ax.loglog(frequencies, mesa_asd, label="MESA: analysis data", **kwargs)
ax.legend()
ax.set(ylim=(1e-24, 1e-19), xlabel='Frequency [Hz]', ylabel="PSD")
plt.show()


In [None]:
kwargs = dict(bins=np.linspace(-5, 5, 100), alpha=0.5, density=True)
fig, ax = plt.subplots()


h_f_mesa, wh_f_mesa = whiten(data_analysis, mesa_asd, frequencies)

ax.hist(wh_f_mesa.real, label=r"$\mathrm{Re}(\tilde{h}_w(f))$", **kwargs)
ax.hist(wh_f_mesa.imag, label=r"$\mathrm{Im}(\tilde{h}_w(f))$", **kwargs)
ax.set_title(f"p-value={anderson_p_value(wh_f_mesa):0.5f}")

xs = np.linspace(-5, 5, 1000)
ax.plot(xs, np.exp(-xs**2 / 2) / np.sqrt(2 * np.pi))
ax.set(xlim=(-5, 5))
ax.legend()
plt.show()

In [None]:
plot_whitening(frequencies, mesa_asd, h_f_mesa, wh_f_mesa, bin_width_Hz=8)