In [None]:
from pycbc import distributions
from pycbc.waveform import get_td_waveform
from pycbc.filter import sigma
from pycbc.psd import aLIGOZeroDetHighPower
import pycbc.noise
import numpy as np
from gwpy.timeseries import TimeSeries
from tqdm import tqdm
import os

# === Parameters ===
n_samples = 5000
fs = 4096
duration = 4.0
delta_t = 1.0 / fs
tsamples = int(duration * fs)
flow = 30.0
target_snr_range = (5, 8)

# === Output directories ===
base_dir = "./dataset/raw-data-files-enhanced-data-generation/"
dirs = {
    "signal": os.path.join(base_dir, "BBH/"),
    "noise": os.path.join(base_dir, "noise_template/"),
    "injected": os.path.join(base_dir, "noise-injected-signal/")
}
for d in dirs.values():
    os.makedirs(d, exist_ok=True)

# === PSD for colored noise generation (fixed) ===
delta_f_noise = 1.0 / duration  # 0.25 Hz
flen_noise = int(2048 / delta_f_noise) + 1
psd_noise = aLIGOZeroDetHighPower(flen_noise, delta_f_noise, flow)
psd_noise[psd_noise == 0] = 1e-40
psd_noise = pycbc.types.FrequencySeries(psd_noise, delta_f=delta_f_noise)

# === Mass distribution ===
mass_dist = distributions.Uniform(mass1=(10, 100), mass2=(10, 100))

# === Injection loop ===
valid_count = 0
max_attempts = 10000
i = 0

while valid_count < n_samples and i < max_attempts:
    try:
        # Sample random BBH masses
        m1, m2 = mass_dist.rvs(size=1)[0]
        distance = 1
        inclination = np.random.uniform(0, np.pi)
        spin1z = np.random.uniform(-0.99, 0.99)
        spin2z = np.random.uniform(-0.99, 0.99)

        # Generate waveform
        hp, _ = get_td_waveform(approximant="SEOBNRv2",
                                mass1=m1,
                                mass2=m2,
                                delta_t=delta_t,
                                f_lower=40.0,
                                distance=distance,
                                inclination=inclination,
                                spin1z=spin1z,
                                spin2z=spin2z)

        # Frequency domain waveform
        ht = hp.to_frequencyseries()

        # === NEW: PSD for SNR scaling (match delta_f) ===
        delta_f_sigma = ht.delta_f
        flen_sigma = int(2048 / delta_f_sigma) + 1
        psd_sigma = aLIGOZeroDetHighPower(flen_sigma, delta_f_sigma, flow)
        psd_sigma[psd_sigma == 0] = 1e-40
        psd_sigma = pycbc.types.FrequencySeries(psd_sigma, delta_f=delta_f_sigma)

        # Calculate original SNR
        snr_orig = sigma(ht, psd=psd_sigma, low_frequency_cutoff=40.0)

        if not np.isfinite(snr_orig) or snr_orig == 0:
            i += 1
            continue

        # Random target SNR and scale
        target_snr = np.random.randint(*target_snr_range)
        hp *= target_snr / snr_orig

        # Generate colored noise using fixed PSD
        noise = pycbc.noise.noise_from_psd(tsamples, delta_t, psd_noise)
        noise_ts = TimeSeries.from_pycbc(noise)

        # Convert waveform to TimeSeries and shift to injection time
        signal_ts = TimeSeries.from_pycbc(hp)
        injection_time = np.random.choice([1, 1.5, 2, 2.5, 3])
        signal_ts.t0 = injection_time

        # Inject signal into noise
        injected_ts = noise_ts.inject(signal_ts)

        # Save all components
        signal_ts.write(os.path.join(dirs["signal"], f"bbh_4k_{valid_count}.txt"))
        noise_ts.write(os.path.join(dirs["noise"], f"noise_4k_{valid_count}.txt"))
        injected_ts.write(os.path.join(dirs["injected"], f"merged_noise_signal_{valid_count}.txt"))

        valid_count += 1

    except Exception as e:
        print(f"Skipping sample {i}: {e}")

    i += 1

if valid_count < n_samples:
    print(f"⚠️ Only generated {valid_count} samples after {max_attempts} attempts.")


In [3]:
from pycbc import distributions
from pycbc.waveform import get_td_waveform
from pycbc.filter import sigma
from pycbc.psd import aLIGOZeroDetHighPower
import pycbc.noise
import numpy as np
from gwpy.timeseries import TimeSeries
from tqdm import tqdm
import os

# === Parameters ===
n_val_samples = 1000
fs = 4096
duration = 4.0
delta_t = 1.0 / fs
tsamples = int(duration * fs)
flow = 30.0
snr_range = (4, 10)

# === Directories ===
val_dir = "./dataset/raw-val-data-files-enhanced-data-generation/"
dirs = {
    "noise": os.path.join(val_dir, "noise"),
    "injected": os.path.join(val_dir, "injected")
}
for d in dirs.values():
    os.makedirs(d, exist_ok=True)

# === Shared PSD for noise generation ===
delta_f_noise = 1.0 / duration
flen_noise = int(2048 / delta_f_noise) + 1
shared_psd = aLIGOZeroDetHighPower(flen_noise, delta_f_noise, flow)
shared_psd[shared_psd == 0] = 1e-40
shared_psd = pycbc.types.FrequencySeries(shared_psd, delta_f=delta_f_noise)

# === Mass distribution ===
mass_dist = distributions.Uniform(mass1=(10, 50), mass2=(10, 50))
mass_samples = mass_dist.rvs(size=n_val_samples)

# === Generation loop ===
generated = 0
attempts = 0
max_attempts = 3000  # Safety cap

while generated < n_val_samples and attempts < max_attempts:
    attempts += 1
    try:
        # === Sample masses and SNR dynamically ===
        m1, m2 = np.random.uniform(10, 50, size=2)
        target_snr = np.random.randint(*snr_range)
        distance = 1 
        inclination = np.random.uniform(0, np.pi)
        spin1z = np.random.uniform(-0.99, 0.99)
        spin2z = np.random.uniform(-0.99, 0.99)

        # Generate waveform
        hp, _ = get_td_waveform(approximant="SEOBNRv2",
                                mass1=m1,
                                mass2=m2,
                                delta_t=delta_t,
                                f_lower=40.0,
                                distance=distance,
                                inclination=inclination,
                                spin1z=spin1z,
                                spin2z=spin2z)

        # PSD for scaling (matched to waveform length)
        delta_f_wave = 1.0 / hp.duration
        flen_wave = int(2048 / delta_f_wave) + 1
        psd_wave = aLIGOZeroDetHighPower(flen_wave, delta_f_wave, flow)
        psd_wave[psd_wave == 0] = 1e-40
        psd_wave = pycbc.types.FrequencySeries(psd_wave, delta_f=delta_f_wave)

        ht = hp.to_frequencyseries()
        snr_orig = sigma(ht, psd=psd_wave, low_frequency_cutoff=40.0)

        if not np.isfinite(snr_orig) or snr_orig == 0:
            continue

        hp *= target_snr / snr_orig

        noise = pycbc.noise.noise_from_psd(tsamples, delta_t, shared_psd)
        noise_ts = TimeSeries.from_pycbc(noise)

        signal_ts = TimeSeries.from_pycbc(hp)
        injection_time = np.random.choice([1, 1.5, 2, 2.5, 3])
        signal_ts.t0 = injection_time

        injected_ts = noise_ts.inject(signal_ts)

        # Save
        noise_ts.write(f"{dirs['noise']}/val_noise_{generated}.txt")
        injected_ts.write(f"{dirs['injected']}/val_injected_{generated}.txt")

        generated += 1

    except Exception as e:
        print(f"⚠️ Skipping sample {generated} (attempt {attempts}): {e}")

if generated < n_val_samples:
    print(f"⚠️ Only generated {generated} samples after {attempts} attempts.")
else:
    print("✅ Validation dataset generation complete.")


✅ Validation dataset generation complete.


In [4]:
import csv
import pandas as pd

In [8]:
#Merging the injected signal into single CSV file
path = "./dataset/raw-data-files-enhanced-data-generation/noise-injected-signal/"
files= os.listdir(path)
f = open("./dataset/raw-data-files-enhanced-data-generation/Final_BBH_Merged_Noise_Signal_Reduced_No_ABS.csv", 'w')
cw = csv.writer(f)

for i in tqdm(files):
    df = pd.read_csv(path+i,sep = ' ', header=None)
    c = df[:][1]
    cw.writerow(c)
f.close()

100%|██████████| 5000/5000 [03:58<00:00, 20.96it/s]


In [9]:
#Merging the noise into single CSV file
path = "./dataset/raw-data-files-enhanced-data-generation/noise_template/"
files= os.listdir(path)
f = open("./dataset/raw-data-files-enhanced-data-generation/Final_Merged_Noise_Reduced_No_Abs.csv", 'w')
cw = csv.writer(f)

for i in tqdm(files):
    df = pd.read_csv(path+i,sep = ' ', header=None)
    if 1 in df.columns:
        c = df[1]
    else:
        c = df[0]
    #c = df[:][1]
    cw.writerow(c)
f.close()

100%|██████████| 5000/5000 [03:18<00:00, 25.19it/s]


In [10]:
#Merging the signal into single CSV file
path = "./dataset/raw-data-files-enhanced-data-generation/BBH/"
files= os.listdir(path)
f = open("./dataset/raw-data-files-enhanced-data-generation/Final_Merged_bbh_Signal_Reduced_No_Abs.csv", 'w')
cw = csv.writer(f)

for i in tqdm(files):
    df = pd.read_csv(path+i,sep = ' ', header=None)
    c = df[:][1]
    cw.writerow(c)
f.close()

100%|██████████| 5000/5000 [00:24<00:00, 207.22it/s]


In [11]:
path = val_dir+"injected/"
files= os.listdir(path)
f = open(val_dir+'val_Final_BBH_Merged_Noise_Signal_Reduced_No_ABS.csv', 'w')
cw = csv.writer(f)

for i in tqdm(files):
    df = pd.read_csv(path+i,sep = ' ', header=None)
    c = df[:][1]
    cw.writerow(c)
f.close()

100%|██████████| 1000/1000 [00:46<00:00, 21.42it/s]


In [12]:
path_1 = val_dir+"noise/"
files_1= os.listdir(path_1)
f1 = open(val_dir+'val_Final_Merged_Noise_Reduced_No_Abs.csv', 'w')
cw_1 = csv.writer(f1)

for i in tqdm(files_1):
    #print(files)
    df = pd.read_csv(path_1+i,sep = ' ', header=None)
    c = df[:][1]
    cw_1.writerow(c)
f1.close()

100%|██████████| 1000/1000 [00:49<00:00, 20.30it/s]
