In [4]:
import torch as th
import pytest
import pandas as pd
import numpy as np
from torch.utils.data import DataLoader, Dataset
import os
import glob
import scipy
from numpy import ndarray

In [3]:
class SeismicDataset(Dataset):

    def __init__(self, signal_folder_path: str, noise_folder_path: str, randomized = False):

        """
        Args:
            signal_folder_path (str): Path to earthquake signal folder containing .npz files.
            noise_folder_path (str): Path to noise folder containing .npz files.
        """

        self.signal_folder_path = signal_folder_path
        self.noise_folder_path = noise_folder_path
        self.eq_signal_files = glob.glob(f'{signal_folder_path}/**/*.npz', recursive=True)
        self.noise_files = glob.glob(f'{noise_folder_path}/**/*.npz', recursive=True)
        self.randomized = randomized


    def __len__(self) -> int:
        return len(self.eq_signal_files)

    def __getitem__(self, idx) -> tuple[th.Tensor, int, str]:

        eq_path = self.eq_signal_files[idx]
        eq = np.load(eq_path, allow_pickle=True)
        eq_name = (os.path.splitext(os.path.basename(eq_path)))[0]

        noise_idx = np.random.randint(0, len(self.noise_files))
        noise_path = self.noise_files[noise_idx]
        noise = np.load(noise_path, allow_pickle=True)
        noise_name = (os.path.splitext(os.path.basename(noise_path)))[0]

        eq_start = 0
        noise_start = 0
        if self.randomized:
            eq_start = np.random.randint(low = 0, high = 6000)
            noise_start = np.random.randint(low = 0, high = 12000)

        Z_eq = eq['earthquake_waveform_Z'][eq_start:eq_start+6000]
        N_eq = eq['earthquake_waveform_N'][eq_start:eq_start+6000]
        E_eq = eq['earthquake_waveform_E'][eq_start:eq_start+6000]
        event = np.stack([Z_eq, N_eq, E_eq], axis=0)
        eq_tensor = th.from_numpy(event)

        Z_noise = noise['noise_waveform_Z'][noise_start:noise_start+6000]
        N_noise = noise['noise_waveform_N'][noise_start:noise_start+6000]
        E_noise = noise['noise_waveform_E'][noise_start:noise_start+6000]
        noise_stacked = np.stack([Z_noise, N_noise, E_noise], axis=0)
        noise_tensor = th.from_numpy(noise_stacked)

        # tensor_normalized = eq_tensor / eqtensor.abs().max()

        p_wave_start = 6000 - eq_start
        # , p_wave_start, eq_name, noise_name

        print(eq_tensor.shape)
        print(noise_tensor.shape)

        noisy_eq = eq_tensor + noise_tensor

        return noisy_eq, eq_tensor


In [5]:
import ipytest
ipytest.autoconfig()

def test_dataset_length():
    signal_folder = "C:/Users/cleme/ETH/Master/DataLab/dsl-as24-challenge-3/data/signal/train"
    noise_folder = "C:/Users/cleme/ETH/Master/DataLab/dsl-as24-challenge-3/data/noise/train"
    dataset = SeismicDataset(signal_folder, noise_folder)
    assert len(dataset) == 20230 

def test_output_shape():
    signal_folder = "C:/Users/cleme/ETH/Master/DataLab/dsl-as24-challenge-3/data/signal/train"
    noise_folder = "C:/Users/cleme/ETH/Master/DataLab/dsl-as24-challenge-3/data/noise/train"
    dataset = SeismicDataset(signal_folder, noise_folder)
    dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
    next_tensor, noise_tensor = next(iter(dataloader))
    print(next_tensor.shape)
    print(noise_tensor.shape)
    assert next_tensor.shape == (1,3,6000)


ipytest.run('-vv')

platform win32 -- Python 3.12.3, pytest-8.3.3, pluggy-1.5.0 -- c:\Users\cleme\miniconda3\envs\dsl\python.exe
cachedir: .pytest_cache
rootdir: c:\Users\cleme\ETH\Master\DataLab\dsl-as24-challenge-3
plugins: anyio-4.2.0, typeguard-4.3.0
[1mcollecting ... [0mcollected 2 items

t_e216c054b0144360a2e83cb3a6ddf972.py::test_dataset_length [32mPASSED[0m[32m                            [ 50%][0m
t_e216c054b0144360a2e83cb3a6ddf972.py::test_output_shape [32mPASSED[0m[32m                              [100%][0m



<ExitCode.OK: 0>

In [8]:
class DeepDenoiserDataset(Dataset):

    def __init__(self, signal_folder_path: str, noise_folder_path: str, randomized = True):

        """
        Args:
            signal_folder_path (str): Path to earthquake signal folder containing .npz files.
            noise_folder_path (str): Path to noise folder containing .npz files.
        """

        self.signal_folder_path = signal_folder_path
        self.noise_folder_path = noise_folder_path
        self.eq_signal_files = glob.glob(f'{signal_folder_path}/**/*.npz', recursive=True)
        self.noise_files = glob.glob(f'{noise_folder_path}/**/*.npz', recursive=True)
        self.signal_length = 3000

        self.randomized = randomized

        #scipy hyperparameters
        self.fs = 100
        self.nperseg = 30
        self.nfft = 60

        # sample hyperparameters
        self.noise_mean = 2
        self.noise_std = 1


    def __len__(self) -> int:
        return len(self.eq_signal_files)

    def __getitem__(self, idx) -> th.Tensor:

        eq_path = self.eq_signal_files[idx]
        eq = np.load(eq_path, allow_pickle=True)
        eq_name = (os.path.splitext(os.path.basename(eq_path)))[0]

        noise_to_small = True
        while noise_to_small:
            noise_idx = np.random.randint(0, len(self.noise_files))
            noise_path = self.noise_files[noise_idx]
            noise = np.load(noise_path, allow_pickle=True)
            if len(noise['noise_waveform_Z']) >= self.signal_length:
                noise_to_small = False
        
        noise_name = (os.path.splitext(os.path.basename(noise_path)))[0]

        noise_seq_len = len(noise['noise_waveform_Z'])
        assert noise_seq_len >= self.signal_length

        eq_start = 0
        noise_start = 0
        if self.randomized:
            eq_start = np.random.randint(low = 0, high = 6000)
            noise_start = np.random.randint(low = 0, high = max(noise_seq_len - self.signal_length, 1))

        Z_eq = eq['earthquake_waveform_Z'][eq_start:eq_start+self.signal_length]
        N_eq = eq['earthquake_waveform_N'][eq_start:eq_start+self.signal_length]
        E_eq = eq['earthquake_waveform_E'][eq_start:eq_start+self.signal_length]
        eq_stacked = np.stack([Z_eq, N_eq, E_eq], axis=0)
        eq_tensor = th.from_numpy(eq_stacked)

        Z_noise = noise['noise_waveform_Z'][noise_start:noise_start+self.signal_length]
        N_noise = noise['noise_waveform_N'][noise_start:noise_start+self.signal_length]
        E_noise = noise['noise_waveform_E'][noise_start:noise_start+self.signal_length]
        noise_stacked = np.stack([Z_noise, N_noise, E_noise], axis=0)
        noise_tensor = th.from_numpy(noise_stacked)

        # sample random channel
        j = np.random.choice([0, 1, 2])

        eq = eq_stacked[j]
        noise = noise_stacked[j]

        def compute_stft(signal: ndarray) -> ndarray:

            f, t, transform = scipy.signal.stft(
                signal,
                fs=self.fs,
                nperseg=self.nperseg,
                nfft=self.nfft,
                boundary='zeros',
            )
            
            return transform
        
        stft_eq = compute_stft(eq)
        stft_noise = compute_stft(noise)

        assert not np.isinf(stft_eq).any() and not np.isnan(stft_eq).any(), "stft_eq nan or inf"
        assert not np.isinf(stft_noise).any() and not np.isnan(stft_noise).any(), "stft_noise nan or inf"

        if np.random.random() < 0.9:

            stft_eq = stft_eq / np.std(stft_eq)
            
            if np.random.random() < 0.2:
                stft_eq = np.fliplr(stft_eq)
        

        ratio = 0
        while ratio <= 0:
            ratio = self.noise_mean + np.random.randn() * self.noise_std
        
        noisy = stft_eq + ratio * stft_noise
        noisy = np.stack([noisy.real, noisy.imag], axis=-1)

        assert not np.isnan(noisy).any() and not np.isinf(noisy).any(),  "noisy nan or inf"

        noisy = noisy / np.std(noisy)
        tmp_mask = np.abs(stft_eq) / (np.abs(stft_eq) + np.abs(ratio * stft_noise) + 1e-4)
        tmp_mask[tmp_mask >= 1] = 1
        tmp_mask[tmp_mask <= 0] = 0
        mask = np.zeros([tmp_mask.shape[0], tmp_mask.shape[1], 2])
        mask[:, :, 0] = tmp_mask
        mask[:, :, 1] = 1 - tmp_mask


        p_wave_start = 6000 - eq_start
        # , p_wave_start, eq_name, noise_name

        return th.from_numpy(mask)

In [11]:
import ipytest
ipytest.autoconfig()

def test_dataset_length():
    signal_folder = "C:/Users/cleme/ETH/Master/DataLab/dsl-as24-challenge-3/data/signal/train"
    noise_folder = "C:/Users/cleme/ETH/Master/DataLab/dsl-as24-challenge-3/data/noise/train"
    dataset = DeepDenoiserDataset(signal_folder, noise_folder)
    assert len(dataset) == 20230 

def test_output_shape():
    signal_folder = "C:/Users/cleme/ETH/Master/DataLab/dsl-as24-challenge-3/data/signal/train"
    noise_folder = "C:/Users/cleme/ETH/Master/DataLab/dsl-as24-challenge-3/data/noise/train"
    dataset = DeepDenoiserDataset(signal_folder, noise_folder)
    dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
    next_tensor = next(iter(dataloader))
    assert next_tensor.shape == (1,31,201,2)


ipytest.run('-vv')

platform win32 -- Python 3.12.3, pytest-8.3.3, pluggy-1.5.0 -- c:\Users\cleme\miniconda3\envs\dsl\python.exe
cachedir: .pytest_cache
rootdir: c:\Users\cleme\ETH\Master\DataLab\dsl-as24-challenge-3
plugins: anyio-4.2.0, typeguard-4.3.0
[1mcollecting ... [0mcollected 2 items

t_4cda14160a14447493dc032bf2a5759f.py::test_dataset_length [32mPASSED[0m[32m                            [ 50%][0m
t_4cda14160a14447493dc032bf2a5759f.py::test_output_shape [31mFAILED[0m[31m                              [100%][0m

[31m[1m________________________________________ test_output_shape ________________________________________[0m

    [0m[94mdef[39;49;00m [92mtest_output_shape[39;49;00m():[90m[39;49;00m
        signal_folder = [33m"[39;49;00m[33mC:/Users/cleme/ETH/Master/DataLab/dsl-as24-challenge-3/data/signal/train[39;49;00m[33m"[39;49;00m[90m[39;49;00m
        noise_folder = [33m"[39;49;00m[33mC:/Users/cleme/ETH/Master/DataLab/dsl-as24-challenge-3/data/noise/train[39;49;00m

<ExitCode.TESTS_FAILED: 1>