In [1]:
import numpy as np
import pandas as pd
from pathlib import Path


def make_point_clouds(n_samples_per_shape: int, n_points: int, noise: float):
    """Make point clouds for circles, spheres, and tori with random noise.
    """
    circle_point_clouds = [
        np.asarray(
            [
                [np.sin(t) + noise * (np.random.rand(1)[0] - 0.5), np.cos(t) + noise * (np.random.rand(1)[0] - 0.5), 0]
                for t in range((n_points ** 2))
            ]
        )
        for kk in range(n_samples_per_shape)
    ]
    # label circles with 0
    circle_labels = np.zeros(n_samples_per_shape)

    sphere_point_clouds = [
        np.asarray(
            [
                [
                    np.cos(s) * np.cos(t) + noise * (np.random.rand(1)[0] - 0.5),
                    np.cos(s) * np.sin(t) + noise * (np.random.rand(1)[0] - 0.5),
                    np.sin(s) + noise * (np.random.rand(1)[0] - 0.5),
                ]
                for t in range(n_points)
                for s in range(n_points)
            ]
        )
        for kk in range(n_samples_per_shape)
    ]
    # label spheres with 1
    sphere_labels = np.ones(n_samples_per_shape)

    torus_point_clouds = [
        np.asarray(
            [
                [
                    (2 + np.cos(s)) * np.cos(t) + noise * (np.random.rand(1)[0] - 0.5),
                    (2 + np.cos(s)) * np.sin(t) + noise * (np.random.rand(1)[0] - 0.5),
                    np.sin(s) + noise * (np.random.rand(1)[0] - 0.5),
                ]
                for t in range(n_points)
                for s in range(n_points)
            ]
        )
        for kk in range(n_samples_per_shape)
    ]
    # label tori with 2
    torus_labels = 2 * np.ones(n_samples_per_shape)

    point_clouds = np.concatenate((circle_point_clouds, sphere_point_clouds, torus_point_clouds))
    labels = np.concatenate((circle_labels, sphere_labels, torus_labels))

    return point_clouds, labels


def make_gravitational_waves(
    path_to_data: Path,
    n_signals: int = 30,
    downsample_factor: int = 2,
    r_min: float = 0.075,
    r_max: float = 0.65,
    n_snr_values: int = 10,
        ):
    def padrand(V, n, kr):
        cut = np.random.randint(n)
        rand1 = np.random.randn(cut)
        rand2 = np.random.randn(n - cut)
        out = np.concatenate((rand1 * kr, V, rand2 * kr))
        return out

    Rcoef = np.linspace(r_min, r_max, n_snr_values)
    Npad = 500  # number of padding points on either side of the vector
    gw = np.load(path_to_data / "gravitational_wave_signals.npy")
    Norig = len(gw["data"][0])
    Ndat = len(gw["signal_present"])
    N = int(Norig / downsample_factor)

    ncoeff = []
    Rcoeflist = []

    for j in range(n_signals):
        ncoeff.append(10 ** (-19) * (1 / Rcoef[j % n_snr_values]))
        Rcoeflist.append(Rcoef[j % n_snr_values])

    noisy_signals = []
    gw_signals = []
    k = 0
    labels = np.zeros(n_signals)

    for j in range(n_signals):
        signal = gw["data"][j % Ndat][range(0, Norig, downsample_factor)]
        sigp = int((np.random.randn() < 0))
        noise = ncoeff[j] * np.random.randn(N)
        labels[j] = sigp
        if sigp == 1:
            rawsig = padrand(signal + noise, Npad, ncoeff[j])
            if k == 0:
                k = 1
        else:
            rawsig = padrand(noise, Npad, ncoeff[j])
        noisy_signals.append(rawsig.copy())
        gw_signals.append(signal)

    return noisy_signals, gw_signals, labels

In [2]:
import matplotlib.pyplot as plt

def plot_rips_complex(data, R, label=" data" , col=1 , maxdim=2):
    tab10 = plt.get_cmap('tab10')
    
    fig, ax = plt.subplots(figsize =(6,6))
    ax.set_title(label)
    ax.scatter(
        data [:,0], data[:,1], label=label,
        s=8, alpha=0.9, c=np.array(tab10([col] * len(data)))
    )
    
    for i , xy in enumerate(data):
        if maxdim >= 1:
            for j in range(i + 1, len(data)):
                pq = data[j]
                if (xy != pq) . all() and (np.linalg.norm(xy - pq) <= R):
                    pts = np.array([xy ,pq])
                    ax.plot(pts[:,0], pts[:,1], color="pink", alpha=0.05, linewidth=1)
                if maxdim == 2:
                    for k in range (j + 1 ,len(data)):
                        ab = data[k]
                        if((ab != pq) . all () 
                            and (np.linalg.norm(xy - pq) <= R) 
                            and (np.linalg.norm(xy - ab) <= R)
                            and (np.linalg.norm(pq - ab) <= R)
                        ):
                            pts = np.array([xy, pq, ab])
                            ax.fill(pts[:,0], pts[:,1], facecolor="red", alpha=0.05)
                        pass
    plt.axis('equal')
    plt.tight_layout()
    plt.show()
    pass

In [3]:
#from data.generate_datasets import make_gravitational_waves
from pathlib import Path

Rmin = 0.075
Rmax = 0.65
n_signals = 10000
DATA = Path("./data")

noisy_signals, gw_signals, labels = make_gravitational_waves(
    path_to_data=DATA, n_signals=n_signals, r_min=Rmin, r_max=Rmax, n_snr_values=100
)

print(f"Number of noisy signals: {len(noisy_signals)}")
print(f"Number of timesteps per series: {len(noisy_signals[0])}")

Number of noisy signals: 10000
Number of timesteps per series: 8692


Next let's visualise the two different types of time series that we wish to classify: one that is pure noise vs. one that is composed of noise plus an embedded gravitational wave signal:

In [4]:
import numpy as np
from plotly.subplots import make_subplots
import plotly.graph_objects as go

# Standarize data
noisy_signals = np.array(noisy_signals)
noisy_signals = (noisy_signals - np.mean(noisy_signals)) / np.std(noisy_signals)

gw_signals = np.array(gw_signals)
gw_signals = (gw_signals - np.mean(gw_signals)) / np.std(gw_signals)


# get the index corresponding to the first pure noise time series
background_idx = np.argmin(labels)
# get the index corresponding to the first noise + gravitational wave time series
signal_idx = np.argmax(labels)

ts_noise = noisy_signals[background_idx]
ts_background = noisy_signals[signal_idx]
ts_signal = gw_signals[signal_idx]

fig = make_subplots(rows=1, cols=2)

fig.add_trace(
    go.Scatter(x=list(range(len(ts_noise))), y=ts_noise, mode="lines", name="noise"),
    row=1,
    col=1,
)

fig.add_trace(
    go.Scatter(
        x=list(range(len(ts_background))),
        y=ts_background,
        mode="lines",
        name="background",
    ),
    row=1,
    col=2,
)

fig.add_trace(
    go.Scatter(x=list(range(len(ts_signal))), y=ts_signal, mode="lines", name="signal"),
    row=1,
    col=2,
)
fig.show()

In [5]:
# Calculate SNR for each signal as 20^10 * log10(signal / noise)
snr = np.abs([20 * np.log10(np.linalg.norm(signal) / np.linalg.norm(noise)) for signal, noise in zip(ts_signal, ts_noise)])


# Plot the SNR of each signal, distinguishing between ranks 5-10 dB, 10-25 dB and above as insufficient, acceptable, and excellent respectively
fig = go.Figure()
fig.add_trace(
    go.Scatter(
        x=np.where(np.array(snr) < 10)[0],
        y=np.array(snr)[np.where(np.array(snr) < 10)],
        mode="markers",
        name="Insufficient",
        marker=dict(color="red"),
    )
)
fig.add_trace(
    go.Scatter(
        x=np.where((np.array(snr) >= 10) & (np.array(snr) < 25))[0],
        y=np.array(snr)[np.where((np.array(snr) >= 10) & (np.array(snr) < 25))],
        mode="markers",
        name="Acceptable",
        marker=dict(color="orange"),
    )
)
fig.add_trace(
    go.Scatter(
        x=np.where(np.array(snr) >= 25)[0],
        y=np.array(snr)[np.where(np.array(snr) >= 25)],
        mode="markers",
        name="Excellent",
        marker=dict(color="green"),
    )
)
fig.update_layout(title="Signal-to-Noise Ratio (SNR) of Gravitational Wave Signals")
fig.update_xaxes(title_text="Signal Index")
fig.update_yaxes(title_text="SNR")
fig.show()

In [6]:
sigxsnr = np.array([ts_signal, snr]).T

fig = go.Figure()
fig.add_trace(
    go.Scatter(
        x=sigxsnr[np.where(sigxsnr[:, 1] < 10)][:, 0],
        y=sigxsnr[np.where(sigxsnr[:, 1] < 10)][:, 1],
        mode="markers",
        name="Insufficient",
        marker=dict(color="red"),
    )
)
fig.add_trace(
    go.Scatter(
        x=sigxsnr[np.where((sigxsnr[:, 1] >= 10) & (sigxsnr[:, 1] < 25))][:, 0],
        y=sigxsnr[np.where((sigxsnr[:, 1] >= 10) & (sigxsnr[:, 1] < 25))][:, 1],
        mode="markers",
        name="Acceptable",
        marker=dict(color="orange"),
    )
)
fig.add_trace(
    go.Scatter(
        x=sigxsnr[np.where(sigxsnr[:, 1] >= 25)][:, 0],
        y=sigxsnr[np.where(sigxsnr[:, 1] >= 25)][:, 1],
        mode="markers",
        name="Excellent",
        marker=dict(color="green"),
    )
)
fig.update_layout(title="Signal-to-Noise Ratio (SNR) of Gravitational Wave Signals")
fig.update_xaxes(title_text="Signal value")
fig.update_yaxes(title_text="SNR")
fig.show()