In [1]:
from rockpool.devices.xylo.syns61201 import AFESim
from rockpool.timeseries import TSContinuous
import librosa
import numpy as np
import os
import logging
import matplotlib.pyplot as plt
import soundfile as sf

  from .autonotebook import tqdm as notebook_tqdm
  from .autonotebook import tqdm as notebook_tqdm



In [2]:
def audio_to_features(
    input_path: str,
    output_dir: str,
    target_sr: int = 16000
) -> str:
    fs = 110e3
    raster_period = 10e-3
    max_spike_per_raster_period = 15
    add_noise = False
    add_offset = False
    add_mismatch = False
    seed = None

    if not os.path.exists(input_path):
        logging.error(f"File does not exist: {input_path}")
        return ''

    y, sr = librosa.load(input_path, sr=target_sr, mono=True)

    dt = 1.0 / target_sr
    ts = TSContinuous.from_clocked(y, dt=dt, name='Audio input')

    afe = AFESim(
        fs=fs,
        raster_period=raster_period,
        max_spike_per_raster_period=max_spike_per_raster_period,
        add_noise=add_noise,
        add_offset=add_offset,
        add_mismatch=add_mismatch,
        seed=seed
    ).timed()

    features, stat, rec = afe(ts, record=True)
    raster = features.raster(dt=raster_period, add_events=True)

    fig = plt.figure(figsize=(16, 16))
    ax = fig.add_subplot(211)
    ts.plot()
    ax.set_xticks([])
    ax.set_xlabel("")

    ax = fig.add_subplot(212)
    plt.imshow(raster.T, aspect="auto", origin="lower")
    ax.set_xlabel("Time (s)")
    ax.set_ylabel("Output channel")
    ax.set_title("Spiking output")
    plt.tight_layout()
    plt.show()

    os.makedirs(output_dir, exist_ok=True)
    base = os.path.splitext(os.path.basename(input_path))[0]
    out_name = f"{base}.npy"
    out_path = os.path.join(output_dir, out_name)
    np.save(out_path, raster)
    logging.info(f"Saved AFE raster: {out_path}")
    return out_path


In [3]:
def time_stretch(y, rate):
    return librosa.effects.time_stretch(y, rate)

In [4]:
def pitch_shift(y, sr, n_steps):
    return librosa.effects.pitch_shift(y, sr=sr, n_steps=n_steps)

In [5]:
def reverse(y):
    return y[::-1]

In [6]:

def bandpass_filter(y: np.ndarray) -> np.ndarray:
    return librosa.effects.preemphasis(y, coef=0.97)

In [7]:
def estimate_noise(y: np.ndarray, sr: int, noise_duration = 0.5) -> float:
    n = int(noise_duration * sr)
    noise_rms = np.sqrt(np.mean(y[:n]**2))
    return noise_rms

In [8]:
def normalize_audio(y: np.ndarray) -> np.ndarray:
    if y.ndim > 1:
        y = np.mean(y, axis=1)
    peak = np.max(np.abs(y)) + 1e-9
    return y / peak

In [9]:
def augment_with_background(y, bg, snr_db=5):
    if len(bg) < len(y):
        bg = np.tile(bg, int(np.ceil(len(y)/len(bg))))
    bg = bg[:len(y)]
    rms_y = np.sqrt(np.mean(y**2))
    rms_bg = np.sqrt(np.mean(bg**2))
    alpha = rms_y / (10**(snr_db/20)) / (rms_bg + 1e-6)
    return y + alpha * bg

In [None]:
def segment_and_filter(
    input_path: str,
    output_dir: str,
    segment_length: float = 8.0
) -> list:

    y, sr = librosa.load(input_path, sr=16000, mono=True)
    y = normalize_audio(y)

    seg_samples = int(segment_length * sr)
    total = len(y)
    if seg_samples <= 0:
        return []

    metrics = []  # (start, rms, peak, centroid)
    for start in range(0, total - seg_samples + 1, seg_samples):
        seg = y[start:start + seg_samples]
        rms = np.sqrt(np.mean(seg**2))
        peak = np.max(np.abs(seg))
        cent = np.mean(librosa.feature.spectral_centroid(y=seg, sr=sr)[0])
        metrics.append((start, rms, peak, cent))

    if not metrics:
        return []

    rms_vals = np.array([m[1] for m in metrics], dtype=float)
    peak_vals = np.array([m[2] for m in metrics], dtype=float)
    cent_vals = np.array([m[3] for m in metrics], dtype=float)
    energy_th = float(np.median(rms_vals) * 1.2)
    peak_th = float(np.median(peak_vals) * 1.2)
    cent_th = float(np.median(cent_vals))

    os.makedirs(output_dir, exist_ok=True)
    base = os.path.splitext(os.path.basename(input_path))[0]
    valid = []

    for start, rms, peak, cent in metrics:
        if rms >= energy_th and peak >= peak_th:
            seg = y[start:start + seg_samples]
            if cent < cent_th * 0.9:
                label = 1
            elif cent > cent_th * 1.1:
                label = 0
            else:
                label = 2
            out_name = f"{base}_s{start}_lbl{label}.flac"
            out_path = os.path.join(output_dir, out_name)
            sf.write(out_path, seg, sr)
            logging.info(f"Saved segment: {out_path}")
            valid.append((out_path, label, start))
    return valid

In [11]:
def augment_cv(audio, sr, noise_path=None):
    augmented = []

    reversed_audio = audio[::-1]
    augmented.append(reversed_audio)

    for rate in [0.9, 1.1]:
        y_stretch = librosa.effects.time_stretch(audio, rate=rate)
        augmented.append(y_stretch)

    if noise_path:
        noise, _ = librosa.load(noise_path, sr=sr)
        min_len = min(len(audio), len(noise))
        mixed = audio[:min_len] + 0.2 * noise[:min_len]
        augmented.append(mixed)

    return augmented

In [None]:
if __name__ == '__main__':
    logging.basicConfig(
        filename='spike_test.log',
        level=logging.INFO,
        format='%(asctime)s %(levelname)s %(message)s'
    )
    base_dir = os.path.dirname(os.path.abspath('__file__'))
    project_root = os.path.abspath(os.path.join(base_dir, '..', '..'))

    audio_files = [
        os.path.join(r, f)
        for r, _, files in os.walk(project_root)
        for f in files if f.lower().endswith(('.flac'))
    ]

    seg_dir = os.path.join(base_dir, 'segments')
    spike_dir = os.path.join(base_dir, 'npy')
    os.makedirs(seg_dir, exist_ok=True)
    os.makedirs(spike_dir, exist_ok=True)

    csv_path = os.path.join(spike_dir, 'labels.csv')
    if os.path.exists(csv_path):
        os.remove(csv_path)

    for audio_path in audio_files[:20]:
        try:
            segs = segment_and_filter(audio_path, seg_dir)
            for out_path, label, start in segs:
                npy_path = audio_to_features(out_path, spike_dir)
                # record label
                header = not os.path.exists(csv_path)
                with open(csv_path, 'a') as cf:
                    if header:
                        cf.write('file,label,start\n')
                    cf.write(f"{os.path.basename(npy_path)},{label},{start}\n")
        except Exception as e:
            logging.error(f"Error processing {audio_path}: {e}")

ERROR:root:File does not exist: [-0.01571472 -0.0196434  -0.01631913 ... -0.0048353  -0.00120882
  0.00241765]
ERROR:root:File does not exist: [ 0.01873678  0.02115443  0.02115443 ... -0.09398609 -0.09277727
 -0.09398609]
ERROR:root:File does not exist: [-0.00423089 -0.00543971 -0.00211544 ... -0.04593533 -0.03807797
 -0.03686914]
ERROR:root:File does not exist: [0.18571427 0.2087912  0.21263735 ... 0.12417582 0.1076923  0.06208791]
ERROR:root:File does not exist: [ 0.02142857  0.01593406  0.00879121 ... -0.03296703 -0.02472527
 -0.01703297]
ERROR:root:File does not exist: [-0.05121396 -0.04817906 -0.06866464 ... -0.0284522  -0.03945372
 -0.04248862]
ERROR:root:File does not exist: [0.01062215 0.01858877 0.01669196 ... 0.06449164 0.06221547 0.07094081]
ERROR:root:File does not exist: [-0.07740384 -0.07836538 -0.10721152 ...  0.01346154  0.04999999
  0.08173076]
ERROR:root:File does not exist: [ 0.01013513  0.01539039  0.02477477 ...  0.00525525  0.003003
 -0.01764264]
ERROR:root:File d