In [19]:
import numpy as np
from pathlib import Path
from scipy.signal import butter, filtfilt

def butter_lowpass_filter(data, cutoff, fs, order=5):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    y = filtfilt(b, a, data)
    return y

def preprocess_signals(signals, cutoff, fs):
    filtered_signals = [butter_lowpass_filter(signal, cutoff, fs) for signal in signals]
    normalized_signals = [(signal - np.mean(signal)) / np.std(signal) for signal in filtered_signals]
    return normalized_signals


def make_gravitational_waves(
    path_to_data: Path,
    n_signals: int = 30,
    downsample_factor: int = 2,
    r_min: float = 0.075,
    r_max: float = 0.65,
    n_snr_values: int = 10,
        ):
    def padrand(V, n, kr):
        cut = np.random.randint(n)
        rand1 = np.random.randn(cut)
        rand2 = np.random.randn(n - cut)
        out = np.concatenate((rand1 * kr, V, rand2 * kr))
        return out

    Rcoef = np.linspace(r_min, r_max, n_snr_values)
    Npad = 500  # number of padding points on either side of the vector
    gw = np.load(path_to_data / "gravitational_wave_signals.npy")
    Norig = len(gw["data"][0])
    Ndat = len(gw["signal_present"])
    N = int(Norig / downsample_factor)

    ncoeff = []
    Rcoeflist = []

    for j in range(n_signals):
        ncoeff.append(10 ** (-19) * (1 / Rcoef[j % n_snr_values]))
        Rcoeflist.append(Rcoef[j % n_snr_values])

    noisy_signals = []
    gw_signals = []
    k = 0
    labels = np.zeros(n_signals)

    for j in range(n_signals):
        signal = gw["data"][j % Ndat][range(0, Norig, downsample_factor)]
        sigp = int((np.random.randn() < 0))
        noise = ncoeff[j] * np.random.randn(N)
        labels[j] = sigp
        if sigp == 1:
            rawsig = padrand(signal + noise, Npad, ncoeff[j])
            if k == 0:
                k = 1
        else:
            rawsig = padrand(noise, Npad, ncoeff[j])
        noisy_signals.append(rawsig.copy())
        gw_signals.append(signal)

    # Preprocesar señales
    cutoff = 0.1  # Frecuencia de corte del filtro pasa bajo
    fs = 1.0  # Frecuencia de muestreo (debe ajustarse según tus datos)
    noisy_signals = preprocess_signals(noisy_signals, cutoff, fs)
    gw_signals = preprocess_signals(gw_signals, cutoff, fs)

    return noisy_signals, gw_signals, labels

In [20]:
R = 0.65
n_signals = 1500
DATA = Path(".")

noisy_signals, gw_signals, labels = make_gravitational_waves(
    path_to_data=DATA, n_signals=n_signals, r_min=R, r_max=R, n_snr_values=1
)

print(f"Número de señales con ruido: {len(noisy_signals)}")
print(f"Número de pasos en el tiempo por serie: {len(gw_signals[0])}","\n")

Número de señales con ruido: 1500
Número de pasos en el tiempo por serie: 8192 



In [21]:
import numpy as np
from plotly.subplots import make_subplots
import plotly.graph_objects as go

# get the index corresponding to the first pure noise time series
background_idx = np.argmin(labels)
# get the index corresponding to the first noise + gravitational wave time series
signal_idx = np.argmax(labels)

ts_noise = noisy_signals[background_idx]
ts_background = noisy_signals[signal_idx]
ts_signal = gw_signals[signal_idx]

fig = make_subplots(rows=1, cols=2)

fig.add_trace(
    go.Scatter(x=list(range(len(ts_noise))), y=ts_noise, mode="lines", name="noise"),
    row=1,
    col=1,
)

fig.add_trace(
    go.Scatter(
        x=list(range(len(ts_background))),
        y=ts_background,
        mode="lines",
        name="background",
    ),
    row=1,
    col=2,
)

fig.add_trace(
    go.Scatter(x=list(range(len(ts_signal))), y=ts_signal, mode="lines", name="signal"),
    row=1,
    col=2,
)
fig.show()

In [22]:
from gtda.time_series import SingleTakensEmbedding
embedding_dimension = 30
embedding_time_delay = 30
stride = 5

embedder = SingleTakensEmbedding(
    parameters_type="search", n_jobs=6, time_delay=embedding_time_delay, dimension=embedding_dimension, stride=stride
)

y_gw_embedded = embedder.fit_transform(gw_signals[0])

In [23]:
from sklearn.decomposition import PCA
from gtda.plotting import plot_point_cloud

pca = PCA(n_components=3)
y_gw_embedded_pca = pca.fit_transform(y_gw_embedded)

plot_point_cloud(y_gw_embedded_pca)

In [24]:
embedding_dimension = 30
embedding_time_delay = 30
stride = 5

embedder = SingleTakensEmbedding(
    parameters_type="search", n_jobs=6, time_delay=embedding_time_delay, dimension=embedding_dimension, stride=stride
)

y_noise_embedded = embedder.fit_transform(noisy_signals[background_idx])

pca = PCA(n_components=3)
y_noise_embedded_pca = pca.fit_transform(y_noise_embedded)

plot_point_cloud(y_noise_embedded_pca)

In [26]:
import numpy as np
import kmapper as km
from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
import matplotlib.pyplot as plt

# Asegurarse de que noisy_signals esté en formato numpy array
noisy_signals = np.array(noisy_signals)

# Inicializar el objeto Mapper
mapper = km.KeplerMapper(verbose=1)

# Proyección con PCA
projected_data = mapper.fit_transform(noisy_signals, projection=PCA(n_components=3))

# Creación del grafo con el algoritmo Mapper
graph = mapper.map(projected_data,
                   cover=km.Cover(n_cubes=10, perc_overlap=0.5),
                   clusterer=DBSCAN(eps=0.5, min_samples=3))

# Visualización del grafo
mapper.visualize(graph, path_html="mapper_output.html")

print("Visualización del grafo guardada como 'mapper_output.html'")


KeplerMapper(verbose=1)
..Composing projection pipeline of length 1:
	Projections: PCA(n_components=3)
	Distance matrices: False
	Scalers: MinMaxScaler()
..Projecting on data shaped (1500, 8692)

..Projecting data using: 
	PCA(n_components=3)


..Scaling with: MinMaxScaler()

Mapping on data shaped (1500, 3) using lens shaped (1500, 3)

Creating 1000 hypercubes.

Created 1929 edges and 260 nodes in 0:00:01.367335.
Wrote visualization to: mapper_output.html
Visualización del grafo guardada como 'mapper_output.html'
