In [1]:
import matplotlib.pyplot as plt
from scipy.io import wavfile
import argparse
import os
from glob import glob
import numpy as np
import pandas as pd
from librosa.core import resample, to_mono
import librosa
from tqdm import tqdm
import wavio

## def envelope() func

In [2]:
def envelope(y, rate, threshold): #clean data to remove parts that are not relevant
    mask = []
    y = pd.Series(y).apply(np.abs)
    y_mean = y.rolling(window=int(rate/20),
                       min_periods=1,
                       center=True).max()
    for mean in y_mean:
        if mean > threshold:
            mask.append(True)
        else:
            mask.append(False)
    return mask, y_mean

In [3]:
signal, sr = librosa.load('D:/NeuralNetwork/UpdatedSiren/data/10577.wav', sr=16000)
mask, y_mean = envelope(signal, sr, 0.0005)

In [11]:
print(type(y_mean))

<class 'pandas.core.series.Series'>


## def downsample_mono () func

In [5]:
def downsample_mono(path, sr):
    obj = wavio.read(path) #read a wav file and returns an object with data, rate, and sampwidth attribs
    wave = obj.data.astype(np.float32, order='F') #changes the data type to np.float32, Fortran order
    rate = obj.rate #sets the sampling rate of the wac to variable rate
    try:
        channel = wave.shape[1] 
        if channel == 2: #check if channel is stereo
            wav = to_mono(wave.T)
        elif channel == 1: #check if channel is mono
            wav = to_mono(wave.reshape(-1)) #wave.reshape(-1) reshapes the wav to (wave.sr,)
    except IndexError:
        wav = to_mono(wave.reshape(-1)) #convert audio signal to mono
        pass
    except Exception as exc:
        raise exc
    wav = resample(y=wav, orig_sr=rate, target_sr=sr) #resample wav to the target sr of 16000
    wav = wav.astype(np.int16) #changes wav dtype to np.int16
    return sr, wave

In [6]:
rate, wav = downsample_mono('D:/NeuralNetwork/UpdatedSiren/data/10577.wav', 16000)

In [14]:
print(wav.dtype)

float32


## def save_sample() func

In [25]:
def save_sample(sample, rate, target_dir, fn, ix): #saves processed wav to target dir
    fn = fn.split('.wav')[0]
    dst_path = os.path.join(target_dir.split('.')[0], fn+'_{}.wav'.format(str(ix)))
    if os.path.exists(dst_path):
        return
    wavfile.write(dst_path, rate, sample)

## def check_dir() func

In [26]:
def check_dir(path): #check if directory exists, creates the dir if not
    if os.path.exists(path) is False:
        os.mkdir(path)

## def split_wavs() func

In [None]:
def split_wavs(args):
    src_root = args.src_root 
    dst_root = args.dst_root
    dt = args.delta_time

    wav_paths = glob('{}/**'.format(src_root), recursive=True) 
    wav_paths = [x for x in wav_paths if '.wav' in x]
    dirs = os.listdir(src_root)
    check_dir(dst_root)
    classes = os.listdir(src_root)
    for _cls in classes:
        target_dir = os.path.join(dst_root, _cls)
        check_dir(target_dir)
        src_dir = os.path.join(src_root, _cls)
        for fn in tqdm(os.listdir(src_dir)):
            src_fn = os.path.join(src_dir, fn)
            rate, wav = downsample_mono(src_fn, args.sr)
            mask, y_mean = envelope(wav, rate, threshold=args.threshold)
            wav = wav[mask]
            delta_sample = int(dt*rate)

            # cleaned audio is less than a single sample
            # pad with zeros to delta_sample size
            if wav.shape[0] < delta_sample:
                sample = np.zeros(shape=(delta_sample,), dtype=np.int16)
                sample[:wav.shape[0]] = wav
                save_sample(sample, rate, target_dir, fn, 0)
            # step through audio and save every delta_sample
            # discard the ending audio if it is too short
            else:
                trunc = wav.shape[0] % delta_sample
                for cnt, i in enumerate(np.arange(0, wav.shape[0]-trunc, delta_sample)):
                    start = int(i)
                    stop = int(i + delta_sample)
                    sample = wav[start:stop]
                    save_sample(sample, rate, target_dir, fn, cnt)

## def test_threshold() func

In [None]:
def test_threshold(args):
    src_root = args.src_root
    wav_paths = glob('{}/**'.format(src_root), recursive=True)
    wav_path = [x for x in wav_paths if args.fn in x]
    if len(wav_path) != 1:
        print('audio file not found for sub-string: {}'.format(args.fn))
        return
    rate, wav = downsample_mono(wav_path[0], args.sr)
    mask, env = envelope(wav, rate, threshold=args.threshold)
    plt.style.use('ggplot')
    plt.title('Signal Envelope, Threshold = {}'.format(str(args.threshold)))
    plt.plot(wav[np.logical_not(mask)], color='r', label='remove')
    plt.plot(wav[mask], color='c', label='keep')
    plt.plot(env, color='m', label='envelope')
    plt.grid(False)
    plt.legend(loc='best')
    plt.show()