In [None]:
import numpy as np
import pandas as pd
from pathlib import Path
from matplotlib import pyplot as plt
from IPython.display import Audio, display, clear_output

import librosa
from birdnetlib import Recording
from birdnetlib.analyzer import Analyzer
from mosqito import sq_metrics
from noisereduce import reduce_noise
import pyloudnorm as pyln

pd.set_option('future.no_silent_downcasting', True)

raw_dir = Path('../audio/data/raw/').resolve()
examples = [path for path in raw_dir.glob('*.mp3')]

# Presence

In [None]:
def presence_score(x, window):
    # Ensure x is a list of floats of length `window_size`, padding with zero
    x = list(x)[:window]
    x = [float(xi) for xi in x]
    x = x + [0]*(window - len(x))
    
    if x[0] == 0:
        return 0.0
    else:
        return sum([xi / (i+1) for i, xi in enumerate(x)])

In [None]:
def presence_window(analyzer, filepath, scientific_name=None, window_size=4):
    seconds_per_segment = 3

    recording = Recording(analyzer, filepath, return_all_detections=True)
    recording.analyze()
    detections = pd.DataFrame(recording.detections)

    if scientific_name is None:
        counts = detections['scientific_name'].value_counts()
        target = counts.index[0]
    else:
        target = scientific_name

    df = detections[['start_time', 'scientific_name']].copy()
    df['segment'] = df['start_time'].div(seconds_per_segment).astype(int)
    df['target'] = df['scientific_name'] == target
    
    good_segments = df.groupby('segment')['target'].apply(lambda x: np.all(x))
    good_segments = good_segments.reindex(range(max(good_segments.index) + 1)).fillna(False)
    
    rolling_good_count = (
        good_segments
        .rolling(window=window_size, min_periods=1)
        .apply(lambda x: presence_score(x, window=window_size))
    )
    idx = rolling_good_count.idxmax()
    
    result = {
        "filepath": filepath,
        "start": idx,
        "end": idx + window_size*seconds_per_segment,
        "target": target,
        "presence": rolling_good_count.loc[idx],
    }

    return result


analyzer = Analyzer()

recordings = pd.DataFrame([presence_window(analyzer, ex) for ex in examples])  

clear_output(wait=True)

recordings

# Noise reduce

In [None]:
recordings['noise_rmsd'] = 999.9
for idx, row in recordings.iterrows():
    data, rate = librosa.load(row['filepath'], sr=None)   
    clip = data[row['start']*rate:row['end']*rate]
    nr_clip = reduce_noise(clip, rate, stationary=False)
    recordings.loc[idx, 'noise_rmsd'] = np.sqrt(np.mean((clip - nr_clip)**2))

# Mosquito feaures

In [None]:
recordings['zwst'] = 999.9
for idx, row in recordings.iterrows():
    data, rate = librosa.load(row['filepath'], sr=48000)  # Set sr to avoid warning about upsampling    
    start, end = row['start'] * rate, row['end'] * rate
    recordings.loc[idx, 'zwst'], _, _ = sq_metrics.loudness_zwst(data[start:end], rate)

# Librosa features

In [None]:
def extract_noise_features(data, rate):
    zcr = np.mean(librosa.feature.zero_crossing_rate(data))
    spectral_flatness = np.mean(librosa.feature.spectral_flatness(y=data))
    spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=data, sr=rate))
    
    S = np.abs(librosa.stft(data))
    freqs = librosa.fft_frequencies(sr=rate)
    low_freq_ratio = np.sum(S[freqs < 300]) / np.sum(S)  # Energy below 300 Hz
    
    rms = librosa.feature.rms(y=data).flatten()
    rms_sd = np.std(rms)
    rms_mean = np.mean(rms)
    
    rolling_mean = pd.Series(rms).rolling(window=10).mean().dropna()
    floor = rolling_mean.min()
    peak = rolling_mean.quantile(0.95)
    floor_to_peak = floor / peak if peak > 0 else 0
    
    return {
        "zcr": zcr,
        "spectral_flatness": spectral_flatness,
        "spectral_centroid": spectral_centroid,
        "low_freq_ratio": low_freq_ratio,
        "rms_mean": rms_mean,
        "rms_sd": rms_sd,
        "rms_cv": rms_sd / rms_mean,
        "floor_to_peak": floor_to_peak
    }


feature_names = ["zcr", "spectral_flatness", "spectral_centroid", "low_freq_ratio", "rms_mean", "rms_sd", "rms_cv", "floor_to_peak"]
for v in feature_names:
    recordings[v] = -9.9
    
for idx, row in recordings.iterrows():
    data, rate = librosa.load(row['filepath'])
    start, end = row['start']*rate, row['end']*rate
    features = extract_noise_features(data[start:end], rate)
    for v in feature_names:
        recordings.loc[idx, v] = features[v]

# Feature comparison

In [None]:
recordings.head(4)

Choose any feature to sample audio with low versus high values.

In [None]:
def audio_compare(df, col, n=3, quantile=.2, start_col='start', end_col='end'):
    df = df.copy().sort_values(col)
    
    base_idx = int(len(df) * quantile)
    ii = list(range(base_idx, base_idx + n))
    ii = ii + [len(df) - i - 1 for i in ii]
    
    for i in ii:
        row = df.iloc[i]
        print(col, row[col])
        data, rate = librosa.load(row['filepath'])
        start = row[start_col] * rate
        end = row[end_col] * rate
        display(Audio(data[start:end], rate=rate))


audio_compare(recordings, "floor_to_peak", quantile=.05)

# Onset detection

In [None]:
def find_onset(filepath, start_sec, delta=.25):
    data, rate = librosa.load(filepath)

    # "Offset" because times are relative to clip, not full audio
    onsets = librosa.onset.onset_detect(y=data, sr=rate, backtrack=True, units='time', delta=delta)
    
    onsets = np.array(onsets)
    onsets = onsets[onsets < start_sec + 3]
    onsets = onsets[onsets > start_sec - 1]
    
    if not len(onsets):
        return start_sec
    else:
        closest_idx = np.argmin(np.abs(start_sec - onsets))
        return onsets[closest_idx]


recordings['onset'] = -9.9
for idx, row in recordings.iterrows():
    recordings.loc[idx, 'onset'] = find_onset(row['filepath'], row['start'], delta=.20)

In [None]:
sum(recordings['onset'] == recordings['start'])

In [None]:
diff = recordings['onset'] - recordings['start']
diff.plot(kind='hist')
plt.show()

In [None]:
for idx in np.random.choice(recordings.index, 5):
    row = recordings.loc[idx]
    data, rate = librosa.load(row['filepath'])
    start = int(row['onset'] * rate)
    end = int((row['onset'] + 9) * rate)
    print(row['onset'] - row['start'])
    display(Audio(data[start:end], rate=rate))

# Volume

Find the quietest and the loudest recording to compare. This works poorly without filtering out noisy recordings.

In [None]:
def load_sec(filepath, start=None, end=None):
    data, rate = librosa.load(filepath)
    if start is not None:
        start = int(start * rate)
    if end is not None:
        end = int(end * rate)
    return data[start:end], rate
    

def get_loudness(filepath, start, end):
    clip, rate = load_sec(filepath, start, end)
    meter = pyln.Meter(rate)
    loudness = meter.integrated_loudness(clip)
    return loudness
    

loud = pd.Series([get_loudness(row['filepath'], row['onset'], row['onset'] + 9) for _, row in recordings.iterrows()])
mn = loud.argmin()
mx = loud.argmax()

row = recordings.iloc[mn]
data_quiet, rate_quiet = load_sec(row['filepath'], row['onset'], row['onset'] + 9)
print("db =", loud.iloc[mn])
display(Audio(data_quiet, rate=rate_quiet))

row = recordings.iloc[mx]
data_loud, rate_loud = load_sec(row['filepath'], row['onset'], row['onset'] + 9)
print("db =", loud.iloc[mx])
display(Audio(data_loud, rate=rate_loud))

Normalize the volumes and measuring the amount of clipping. Suggested to keep below .0001.

In [None]:
def normalize_loudness(data, rate, db=-20, threshold = 1.0):
    meter = pyln.Meter(rate)
    loudness = meter.integrated_loudness(data)
    
    new_data = pyln.normalize.loudness(data, loudness, db)
    new_loudness = meter.integrated_loudness(new_data)

    clipping_prop = np.mean(np.abs(new_data) > threshold)
    
    return new_data, rate, loudness, new_loudness, clipping_prop


data, rate, loudness, new_loudness, clipping = normalize_loudness(data_quiet, rate_quiet)
print("db", loudness, "to", new_loudness, "and clipping =", clipping)
display(Audio(data, rate=rate))

data, rate, loudness, new_loudness, clipping  = normalize_loudness(data_loud, rate_loud)
print("db", loudness, "to", new_loudness, "and clipping =", clipping)
display(Audio(data, rate=rate))