# Test using Dask to speed up binning spectral peaks

In [1]:
from os.path import join
from numpy import linspace
from pandas import read_csv, cut, date_range, Timestamp, Timedelta
# import pandas as pd
from dask.dataframe import from_pandas
from time import time


from utils_basic import SPECTROGRAM_DIR as indir, GEO_STATIONS as stations
from utils_spec import assemble_spec_filename, group_spectral_peaks_regular_bins, read_geo_spectrograms, find_geo_station_spectral_peaks, read_spectral_peaks, save_spectral_peak_bin_counts, bin_counts_to_df
from utils_plot import plot_array_spec_peak_bin_counts, save_figure

In [2]:
# Inputs
# Data
window_length = 1.0
overlap = 0.0
downsample = False
downsample_factor = 60
prom_threshold = 10
rbw_threshold = 0.2

# Grouping
starttime = "2020-01-10T00:00:00"
endtime = "2020-02-02T00:00:00"

time_bin_width = "1s"
freq_bin_width = 1.0 # in Hz
min_freq = 0.0
max_freq = 200.0

count_threshold = 4

num_partitions = 32

In [3]:
if downsample:
    suffix = f"window{window_length:.0f}s_overlap{overlap:.1f}_downsample{downsample_factor:d}_prom{prom_threshold:.0f}db_rbw{rbw_threshold:.1f}"
else:
    suffix = f"window{window_length:.0f}s_overlap{overlap:.1f}_prom{prom_threshold:.0f}db_rbw{rbw_threshold:.1f}"

In [4]:
# Read the spectral peaks
print("Reading the spectral peaks...")
filename = f"geo_spectral_peaks_A01_{suffix}.h5"
inpath = join(indir, filename)
peak_df = read_spectral_peaks(inpath)
#peak_ddf = from_pandas(peak_df, npartitions = num_partitions)
num_peaks = len(peak_df)
print(f"{num_peaks} peaks are read.")

Reading the spectral peaks...
9944009 peaks are read.


In [5]:
# Define the time bins
if isinstance(starttime, str):
    starttime = Timestamp(starttime)

if isinstance(endtime, str):
    endtime = Timestamp(endtime)

starttime = Timestamp(starttime)
endtime = Timestamp(endtime)
time_bin_edges = date_range(starttime, endtime, freq=time_bin_width)
time_delta = time_bin_edges[1] - time_bin_edges[0]
time_bin_centers = [time + time_delta / 2 for time in time_bin_edges]
time_bin_centers = time_bin_centers[:-1]

In [6]:
clock1 = time()
# peak_ddf = from_pandas(peak_df, npartitions = 16)
#peak_ddf['time_bin'] = peak_ddf.map_partitions(lambda df: pd.cut(df['time'], bins=time_bin_edges, include_lowest=True, right=False))
peak_df['time_bin'] = cut(peak_df['time'], bins=time_bin_edges, include_lowest=True, right=False)
#peak_df.groupby(['time_bin', 'frequency'], observed = False).size().unstack(fill_value=0)
dask_df = from_pandas(peak_df, npartitions=16)
dask_df.groupby(['time_bin', 'frequency'], observed = False).size()
dask_df.compute()
clock2 = time()

print(clock2 - clock1)
# peak_df['time_bin'] = cut(peak_df["time"], time_bin_edges, include_lowest=True, right=False)
# peak_df['freq_bin'] = cut(peak_df["frequency"], freq_bin_edges, include_lowest=True, right=False)

238.8524341583252
