In [None]:
!pip install git+https://github.com/PyFstat/PyFstat@python37
!pip3 install timm -q

# Library

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from glob import glob
import os
import sys
import h5py
from pathlib import Path

import pyfstat
from pyfstat.utils import get_sft_as_arrays



In [None]:
%matplotlib inline

In [None]:
"""
Utils
=====
Utility functions to simplify tutorials.
"""
from matplotlib import colors
from scipy import stats

plt.rcParams["font.family"] = "serif"
plt.rcParams["font.size"] = 20


def plot_real_imag_spectrograms(timestamps, frequency, fourier_data):
    fig, axs = plt.subplots(1, 2, figsize=(16, 10))

    for ax in axs:
        ax.set(xlabel="SFT index", ylabel="Frequency [Hz]")

    time_in_days = (timestamps - timestamps[0]) / 1800

    axs[0].set_title("SFT Real part")
    c = axs[0].pcolormesh(
        time_in_days,
        frequency,
        fourier_data.real,
        norm=colors.CenteredNorm(),
    )
    fig.colorbar(c, ax=axs[0], orientation="horizontal", label="Fourier Amplitude")

    axs[1].set_title("SFT Imaginary part")
    c = axs[1].pcolormesh(
        time_in_days,
        frequency,
        fourier_data.imag,
        norm=colors.CenteredNorm(),
    )

    fig.colorbar(c, ax=axs[1], orientation="horizontal", label="Fourier Amplitude")

    return fig, axs


def plot_real_imag_spectrograms_with_gaps(timestamps, frequency, fourier_data, Tsft):

    # Fill up gaps with Nans
    gap_length = timestamps[1:] - (timestamps[:-1] + Tsft)

    gap_data = [fourier_data[:, 0]]
    gap_timestamps = [timestamps[0]]

    for ind, gap in enumerate(gap_length):
        if gap > 0:
            gap_data.append(np.full_like(fourier_data[:, ind], np.nan + 1j * np.nan))
            gap_timestamps.append(timestamps[ind] + Tsft)

        gap_data.append(fourier_data[:, ind + 1])
        gap_timestamps.append(timestamps[ind + 1])

    return plot_real_imag_spectrograms(
        np.hstack(gap_timestamps), frequency, np.vstack(gap_data).T
    )

def size_of_spectrograms_with_gaps(timestamps,frequency,fourier_data,Tsft):
    gap_length=timestamps[1:]-(timestamps[:-1]+Tsft)
    gap_data=[fourier_data[:,0]]
    gap_timestamps=[timestamps[0]]
    for ind, gap in enumerate(gap_length):
        if gap > 0:
            gap_data.append(np.full_like(fourier_data[:, ind], np.nan + 1j * np.nan))
            gap_timestamps.append(timestamps[ind] + Tsft)

        gap_data.append(fourier_data[:, ind + 1])
        gap_timestamps.append(timestamps[ind + 1])
    np_array=np.array(gap_data)
    return np_array.shape
    
    
def plot_real_imag_histogram(fourier_data, theoretical_stdev=None):

    fig, ax = plt.subplots(figsize=(16, 10))
    ax.set(xlabel="SFT value", ylabel="PDF", yscale="log")

    ax.hist(
        fourier_data.real.ravel(),
        density=True,
        bins="auto",
        histtype="step",
        lw=2,
        label="Real part",
    )
    ax.hist(
        fourier_data.imag.ravel(),
        density=True,
        bins="auto",
        histtype="step",
        lw=2,
        label="Imaginary part",
    )

    if theoretical_stdev is not None:
        x = np.linspace(-4 * theoretical_stdev, 4 * theoretical_stdev, 1000)
        y = stats.norm(scale=theoretical_stdev).pdf(x)
        ax.plot(x, y, color="black", ls="--", label="Gaussian distribution")

    ax.legend()

    return fig, ax


def plot_amplitude_phase_spectrograms(timestamps, frequency, fourier_data):
    fig, axs = plt.subplots(1, 2, figsize=(16, 10))

    for ax in axs:
        ax.set(xlabel="SFT index", ylabel="Frequency [Hz]")

    time_in_days = (timestamps - timestamps[0]) / 1800

    axs[0].set_title("SFT absolute value")
    c = axs[0].pcolorfast(
        time_in_days, frequency, np.absolute(fourier_data), norm=colors.Normalize()
    )
    fig.colorbar(c, ax=axs[0], orientation="horizontal", label="Value")

    axs[1].set_title("SFT phase")
    c = axs[1].pcolorfast(
        time_in_days, frequency, np.angle(fourier_data), norm=colors.CenteredNorm()
    )

    fig.colorbar(c, ax=axs[1], orientation="horizontal", label="Value")

    return fig, axs

# Data
[このリンク](https://www.kaggle.com/code/ayuraj/g2net-understand-the-data)を参考にしている

In [None]:
DIR='../input/g2net-detecting-continuous-gravitational-waves'
df=pd.read_csv(DIR+'/train_labels.csv')
df.head(15)

In [None]:
print(len(df))
df.target.value_counts()


| target | 説明 |
| --- | --- |
| 1 | 重力波を観測 |
| 0 | 重力波が存在しない |
| -1 | 重力波があるかどうかがわからない |

In [None]:
train_files=glob(f"{DIR}/train/*.hdf5")
train_files[:10]

In [None]:
print(train_files[7])
file= Path(train_files[7])
with h5py.File(file,"r") as f:
    print("Keyの数: %d" % len(f.keys()))
    key=list(f.keys())[0]
    print(f"一番上のKey: {key}")
    print(f"Key{key}を用いたときのデータのタイプ : {type(f[key])}")
    group1=f[key]
    print(f"\nf[\"{key}\"]をgroup1に代入した")
    print(f"group1のKey: {list(group1.keys())}" )
    for key_i in list(group1.keys()):
        print(f"Key {key_i}のデータのタイプ: {type(group1[key_i])}")
    print()
    group11=group1[list(group1.keys())[0]]
    print(f"group1[\"{list(group1.keys())[0]}\"]をgroup11に代入した")
    print(f"group11のKey: {list(group11.keys())}")
    for key_i in list(group11.keys()):
        print(f"Key {key_i}のデータのタイプ: {type(group11[key_i])}")
    print()
    group12=group1[list(group1.keys())[1]]
    print(f"group1[\"{list(group1.keys())[1]}\"]をgroup12に代入した")
    print(f"group12のKey: {list(group12.keys())}")
    for key_i in list(group12.keys()):
        print(f"Key {key_i}のデータのタイプ: {type(group12[key_i])}")
    print()
    frequency_Hz=group1["frequency_Hz"][()]
    H1_SFTs=group11["SFTs"][()]
    H1_timestamps_GPS=group11["timestamps_GPS"][()]
    L1_SFTs=group12["SFTs"][()]
    L1_timestamps_GPS=group12["timestamps_GPS"][()]
    print(f"frequency_Hzのデータサイズ: {frequency_Hz.shape}")
    print(f"H1_SFTsのデータサイズ: {H1_SFTs.shape}")
    print(f"H1_timestamps_GPSのデータサイズ: {H1_timestamps_GPS.shape}")
    print(f"L1_SFTsのデータサイズ{L1_SFTs.shape}")
    print(f"L1_timestamps_GPSのデータサイズ: {L1_timestamps_GPS.shape}")
    

## hdf5の中身 
```
xxx.hdf5
|
└---xxx
     |   frequency_Hz  ... 多分360個のデータ
     |
     └---H1
     |    |   SFTs　　　... 360? $\times$ x個のデータ
     |    |   timestamps_GPS ... x個のデータ
     |    
     └---L1
     |    |   SFTs     ... 360? $\times$ y個のデータ
     |    |   timestamps_GPS ... y個のデータ
     
     

```

- timestamps_GPSは連続的に取られたものではない
- H1はLIGO Hanfordからのデータ
- L1はLIGO Livingstonからのデータを示す

In [None]:
def read_hdf5(file: Path):
    with h5py.File(file,"r") as f:
#         print("Keyの数: %d" % len(f.keys()))
        key=list(f.keys())[0]
#         print(f"一番上のKey: {key}")
#         print(f"Key{key}を用いたときのデータのタイプ : {type(f[key])}")
        group1=f[key]
#         print(f"\nf[\"{key}\"]をgroup1に代入した")
#         print(f"group1のKey: {list(group1.keys())}" )
#         for key_i in list(group1.keys()):
#             print(f"Key {key_i}のデータのタイプ: {type(group1[key_i])}")
#         print()
        group11=group1[list(group1.keys())[0]]
#         print(f"group1[\"{list(group1.keys())[0]}\"]をgroup11に代入した")
#         print(f"group11のKey: {list(group11.keys())}")
#         for key_i in list(group11.keys()):
#             print(f"Key {key_i}のデータのタイプ: {type(group11[key_i])}")
#         print()
        group12=group1[list(group1.keys())[1]]
#         print(f"group1[\"{list(group1.keys())[1]}\"]をgroup12に代入した")
#         print(f"group12のKey: {list(group12.keys())}")
#         for key_i in list(group12.keys()):
#             print(f"Key {key_i}のデータのタイプ: {type(group12[key_i])}")
#         print()
        frequency_Hz=group1["frequency_Hz"][()]
#         print(type(frequency_Hz))
        H1_SFTs=group11["SFTs"][()]
        H1_timestamps_GPS=group11["timestamps_GPS"][()]
        L1_SFTs=group12["SFTs"][()]
        L1_timestamps_GPS=group12["timestamps_GPS"][()]
#         print(f"frequency_Hzのデータサイズ: {frequency_Hz.shape}")
#         print(f"H1_SFTsのデータサイズ: {H1_SFTs.shape}")
#         print(f"H1_timestamps_GPSのデータサイズ: {H1_timestamps_GPS.shape}")
#         print(f"L1_SFTsのデータサイズ{L1_SFTs.shape}")
#         print(f"L1_timestamps_GPSのデータサイズ: {L1_timestamps_GPS.shape}")
        return {
            "H1": [H1_SFTs,H1_timestamps_GPS],
            "L1": [L1_SFTs,L1_timestamps_GPS],
            "freq_Hz": frequency_Hz
        }

In [None]:
file= Path(train_files[1])

data=read_hdf5(file)
h1_sft,h1_ts=data["H1"]
l1_sft,l1_ts=data["L1"]
freq_hz=data["freq_Hz"]
print(f"H1 shape:{h1_sft.shape}")
print(f"L1 shape:{l1_sft.shape}")

In [None]:
#df["Size_with_gaps_h1"]=8000
#df["Size_with_gaps_l1"]=8000
for index,row in df.iterrows():
    file_i='../input/g2net-detecting-continuous-gravitational-waves/train/'+row["id"]+'.hdf5'
    data_i=read_hdf5(file_i)
    h1_sft_i,h1_ts_i=data_i["H1"]
    l1_sft_i,l1_ts_i=data_i["L1"]
    freq_hz_i=data_i["freq_Hz"]
    size_h1_i=size_of_spectrograms_with_gaps(h1_ts_i, freq_hz_i, h1_sft_i, 1800)
    size_l1_i=size_of_spectrograms_with_gaps(l1_ts_i, freq_hz_i, l1_sft_i, 1800)
    df.loc[index,"Size_with_gaps_h1"]=size_h1_i[0]
    df.loc[index,"Size_with_gaps_l1"]=size_l1_i[0]
    


In [None]:
df=df[df["target"]!=-1]
df.sort_values("Size_with_gaps_l1").head(10)

In [None]:
df.sort_values("Size_with_gaps_h1").head(10)

このタイムスタンプは[unixtimestamp](https://ja.wikipedia.org/wiki/UNIX%E6%99%82%E9%96%93)

もしかしたら[gps epoch](https://gis.stackexchange.com/questions/281223/what-is-gps-epoch) かもしれない
わかりやすくすると以下の通り

In [None]:
from datetime import datetime
print(datetime.utcfromtimestamp(h1_ts[0]).strftime('%Y-%m-%d %H:%M:%S'))

In [None]:
gaps = []
a = h1_ts[0]
for b in h1_ts[1:]:
    gap = b-a
    
    #print(a,b,gap)
    a = b
    gaps.append(gap)
    
plt.figure(figsize=(20, 5))
plt.plot(h1_ts[1:], gaps, marker='.')
plt.hlines(1800,h1_ts[0],h1_ts[len(h1_ts)-1],color="red")

In [None]:
l_gaps = []
a = l1_ts[0]
for b in l1_ts[1:]:
    l_gap = b-a
    
    #print(a,b,l_gap)
    a = b
    l_gaps.append(l_gap)
    
plt.figure(figsize=(20, 5))
plt.plot(l1_ts[1:], l_gaps, marker='.')
plt.hlines(1800,l1_ts[0],l1_ts[len(l1_ts)-1],color="red")

## 短時間フーリエ変換
- 多分アルゴリズムの本で読んだ気がするから読みなおしたい
- 以下のプロットは時間が連続ではないし差が等しくないからそこには注意

In [None]:
fig, ax =plt.subplots(2,2,figsize=(24,15))
fig.suptitle(f"Filename ID: {file.stem}")

for d_ind, detector in enumerate(["H1","L1"]):
    ax[d_ind][0].set(xlabel="Timestamps [GPS]",
                     ylabel="Frequency [Hz]",
                     title=f"{detector} - Real part")
    ax[d_ind][1].set(xlabel="Timestamps [GPS]",
                     ylabel="Frequency [Hz]",
                     title=f"{detector} - Imaginary part")
    c0=ax[d_ind][0].pcolormesh(data[detector][1],data["freq_Hz"],data[detector][0].real)
    c1=ax[d_ind][1].pcolormesh(data[detector][1],data["freq_Hz"],data[detector][0].imag)
    fig.colorbar(c0,ax=ax[d_ind][0])
    fig.colorbar(c1,ax=ax[d_ind][1])
plt.show()

In [None]:
plot_real_imag_spectrograms(h1_ts,freq_hz,h1_sft)

- gapをある程度考慮すると以下の通りになる

In [None]:
plot_real_imag_spectrograms_with_gaps(h1_ts,freq_hz,h1_sft,1800)

In [None]:
# print(f"{h1_ts.shape} {freq_hz.shape} {h1_sft.shape}")
# fig,ax=plot_amplitude_phase_spectrograms(h1_ts,freq_hz,h1_sft[:-1,:-1])

# ノイズの生成
- ${h0}=0$によって生成されたSFTは0とラベル付けされる
- ${h0}=1$によって生成されたSFTは1とラベル付けされる
    - この時SNR(signal to noise ratio)>0

`
"""
        Parameters
        ----------
        label: string
            アウトプットするファイルの名前を決定する
        tstart: int
            データセットの開始GPSエポック
            Starting GPS epoch of the data set.
            NOTE: mutually exclusive with `timestamps`.
        duration: int
            Duration (in GPS seconds) of the total data set.
            NOTE: mutually exclusive with `timestamps`.
        tref: float or None
            Reference time for simulated signals.
            Default is `None`, which sets the reference time to `tstart`.
        F0: float or None
            Frequency of a signal to inject.
            Also used (if `Band` is not `None`) as center of frequency band.
            Also needed when noise-only (`h0=None` or `h0==0`)
            but no `noiseSFTs` given,
            in which case it is also used as center of frequency band.
        F1, F2, Alpha, Delta, h0, cosi, psi, phi: float or None
            Additional frequency evolution and amplitude parameters for a signal.
            If `h0=None` or `h0=0`, these are all ignored.
            If `h0>0`, then at least `[Alpha,Delta,cosi]` need to be set explicitly.
        Tsft: int
            The SFT duration in seconds.
            Will be ignored if `noiseSFTs` are given.
        outdir: str
            The directory where files are written to.
            Default: current working directory.
        sqrtSX: float or list or str or None
            Single-sided PSD values for generating fake Gaussian noise.
            Single float or str value: use same for all detectors.
            List or comma-separated string: must match len(detectors).
            Detectors will be paired to list elements following alphabetical order.
        noiseSFTs: str or None
            Existing SFT files on top of which signals will be injected.
            If not `None`, additional constraints can be applied
            using the arguments `tstart` and `duration`.
            NOTE: mutually exclusive with `timestamps`.
        SFTWindowType: str or None
            LAL name of the windowing function to apply to the data.
        SFTWindowBeta: float
            Optional parameter for some windowing functions.
        Band: float or None
            If float, and `F0` is also not `None`, then output SFTs cover
            `[F0-Band/2,F0+Band/2]`.
            If `None` and `noiseSFTs` given, use their bandwidth.
            If `None` and no `noiseSFTs` given,
            a minimal covering band for a perfectly-matched
            single-template ComputeFstat analysis is estimated.
        detectors: str or None
            Comma-separated list of detectors to generate data for.
        earth_ephem, sun_ephem: str or None
            Paths of the two files containing positions of Earth and Sun.
            If None, will check standard sources as per
            utils.get_ephemeris_files().
        transientWindowType: str
            If `none`, a fully persistent CW signal is simulated.
            If `rect` or `exp`, a transient signal with the corresponding
            amplitude evolution is simulated.
        transientStartTime: int or None
            Start time for a transient signal.
        transientTau: int or None
            Duration (`rect` case) or decay time (`exp` case) of a transient signal.
        randSeed: int or None
            Optionally fix the random seed of Gaussian noise generation
            for reproducibility.
        timestamps: str or dict
            Dictionary of timestamps (each key must refer to a detector),
            list of timestamps (`detectors` should be set),
            or comma-separated list of per-detector timestamps files
            (simple text files, lines with <seconds> <nanoseconds>,
            comments should use `%`, each time stamp gives the
            start time of one SFT).
            WARNING: In that last case, order must match that of `detectors`!
            NOTE: mutually exclusive with [`tstart`,`duration`]
            and with `noiseSFTs`.
        """
        `

In [None]:
writer_kwargs = {
    "label": "single_detector_gaussian_noise",
    "outdir": "PyFstat_example_data",
    "tstart": 1238166018,  # Starting time of the observation [GPS time]
    "duration": 5 * 86400,  # Duration [seconds]
    "detectors": "H1",  # Detector to simulate, in this case LIGO Hanford
    "F0": 100.0,  # Central frequency of the band to be generated [Hz]
    "Band": 1.0,  # Frequency band-width around F0 [Hz]
    "sqrtSX": 1e-23,  # Single-sided Amplitude Spectral Density of the noise
    "Tsft": 1800,  # Fourier transform time duration
    "SFTWindowType": "tukey",  # Window function to compute short Fourier transforms
    "SFTWindowBeta": 0.01,  # Parameter associated to the window function
}
writer = pyfstat.Writer(**writer_kwargs)

# Create SFTs
writer.make_data()

In [None]:
frequency,timestamps,fourier_data=get_sft_as_arrays(writer.sftfilepath)

In [None]:
h1_sft, h1_ts = fourier_data["H1"],timestamps["H1"]
print(f"The shape of H1 SFT is: {h1_sft.shape} while that of it's time stamp is: {h1_ts.shape}")
print(f"The shape of frequency Hz is: {frequency.shape}")

In [None]:
plot_real_imag_spectrograms(timestamps["H1"],frequency,fourier_data["H1"])

In [None]:
fig, ax = plot_amplitude_phase_spectrograms(
    timestamps["H1"], frequency, fourier_data["H1"]
);

# CW信号の生成
CW波は`amplitude`と`Doppler`パラメータで主に決定される

- doppler パラメータは波源と検出器との関係を表している
    - `f0` : 重力波の周波数
    - `f1` : spindown
    - `n` : sky position
    - $\alpha$ : right ascension 右上角?
    - $\delta$ : declination 赤緯

In [None]:
# このパラメータは上のと同じ
writer_kwargs = {
    "label": "single_detector_gaussian_noise",
    "outdir": "PyFstat_example_data",
    "tstart": 1238166018,  # Starting time of the observation [GPS time]
    "duration": 365 * 86400,  # Duration [seconds]
    "detectors": "H1",  # Detector to simulate, in this case LIGO Hanford
    #"F0": 100.0,  # Central frequency of the band to be generated [Hz]
    #"Band": 1.0,  # Frequency band-width around F0 [Hz]
    "sqrtSX": 1e-23,  # Single-sided Amplitude Spectral Density of the noise
    "Tsft": 1800,  # Fourier transform time duration
    "SFTWindowType": "tukey",  # Window function to compute short Fourier transforms
    "SFTWindowBeta": 0.01,  # Parameter associated to the window function
}
## このパラメータは検出するべき信号を決定する。
signal_parameters={
    "F0":100.0,
    "F1": -1e-9,
    "Alpha": 0.0,
    "Delta":0.0,
    "h0": 1e-22,#このパラメータは特に大きな特徴になる
    "cosi": 1,
    "psi": 0.0,
    "phi": 0.0,
    "tref": writer_kwargs["tstart"],
}
writer=pyfstat.Writer(**writer_kwargs,**signal_parameters)
writer.make_data()
frequency, timestamps, fourier_data=get_sft_as_arrays(writer.sftfilepath)
plot_real_imag_spectrograms(
    timestamps["H1"],frequency,fourier_data["H1"]
)

In [None]:
# このパラメータは上のと同じ
writer_kwargs = {
    "label": "single_detector_gaussian_noise",
    "outdir": "PyFstat_example_data",
    "tstart": 1238166018,  # Starting time of the observation [GPS time]
    "duration": 365 * 86400,  # Duration [seconds]
    "detectors": "H1",  # Detector to simulate, in this case LIGO Hanford
    #"F0": 100.0,  # Central frequency of the band to be generated [Hz]
    #"Band": 1.0,  # Frequency band-width around F0 [Hz]
    "sqrtSX": 1e-23,  # Single-sided Amplitude Spectral Density of the noise
    "Tsft": 1800,  # Fourier transform time duration
    "SFTWindowType": "tukey",  # Window function to compute short Fourier transforms
    "SFTWindowBeta": 0.01,  # Parameter associated to the window function
}
## このパラメータは検出するべき信号を決定する。
signal_parameters={
    "F0":100.0,
    "F1": -1e-9,
    "Alpha": 0.0,
    "Delta":0.0,
    "h0": 1e-22,#このパラメータは特に大きな特徴になる
    "cosi": 1,
    "psi": 0.0,
    "phi": 0.0,
    "tref": writer_kwargs["tstart"],
}
writer=pyfstat.Writer(**writer_kwargs,**signal_parameters)
writer.make_data()
frequency, timestamps, fourier_data=get_sft_as_arrays(writer.sftfilepath)
plot_real_imag_spectrograms(
    timestamps["H1"],frequency,fourier_data["H1"]
)

In [None]:
# このパラメータは上のと同じ
writer_kwargs = {
    "label": "single_detector_gaussian_noise",
    "outdir": "PyFstat_example_data",
    "tstart": 1238166018,  # Starting time of the observation [GPS time]
    "duration": 365 * 86400,  # Duration [seconds]
    "detectors": "H1",  # Detector to simulate, in this case LIGO Hanford
    #"F0": 100.0,  # Central frequency of the band to be generated [Hz]
    #"Band": 1.0,  # Frequency band-width around F0 [Hz]
    "sqrtSX": 1e-23,  # Single-sided Amplitude Spectral Density of the noise
    "Tsft": 1800,  # Fourier transform time duration
    "SFTWindowType": "tukey",  # Window function to compute short Fourier transforms
    "SFTWindowBeta": 0.01,  # Parameter associated to the window function
}
## このパラメータは検出するべき信号を決定する。
signal_parameters={
    "F0":100.0,
    "F1": -1e-9,
    "Alpha": 0.0,
    "Delta":0.0,
    "h0": 1e-22,#このパラメータは特に大きな特徴になる
    "cosi": 1,
    "psi": 0.0,
    "phi": 0.0,
    "tref": writer_kwargs["tstart"],
}
writer=pyfstat.Writer(**writer_kwargs,**signal_parameters)
writer.make_data()
frequency, timestamps, fourier_data=get_sft_as_arrays(writer.sftfilepath)
plot_real_imag_spectrograms(
    timestamps["H1"],frequency,fourier_data["H1"]
)

In [None]:
print(timestamps["H1"].shape)
print(f"{frequency.shape} { fourier_data['H1'].shape }")
fig, ax = plot_amplitude_phase_spectrograms(
    timestamps["H1"], frequency, fourier_data["H1"]
);

In [None]:
snr = pyfstat.SignalToNoiseRatio.from_sfts(F0=writer.F0,sftfilepath=writer.sftfilepath)
squared_snr=snr.compute_snr2(
    Alpha=writer.Alpha, Delta=writer.Delta, psi=writer.psi, phi=writer.phi, h0=writer.h0, cosi=writer.cosi
)
sqrt_snr=np.sqrt(squared_snr)
print(f"Signal  -  SNR: {sqrt_snr}")

In [None]:
signal_parameters["h0"]=writer_kwargs["sqrtSX"] / 20.0
writer = pyfstat.Writer(**writer_kwargs, **signal_parameters)

writer.make_data()

frequency, timestamps, fourier_data = get_sft_as_arrays(writer.sftfilepath)

plot_real_imag_spectrograms(timestamps["H1"],frequency,fourier_data["H1"])
plot_amplitude_phase_spectrograms(timestamps["H1"],frequency,fourier_data["H1"])


In [None]:
snr = pyfstat.SignalToNoiseRatio.from_sfts(F0=writer.F0,sftfilepath=writer.sftfilepath)
squared_snr=snr.compute_snr2(
    Alpha=writer.Alpha, Delta=writer.Delta, psi=writer.psi, phi=writer.phi, h0=writer.h0, cosi=writer.cosi
)
sqrt_snr=np.sqrt(squared_snr)
print(f"Signal  -  SNR: {sqrt_snr}")

In [None]:
timestamps = {"H1": data[detector][1]}

# Setup Writer
writer_kwargs = {
    "label": "single_detector_gaps",
    "outdir": "PyFstat_example_data",
    "timestamps": timestamps,
    "sqrtSX": 1e-23,  # Single-sided Amplitude Spectral Density of the noise
    "Tsft": 1800,  # Fourier transform time duration
    "SFTWindowType": "tukey",  # Window function to compute short Fourier transforms
    "SFTWindowBeta": 0.01,  # Parameter associated to the window function
}

# These parameters determine the CW signal that we have to detect.
signal_parameters = {
    "F0": 100.0,
    "F1": -1e-9,
    "Alpha": 0.0,
    "Delta": 0.0,
    "h0": 1e-22,
    "cosi": 1,
    "psi": 0.0,
    "phi": 0.0,
}

writer = pyfstat.Writer(**writer_kwargs, **signal_parameters)
# writer = pyfstat.Writer(**writer_kwargs)

# Create SFTs
writer.make_data()
frequency, timestamps, fourier_data = get_sft_as_arrays(writer.sftfilepath)

In [None]:
plot_real_imag_spectrograms_with_gaps(
    timestamps["H1"], frequency, fourier_data["H1"], writer_kwargs["Tsft"]
);

In [None]:
import torch
import torch.nn as nn
import torchaudio
import torchvision.transforms as TF
import timm

from tqdm.auto import tqdm
from sklearn.model_selection import KFold
from sklearn.metrics import roc_auc_score
from timm.scheduler import CosineLRScheduler

device=torch.device("cuda")
criterion=nn.BCEWithLogitsLoss()


In [None]:
df.head(11)

In [None]:
class Dataset(torch.utils.data.Dataset):
    """
    dataset=Dataset(data_type,df)
    
    img, y = dataset[i]
        img (np.float32):
        y (np.float32): label 0 or 1
    """
    def __init__(self,data_type,df,tfms=False):
        self.data_type=data_type
        self.df=df
        self.tfms=tfms
        self.Horizontal=128
    
    def __len__(self):
        return len(self.df)
    
    def read_hdf5_data(self,id):
        file='../input/g2net-detecting-continuous-gravitational-waves/'+self.data_type+'/'+id+'.hdf5'
        with h5py.File(file,"r") as f:
            key=list(f.keys())[0]
            group1=f[key]
            group11=group1[list(group1.keys())[0]]
            group12=group1[list(group1.keys())[1]]
            frequency_Hz=group1["frequency_Hz"][()]
            H1_SFTs=group11["SFTs"][()]
            H1_timestamps_GPS=group11["timestamps_GPS"][()]
            L1_SFTs=group12["SFTs"][()]
            L1_timestamps_GPS=group12["timestamps_GPS"][()]
            return {
                "H1": [H1_SFTs,H1_timestamps_GPS],
                "L1": [L1_SFTs,L1_timestamps_GPS],
                "freq_Hz": frequency_Hz
            }
    def get_spectrograms_with_gaps(self,timestamps,frequency,fourier_data,Tsft):
        #print(fourier_data.shape)
        fourier_data=fourier_data* 1e22
        fourier_data=np.sqrt(fourier_data.real**2+fourier_data.imag**2)
        fourier_data/=np.mean(fourier_data)
        
        gap_data=fourier_data.transpose()
        
#         gap_length=timestamps[1:]-(timestamps[:-1]+Tsft)
#         gap_data=[fourier_data[:,0]]
#         #gap_data=np.array(fourier_data[:,0])
#         #print(gap_data.shape)
#         gap_timestamps=[timestamps[0]]
#         for ind, gap in enumerate(gap_length):
#             if gap > 0:
#                 #print(ind)
#                 gap_data.append(np.full_like(fourier_data[:, ind], 0))
#                 #gap_data.append(np.full_like(fourier_data[:, ind], np.nan + 1j * np.nan))
#                 #gap_data.append(np.full_like(fourier_data[:, ind], np.nan ))
#                 gap_timestamps.append(timestamps[ind] + Tsft)

#             gap_data.append(fourier_data[:, ind + 1])
#             gap_timestamps.append(timestamps[ind + 1])
        
        
        
        
        np_array=np.array(gap_data).transpose()
        #print(np_array.shape)
        np_array=np_array[:,:4096]
        np_array=np.mean(np_array.reshape(360,self.Horizontal,int(4096/self.Horizontal)),axis=2)
        #print(np.hstack(np_array).shape)
        return np_array
    def __getitem__(self, i):
        row=self.df.iloc[i]
        y=np.float32(row.target)
        file_id=row.id
        data=self.read_hdf5_data(file_id)
        h1_sft,h1_ts=data["H1"]
        l1_sft,l1_ts=data["L1"]
        freq_hz=data["freq_Hz"]
        #plot_real_imag_spectrograms_with_gaps(h1_ts,freq_hz,h1_sft,1800)
        img=np.empty((2,360,self.Horizontal),dtype=np.float32)
        img[0]=self.get_spectrograms_with_gaps(h1_ts,freq_hz,h1_sft,1800)
        img[1]=self.get_spectrograms_with_gaps(l1_ts,freq_hz,l1_sft,1800)
        if self.tfms:
            print()
            #if np.random.rand() <=flip_rate
        else:
            img=torch.from_numpy(img)
        return img,y

    
    
    
dataset=Dataset("train",df)
img,y=dataset[10]
plt.figure(figsize=(1.5,3))
plt.title("Spectrograms")
plt.xlabel("time")
plt.ylabel("frequency")
#plt.imshow(img[0,:,:])
plt.imshow(img[0,0:360])
#plt.pcolormesh(img[0,:,:])
plt.colorbar()
plt.show()

dataset=Dataset("train",df)
img,y=dataset[10]
plt.figure(figsize=(1.5,3))
plt.title("Spectrograms")
plt.xlabel("time")
plt.ylabel("frequency")
plt.imshow(img[1,:,:])
#plt.pcolormesh(img[1,:,:])
plt.colorbar()
plt.show()

In [None]:
print(df.loc[10])

In [None]:
transforms_time_mask=nn.Sequential(
    torchaudio.transforms.TimeMasking(time_mask_param=10),
)
transforms_freq_mask=nn.Sequential(
    torchaudio.transforms.FrequencyMasking(freq_mask_param=10),
)
flip_rate=0.0 
fre_shift_rate= 0.0 
time_mask_num= 0 
freq_mask_num = 0
