# Compile a day of miniseed data

In [14]:
from obspy import read
import xarray as xr
import os
from scipy import signal
import numpy as np
from datetime import datetime
import dask.array as da
import multiprocessing

In [2]:
instrument_day_path = "../../ooi/san_data/RS03AXBS-LJ03A-09-HYDBBA302/2024/01/05"

## Find all the miniseed files for the selected instrument and day

First define a function that recursively searches for all files with a \*.mseed extension:

In [3]:
def find_mseed_files(directory):
    mseed_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(".mseed"):
                mseed_files.append(os.path.join(root, file))
    return mseed_files

In [21]:
def make_xarray(file):
    
    time_string = file.split("YDH-")[1].split('.mseed')[0]
    time_val = datetime.strptime(time_string, '%Y-%m-%dT%H:%M:%S.%fZ')
    
    stream = read(file)
    data = stream.detrend().split().decimate(16).merge(method=1, fill_value="interpolate").decimate(10)
    
    fs = data[0].stats["sampling_rate"]

    # Set spectrogram parameters
    segment_length_seconds = .5
    nperseg = int(segment_length_seconds * fs)
    noverlap_percent = 15
    noverlap = int(nperseg * noverlap_percent/100)

    f, t, Sxx = signal.spectrogram(data[0].data, fs, nperseg = nperseg, noverlap = noverlap)
    array_out = xr.DataArray(np.log10(Sxx), 
                           dims=('frequency', 'seconds'),
                           coords={'frequency':f, 'seconds':t})
    array_out.coords['time'] = time_val

    return array_out

In [37]:
def parallel_process(data, func, sub_segment_length, n_processes=None):
    if n_processes is None:
        n_processes = multiprocessing.cpu_count() 

    pool = multiprocessing.Pool(processes=n_processes)
    
    # Initialize a list to hold the xarray sub-arrays
    results_all = []
    
    # Break into a number of 5-minute segments at a time (30-mins total)
    segments = [data[i:i + sub_segment_length] for i in range(0, len(data), sub_segment_length)]
    for idx, segment in enumerate(segments):
        print( "Segment " + str(idx+1) + " of " + str(len(segments)) )
        results = pool.map(func, segment)
        # Use dask for efficiency
        dask_datasets = [da.chunk({'seconds': 50, 'frequency': 50}) for da in results]
        results_all.append(xr.concat(dask_datasets, dim='time'))
        del results
        
    pool.close() 
    pool.join()

    return results_all

In [40]:
mseed_files = find_mseed_files(instrument_day_path)

In [48]:
mseed_files_sub = mseed_files[0:100]
results = parallel_process(mseed_files_sub, make_xarray, 6)
results_full = xr.concat(results, dim='time').sortby('time')


Segment 1 of 17
Segment 2 of 17
Segment 3 of 17
Segment 4 of 17
Segment 5 of 17
Segment 6 of 17
Segment 7 of 17
Segment 8 of 17
Segment 9 of 17
Segment 10 of 17
Segment 11 of 17
Segment 12 of 17
Segment 13 of 17
Segment 14 of 17
Segment 15 of 17
Segment 16 of 17
Segment 17 of 17
