# Gravitational wave detection



In [None]:
import numpy as np # Import the NumPy library for numerical operations
from pathlib import Path # Import the Path class from the pathlib module to handle file system paths
from scipy.stats import norm # Import the norm class from scipy.stats for statistical functions

# Poner comentarios en cada linea de que significa cada cosa
# En alguna parte se asegura que los datos esten balanceados.
# Tarea: Generar datos con imbalance (75 señal, 25 ruido) (25-75) (90-10) (10-90)

# Define the main function `make_gravitational_waves`
def make_gravitational_waves(
    path_to_data: Path, # Path to the directory containing the data
    n_signals: int = 30, # Number of signals to generate
    downsample_factor: int = 2, # Factor by which to downsample the signals
    Rcoef: float = 0.5, # Signal-to-noise ratio (SNR) coefficient
    perc_signal: float = 0.5, # Data partitioning factor
        ):
    def padrand(V, n, kr):
        cut = np.random.randint(n) # Generate a random integer to determine the split point for padding
        rand1 = np.random.randn(cut) # Create random noise for the first part of the padding
        rand2 = np.random.randn(n - cut) # Create random noise for the second part of the padding
        
        # Concatenate the first padding, the input signal `V`, and the second padding
        # Scale the padding by the factor `kr`
        out = np.concatenate((rand1 * kr, V, rand2 * kr))
        return out

    Npad = 50  # number of padding points on either side of the vector
    gw = np.load("../data/gravitational_wave_signals.npy") # Load data
    Norig = len(gw["data"][0]) # Get the original number of data points in each signal
    Ndat = len(gw["signal_present"]) # Get the total number of signals in the dataset todo es uno
    N = int(Norig / downsample_factor) # Calculate the number of data points after downsampling

    # Initialize lists to store noise coefficients and SNR coefficients
    ncoeff = []
    Rcoeflist = []

    # Loop through the number of signals to generate noise coefficients and SNR coefficients
    for j in range(n_signals):
        # Calculate the noise coefficient based on the R coefficient
        ncoeff.append(10 ** (-19) * (1 / Rcoef))
        # Append the corresponding R coefficient to the list
        Rcoeflist.append(Rcoef)

    # Initialize variables
    noisy_signals = []
    gw_signals = []
    k = 0
    labels = np.zeros(n_signals)

    # Loop through the number of signals to generate noisy signals and labels
    for j in range(n_signals):
        # Select a signal from the dataset and downsample it
        signal = gw["data"][j % Ndat][range(0, Norig, downsample_factor)]
        
        # Randomly decide if the signal is present (1) or absent (0)
        crit_val = norm.ppf(perc_signal) # Critical value for the normal distributio
        sigp = int((np.random.randn() < crit_val))
        
        # Generate random noise scaled by the noise coefficient
        noise = ncoeff[j] * np.random.randn(N)
        
        # Assign the label based on whether the signal is present
        labels[j] = sigp
        
        # If the signal is present, add it to the noise and pad the result
        if sigp == 1:
            rawsig = padrand(signal + noise, Npad, ncoeff[j])
            
            # Ensure at least one signal is present in the dataset
            if k == 0:
                k = 1
        else:
            # If the signal is absent, pad only the noise
            rawsig = padrand(noise, Npad, ncoeff[j])
        
        # Append the padded noisy signal to the list
        noisy_signals.append(rawsig.copy())
        
        # Append the original signal to the list
        gw_signals.append(signal)
    
    # Return the generated noisy signals, original signals, and labels
    return noisy_signals, gw_signals, labels

In [19]:
# Dataset 50-50
# generate and get data
R = 0.50 # Maximum signal-to-noise ratio (SNR) coefficient
n_signals = 100 # Number of signals to generate
DATA = Path(".") # Path to the directory containing the data
dataPartition = 0.5 # Data partitioning factor
noisy_signals_50_50, gw_signals_50_50, labels_50_50 = make_gravitational_waves(
    path_to_data=DATA, n_signals=n_signals, Rcoef=R, perc_signal=dataPartition
)
noisy_signals_50_50 = np.array(noisy_signals_50_50) # Convert the list of noisy signals to a NumPy array
gw_signals_50_50 = np.array(gw_signals_50_50) # Convert the list of original signals to a NumPy array
labels_50_50 = np.array(labels_50_50) # Convert the labels list to a NumPy array

# Dataset 25-75
dataPartition = 0.25 # Data partitioning factor
noisy_signals_25_75, gw_signals_25_75, labels_25_75 = make_gravitational_waves(
    path_to_data=DATA, n_signals=n_signals, Rcoef=R, perc_signal=dataPartition
)
noisy_signals_25_75 = np.array(noisy_signals_25_75) # Convert the list of noisy signals to a NumPy array
gw_signals_25_75 = np.array(gw_signals_25_75) # Convert the list of original signals to a NumPy array
labels_25_75 = np.array(labels_25_75) # Convert the labels list to a NumPy array

# Dataset 75-25
dataPartition = 0.75 # Data partitioning factor
noisy_signals_75_25, gw_signals_75_25, labels_75_25 = make_gravitational_waves(
    path_to_data=DATA, n_signals=n_signals, Rcoef=R, perc_signal=dataPartition
)
noisy_signals_75_25 = np.array(noisy_signals_75_25) # Convert the list of noisy signals to a NumPy array
gw_signals_75_25 = np.array(gw_signals_75_25) # Convert the list of original signals to a NumPy array
labels_75_25 = np.array(labels_75_25) # Convert the labels list to a NumPy array

# Dataset 10-90
dataPartition = 0.10 # Data partitioning factor
noisy_signals_10_90, gw_signals_10_90, labels_10_90 = make_gravitational_waves(
    path_to_data=DATA, n_signals=n_signals, Rcoef=R, perc_signal=dataPartition
)
noisy_signals_10_90 = np.array(noisy_signals_10_90) # Convert the list of noisy signals to a NumPy array
gw_signals_10_90 = np.array(gw_signals_10_90) # Convert the list of original signals to a NumPy array
labels_10_90 = np.array(labels_10_90) # Convert the labels list to a NumPy array

# Dataset 90-10
dataPartition = 0.90 # Data partitioning factor
noisy_signals_90_10, gw_signals_90_10, labels_90_10 = make_gravitational_waves(
    path_to_data=DATA, n_signals=n_signals, Rcoef=R, perc_signal=dataPartition
)
noisy_signals_90_10 = np.array(noisy_signals_90_10) # Convert the list of noisy signals to a NumPy array
gw_signals_90_10 = np.array(gw_signals_90_10) # Convert the list of original signals to a NumPy array
labels_90_10 = np.array(labels_90_10) # Convert the labels list to a NumPy array

In [20]:
print(labels_50_50.sum()) # Print the sum of labels to check the number of signals present
print(labels_25_75.sum()) # Print the sum of labels to check the number of signals present
print(labels_75_25.sum()) # Print the sum of labels to check the number of signals present
print(labels_10_90.sum()) # Print the sum of labels to check the number of signals present
print(labels_90_10.sum()) # Print the sum of labels to check the number of signals present

49.0
30.0
79.0
7.0
89.0


In [21]:
len(noisy_signals_50_50[0])
time_delay = 2
dimension = 50
from gtda.time_series import TakensEmbedding
TE = TakensEmbedding(time_delay=time_delay, dimension=dimension)
data = TE.fit_transform(noisy_signals_50_50)

print(data.shape)
print(len(data))
print(noisy_signals_50_50.shape)
print(len(noisy_signals_50_50))
data[0]

(100, 4048, 50)
100
(100, 4146)
100


array([[ 1.34027218e-19, -3.58957443e-19,  1.00876066e-19, ...,
        -2.68485533e-19,  1.47052326e-19,  2.45399686e-19],
       [ 1.69100204e-19,  2.96094893e-19, -4.29817537e-19, ...,
         2.28681812e-19, -2.45026725e-20, -4.19326339e-20],
       [-3.58957443e-19,  1.00876066e-19,  5.41672429e-20, ...,
         1.47052326e-19,  2.45399686e-19,  3.32039855e-19],
       ...,
       [-3.38283166e-19, -1.62368914e-19,  1.27224932e-19, ...,
        -2.12972656e-19,  1.22209308e-19,  1.25598404e-19],
       [-1.06045012e-19, -3.22780352e-19,  2.43646126e-20, ...,
         5.45488392e-20, -1.04393620e-19, -4.62985828e-19],
       [-1.62368914e-19,  1.27224932e-19, -3.05822748e-19, ...,
         1.22209308e-19,  1.25598404e-19,  1.35458808e-19]])

# Pipeline Creation

In [36]:
from gtda.time_series import SlidingWindow
from gtda.time_series import TakensEmbedding
from gtda.homology import VietorisRipsPersistence
from gtda.diagrams import Amplitude
from gtda.pipeline import make_pipeline
from sklearn import set_config
from sklearn.ensemble import RandomForestRegressor

time_delay = 30
dimension = 100

TE = TakensEmbedding(time_delay=time_delay, dimension=dimension, stride=stride)
VR = VietorisRipsPersistence() 
Ampl = Amplitude()
RFR = RandomForestRegressor()
pipe = make_pipeline(TE, VR, Ampl, RFR)
pipe

In [37]:
pipe_50_50 = make_pipeline(TE, VR, Ampl, RFR)
pipe_50_50.fit(noisy_signals_50_50, labels_50_50)
pipe_50_50.score(noisy_signals_50_50, labels_50_50)

-1.0244097638834049e-05

In [38]:
pipe_25_75 = make_pipeline(TE, VR, Ampl, RFR)
pipe_25_75.fit(noisy_signals_25_75, labels_25_75)
pipe_25_75.score(noisy_signals_25_75, labels_25_75)

-1.904761904802932e-05

In [39]:
pipe_75_25 = make_pipeline(TE, VR, Ampl, RFR)
pipe_75_25.fit(noisy_signals_75_25, labels_75_25)
pipe_75_25.score(noisy_signals_75_25, labels_75_25)

-5.424954792054848e-05

In [40]:
pipe_10_90 = make_pipeline(TE, VR, Ampl, RFR)
pipe_10_90.fit(noisy_signals_10_90, labels_10_90)
pipe_10_90.score(noisy_signals_10_90, labels_10_90)

-3.0107526881595348e-05

In [41]:
pipe_90_10 = make_pipeline(TE, VR, Ampl, RFR)
pipe_90_10.fit(noisy_signals_90_10, labels_90_10)
pipe_90_10.score(noisy_signals_90_10, labels_90_10)

-0.0001323799795711622