In [None]:
from bayesbeat.data import get_n_entries, get_frequency, get_data
from bayesbeat.result import get_fit
import h5py
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
import nessai
import numpy as np
import numpy.lib.recfunctions as rfn
import pathlib
import pandas as pd
from scipy.stats import norm

nessai.config.plotting.disable_style = True

from utils import (
    get_duration,
    model_colours,
    model_labels,
    compute_residuals,
)

plt.style.use("paper.mplstyle")

In [None]:
outdir = pathlib.Path("figures") / "simulated_data"
outdir.mkdir(exist_ok=True)

file_format = "pdf"

Path to the data file

In [None]:
data_file = "../data/simulated_data.hdf5"
n_ringdowns = get_n_entries(data_file)

In [None]:
injection_parameters = {}
with h5py.File(data_file, "r") as hdf_file:
    for key in hdf_file["parameters/"].keys():
        injection_parameters[key] = hdf_file[f"parameters/{key}"][()]

Path to the result files

In [None]:
results_path = pathlib.Path("../results/bayesbeat_inference_results/simulated_data/")
paths = {
    "model_1_constant_noise": results_path / "model_1_constant_noise/",
    "model_1": results_path / "model_1",
    "model_3": results_path / "model_3",
}

In [None]:
analysis_path = pathlib.Path("../analysis/simulated_data/")
config_files = {
    "model_1_constant_noise": analysis_path / "model_1_constant_noise.ini",
    "model_1": analysis_path / "model_1.ini",
    "model_3": analysis_path / "model_3.ini",
}

Load the posterior samples for each ringdown

In [None]:
parameters = ["tau_1", "tau_2", "domega", "dphi", "log10_a_1", "a_ratio", "log10_a_scale"]
posteriors = {}
for key, path in paths.items():
    posteriors[key] = []
    for index in range(n_ringdowns):
        result_file = path / f"result_ringdown_{index}.hdf5"
        if not result_file.exists():
            print(f"Missing file: {result_file}")
            posteriors[key].append(None)
            continue
        with h5py.File(result_file, "r") as res_file:
            post = res_file["posterior_samples"][()]
            params = [p for p in parameters if p in post.dtype.names]
            post = rfn.append_fields(post, "a_1", data=10 ** post["log10_a_1"])
            post = rfn.append_fields(post, "a_2", post["a_1"] * post["a_ratio"])
            params = params + ["a_1", "a_2"]
            posteriors[key].append(post[params])

Load the log-evidence.

**Note:** by default, this will be in base $e$.

In [None]:
log_z = {}
for key, path in paths.items():
    log_z[key] = np.empty(n_ringdowns)
    for index in range(n_ringdowns):
        result_file = path / "analysis" / f"index_{index}" / "result.hdf5"
        try:
            with h5py.File(result_file, "r") as res_file:
                log_z[key][index] = res_file["log_evidence"][()]
        except OSError:
            log_z[key][index] = np.nan

In [None]:
injections = [14, 31]
for inj in injections:
    print(f"log10 BF (1 vs 1 (stationary)) - {inj}: {(log_z['model_1'][inj] - log_z['model_1_constant_noise'][inj]) / np.log(10)}")
    print(f"log10 BF (3 vs 1 (same noise)) - {inj}: {(log_z['model_3'][inj] - log_z['model_1'][inj]) / np.log(10)}")
    print(f"log10 BF (3 vs 1 (stationary)) - {inj}: {(log_z['model_3'][inj] - log_z['model_1_constant_noise'][inj]) / np.log(10)}")

In [None]:
durations = np.empty(n_ringdowns)
for index in range(n_ringdowns):
    durations[index] = get_duration(data_file, index)

In [None]:
freqs = np.empty(n_ringdowns)
for index in range(n_ringdowns):
    freqs[index] = get_frequency(data_file, index)

## Figures 5, 7

In [None]:
indices = [14, 31]
combinations = [
    ["model_1_constant_noise", "model_1"],
    ["model_1", "model_3"],
]
for index in indices:
    for to_plot in combinations:
        x_data, y_data, frequency, _ = get_data(data_file, index=index)

        dt = (x_data[1] - x_data[0])
        sample_rate = 1 / dt
        window = 150
        window_duration = window * dt

        results = {}
        for key in to_plot:
            path = paths[key]
            d = dict()
            result_file = path / f"result_ringdown_{index}.hdf5"
            with h5py.File(result_file, "r") as res_file:
                posterior_samples = res_file["posterior_samples"][()]
                d["log_z"] = res_file["log_evidence"][()]
            # Get sample with max. log-likelihood
            max_logl_idx = np.argmax(posterior_samples["logL"])
            # Get noise sigmas

            d["sigma_constant"] = posterior_samples["sigma_constant_noise"][max_logl_idx]
            d["sigma_amp"] = (
                posterior_samples["sigma_amp_noise"][max_logl_idx]
                if "sigma_amp_noise" in posterior_samples.dtype.names else 0
            )
            model_kwargs = {}
            if key == "model_3":
                model_kwargs["compile"] = False
            d["y_fit"] = get_fit(
                config_file=config_files[key],
                result_file=result_file,
                datafile=data_file,
                index=index,
                method="max",
                **model_kwargs,
            )
            results[key] = d
        figsize = plt.rcParams["figure.figsize"].copy()
        figsize[0] = 1.8 * figsize[0]
        figsize[1] = 1.6 * figsize[1]
        fig, axs = plt.subplot_mosaic(
            [
                ["fit", "fit", "fit", "fit"],
                ["fit", "fit", "fit", "fit"],
                ["res", "res", "res", "res"],
                ["mean", "mean", "mean", "mean"],
                ["std", "std", "std", "std"],
            ],
            figsize=figsize,
        )
        axs["fit"].scatter(x_data, y_data, color="k", s=1, label="Data", rasterized=True)

        for i, (key, res) in enumerate(results.items()):
            y_fit = res["y_fit"]
            sigma_constant_noise = res["sigma_constant"]
            sigma_amp_noise = res["sigma_amp"]
            ln_evidence = res["log_z"]
            colour = model_colours[key]

            axs["fit"].plot(x_data, y_fit, c=colour, label=model_labels[key])
            axs["fit"].set_yscale("log")

            residuals = compute_residuals(
                y_data,
                y_fit,
                sigma_constant_noise=sigma_constant_noise,
                sigma_amp_noise=sigma_amp_noise,
            )
            
            axs["res"].scatter(x_data, residuals, s=1, c=colour, lw=0.0, label=model_labels[key], rasterized=True)

            res = pd.Series(residuals)
            axs["mean"].plot(x_data, res.rolling(window).mean().values, c=colour)
            axs["mean"].axhline(0.0, c="k")
            axs["std"].plot(x_data, res.rolling(window).std().values, c=colour)
            axs["std"].axhline(1.0, c="k")

        axs["std"].set_xlabel("Time [s]")
        # axs["res"].legend()
        axs["fit"].set_ylabel("Scaled amplitude")
        axs["res"].set_ylabel(r"$\mathcal{R}$")

        axs["mean"].set_ylabel(r"$\mu_\mathcal{R}$")
        axs["std"].set_ylabel(r"$\sigma_{\mathcal{R}}$")

        axs["fit"].tick_params(labelbottom=False)
        axs["res"].tick_params(labelbottom=False)
        axs["mean"].tick_params(labelbottom=False)

        axs["fit"].sharex(axs["res"])
        axs["res"].sharex(axs["mean"])
        axs["mean"].sharex(axs["std"])
        axs["std"].set_xlim(0, x_data[-1])

        # axs["res"].set_ylim(0.5, 2.5)

        log10_bf_fit = (results[to_plot[1]]["log_z"] - results[to_plot[0]]["log_z"]) / np.log(10)

        axs["fit"].set_title(r"$\log_{10} \mathcal{B}_{B/A} = " + f"{log10_bf_fit:.1f}" + "$")

        legend_handles = (
            [Line2D([0], [0], ls="", marker=".", label="Data", color="k")] 
            + [
                Line2D([0], [0], color=model_colours[key], label=model_labels[key])
                for key in to_plot
            ]
        )

        fig.legend(
            handles=legend_handles,
            loc="center",
            ncol=3,
            bbox_to_anchor=(0.5, -0.0)
        )

        fig.savefig(outdir / f"{'_'.join(to_plot)}_{index}.{file_format}", bbox_inches="tight")

        plt.show()
        plt.close(fig)

## Figure 6

In [None]:
model_a = "model_1"
model_b = "model_1_constant_noise"
log10_bf = (log_z[model_a] - log_z[model_b]) / np.log(10)

a_total = injection_parameters["a_1"] + injection_parameters["a_ratio"] * injection_parameters["a_1"]
tau_diff = injection_parameters["tau_1"] - injection_parameters["tau_2"]
colour = tau_diff

vmin, vmax = -max(abs(colour)), max(abs(colour))

fig = plt.figure()
plt.scatter(a_total, log10_bf, c=colour, cmap="managua")
plt.xlabel(r"$a_\text{T}$")
plt.yscale("log")
plt.ylabel(r"$\log_{10}\mathcal{B}$")
plt.colorbar(label=r"$\tau_1 - \tau_2$")
plt.title(f"{model_labels[model_a]} vs. {model_labels[model_b]}")
fig.savefig(outdir / f"total_amplitude_vs_log10_bf_noise_comparison_simulated.{file_format}")
plt.show()

## Figure 8

In [None]:
model_a = "model_3"
model_b = "model_1"
log10_bf_signal = (log_z[model_a] - log_z[model_b]) / np.log(10)

a_total = injection_parameters["a_1"] + injection_parameters["a_ratio"] * injection_parameters["a_1"]
tau_diff = injection_parameters["tau_1"] - injection_parameters["tau_2"]
colour = tau_diff

vmin, vmax = -max(abs(colour)), max(abs(colour))

fig = plt.figure()
plt.scatter(a_total, log10_bf_signal, c=colour, cmap="managua")
plt.xlabel(r"$a_\text{T}$")
plt.yscale("log")
plt.ylabel(r"$\log_{10}\mathcal{B}$")
plt.colorbar(label=r"$\tau_1 - \tau_2$")
plt.title(f"{model_labels[model_a]} vs. {model_labels[model_b]}")
fig.savefig(outdir / f"total_amplitude_vs_log10_bf_signal_comparison_simulated.{file_format}")
plt.show()

## Figure 9

In [None]:
ringdowns_to_plot = [31]
for index in ringdowns_to_plot:

    stats = {}
    parameters = ["tau_1", "tau_2", "domega"]

    parameter_labels = {
        "a_ratio": r"$\rho - \rho^\text{True}$",
        "tau_1": r"$\widehat{\tau_1} - \tau_1$",
        "tau_2": r"$\widehat{\tau_2} - \tau_2$",
        "domega": r"$\widehat{\Delta f} - \Delta f\;[\mathrm{\mu}\mathrm{Hz}]$",
        "dphi": r"$\Delta\phi - \Delta\phi^{\text{True}}$",
    }

    true_values = {k: injection_parameters[k][index] for k in parameters}

    posterior_samples = {}

    for key, path in paths.items():
        result_file = path / f"result_ringdown_{index}.hdf5"
        with h5py.File(result_file, "r") as res_file:
            ps = res_file["posterior_samples"][()]
        posterior_samples[key] = ps


    figsize = plt.rcParams["figure.figsize"].copy()
    figsize[0] = 1.2 * figsize[0]
    figsize[1] = 0.8 * figsize[1]
    fig, axs = plt.subplot_mosaic([parameters], figsize=figsize)

    hist_kwargs = dict(
        density=True,
        histtype="step",
    )

    factors = {
        "tau_1": 1,
        "tau_2": 1,
        "domega": 1e6 / (2 * np.pi),
    }

    for parameter in parameters:
        true_val = injection_parameters[parameter][index]
        for key, samples in posterior_samples.items():
            axs[parameter].hist((samples[parameter] - true_val) * factors[parameter], **hist_kwargs)
        axs[parameter].axvline(0, color="k")
        axs[parameter].set_xlabel(parameter_labels[parameter])
        axs[parameter].set_yticklabels([])

    handles = [
        Line2D([0], [0], color=model_colours[key], label=model_labels[key])
        for key in paths.keys()
    ]
    handles.append(Line2D([0], [0], color="k", label="True Value"))

    fig.legend(
        handles=handles,
        ncol=2,
        loc="center",
        bbox_to_anchor=(0.5, -0.05),
        frameon=False,
    )

    plt.tight_layout()
    subdir = outdir / "signal_model_comparison" / "simulated_data_posteriors"
    subdir.mkdir(parents=True, exist_ok=True)
    fig.savefig(subdir / f"parameter_comparison_{index}.{file_format}", bbox_inches="tight")
    plt.show()
    plt.close(fig)

## Figure 10 - Biases in decay parametrs

In [None]:
figsize = plt.rcParams["figure.figsize"].copy()
figsize[0] = 1.5 * figsize[0]
fig, axs = plt.subplots(1, 2, sharey=True, figsize=figsize)

parameters = ["tau_1", "tau_2"]
# Error region to check
sigma = 3
q_vals = norm.cdf([-sigma, 0, sigma], loc=0, scale=1)

injections_with_noise_floor = np.array([
    8, 14, 18, 32,
])
ring_size = 50

quantiles = {}
for j, model in enumerate(["model_1", "model_3"]):
    quantiles[model] = {}
    for parameter in parameters:
        quantiles[model][parameter] = np.nan * np.ones([50, 3])
        for i in range(n_ringdowns):
            if posteriors[model][i] is None:
                continue
            quantiles[model][parameter][i] = np.quantile(posteriors[model][i][parameter], q_vals)


colours = model_colours.copy()

for key, quant in quantiles.items():
    for i, (parameter, values) in enumerate(quant.items()):
        # print(values)
        correct = np.logical_and(
            values[:, 0] < injection_parameters[parameter],
            values[:, 2] > injection_parameters[parameter],
        )
        missed = np.where(~correct)[0]
        print(f"Missed injections for {key} tau_{i+1}: {missed} ({len(missed)} total)")

        tau = injection_parameters[parameter] 
        error = (values[:, 1] - injection_parameters[parameter])# / injection_parameters[parameter]
        axs[i].scatter(tau[correct], error[correct], color=colours[key], marker=".")
        axs[i].scatter(tau[~correct], error[~correct], color=colours[key], marker="x")

        if key == "model_3":
            axs[i].scatter(
                tau[injections_with_noise_floor],
                error[injections_with_noise_floor],
                marker="o",
                facecolor="none",
                edgecolor="k",
                s=ring_size,
            )


for ax in axs:
    ax.set_ylim(-0.5, 0.5)
    # ax.set_yticks([-2e-1, -1e-1, 0, 1e-1, 2e-1])
    # ax.set_yticks([-1.5e-1, -0.5e-1, 0.5e-1, 1.5e-1], minor=True)
    # enable minor ticks
    # ax.set

axs[0].set_xlabel(r"$\tau_1$")
axs[1].set_xlabel(r"$\tau_2$")

axs[0].set_ylabel(r"$\tau - \tau^\text{True}$")

legend = fig.legend(
    handles=[
        Line2D([0], [0], color=colours["model_1"], marker=".", linestyle="None", label=r"$M_1$"),
        Line2D([0], [0], color=colours["model_3"], marker=".", linestyle="None", label=r"$M_3$ - 7 terms"),
        Line2D([0], [0], color="black", marker="x", linestyle="None", label=r"Outside 3-$\sigma$ CI"),
        Line2D([0], [0], color="black", marker="o", linestyle="None", label=r"Signal below noise floor", markerfacecolor="none", markeredgecolor="k"),
    ],
    frameon=False,
    fancybox=False,
    edgecolor="black",
    loc="center",
    bbox_to_anchor=(0.5, -0.1),
    ncol=2,
    
)
# legend.get_frame().set_linewidth(0.5)

# plt.tight_layout()
fig.savefig(outdir / f"tau_error.{file_format}", bbox_inches="tight")

plt.show()
