# This song detector checks to see if there are high frequency oscillations in a recording to figure out if it has song. If there are high frequency oscillations, then it will use an amplitude threshold to check for song near the region of high frequency oscillation.

### For generating the .json file of periodicity-detected songs:

In [1]:
# #!/usr/bin/env python
# # -*- coding: utf‑8 -*-

# import numpy as np
# from scipy.io import wavfile
# from scipy.signal import spectrogram, windows, ellip, filtfilt
# from scipy.ndimage import gaussian_filter1d
# import os
# import json


# # ── Core detector ────────────────────────────────────────────────────────────
# def detect_song_intervals(
#     file_path,
#     segment_duration=10,
#     low_cut=500,
#     high_cut=8000,
#     low_mod=10,              # ── NEW ──
#     high_mod=40,             # ── NEW ──
#     smoothing_sigma=3,
#     power_threshold=0.5,
# ):
#     """
#     Detects [start_sec, end_sec] intervals where the smoothed `low_mod–high_mod`
#     Hz modulation power of the log‑amplitude trace exceeds `power_threshold`.
#     """
#     try:
#         sr, data = wavfile.read(file_path)
#         if data.ndim > 1:                    # stereo → mono
#             data = data.mean(axis=1)

#         # Band‑pass filter raw audio
#         nyq = sr / 2
#         b, a = ellip(5, 0.2, 40, [low_cut / nyq, high_cut / nyq], btype="band")
#         data = filtfilt(b, a, data)

#         seg_len_samples = int(segment_duration * sr)
#         n_segments = int(np.ceil(len(data) / seg_len_samples))
#         detected_intervals = []

#         for i in range(n_segments):
#             start_samp = i * seg_len_samples
#             end_samp   = min(start_samp + seg_len_samples, len(data))
#             segment = np.zeros(seg_len_samples, dtype=data.dtype)
#             segment[: end_samp - start_samp] = data[start_samp:end_samp]

#             # ── Audio‑spectrogram of segment ──────────────────────────────
#             f, t, Sxx = spectrogram(
#                 segment,
#                 fs=sr,
#                 window=windows.gaussian(2048, std=2048 / 8),
#                 nperseg=2048,
#                 noverlap=2048 - 119,
#             )

#             # Log‑amplitude trace
#             mask_audio_band = (f >= low_cut) & (f <= high_cut)
#             amp_trace = np.sum(Sxx[mask_audio_band, :], axis=0)
#             log_amp = np.log10(amp_trace + np.finfo(float).eps)

#             # ── Spectrogram of amplitude trace (with full‑duration padding)
#             amp_fs   = len(t) / segment_duration
#             nper     = 128
#             pad      = nper // 2
#             exp_len  = int(segment_duration * amp_fs)

#             log_amp_padded = np.pad(
#                 log_amp,
#                 (pad, pad + exp_len - len(log_amp)),
#                 mode="constant",
#             )

#             f_amp, t_amp_raw, Sxx_amp = spectrogram(
#                 log_amp_padded, fs=amp_fs, window="hann", nperseg=nper, noverlap=96
#             )
#             t_amp = t_amp_raw - pad / amp_fs

#             # ── Ensure full‑width spectrogram ─────────────────────────────
#             if t_amp[-1] < segment_duration:
#                 extra_cols = int(np.round((segment_duration - t_amp[-1]) * amp_fs))
#                 if extra_cols > 0:
#                     Sxx_amp = np.pad(Sxx_amp, ((0, 0), (0, extra_cols)), mode="constant")
#                     t_extra = t_amp[-1] + np.arange(1, extra_cols + 1) / amp_fs
#                     t_amp   = np.concatenate((t_amp, t_extra))

#             # ── Modulation‑band power envelope ───────────────────────────
#             mask_mod_band = (f_amp >= low_mod) & (f_amp <= high_mod)  # NEW
#             band_power    = np.sum(Sxx_amp[mask_mod_band, :], axis=0)
#             smooth_pow    = gaussian_filter1d(band_power, sigma=smoothing_sigma)

#             # ── Threshold‑based interval extraction ───────────────────────
#             above = smooth_pow > power_threshold
#             in_span = False
#             for j, flg in enumerate(above):
#                 cur_time = i * segment_duration + t_amp[j]
#                 if flg and not in_span:
#                     in_span  = True
#                     span_start = cur_time
#                 elif not flg and in_span:
#                     detected_intervals.append([span_start, cur_time])
#                     in_span = False
#             if in_span:  # ran off end of panel still above threshold
#                 detected_intervals.append([span_start, (i + 1) * segment_duration])

#         return detected_intervals

#     except Exception as err:
#         print(f"⚠️  Error processing {file_path}: {err}")
#         return []


# # ── Folder‑level helper ───────────────────────────────────────────────────────
# def process_folder(folder_path, output_json_path, **kwargs):
#     results = {}
#     for fname in sorted(os.listdir(folder_path)):
#         if fname.lower().endswith(".wav"):
#             fpath = os.path.join(folder_path, fname)
#             print(f"Processing: {fname}")
#             results[fname] = detect_song_intervals(fpath, **kwargs)

#     with open(output_json_path, "w") as fp:
#         json.dump(results, fp, indent=4)

#     print(f"\nSaved song detection results to: {output_json_path}")


# # ── Example usage ─────────────────────────────────────────────────────────────
# if __name__ == "__main__":
#     FOLDER = (
#         "/Volumes/my_own_SSD/UO_stuff/nerve_transections/USA5207/42"
#     )
#     OUT_JSON = FOLDER + "_periodicity_only_detected_song_intervals.json"

#     process_folder(
#         FOLDER,
#         OUT_JSON,
#         segment_duration=10,
#         low_cut=700,
#         high_cut=7000,
#         low_mod=22,            # ← choose your modulation band here
#         high_mod=55,          # ←
#         smoothing_sigma=5,
#         power_threshold=0.05,
#     )

In [2]:
#!/usr/bin/env python
# -*- coding: utf‑8 -*-
"""
Batch long‑amplitude song detector (modulation‑band version)

❱  Detects song by looking at modulation‑band power of the
   log‑amplitude trace and writes a JSON of intervals.

•  Skips macOS “dot‑underscore” side‑car files automatically.
•  Performs a lightweight RIFF/RIFX header check so any other
   non‑WAV files are skipped instead of crashing.
•  Compatible with Python 3.7 – 3.11 (uses typing.Union).
"""
import os, json, numpy as np
from pathlib import Path
from typing  import List, Union, Dict, Optional
from scipy.io      import wavfile
from scipy.signal  import spectrogram, windows, ellip, filtfilt
from scipy.ndimage import gaussian_filter1d

try:
    from tqdm.auto import tqdm          # optional progress bar
except ModuleNotFoundError:
    tqdm = None


# ── Core detector ────────────────────────────────────────────────────────────
def detect_song_intervals(
    file_path: Union[str, Path],
    segment_duration: int   = 10,
    low_cut: int            = 500,
    high_cut: int           = 8000,
    low_mod: int            = 10,
    high_mod: int           = 40,
    smoothing_sigma: int    = 3,
    power_threshold: float  = 0.5,
) -> List[List[float]]:
    """
    Returns intervals [[start_sec, end_sec], …] where the smoothed
    modulation‑band power exceeds `power_threshold`.
    """
    try:
        # ── quick RIFF/RIFX sanity‑check ──────────────────────────────────
        with open(file_path, "rb") as fh:
            if fh.read(4) not in (b"RIFF", b"RIFX"):
                raise ValueError("not a RIFF/RIFX WAV")

        sr, data = wavfile.read(file_path)
        if data.ndim > 1:                         # stereo → mono
            data = data.mean(axis=1)

        # Band‑pass filter raw audio
        nyq = sr / 2
        b, a = ellip(5, 0.2, 40, [low_cut / nyq, high_cut / nyq], btype="band")
        data = filtfilt(b, a, data)

        seg_len_samples = int(segment_duration * sr)
        n_segments      = int(np.ceil(len(data) / seg_len_samples))
        detected: List[List[float]] = []

        for i in range(n_segments):
            start_samp = i * seg_len_samples
            end_samp   = min(start_samp + seg_len_samples, len(data))
            segment    = np.zeros(seg_len_samples, dtype=data.dtype)
            segment[: end_samp - start_samp] = data[start_samp:end_samp]

            # Audio spectrogram of segment
            f, t, Sxx = spectrogram(
                segment,
                fs=sr,
                window=windows.gaussian(2048, std=2048 / 8),
                nperseg=2048,
                noverlap=2048 - 119,
            )

            # Log‑amplitude trace
            mask_audio_band = (f >= low_cut) & (f <= high_cut)
            amp_trace = np.sum(Sxx[mask_audio_band, :], axis=0)
            log_amp   = np.log10(amp_trace + np.finfo(float).eps)

            # Spectrogram of amplitude trace (with full‑duration padding)
            amp_fs  = len(t) / segment_duration
            nper    = 128
            pad     = nper // 2
            exp_len = int(segment_duration * amp_fs)

            log_amp_padded = np.pad(
                log_amp,
                (pad, pad + exp_len - len(log_amp)),
                mode="constant",
            )

            f_amp, t_amp_raw, Sxx_amp = spectrogram(
                log_amp_padded, fs=amp_fs, window="hann", nperseg=nper, noverlap=96
            )
            t_amp = t_amp_raw - pad / amp_fs

            # Ensure full‑width spectrogram
            if t_amp[-1] < segment_duration:
                extra_cols = int(np.round((segment_duration - t_amp[-1]) * amp_fs))
                if extra_cols > 0:
                    Sxx_amp = np.pad(Sxx_amp, ((0, 0), (0, extra_cols)), mode="constant")
                    t_extra = t_amp[-1] + np.arange(1, extra_cols + 1) / amp_fs
                    t_amp   = np.concatenate((t_amp, t_extra))

            # Modulation‑band power envelope
            mask_mod_band = (f_amp >= low_mod) & (f_amp <= high_mod)
            band_power    = np.sum(Sxx_amp[mask_mod_band, :], axis=0)
            smooth_pow    = gaussian_filter1d(band_power, sigma=smoothing_sigma)

            # Threshold‑based interval extraction
            above, in_span = smooth_pow > power_threshold, False
            for j, flg in enumerate(above):
                cur_time = i * segment_duration + t_amp[j]
                if flg and not in_span:
                    in_span, span_start = True, cur_time
                elif not flg and in_span:
                    detected.append([span_start, cur_time])
                    in_span = False
            if in_span:  # ran off end of segment while above threshold
                detected.append([span_start, (i + 1) * segment_duration])

        return detected

    except Exception as err:
        print(f"⚠️  Skipping {Path(file_path).name}: {err}")
        return []


# ── Folder‑level helper ──────────────────────────────────────────────────────
def process_folder(
    folder_path     : Union[str, Path],
    output_json_path: Union[str, Path],
    **kwargs
) -> Dict[str, List[List[float]]]:
    """
    Runs `detect_song_intervals` on every WAV in *folder_path* and saves
    the results to *output_json_path*.

    • Skips dot‑underscore side‑cars.
    • Shows a tqdm progress bar if tqdm is available.
    """
    folder_path      = Path(folder_path).expanduser().resolve()
    output_json_path = Path(output_json_path).expanduser().resolve()

    wav_files = sorted(
        p for p in folder_path.glob("*.wav") if not p.name.startswith("._")
    )
    if not wav_files:
        print("No WAV files found in", folder_path)
        return {}

    iterator = (
        tqdm(wav_files, desc="Processing WAVs", unit="file")
        if tqdm is not None else wav_files
    )

    results: Dict[str, List[List[float]]] = {}
    for p in iterator:
        intervals = detect_song_intervals(p, **kwargs)
        if intervals:
            results[p.name] = intervals

    with open(output_json_path, "w", encoding="utf‑8") as fp:
        json.dump(results, fp, indent=4)

    print(f"\nSaved song detection results for {len(results)} files → {output_json_path}")
    return results


# ── Example usage ────────────────────────────────────────────────────────────
if __name__ == "__main__":
    FOLDER   = "/Volumes/my_own_SSD/UO_stuff/RC12_R10_Comp1/36"
    OUT_JSON = FOLDER + "_periodicity_only_detected_song_intervals.json"

    process_folder(
        FOLDER,
        OUT_JSON,
        segment_duration = 10,
        low_cut          = 700,
        high_cut         = 7000,
        low_mod          = 22,   # choose modulation band here
        high_mod         = 55,
        smoothing_sigma  = 5,
        power_threshold  = 0.05,
    )


  from .autonotebook import tqdm as notebook_tqdm
Processing WAVs: 100%|██████████| 553/553 [07:44<00:00,  1.19file/s]


Saved song detection results for 553 files → /Volumes/my_own_SSD/UO_stuff/RC12_R10_Comp1/36_periodicity_only_detected_song_intervals.json





## For generating the .json file of time segments of amplitude-detected song:

In [3]:
# #!/usr/bin/env python
# # -*- coding: utf‑8 -*-
# """
# Batch long‑amplitude song detector with progress bar
# ----------------------------------------------------
# ▶  Point `process_folder()` at any directory full of .wav files.
# ▶  A pretty‑printed JSON is saved alongside that directory,
#     not inside it, unless you override `out_json`.

# Progress feedback
# -----------------
# * Uses `tqdm` (if installed) for a live bar; otherwise prints "X / N".
# """
# from __future__ import annotations
# import json, sys
# from pathlib import Path
# from typing   import List, Tuple, Dict, Optional

# import numpy as np
# from scipy.io     import wavfile
# from scipy.signal import spectrogram, windows, ellip, filtfilt
# from scipy.ndimage import gaussian_filter1d
# from tqdm.auto import tqdm

# # ── optional tqdm import ───────────────────────────────────────
# try:
#     from tqdm.auto import tqdm
# except ModuleNotFoundError:
#     tqdm = None


# # ──────────────────────────────────────────────────────────────
# def mask_to_intervals(mask: np.ndarray, times: np.ndarray) -> List[Tuple[float, float]]:
#     intervals: List[Tuple[float, float]] = []
#     in_run, start = False, 0.0
#     for idx, v in enumerate(mask):
#         if v and not in_run:
#             in_run, start = True, times[idx]
#         elif not v and in_run:
#             intervals.append((start, times[idx])); in_run = False
#     if in_run:
#         intervals.append((start, times[-1]))
#     return intervals


# def detect_song_intervals(
#         file_path          : Path | str,
#         segment_duration   : float       = 10.0,
#         bandpass_low_cut   : float       = 500.0,
#         bandpass_high_cut  : float       = 8000.0,
#         sigma              : float       = 100.0,
#         threshold          : float       = 0.05,
#         sigma_log          : Optional[float] = None,
#         threshold_log      : Optional[float] = None,
#         log_base           : float       = 10.0,
#     ) -> List[Tuple[float, float]]:
#     if sigma_log is None:
#         sigma_log = sigma
#     eps = np.finfo(float).eps

#     sr, data = wavfile.read(str(file_path))
#     if data.ndim > 1:
#         data = data.mean(axis=1)
#     wp = [bandpass_low_cut / (sr / 2),
#           bandpass_high_cut / (sr / 2)]
#     b, a = ellip(5, 0.2, 40, wp, btype='band')
#     data = filtfilt(b, a, data)

#     seg_len  = int(segment_duration * sr)
#     n_seg    = int(np.ceil(len(data) / seg_len))
#     detected: List[Tuple[float, float]] = []

#     for seg_idx in range(n_seg):
#         seg = np.zeros(seg_len, dtype=float)
#         start = seg_idx * seg_len
#         seg[:max(0, min(seg_len, len(data) - start))] = data[start:start + seg_len]

#         f, t, Sxx = spectrogram(
#             seg, fs=sr,
#             window=windows.gaussian(2048, 2048/8),
#             nperseg=2048, noverlap=2048 - 119
#         )
#         rows = (f >= bandpass_low_cut) & (f <= bandpass_high_cut)

#         amp       = np.sum(Sxx[rows, :], axis=0)
#         amp_log   = (10.0 * np.log10(amp + eps)
#                      if log_base == 10 else np.log(amp + eps))
#         amp_log_s = gaussian_filter1d(amp_log, sigma=sigma_log)
#         if threshold_log is None and seg_idx == 0:
#             threshold_log = np.percentile(amp_log_s, 90)
#         mask_log  = amp_log_s > threshold_log

#         abs_times = t + seg_idx * segment_duration
#         detected.extend(mask_to_intervals(mask_log, abs_times))

#     return detected


# # ──────────────────────────────────────────────────────────────
# def process_folder(
#         folder_path   : Path | str,
#         out_json      : Optional[Path | str] = None,
#         **detector_kw
#     ) -> Dict[str, List[List[float]]]:
#     """
#     Analyse every .wav in `folder_path` and dump a JSON *beside*
#     the folder (i.e. in its parent directory) unless `out_json`
#     explicitly points elsewhere.
#     """
#     folder = Path(folder_path).expanduser().resolve()
#     if not folder.is_dir():
#         raise NotADirectoryError(f"{folder} is not a valid directory")

#     wav_files = sorted(folder.glob("*.wav"))
#     n_total   = len(wav_files)
#     if n_total == 0:
#         print("No .wav files found in", folder)
#         return {}

#     iterator = (
#         tqdm(wav_files, desc="Processing WAVs", unit="file", total=n_total)
#         if tqdm is not None else wav_files
#     )

#     results: Dict[str, List[List[float]]] = {}
#     for idx, wav_path in enumerate(iterator, 1):
#         intervals = detect_song_intervals(wav_path, **detector_kw)
#         results[wav_path.name] = [list(map(float, iv)) for iv in intervals]

#         if tqdm is None:
#             sys.stdout.write(f"\rProcessed {idx} / {n_total} files")
#             sys.stdout.flush()
#     if tqdm is None:
#         print()

#     # ▼▼▼  CHANGED BLOCK  ▼▼▼
#     if out_json is None:
#         # now saved in the *parent* directory of folder_path
#         out_json = folder.parent / f"{folder.name}_logamp_detected_song_intervals.json"
#     # ▲▲▲  CHANGED BLOCK  ▲▲▲

#     with open(out_json, "w", encoding="utf‑8") as fh:
#         json.dump(results, fh, indent=4)
#     print(f"Saved detections for {n_total} WAV files → {out_json}")
#     return results


# # ──────────────────────────────────────────────────────────────
# # Example CLI usage
# # ──────────────────────────────────────────────────────────────
# if __name__ == "__main__":
#     process_folder(
#         folder_path = "/Volumes/my_own_SSD/UO_stuff/nerve_transections/USA5207/41",
#         bandpass_low_cut  = 700,
#         bandpass_high_cut = 7000,
#         sigma             = 60,
#         threshold         = 30,
#         sigma_log         = 100,
#         threshold_log     = 6,      # dB
#         segment_duration  = 10.0,
#     )


In [4]:
# # ──────────────────────────────────────────────────────────────
# def process_folder(
#         folder_path   : Path | str,
#         out_json      : Optional[Path | str] = None,
#         **detector_kw
#     ) -> Dict[str, List[List[float]]]:
#     """
#     Analyse every .wav in `folder_path` and dump a JSON *beside*
#     the folder (i.e. in its parent directory) unless `out_json`
#     explicitly points elsewhere.

#     • Skips macOS “dot‑underscore” side‑car files (names start with '._').
#     • Performs a quick RIFF/RIFX header check so any other non‑WAV
#       files are skipped instead of crashing the batch.
#     """
#     folder = Path(folder_path).expanduser().resolve()
#     if not folder.is_dir():
#         raise NotADirectoryError(f"{folder} is not a valid directory")

#     # ── NEW: filter out dot‑underscore side‑cars right here ───────────────
#     wav_files = sorted(
#         p for p in folder.glob("*.wav")
#         if not p.name.startswith("._")
#     )
#     n_total = len(wav_files)
#     if n_total == 0:
#         print("No .wav files found in", folder)
#         return {}

#     iterator = (
#         tqdm(wav_files, desc="Processing WAVs", unit="file", total=n_total)
#         if tqdm is not None else wav_files
#     )

#     results: Dict[str, List[List[float]]] = {}
#     for idx, wav_path in enumerate(iterator, 1):

#         # ── NEW: lightweight header check so bad files don’t abort the run
#         try:
#             with open(wav_path, "rb") as fh:
#                 if fh.read(4) not in (b"RIFF", b"RIFX"):
#                     raise ValueError("not a RIFF/RIFX WAV")

#             intervals = detect_song_intervals(wav_path, **detector_kw)
#         except (ValueError, OSError) as err:
#             msg = f"⚠️  Skipping {wav_path.name}: {err}"
#             if tqdm is not None:
#                 tqdm.write(msg)
#             else:
#                 print(msg)
#             continue  # move on to the next file

#         results[wav_path.name] = [list(map(float, iv)) for iv in intervals]

#         if tqdm is None:
#             sys.stdout.write(f"\rProcessed {idx} / {n_total} files")
#             sys.stdout.flush()
#     if tqdm is None:
#         print()

#     # If not specified, save JSON beside the folder
#     if out_json is None:
#         out_json = folder.parent / f"{folder.name}_logamp_detected_song_intervals.json"

#     with open(out_json, "w", encoding="utf‑8") as fh:
#         json.dump(results, fh, indent=4)

#     print(f"Saved detections for {len(results)} WAV files → {out_json}")
#     return results

# # ──────────────────────────────────────────────────────────────
# # Example CLI usage
# # ──────────────────────────────────────────────────────────────
# if __name__ == "__main__":
#     process_folder(
#         folder_path = "/Volumes/my_own_SSD/UO_stuff/nerve_transections/USA5207/42",
#         bandpass_low_cut  = 700,
#         bandpass_high_cut = 7000,
#         sigma             = 60,
#         threshold         = 30,
#         sigma_log         = 100,
#         threshold_log     = 6,      # dB
#         segment_duration  = 10.0,
#     )

In [5]:
# import inspect
# from pathlib import Path
# from typing import List, Dict, Optional
# import json, sys

# # ──────────────────────────────────────────────────────────────
# def process_folder(
#         folder_path : Path | str,
#         out_json    : Optional[Path | str] = None,
#         **detector_kw
#     ) -> Dict[str, List[List[float]]]:
#     """
#     Analyse every .wav in *folder_path* with `detect_song_intervals` and
#     save a JSON beside the folder.

#     • Skips macOS “dot‑underscore” files.
#     • Skips anything whose first 4 bytes are not RIFF/RIFX.
#     • Auto‑maps kwargs so you can pass either
#         (bandpass_low_cut, bandpass_high_cut)
#       or (low_cut, high_cut) etc.; unknown kwargs are ignored.
#     """
#     folder = Path(folder_path).expanduser().resolve()
#     if not folder.is_dir():
#         raise NotADirectoryError(f"{folder} is not a valid directory")

#     # ── file list, minus side‑cars ────────────────────────────────────────
#     wav_files = sorted(p for p in folder.glob("*.wav") if not p.name.startswith("._"))
#     if not wav_files:
#         print("No .wav files found in", folder)
#         return {}

#     # ── find which argument names the detector really supports ────────────
#     detector_params = inspect.signature(detect_song_intervals).parameters

#     # map common synonyms once up‑front
#     translated_kw = detector_kw.copy()
#     if "bandpass_low_cut" in translated_kw and "low_cut" in detector_params:
#         translated_kw["low_cut"] = translated_kw.pop("bandpass_low_cut")
#     if "bandpass_high_cut" in translated_kw and "high_cut" in detector_params:
#         translated_kw["high_cut"] = translated_kw.pop("bandpass_high_cut")

#     # keep only those kwargs the detector can accept
#     translated_kw = {
#         k: v for k, v in translated_kw.items() if k in detector_params
#     }

#     iterator = (
#         tqdm(wav_files, desc="Processing WAVs", unit="file", total=len(wav_files))
#         if tqdm is not None else wav_files
#     )

#     results: Dict[str, List[List[float]]] = {}
#     for idx, wav_path in enumerate(iterator, 1):
#         # quick header sanity‑check
#         try:
#             with open(wav_path, "rb") as fh:
#                 if fh.read(4) not in (b"RIFF", b"RIFX"):
#                     raise ValueError("not a RIFF/RIFX WAV")

#             intervals = detect_song_intervals(wav_path, **translated_kw)
#         except (ValueError, OSError) as err:
#             msg = f"⚠️  Skipping {wav_path.name}: {err}"
#             tqdm.write(msg) if tqdm is not None else print(msg)
#             continue

#         results[wav_path.name] = [list(map(float, iv)) for iv in intervals]

#         if tqdm is None:
#             sys.stdout.write(f"\rProcessed {idx} / {len(wav_files)} files")
#             sys.stdout.flush()
#     if tqdm is None:
#         print()

#     # save JSON beside the folder unless user overrides
#     if out_json is None:
#         out_json = folder.parent / f"{folder.name}_logamp_detected_song_intervals.json"

#     with open(out_json, "w", encoding="utf‑8") as fh:
#         json.dump(results, fh, indent=4)

#     print(f"Saved detections for {len(results)} WAV files → {out_json}")
#     return results

# process_folder(
#     folder_path      = "/Volumes/my_own_SSD/UO_stuff/nerve_transections/USA5207/47",
#     bandpass_low_cut = 700,
#     bandpass_high_cut= 7000,
#     sigma            = 60,
#     threshold        = 30,
#     sigma_log        = 100,
#     threshold_log    = 6,
#     segment_duration = 10.0,
# )


In [6]:
import inspect, json, sys
from pathlib import Path
from typing import List, Dict, Optional, Union   # ← added Union for Py < 3.10

# optional progress bar
try:
    from tqdm.auto import tqdm
except ModuleNotFoundError:
    tqdm = None


# ──────────────────────────────────────────────────────────────
def process_folder(
    folder_path: Union[Path, str],                 # ← use Union[…]
    out_json   : Optional[Union[Path, str]] = None,
    **detector_kw
) -> Dict[str, List[List[float]]]:
    """
    Analyse every .wav in *folder_path* with `detect_song_intervals`
    and save a JSON beside the folder.

    • Skips macOS “dot‑underscore” files.
    • Skips anything whose first 4 bytes are not RIFF/RIFX.
    • Auto‑maps kwargs so you can pass either
        (bandpass_low_cut, bandpass_high_cut)
      or (low_cut, high_cut); unknown kwargs are ignored.
    """
    folder = Path(folder_path).expanduser().resolve()
    if not folder.is_dir():
        raise NotADirectoryError(f"{folder} is not a valid directory")

    # ── file list, minus side‑cars ────────────────────────────────────────
    wav_files = sorted(p for p in folder.glob("*.wav") if not p.name.startswith("._"))
    if not wav_files:
        print("No .wav files found in", folder)
        return {}

    # ── find which argument names the detector really supports ────────────
    detector_params = inspect.signature(detect_song_intervals).parameters

    # map common synonyms once up‑front
    translated_kw = detector_kw.copy()
    if "bandpass_low_cut" in translated_kw and "low_cut" in detector_params:
        translated_kw["low_cut"] = translated_kw.pop("bandpass_low_cut")
    if "bandpass_high_cut" in translated_kw and "high_cut" in detector_params:
        translated_kw["high_cut"] = translated_kw.pop("bandpass_high_cut")

    # keep only those kwargs the detector can accept
    translated_kw = {k: v for k, v in translated_kw.items() if k in detector_params}

    iterator = (
        tqdm(wav_files, desc="Processing WAVs", unit="file")
        if tqdm is not None else wav_files
    )

    results: Dict[str, List[List[float]]] = {}
    for idx, wav_path in enumerate(iterator, 1):
        # quick header sanity‑check
        try:
            with open(wav_path, "rb") as fh:
                if fh.read(4) not in (b"RIFF", b"RIFX"):
                    raise ValueError("not a RIFF/RIFX WAV")

            intervals = detect_song_intervals(wav_path, **translated_kw)
        except (ValueError, OSError) as err:
            msg = f"⚠️  Skipping {wav_path.name}: {err}"
            tqdm.write(msg) if tqdm is not None else print(msg)
            continue

        results[wav_path.name] = [list(map(float, iv)) for iv in intervals]

        if tqdm is None:                       # crude progress if no tqdm
            sys.stdout.write(f"\rProcessed {idx} / {len(wav_files)} files")
            sys.stdout.flush()
    if tqdm is None:
        print()

    # save JSON beside the folder unless user overrides
    if out_json is None:
        out_json = folder.parent / f"{folder.name}_logamp_detected_song_intervals.json"

    with open(out_json, "w", encoding="utf‑8") as fh:
        json.dump(results, fh, indent=4)

    print(f"Saved detections for {len(results)} WAV files → {out_json}")
    return results


# ── Example call ────────────────────────────────────────────────────────────
process_folder(
    folder_path      = "/Volumes/my_own_SSD/UO_stuff/RC12_R10_Comp1/36",
    bandpass_low_cut = 700,
    bandpass_high_cut= 7000,
    sigma            = 60,
    threshold        = 30,
    sigma_log        = 100,
    threshold_log    = 6,
    segment_duration = 10.0,
)


Processing WAVs: 100%|██████████| 553/553 [07:56<00:00,  1.16file/s]

Saved detections for 553 WAV files → /Volumes/my_own_SSD/UO_stuff/RC12_R10_Comp1/36_logamp_detected_song_intervals.json





{'R10_45800.10061220_5_23_2_47_41.wav': [],
 'R10_45800.10191866_5_23_2_49_51.wav': [],
 'R10_45800.10323442_5_23_2_52_3.wav': [],
 'R10_45800.10564497_5_23_2_56_4.wav': [],
 'R10_45800.10771092_5_23_2_59_31.wav': [],
 'R10_45800.10900874_5_23_3_1_40.wav': [],
 'R10_45800.11769306_5_23_3_16_9.wav': [],
 'R10_45800.11899084_5_23_3_18_19.wav': [],
 'R10_45800.1208158_5_23_0_20_8.wav': [],
 'R10_45800.12499052_5_23_3_28_19.wav': [],
 'R10_45800.12629914_5_23_3_30_29.wav': [],
 'R10_45800.12760440_5_23_3_32_40.wav': [],
 'R10_45800.12985806_5_23_3_36_25.wav': [],
 'R10_45800.13246159_5_23_3_40_46.wav': [],
 'R10_45800.13376416_5_23_3_42_56.wav': [],
 'R10_45800.13575485_5_23_3_46_15.wav': [],
 'R10_45800.13786215_5_23_3_49_46.wav': [],
 'R10_45800.1385715_5_23_0_23_5.wav': [],
 'R10_45800.14010520_5_23_3_53_30.wav': [],
 'R10_45800.14358386_5_23_3_59_18.wav': [],
 'R10_45800.14635717_5_23_4_3_55.wav': [],
 'R10_45800.14910730_5_23_4_8_30.wav': [],
 'R10_45800.15044340_5_23_4_10_44.wav': []

# Combine the outputs from the .json files:

In [7]:
import json
from pathlib import Path

# === User Configuration ===
PERIODICITY_JSON_PATH = Path("/Volumes/my_own_SSD/UO_stuff/RC12_R10_Comp1/36_periodicity_only_detected_song_intervals.json")
AMPLITUDE_JSON_PATH   = Path("/Volumes/my_own_SSD/UO_stuff/RC12_R10_Comp1/36_logamp_detected_song_intervals.json")
# Combined output will be saved alongside the periodicity file:
COMBINED_JSON_PATH    = PERIODICITY_JSON_PATH.parent / f"{PERIODICITY_JSON_PATH.stem}_combined_segments.json"

# === Parameters ===
MARGIN_SECONDS = 2.0  # include amplitude segments within ±2 s of each periodicity window

def load_json(path: Path) -> dict:
    if not path.is_file():
        raise FileNotFoundError(f"No file found at: {path}")
    with path.open('r') as f:
        return json.load(f)

def main():
    # Load data
    periodicity_segments = load_json(PERIODICITY_JSON_PATH)
    amplitude_segments   = load_json(AMPLITUDE_JSON_PATH)

    combined_segments = {}

    # Process each file in the periodicity results
    for fname, p_segs in periodicity_segments.items():
        a_segs = amplitude_segments.get(fname, [])
        file_combined = []

        print(f"File: {fname}")
        # 1) Periodicity detector segments
        print("  Periodicity segments:")
        for (p_start, p_end) in p_segs:
            print(f"    - [{p_start:.3f}, {p_end:.3f}]")

        # 2) Amplitude-detector segments within margin of each periodicity segment
        print(f"  Amplitude segments (within ±{MARGIN_SECONDS}s of periodicity windows):")
        # We'll collect matches per periodicity segment
        per_to_amp_matches = []
        for (p_start, p_end) in p_segs:
            matches = [
                (a_start, a_end)
                for (a_start, a_end) in a_segs
                if (a_start <= p_end + MARGIN_SECONDS) and (a_end >= p_start - MARGIN_SECONDS)
            ]
            per_to_amp_matches.append(matches)
            if matches:
                for (a_start, a_end) in matches:
                    print(f"    - [{a_start:.3f}, {a_end:.3f}]")
            else:
                print("    - None")

        # 3) Combined segments
        print("  Combined segments:")
        for (p_seg, matches) in zip(p_segs, per_to_amp_matches):
            p_start, p_end = p_seg
            if matches:
                starts = [p_start] + [m[0] for m in matches]
                ends   = [p_end]   + [m[1] for m in matches]
                combined = [min(starts), max(ends)]
            else:
                combined = [p_start, p_end]
            file_combined.append(combined)
            print(f"    - [{combined[0]:.3f}, {combined[1]:.3f}]")

        print()  # blank line between files
        if file_combined:
            combined_segments[fname] = file_combined

    # Save the merged results
    with COMBINED_JSON_PATH.open('w') as out_f:
        json.dump(combined_segments, out_f, indent=2)
    print(f"Combined segments written to: {COMBINED_JSON_PATH}")

if __name__ == "__main__":
    main()


File: R10_45800.10061220_5_23_2_47_41.wav
  Periodicity segments:
    - [130.000, 130.520]
    - [139.629, 139.989]
  Amplitude segments (within ±2.0s of periodicity windows):
    - None
    - None
  Combined segments:
    - [130.000, 130.520]
    - [139.629, 139.989]

File: R10_45800.10191866_5_23_2_49_51.wav
  Periodicity segments:
    - [130.000, 130.434]
    - [139.629, 139.989]
  Amplitude segments (within ±2.0s of periodicity windows):
    - None
    - None
  Combined segments:
    - [130.000, 130.434]
    - [139.629, 139.989]

File: R10_45800.10323442_5_23_2_52_3.wav
  Periodicity segments:
    - [139.629, 139.989]
  Amplitude segments (within ±2.0s of periodicity windows):
    - None
  Combined segments:
    - [139.629, 139.989]

File: R10_45800.10564497_5_23_2_56_4.wav
  Periodicity segments:
    - [139.629, 139.989]
  Amplitude segments (within ±2.0s of periodicity windows):
    - None
  Combined segments:
    - [139.629, 139.989]

File: R10_45800.10771092_5_23_2_59_31.wav
  