In [None]:
#|default_exp preprocessing

# PREPROCESSING

> API details.

In [None]:
#| export
import numpy as np
import h5py
import pandas as pd
from typing import Dict
from secrets import token_hex
from pathlib import Path
from scipy import stats
import pyfstat
import matplotlib.pyplot as plt
import os
import random
from joblib import Parallel, delayed
from tqdm import tqdm
import torch

In [None]:
#| export
def good_luck():
    return 'pass'

In [None]:
# | export
def read_file(filename):
    file_id = Path(filename).stem
    img = np.empty((2, 360, 128), dtype=np.float32)
    with h5py.File(filename, "r") as f:
        g = f[file_id]

        for ch, s in enumerate(["H1", "L1"]):
            a = g[s]["SFTs"][:, :4096] * 1e22  # Fourier coefficient complex64
            p = a.real**2 + a.imag**2  # power
            p /= np.mean(p)  # normalize
            p = np.mean(p.reshape(360, 128, 32), axis=2)  # compress 4096 -> 128
            img[ch] = p
    return img


def read_timestemp(filename):
    file_id = Path(filename).stem
    img = dict()
    with h5py.File(filename, "r") as f:
        img["L1"] = np.copy(f[file_id]["L1"]["timestamps_GPS"])
        img["H1"] = np.copy(f[file_id]["H1"]["timestamps_GPS"])
    return img


def get_random_name(len_k=16):
    token = token_hex(len_k)
    return token


def save_hdf(
    name: str,
    sft_h1: np.array,
    sft_l1: np.array,
    timestamps: Dict,
    frequency: np.array,
    meta_: dict,
):
    hf = h5py.File(f"{name}.h5", "w")
    g1 = hf.create_group(name.stem)
    h1 = g1.create_group("H1")
    h1.create_dataset("SFTs", data=sft_h1)
    h1.create_dataset("timestamps_GPS", data=timestamps["H1"])

    l1 = g1.create_group("L1")
    l1.create_dataset("SFTs", data=sft_l1)
    l1.create_dataset("timestamps_GPS", data=timestamps["L1"])

    hf.create_dataset("frequency_Hz", data=frequency)
    pd.DataFrame(meta_).to_csv(f"{name}.csv", index=False)


def constrained_sum_sample_pos(n, total):
    """Return a randomly chosen list of n positive integers summing to total.
    Each such list is equally likely to occur."""

    dividers = sorted(random.sample(range(1, total), n - 1))
    return [a - b for a, b in zip(dividers + [total], [0] + dividers)]

def get_random_sqrtx_noise():
    sqrtx_choice = [
        (2e-23, 2.5e-23),
        (1e-23, 1.5e-23),
        (3e-23, 3.5e-23),
        (4e-23, 4.5e-23),
        (5e-23, 5.5e-23),
        (6e-23, 7e-23),
    ]
    ch = random.choice(sqrtx_choice)
    return ch[0], ch[1]
    #return np.random.uniform(1e-23, 5e-23)

## DATA_V10

## DATA_V11

## DATA_V12

## DATA_V13

## DATA_V15

## DATA V_16


## DATA_V17

# DATA V_18

## DATA V19

## DATA_V20

In [None]:
#def generate_data_v20(fn, save_folder, neg=False):
#    """
#    this function generates random data fragments using uniform values of sqrtSX for each segment
#    currently it usses narrow noise
#    """
#    try:
#        # Generate signals with parameters drawn from a specific population
#        sft_path = []
#        # These parameters describe background noise and data format
#        m = np.random.randint(20, 70)
#        segment_lengths = list(np.array(constrained_sum_sample_pos(m, 86 * 2)) * 43200)
#        ch_0, ch_1 = get_random_sqrtx_noise()
#        segment_sqrtSX = [np.random.uniform(ch_0, ch_1) for i in range(m)]
#
#        writer_kwargs = {
#            "tstart": 1238166018,
#            "detectors": "H1,L1",
#            "Tsft": 1800,
#            "SFTWindowType": "tukey",
#            "SFTWindowBeta": random.choice([0.01, 0.001]),
#            "Band": 0.2,
#        }
#
#        h_0 = lambda: random.choice(segment_sqrtSX) / stats.uniform(1, 30).rvs()
#        if neg:
#            h_0 = 0
#        signal_parameters_generator = pyfstat.AllSkyInjectionParametersGenerator(
#            priors={
#                "tref": writer_kwargs["tstart"],
#                "F0": {"uniform": {"low": 50, "high": 500}},
#                "F1": lambda: 10 ** stats.uniform(-12, 4).rvs(),
#                "F2": 0,
#                "h0": h_0,
#                **pyfstat.injection_parameters.isotropic_amplitude_priors,
#            },
#        )
#
#        # Draw signal parameters.
#        # Noise can be drawn by setting `params["h0"] = 0
#        name = save_folder / f"hb_{fn}"
#        params = signal_parameters_generator.draw()
#        # same timestemps as in test data
#        # writer_kwargs['timestamps'] = get_random_timesteps(random.choice(t_fns))
#        for segment in range(len(segment_lengths)):
#            writer_kwargs["outdir"] = f"PyFstat_example_data_ensemble/Signal_{fn}"
#            writer_kwargs["label"] = f"Signal_{fn}"
#            writer_kwargs["duration"] = segment_lengths[segment]
#            writer_kwargs["sqrtSX"] = segment_sqrtSX[segment]
#
#            if segment > 0:
#                writer_kwargs["tstart"] += (
#                    writer_kwargs["Tsft"] + segment_lengths[segment - 1]
#                )
#
#            writer = pyfstat.Writer(**writer_kwargs, **params)
#            writer.make_data()
#            sft_path.append(writer.sftfilepath)
#
#        # SNR can be compute from a set of SFTs for a specific set
#        # of parameters as follows:
#        sft_path = ";".join(sft_path)
#        snr = pyfstat.SignalToNoiseRatio.from_sfts(F0=writer.F0, sftfilepath=sft_path)
#        squared_snr = snr.compute_snr2(
#            Alpha=writer.Alpha,
#            Delta=writer.Delta,
#            psi=writer.psi,
#            phi=writer.phi,
#            h0=writer.h0,
#            cosi=writer.cosi,
#        )
#
#        meta_ = {
#            "alpha": [writer.Alpha],
#            "daelta": [writer.Delta],
#            "cosi": [writer.cosi],
#            "psi": [writer.psi],
#            "phi": [writer.phi],
#            "h0": [writer.h0],
#            "f0": [writer.F0],
#            "f1": [writer.F1],
#            "snr": [np.sqrt(squared_snr)],
#        }
#        # Data can be read as a numpy array using PyFstat
#        frequency, timestamps, amplitudes = pyfstat.utils.get_sft_as_arrays(sft_path)
#
#        sft_h1 = amplitudes["H1"][1:, :]
#        sft_l1 = amplitudes["L1"][1:, :]
#        save_hdf(Path(name), sft_h1, sft_l1, timestamps, frequency, meta_)
#    except:
#        pass

In [None]:
#n_samples = 1000
#name = 'DATA_V20'
#save_pos_path =  Path(f'../data/custom_data/{name}/pos')
#os.makedirs(save_pos_path, exist_ok=True)
#Parallel(n_jobs=16)(
#    delayed(generate_data_v20)(get_random_name(),save_folder=save_pos_path, neg=False)
#    for i in tqdm(range(n_samples))
#)
#
#save_neg_path =  Path(f'../data/custom_data/{name}/neg')
#os.makedirs(save_neg_path, exist_ok=True)
#Parallel(n_jobs=16)(
#    delayed(generate_data_v20)(get_random_name(),save_folder=save_neg_path, neg=True)
#    for i in tqdm(range(n_samples))
#)
#
#df_pos = pd.DataFrame({"id": list(save_pos_path.glob('*.h5')), 
#                       "target": 1})
#df_neg = pd.DataFrame({"id": list(save_neg_path.glob('*.h5')), 
#                       "target": 0})
#df_comb = pd.concat([df_pos, df_neg], ignore_index=True).sample(frac=1.)
#df_comb.to_csv(save_pos_path.parent/'train.csv', index=False)

In [None]:
def read_file_for_save(filename):
    file_id = Path(filename).stem
    img = dict()
    with h5py.File(filename, "r") as f:
        g = f[file_id]

        for ch, s in enumerate(["H1", "L1"]):
            a = g[s]["SFTs"][:, :4096] * 1e22  # Fourier coefficient complex64
            p = a.real**2 + a.imag**2  # power
            img[s] = p
    return img

def save_pytorch_dict(fn):
    out = read_file_for_save(fn)
    save_str = str(fn)
    save_str = save_str.replace('.h5', '.pth')
    torch.save(out, save_str)

In [None]:
fns =  list(Path('../data/custom_data/DATA_V11/neg').glob('*.h5'))
Parallel(n_jobs=16)(
    delayed(save_pytorch_dict)(i)
    for i in tqdm(fns)
)

In [None]:
img = read_file(random.choice(fns))
%matplotlib inline
plt.imshow(img[0])
plt.pause(0.1)
plt.imshow(img[1])
plt.pause(0.1)
plt.imshow(img.mean(0))

In [None]:
#|hide
#|eval: false
from nbdev.doclinks import nbdev_export
nbdev_export()