In [26]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [27]:
#| default_exp core

# core

> core routines

In [28]:
#|hide
from nbdev.showdoc import *

In [29]:
#|export 
import torch
import torchaudio
from torchaudio import transforms as T
from torch.nn import functional as F
import numpy as np
import os
from einops import rearrange

## load_audio
We'll start with a basic utilty to read an audio file.  If it's not at the sample rate we want, we'll automatically resample it.

In [30]:
#|export
def load_audio(
    filename:str,     # name of file to load
    sr=48000,         # sample rate in Hz
    verbose=True,     # whether or not to print notices of resampling
    normalize=False,  # Load to full dB range
    )->torch.tensor:
    "this loads an audio file as a torch tensor"
    audio, in_sr = torchaudio.load(filename)
    if in_sr != sr:
        if verbose: print(f"Resampling {filename} from {in_sr} Hz to {sr} Hz",flush=True)
        resample_tf = T.Resample(in_sr, sr)
        audio = resample_tf(audio)
    return audio

Using the file in `examples/`, let's see how this works:

In [31]:
audio = load_audio('examples/example.wav')

Resampling examples/example.wav from 44100 Hz to 48000 Hz


In [32]:
audio = load_audio('examples/example.wav',verbose=False)

## is_silence

Sometimes we'll want to know if a file is "silent", i.e. if its contents are quieter than some threshold.  Here's one simple way to implement that:

In [33]:
#|export
def audio_float_to_int(waveform):
    "converts torch float to numpy int16 (for playback in notebooks)"
    return np.clip( waveform.cpu().numpy()*32768 , -32768, 32768).astype('int16')

In [34]:
print(audio.dtype)
print(audio_float_to_int(audio).dtype)

torch.float32
int16


In [35]:
#|export
def is_silence(
    audio,       # torch tensor of (multichannel) audio
    thresh=-60,  # threshold in dB below which we declare to be silence
    ):
    "checks if entire clip is 'silence' below some dB threshold"
    dBmax = 20*torch.log10(torch.flatten(audio.abs()).max()).cpu().numpy()
    return dBmax < thresh

Let's test that with some tests.  If all goes well, the following `assert` statements will all pass uneventfully. 

In [36]:
x = torch.ones((2,10))
assert not is_silence(1e-3*x) # not silent
assert is_silence(1e-5*x) # silent
assert is_silence(1e-3*x, thresh=-50) # higher thresh

## batch_it_crazy
This is a pretty basic utility for breaking up a long sequence into batches, e.g. for model inference

In [37]:
#|export
def batch_it_crazy(
    x,        # a time series as a PyTorch tensor, e.g. stereo or mono audio
    win_len,  # length of each "window", i.e. length of each element in new batch
    ):
    "(pun intended) Chop up long sequence into a batch of win_len windows"
    if len(x.shape) < 2: x = x.unsqueeze(0)  # guard against 1-d arrays
    x_len = x.shape[-1]
    n_windows = (x_len // win_len) + 1
    pad_amt = win_len * n_windows - x_len  # pad end w. zeros to make lengths even when split
    xpad = F.pad(x, (0, pad_amt))
    return rearrange(xpad, 'd (b n) -> b d n', n=win_len)

Testing `batch_it_crazy()` for stereo input: 

In [38]:
x = torch.ones([2,1000])  # stereo
batch_it_crazy(x, 10).shape

torch.Size([101, 2, 10])

...and for mono: 

In [39]:
x = torch.ones([1000])   # mono
batch_it_crazy(x, 10).shape

torch.Size([101, 1, 10])

...and yeah, currently that "` 1,`" stays because other parts of the code(s) will be assuming "multichannel" audio. 

## makedir 
The next routine creates a directory if it doesn't already exist.  We'll even let it take a "nested" directory such as `a/b/c/d` and the routine will create any directories in that string.

In [40]:
#|export
def makedir(
    path:str,              # directory or nested set of directories
    ):
    "creates directories where they don't exist"
    if os.path.isdir(path): return  # don't make it if it already exists
    #print(f"  Making directory {path}")
    try:
        os.makedirs(path)  # recursively make all dirs named in path
    except:                # don't really care about errors
        pass

## get_audio_filenames
Often we'll want to grab a long list of audio filenames by looking through a directory and all its subdirectories.  We could use something like `glob`, `glob` turns out to be extremely slow when large numbers of files (say, more than 100,000) are involved.  Instead we will use the much faster `os.scandir()`, which was packaged nicely into the following routine in [an answer to a StackOverflow question](https://stackoverflow.com/a/59803793/4259243) from which this code is modified:

In [41]:
#|export
def fast_scandir(
    dir:str,  # top-level directory at which to begin scanning
    ext:list  # list of allowed file extensions
    ):
    "very fast `glob` alternative. from https://stackoverflow.com/a/59803793/4259243"
    subfolders, files = [], []
    ext = ['.'+x if x[0]!='.' else x for x in ext]  # add starting period to extensions if needed
    try: # hope to avoid 'permission denied' by this try
        for f in os.scandir(dir):
            try: # 'hope to avoid too many levels of symbolic links' error
                if f.is_dir():
                    subfolders.append(f.path)
                elif f.is_file():
                    if os.path.splitext(f.name)[1].lower() in ext:
                        files.append(f.path)
            except:
                pass 
    except:
        pass

    for dir in list(subfolders):
        sf, f = fast_scandir(dir, ext)
        subfolders.extend(sf)
        files.extend(f)
    return subfolders, files

Quick test:

In [42]:
_, files = fast_scandir('.', ['wav','flac','ogg','aiff','aif','mp3'])
files

['.\\examples\\example.wav']

Often, rather than being given a single parent directory, we may be given a list of directories in which to look for files.  The following just called `fast_scandir()` for each of those:

In [43]:
#|export
def get_audio_filenames(
    paths:list   # directories in which to search
    ):
    "recursively get a list of audio filenames"
    filenames = []
    if type(paths) is str: paths = [paths]
    for path in paths:               # get a list of relevant filenames
        subfolders, files = fast_scandir(path, ['.wav','.flac','.ogg','.aiff','.aif','.mp3'])
        filenames.extend(files)
    return filenames

Here's a fun trick to show off how fast this is: Run in the user's directory tree:

In [44]:
path = str(os.path.expanduser("~"))+'/Downloads'
if os.path.exists(path):
    files = get_audio_filenames(path)
    print(f"Found {len(files)} audio files.")
else:
    print("Ok it was just a thought.")

Found 0 audio files.


In [45]:
#| hide
from nbdev import nbdev_export
nbdev_export()