# Creating a dataset

This tutorial provides a short example on how to create a SeisBench dataset. Datasets can be created from any event catalog and waveform collection. For this example, we download an event catalog for Switzerland through FDSN. We will then download the associated waveforms through FDSN as well. We use built-in SeisBench functions for writing out the dataset in SeisBench format. In this example notebook we aim for an easy example outlining the principles of dataset creation. There are a few further considerations, in particular, for converting larger datasets, that we outline at the end.

**Note:** Some familiarity with obspy and its FDSN client is helpful for this tutorial, but not required.

In [1]:
import seisbench.data as sbd
import seisbench.util as sbu

from pathlib import Path
from obspy import read_events
from obspy import read
import pandas as pd
from config import load_config

In [2]:
class get_data:
    def __init__(self, root, pattern_path):
        self.root = root
        self.pattern_path = pattern_path
        self.stream = None
        self.stats  = None

    def read(self, time):
        pattern = self.pattern_path.format(time=time)
        path = f'{self.root}/{pattern}'
        print('Reading Data:', path)
        self.stream = read(path)
        self.preprocessing_data()
        self.stations = list({tr.stats.station for tr in self.stream})

    def get_data_related_to_pick(self, pick):
        if self.stream is None:
            self.read(time=pick.time)
        if not pick.waveform_id.station_code in self.stations:
            self.read(time=pick.time)
        if not pick.time.julday == self.stream[0].stats.starttime.julday:
            self.read(time=pick.time)
        target_stream = self.stream.select(station=pick.waveform_id.station_code)
        return target_stream
    
    def preprocessing_data(self):
        self.sps_check()
        self.stream.merge(-1)
        self.stream.detrend('constant')
        self.stream.merge(method=1, fill_value=0)
    
    def sps_check(self):
        sps = self.stream[0].stats.sampling_rate
        assert all(tr.stats.sampling_rate == sps for tr in self.stream)

In [3]:
import warnings
warnings.simplefilter('ignore', DeprecationWarning)

#### Loading configuration file

In [None]:
cfg = load_config('Kaki-cfg.yml')
print(cfg)

#### The event catalog

As a first step, we need an event catalog. Here, we are going to use the catalog provided by ETHZ over FDSN. For demonstration purposes, we only use a short time window.

In [5]:
network_details = pd.read_csv(cfg.path.network_details, dtype=str)
network_details.fillna(value='', inplace=True)

In [None]:
catalog = read_events(cfg.path.catalog)
catalog = [ev for ev in catalog if ev.picks != []]


In [None]:
print(len(catalog), catalog, sep='\n') # print(catalog.__str__(print_all=True))

lst = []
for ev in catalog:
    for pick in ev.picks:
        lst.append(pick.phase_hint)

for el in set(lst):
    print(el, lst.count(el))

#### Extracting the event parameters

From the catalog, we extract the event parameters and store them into a dictionary. Here, we only extract a few basic parameters on the source and its magnitude - if available. In addition, we define the split of the dataset into training/development/test partitions. We visualize one example.

In [8]:
def get_event_params(event):
    origin = event.preferred_origin()
    mag = event.preferred_magnitude()

    source_id = str(event.resource_id)

    event_params = {
        "source_id": source_id,
        "source_origin_time": str(origin.time),
        "source_origin_uncertainty_sec": origin.time_errors["uncertainty"],
        "source_latitude_deg": origin.latitude,
        "source_latitude_uncertainty_km": origin.latitude_errors["uncertainty"],
        "source_longitude_deg": origin.longitude,
        "source_longitude_uncertainty_km": origin.longitude_errors["uncertainty"],
        "source_depth_km": origin.depth / 1e3            if origin.depth else None,
        "source_depth_uncertainty_km": origin.depth_errors["uncertainty"] / 1e3           if origin.depth else None,
    }

    if mag is not None:
        event_params["source_magnitude"] = mag.mag
        event_params["source_magnitude_uncertainty"] = mag.mag_errors["uncertainty"]
        event_params["source_magnitude_type"] = mag.magnitude_type
        event_params["source_magnitude_author"] = mag.creation_info.agency_id

        if str(origin.time) < "2015-01-07":
            split = "train"
        elif str(origin.time) < "2015-01-08":
            split = "dev"
        else:
            split = "test"
        event_params["split"] = split
    return event_params

#### Extracting the trace parameters

From each pick, we extract parameters about the trace and store them in a dictionary. Again, we only extract very basic parameters. We visualize one example.

In [9]:
def get_trace_params(pick):
    net = pick.waveform_id.network_code
    sta = pick.waveform_id.station_code

    trace_params = {
        "station_network_code": net,
        "station_code": sta,
        "trace_channel": pick.waveform_id.channel_code[:2],
        "station_location_code": pick.waveform_id.location_code,
        "evaluation_mode": pick.evaluation_mode
    }

    return trace_params

In [10]:
def select_arrival_of_pick(pick, arrivals):
    '''
    Docstring
    '''
    find_arrival = False
    for arrival in arrivals:
        if arrival.pick_id == pick.resource_id:
            find_arrival = True
            break
    if not find_arrival:
        arrival = False
    return arrival

In [11]:
# i = 0
# j = 0
# for ev in catalog:
#     origin = ev.preferred_origin()
#     for pick in ev.picks:
#         arrival = select_arrival_of_pick(pick=pick, arrivals=origin.arrivals)
#         if arrival==False:
#             i += 1
#             break
#         else:
#             j += 1
#         # print(arrival, pick, '\n\n', sep='\n')
# print(i, j)

In [12]:
def get_phase_params(pick, event):
    origin = event.preferred_origin()
    arrival = select_arrival_of_pick(pick=pick, arrivals=origin.arrivals)
    #
    phase_params = arrival.__dict__.copy()
    for key in ['resource_id', 'pick_id', 'phase']:
        phase_params.pop(key)
    phase_params = {f'{key}_{pick.phase_hint}':val for key,val in phase_params.items()}
    return phase_params

In [13]:
# ev = catalog[0]
# pick = ev.picks[1]
# get_phase_params(pick, ev)

## Noise Padding

In [None]:
from obspy import Stream, Trace
import numpy as np
###
def tr_noise_padding(tr, stime, etime, std_windows=(2, 2)):
    if isinstance(stime, float | int):
        stime = tr.stats.starttime - stime
    if isinstance(etime, float | int):
        etime = tr.stats.endtime + etime
    ###
    lst_tr = [tr]
    sps = tr.stats.sampling_rate
    ###
    sduration = (tr.stats.starttime - stime) * sps
    sduration = int(sduration)
    if sduration > 0:
        tr_std_s = tr.slice(endtime=tr.stats.starttime+std_windows[0])
        std_s = tr_std_s.std()
        snoise = np.random.normal(loc=0.0, scale=std_s, size=sduration)
        strn = Trace(snoise)
        strn.id = tr.id
        strn.stats.sampling_rate = sps
        strn.stats.starttime = tr.stats.starttime
        strn.stats.starttime -= (strn.stats.npts/sps)
        lst_tr.append(strn)
    ###
    eduration = (etime - tr.stats.endtime) * sps
    eduration = int(eduration)
    if eduration > 0:
        tr_std_e = tr.slice(starttime=tr.stats.endtime-std_windows[1])
        std_e = tr_std_e.std()
        enoise = np.random.normal(loc=0.0, scale=std_e, size=eduration)
        etrn = Trace(enoise)
        etrn.id = tr.id
        etrn.stats.sampling_rate = sps
        etrn.stats.starttime = tr.stats.endtime + 1/sps
        lst_tr.append(etrn)
    ###
    st = Stream(lst_tr)
    st.merge(-1)
    if st.get_gaps() == []:
        return st[0]
    else:
        print('There was a problem in noise-padding!')
        print(st)
        st.print_gaps()
        return None

def st_noise_padding(st, stime, etime, std_windows=(2, 2)):
    st.merge(-1)
    st.detrend('constant')
    st.merge(fill_value=0)
    st_new = Stream()
    for tr in st:
        st_new += tr_noise_padding(
            tr=tr, stime=stime, etime=etime, std_windows=std_windows
        )
    return st_new

### Method 1

In [14]:
def reversing_dictionary(dictionary):
    return {v:k for k, v in dictionary.items()}

In [15]:
def slice_data(st, pick, before, after):
    st_slice = st.slice(
        starttime=pick.time-before,
        endtime=pick.time+after,
        nearest_sample=True)
    st_slice.trim(
        starttime=pick.time-before,
        endtime=pick.time+after,
        pad=True,
        nearest_sample=True,
        fill_value=0)
    return st_slice

def trim_data(st, pick, before, after):
    st.trim(
        starttime=pick.time-before,
        endtime=pick.time+after,
        pad=True,
        nearest_sample=True,
        fill_value=0)

In [16]:
def preprocessing_data(st):
    st.merge(-1)
    st.detrend('constant')
    st.merge(fill_value=0)


#### Writing to SeisBench format

Now, we can combine all the above functions together to write a dataset in SeisBench format. For this, we first need to define the path. For this example, we are using the current working directory. A dataset consists of 2 components:
 - a metadata file, always called `metadata.csv`, which contains all the associated properties of the waveform examples (e.g. trace parameters, source parameters etc.).
 - a waveforms file, always called `waveforms.hdf5`, containing the raw waveforms.

In [None]:
base_path = Path(cfg.path.dataset)
metadata_path = base_path / "metadata.csv"
waveforms_path = base_path / "waveforms.hdf5"
print(metadata_path, waveforms_path, sep='\n')

To write the dataset, we use the `WaveformDataWriter` provided by SeisBench. The writer should always be used as a context manager, i.e., using the `with` statement, as shown below. This is to ensure files are properly clsoed after writing and teardown and cleanup operations are always called when exiting the context manager.

First, we need to set the data format for our dataset. We do this by assigning a dictionary to the `writer.data_format` group.

Next, we iterate over all event and all picks in the events. Using the functions above, we generate the event and trace metadata and download the waveforms. We then convert the waveforms to a numpy array using the function `stream_to_array` provided in `seisbench.util`.

As a last step, we hand the event metadata and the waveforms as numpy array over to the writer using `add_trace`. The writer then automatically takes care of writing out the data in the correct format. It also takes care of performance optimisations that we outline in the further considerations below.

In [18]:
def select_picks(picks, station_name, without_amplitued=True):
    picks = [pick for pick in picks
             if pick.waveform_id.station_code==station_name]
    picks = sorted(picks,
                   key= lambda p: p.time)
    if without_amplitued:
        picks = [pick for pick in picks
                 if not pick.phase_hint.startswith('AM')]
    return picks
    

In [19]:
def get_picks_time_difference(picks):
    picks_time = [pick.time for pick in picks]
    picks_time = sorted(picks_time)
    picks_difftime = [time-picks_time[0] for time in picks_time]
    return picks_difftime

In [20]:
def checking_equal_sps(stream):
    sps = stream[0].stats.sampling_rate
    assert all(tr.stats.sampling_rate == sps for tr in stream)

In [21]:
get_waveforms = get_data(cfg.path.stream_root, cfg.path.stream_pattern)

In [None]:
# Iterate over events and picks, write to SeisBench format
with sbd.WaveformDataWriter(metadata_path, waveforms_path) as writer:

    # Define data format
    writer.data_format = {
        "dimension_order": "CW",
        "component_order": "ZNE",
        "measurement": "velocity",
        "unit": "counts",
        "instrument_response": "not restituted",
    }
    n_all = len(catalog)
    for index, event in enumerate(catalog):
        # if index < 1050:
        #     continue
        if index % 500 == 0:
            print(f'{index} of {n_all} ({index/n_all*100:.2f}%)')
        # if index == 200:
        #     break

        event_params = get_event_params(event)
        stations_in_event = {pick.waveform_id.station_code for pick in event.picks}
        for station_name in stations_in_event:
            picks = select_picks(picks=event.picks,
                                 station_name=station_name)
            if picks == []:
                continue
            ###
            phase_params = {}
            for pick in picks:
                param = get_phase_params(pick, event)
                phase_params.update(param)
            ###
            time_diff = get_picks_time_difference(picks)
            if max(time_diff) >= 60:
                print(f'losing pick, maximume is: {max(time_diff)}')
            ###
            pick = picks[0]
            trace_params = get_trace_params(pick)
            waveforms = get_waveforms.get_data_related_to_pick(pick=pick)
            waveforms = slice_data(waveforms, pick,
                                   before=cfg.cut_time.before, 
                                   after=cfg.cut_time.after)
            waveforms = st_noise_padding(
                st=waveforms,
                stime=cfg.noisepad.before,
                etime=cfg.noisepad.after,
                std_windows=(cfg.noisepad.std_start, cfg.noisepad.std_end)
                )
            ### Check remaining data
            if len(waveforms) == 0:
                # No waveform data available
                print('There is No WaveForms After Slicing!!!')
                continue
            ###
            sampling_rate = waveforms[0].stats.sampling_rate
            # Check that the traces have the same sampling rate
            checking_equal_sps(stream=waveforms)

            actual_t_start, data, _ = sbu.stream_to_array(
                waveforms,
                component_order=writer.data_format["component_order"],
            )
            trace_params["trace_sampling_rate_hz"] = sampling_rate
            trace_params["trace_start_time"] = str(actual_t_start)

            for pick in picks:
                sample = (pick.time - actual_t_start) * sampling_rate
                trace_params[f"trace_{pick.phase_hint}_arrival_sample"] = int(sample)
                trace_params[f"trace_{pick.phase_hint}_status"] = pick.evaluation_mode

            writer.add_trace({**event_params, **trace_params, **phase_params}, data)

## Considerations for converting datasets

As outlined above, this tutorial provides a very minimal example on converting a dataset. Here we outline additional consideration that should be taken into account when preparing a dataset.

- **Grouping picks**: In this example, we created one trace for each pick. Naturally, traces will overlap if multiple picks, e.g., P and S phases, are available for an event at a station. For an example implementation of this grouping operation, have a look [here](https://github.com/seisbench/seisbench/blob/df94dcd86ce66d6a2ee2bd00da3857259fe579bd/seisbench/data/ethz.py#L109) and in the subsequent lines.
- **Adding station information**: In this example, we added no station information except its name. In practice, it will often be helpful for users to incorporate, for example, the location of the station. We skipped this step here, because it requires loading station inventories through FDSN. For an example implementation, have a look [here](https://github.com/seisbench/seisbench/blob/df94dcd86ce66d6a2ee2bd00da3857259fe579bd/seisbench/data/ethz.py#L315).
- **Memory requirements**: Internally, the `WaveformDataWriter` writes out the the waveforms continuously in blocks (see point below), but keeps all metadata in memory until the dataset is complete. For very large datasets (or very detailed metadata) this can result in several gigabytes of memory consumption. If you are writing such datasets, make sure the available memory on your machine is sufficient.
- **Waveform blocks**: Instead of writing each waveform separately, waveforms are written out in blocks. This massively improves IO performance. Have a look at [the documentation](https://seisbench.readthedocs.io/en/stable/pages/data_format.html#traces-blocks) for details on the strategy. We expect that in nearly all cases using the default setting will be a good choice.
- **FDSN considerations**: When converting very large datasets, the performance might be limited by the performance of the FDSN webservice. Unfortunately, downloading lots of short waveforms (as required for many machine learning applications) does not seem to be the most favorable use case for FDSN. This leads to rather slow performance when naively downloading the waveforms as outlined above. Instead, it is often helpful to issue [bulk requests](https://docs.obspy.org/master/packages/autogen/obspy.clients.fdsn.client.Client.get_waveforms_bulk.html). In addition, it might be a good choice to first download the waveforms and cache them locally, for example, in .mseed format, and then convert them to SeisBench.

For further details on the data format, check out [the data format specification in the SeisBench documentation](https://seisbench.readthedocs.io/en/stable/pages/data_format.html#traces-blocks).