In [None]:
# default_exp spectrogram_processor

# Spectrogram Processor

> Commandline program to generate spectrograms from audio data.

In [1]:
#hide
from nbdev.showdoc import *

argparse [load filename ref](https://www.reddit.com/r/learnpython/comments/575rbb/confused_how_to_set_inputoutput_file_paths_with/)



In [18]:
# export
import argparse
from pathlib import Path
import json
import librosa
import numpy as np
from PIL import Image
import time
from fastprogress import progress_bar
from os import uname
from fastcore.parallel import parallel
from fastcore.parallel import num_cpus

In [3]:
# export
# ignore librosa pysoundfile load warning
import warnings
warnings.filterwarnings(
    action='ignore',
    category=UserWarning,
    module=r'librosa'
)

## Configuration

Get path to configuration file. *Note*: looks like commandline programs take path inputs relative to the console's address, not the program's.

In [4]:
# export
parser = argparse.ArgumentParser()
parser.add_argument("--path", help="parameter filepath", nargs=1)
# args = parser.parse_args()
args = parser.parse_args(["--path", "parameters.json"])

In [5]:
# export
fpath = Path(args.path[0])
assert fpath.exists(), (f"Filepath '{fpath}' not found.")
assert fpath.is_file(), (f"'{fpath}' is not a valid file.")

Edit and Save Parameters

In [148]:
# # hide
# # testing
# # p = str((Path()/'sample_data').absolute())

# parameters = {
#     'n_fft':2048,
#     'hop_length':512,
#     'n_mels':256,
#     'mel_n_fft_override':None,
#     'mel_hop_length_override':None,
#     'path':'/home/jupyter/data',
# #     'path':p,
#     'audio_folders':['train','test'],
#     'codecs':['.flac'],
#     'serial':False
# }
# # save
# with open('parameters.json', 'w') as file:
#     json.dump(parameters, file)

Load Parameters

In [16]:
# export
with open(fpath, 'r') as file:
    parameters = json.load(file)
    print(f"Parameters file \"{fpath}\" loaded.")

Parameters file "parameters.json" loaded.


In [19]:
# export
num_cpus = num_cpus() # func → int
serial = True if uname().sysname.lower() != 'linux' else parameters['serial'] # no parallelization support on MacOS or Windows

n_fft = parameters['n_fft']
hop_length = parameters['hop_length']
n_mels = parameters['n_mels']

mel_n_fft = parameters['mel_n_fft_override']
if mel_n_fft == None: mel_n_fft = n_fft

mel_hop_length = parameters['mel_hop_length_override']
if mel_hop_length == None: mel_hop_length = hop_length

if n_fft / n_mels < 2**3:
    print(f"Warning: N FFT ({n_fft}) is fewer than 3 powers of 2 greater than N Mels ({n_mels})."
          f" This may result in null values at lower mels in spectrograms.")

## Spectrograms

In [8]:
# export
def compute_spectrogram(wf, n_fft=1024, hop_length=512):
    return librosa.power_to_db(np.abs(librosa.stft(wf, n_fft=n_fft, hop_length=hop_length))**2)
    
def compute_mel_spectrogram(wf, sr=None, n_fft=1024, hop_length=512, n_mels=128):
    return librosa.power_to_db(librosa.feature.melspectrogram(wf, sr=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels))

def write_spectrogram(spgm, filepath):
    # convert fp32 array to 0-255 UInt8.
    shift = abs(spgm.min()) if spgm.min() < 0 else 0.0
    x = ((spgm + shift) * (255/(shift + spgm.max())))
    img = Image.fromarray(x.round().astype(np.uint8)[::-1]) # vertical flip
    img.save(filepath)

In [9]:
# export
def load_compute_write(audio_file, diagnostic_suffix="", **kwargs):
    """
    For parallelization.
    """
    wf,sr = librosa.load(audio_file, sr=None)
    spgm_frq = compute_spectrogram(wf, n_fft=n_fft, hop_length=hop_length)
    spgm_mel = compute_mel_spectrogram(wf, sr=sr, n_fft=mel_n_fft, hop_length=mel_hop_length, n_mels=n_mels)
    write_spectrogram(spgm_frq, path_spgm_frq/f"{audio_file.stem}{diagnostic_suffix}.png")
    write_spectrogram(spgm_mel, path_spgm_mel/f"{audio_file.stem}{diagnostic_suffix}.png")
#     return {'audio_file':audio_file.stem,'spgm_frq':spgm_frq,'spgm_mel':spgm_mel}

In [10]:
# hide
# times = {'compute':[],'write':[]}

In [11]:
# export
# t = time.time()

In [13]:
# spgms_para = []
# spgms_seri = []

In [30]:
# export
extratext = [f" with {num_cpus} core{['','s'][num_cpus>1]}",""][serial]
print(f"Parallelization {['ON','OFF'][serial]}{extratext}.")

Parallelization ON with 8 cores.


In [102]:
# export
path = Path(parameters['path'])
path_spgm_frq = path/'spectrogram_frq'
path_spgm_frq.mkdir(parents=True, exist_ok=True)
path_spgm_mel = path/'spectrogram_mel'
path_spgm_mel.mkdir(parents=True, exist_ok=True)

# get list of audio files
audio_files = []
for audio_folder in parameters['audio_folders']:
    audio_path = path/audio_folder
    audio_files += [file for file in audio_path.iterdir() if file.suffix in parameters['codecs']]
    
# load waveform
if serial:
    pb = progress_bar(audio_files)
    for audio_file in pb:
        pb.comment = f"processing: {audio_file.parent.name}/{audio_file.name}"
        wf,sr = librosa.load(audio_file, sr=None)
        # compute freq & mel spectrograms
        spgm_frq = compute_spectrogram(wf, n_fft=n_fft, hop_length=hop_length)
        spgm_mel = compute_mel_spectrogram(wf, sr=sr, n_fft=mel_n_fft, hop_length=mel_hop_length, n_mels=n_mels)
        # write spectrograms
        write_spectrogram(spgm_frq, path_spgm_frq/f"{audio_file.stem}.png")
        write_spectrogram(spgm_mel, path_spgm_mel/f"{audio_file.stem}.png")
else:
    _ = parallel(load_compute_write, audio_files, 
            **{'n_fft':n_fft,'hop_length':hop_length,'n_mels':n_mels}, threadpool=True, n_workers = num_cpus)
    

CPU times: user 2min 25s, sys: 1.7 s, total: 2min 26s
Wall time: 1min 29s


In [None]:
# export
print(f"\n{len(audio_files)} files processed in {time.strftime('%H:%M:%S', time.gmtime(time.time() - t))}")

## development testing

---

TODO: incorporate into tests.

In [None]:
# %%time
# # export
# path = Path(parameters['path'])
# path_spgm_frq = path/'spectrogram_frq'
# path_spgm_frq.mkdir(parents=True, exist_ok=True)
# path_spgm_mel = path/'spectrogram_mel'
# path_spgm_mel.mkdir(parents=True, exist_ok=True)

# # get list of audio files
# audio_files = []
# for audio_folder in parameters['audio_folders']:
#     audio_path = path/audio_folder
#     audio_files += [file for file in audio_path.iterdir() if file.suffix in parameters['codecs']]
    
# # load waveform
# if serial:
#     pb = progress_bar(audio_files[:100])
#     for audio_file in pb:
#         pb.comment = f"processing: {audio_file.parent.name}/{audio_file.name}"
#         wf,sr = librosa.load(audio_file, sr=None)

#         # compute freq & mel spectrograms
#         spgm_frq = compute_spectrogram(wf, n_fft=n_fft, hop_length=hop_length)
#         spgm_mel = compute_mel_spectrogram(wf, sr=sr, n_fft=mel_n_fft, hop_length=mel_hop_length, n_mels=n_mels)

#         # write spectrograms
#         write_spectrogram(spgm_frq, path_spgm_frq/f"{audio_file.stem}.png")
#         write_spectrogram(spgm_mel, path_spgm_mel/f"{audio_file.stem}.png")

#         # verify same values parallel vs. serial (note: ensure same audios)
#     #     assert np.alltrue(spgm_frq == para_dict[audio_file.stem]['spgm_frq'])
#     #     assert np.alltrue(spgm_mel == para_dict[audio_file.stem]['spgm_mel'])
# else:
    

In [52]:
# %%time
# for verifying computes parallel vs. serial
# results = parallel(load_and_compute, audio_files[:100], **{'n_fft':n_fft,'hop_length':hop_length,'n_mels':n_mels}, threadpool=True)
# para_dict = {res['audio_file']:{'spgm_frq':res['spgm_frq'],'spgm_mel':res['spgm_mel']} for res in results}

In [104]:
# %%time
# parallel(load_compute_write, audio_files[:100], **{'n_fft':n_fft,'hop_length':hop_length,'n_mels':n_mels,'diagnostic_suffix':"para"}, threadpool=True);

CPU times: user 2min 45s, sys: 23 s, total: 3min 8s
Wall time: 26.9 s


(#100) [None,None,None,None,None,None,None,None,None,None...]

In [147]:
# # verify writes parallel vs. serial
# for path_spgm in (path_spgm_frq, path_spgm_mel):
#     files = list(path_spgm.iterdir())
#     files_s = sorted([f for f in files if 'para' not in f.stem])
#     files_p = sorted([f for f in files if 'para' in f.stem])
#     for f in files_p: assert f.stem[:-4] in map(lambda x: x.stem, files_s)
#     for fp in files_p:
#         fs = [f for f in files_s if f.stem == fp.stem[:-4]][0]
#         im_p = Image.open(fp)
#         im_s = Image.open(fs)
#         ar_s = np.asarray(im_s)
#         ar_p = np.asarray(im_p)
#         assert np.alltrue(ar_s == ar_p)

In [None]:
# spgm_frq = parallel(compute_spectrogram, [wf], **{'n_fft':n_fft,'hop_length':hop_length})
# spgm_mel = parallel(compute_mel_spectrogram, [wf], **{'sr':sr,'n_fft':mel_n_fft,'hop_length':mel_hop_length,'n_mels':n_mels})

In [45]:
# # check parallels equal serials
# assert np.alltrue([np.alltrue(np.array(para) == np.array(seri)) for para,seri in zip(spgms_para,spgms_seri)])

In [None]:
# # export
# path = Path(parameters['path'])
# path_spgm_frq = path/'spectrogram_frq'
# path_spgm_frq.mkdir(parents=True, exist_ok=True)
# path_spgm_mel = path/'spectrogram_mel'
# path_spgm_mel.mkdir(parents=True, exist_ok=True)

# # get list of audio files
# audio_files = []
# for audio_folder in parameters['audio_folders']:
#     audio_path = path/audio_folder
#     audio_files += [file for file in audio_path.iterdir() if file.suffix in parameters['codecs']]
    
# # load waveform
# pb = progress_bar(audio_files)
# for audio_file in pb:
#     pb.comment = f"processing: {audio_file.parent.name}/{audio_file.name}"
#     wf,sr = librosa.load(audio_file, sr=None)
#     time.sleep(0.1)

#     # compute freq & mel spectrograms
#     spgm_frq = compute_spectrogram(wf, n_fft=n_fft, hop_length=hop_length)
#     spgm_mel = compute_mel_spectrogram(wf, sr, n_fft=mel_n_fft, hop_length=mel_hop_length, n_mels=n_mels)

#     # write spectrograms
#     write_spectrogram(spgm_frq, path_spgm_frq/f"{audio_file.stem}.png")
#     write_spectrogram(spgm_mel, path_spgm_mel/f"{audio_file.stem}.png")

In [None]:
# hide
# # custom write
# t_compute = np.array(times['compute'])
# t_write = np.array(times['write'])
# estim = (t_compute.mean() + t_write.mean()) * (1992+4727)
# print(t_compute.sum(), t_compute.mean())
# print(t_write.sum(), t_write.mean())
# print(estim)
# print(spgm_frq.shape, spgm_mel.shape)
# time.strftime('%H:%M:%S', time.gmtime(estim))

## notebook export

In [31]:
from nbdev.export import notebook2script; notebook2script()

Converted 00_core.ipynb.
Converted 01_spectrogram_processor.ipynb.
Converted experiments - spectrogram compression and timing statistics.ipynb.
Converted experiments - waveform analysis.ipynb.
Converted index.ipynb.


## asides

An alternate way to show progress bars. `progress_bar` is subordinated to a `master_bar` instance, `mb`. `mb` is a single-run loop, and used for writing comments. The only issue is this prints a default message "Epoch i/N" with each progress bar loop. If using only a `progress_bar`, you'll have to instatiate it in order to print comments/updates.

In [None]:
# mb = master_bar(range(1))
# for itr in mb:
#     for audio_file in progress_bar(audio_files, parent=mb):
# #         mb.main_bar.comment = ""
#         mb.child.comment = f"processing: {audio_file.name}"
# #         mb.write(f"")
# #         print(f"processing: {audio_file.name}")
#         time.sleep(0.1)