In [1]:
from __future__ import print_function
from scipy.io.wavfile import read as wavread
from scipy.signal import find_peaks
import librosa, os, numpy as np
from matplotlib import pyplot as plt
from IPython.display import display
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets

In [4]:
data = []
srs = []
offsets = []
onset_times = []

def getwindow(center, windowsize):
    if type(windowsize) is int:
        return np.arange(center-windowsize, center+windowsize)
    else:
        return np.arange(center+windowsize[0], center+windowsize[1])
    
def tts(t, sr=44100):
        return [int(ts*sr) for ts in t]  if isinstance(t, (tuple, list)) else int(t*sr)

def stt(sample, sr=44100):
    return [s/float(sr) for s in sample] if isinstance(sample, (tuple, list)) else sample/float(sr) 

def loadfiles(path="./data/1_aligned", clip_intro=0):
    global data,srs, offsets, onset_times
    data = []
    srs = []
    offsets = []
    onset_times = []
    datadir = os.path.abspath(path)
    print("Using recordings from %s" % (datadir))
    
    data=[]
    srs=[]
    for file in os.listdir(datadir):
        if file.endswith('.wav'):
            (sr,dat) = wavread(os.path.join(datadir,file))
            if len(dat.shape) > 1:
                dat = dat[:,0]
            if clip_intro > 0:
                dat = dat[tts(clip_intro):]
            dat = dat.astype(np.float32)/np.max(dat)
            data.append(dat)
            srs.append(sr)
            onset_times.append(librosa.onset.onset_detect(dat,units="samples",sr=sr, backtrack=False))
            offsets.append(0)
            print("loaded %s" % file)

datafolder = os.path.abspath("./data")
options = {run:os.path.join(datafolder,run) for run in os.listdir(datafolder) if os.path.isdir(os.path.join(datafolder,run)) }
loader=interactive(loadfiles,path=options, clip_intro=(0,65,0.5)) 
display(loader)

interactive(children=(Dropdown(description='path', options={'0': 'E:\\Dev\\acoustic-localization\\data\\0', '1…

In [33]:
lines = []
ref = data[0]
    
def getwindow(center, windowsize):
    return np.arange(center-windowsize, center+windowsize)

def tts(t, sr=44100):
        return int(t*sr)

def stt(sample, sr=44100):
    return sample/float(sr)

def onset_ex(which=0,windowsize=1.0, center=5.0, back_track=False):
    global onset_times
    ww = getwindow(tts(center),tts(windowsize))
    clip = data[which][ww]
    onsets = librosa.onset.onset_detect(data[which],units="samples",sr=srs[which],
                                             backtrack=back_track)
    print("plot centered at %f (%d, %d to %d)" %
          (center,tts(center),tts(center-windowsize), tts(center+windowsize)))
    plt.plot(np.linspace(stt(ww[0]), stt(ww[-1]), len(clip)),clip)
    ost_in_window = onsets[np.logical_and(onsets >= ww[0], onsets <= ww[-1])]
    plt.vlines(stt(ost_in_window), -1,1, 'r')
    plt.ylim(-1,1)
    print("avg onset rate is %fHz" % (1/stt(np.mean(np.diff(ost_in_window)))))
    onset_times = ost_in_window

In [34]:
options = {b:a-1 for a,b in enumerate([str(a) for a in np.arange(1,len(data)+1)])}
onset_example = interactive(onset_ex, {'manual':False}, which=options,
                            windowsize=(.1,10,.05),
                            center=(0.250,stt(min([len(a) for a in data])), 0.050),
                            back_track=False)
sl1 = onset_example.children[1]
sl2 = onset_example.children[2]
sl2.continuous_update=False
def update_center_slider(*args):
    sl2.min = max(sl2.value-3*sl1.value, sl1.value)
    sl2.max = sl2.value+3*sl1.value
sl2.observe(update_center_slider, 'value')
sl1.observe(update_center_slider, 'value')
onset_example

interactive(children=(Dropdown(description='which', index=1, options={'1': -1, '2': 0}, value=0), FloatSlider(…

In [17]:
ref = 0
lines = []
center = 1
scale = 1
x_ax = []
y_ax = []

def do_settings(reference, lns, cs, sc, yr, oc, pr, **kwargs):
    global ref, lines, center, scale, x_ax, y_ax
    ref = reference
    lines = lns
    center = cs
    scale = sc
    x_ax = [cs-sc, cs+sc]
    y_ax = yr
    plot(plot_onsets=oc, plot_ref=pr)

def plot(plot_onsets=True, plot_ref=True, **kwargs):
    ww = np.arange(tts(x_ax[0]), tts(x_ax[1]))
    f = plt.figure(figsize = (40,40))
    rc = np.ceil(np.sqrt(len(data)))
    for ln in lines:
        plt.subplot(rc,rc,ln+1)
        ofs = tts(offsets[ln])
        clip = data[ln][ww+ofs]
        plt.plot(np.linspace(stt(ww[0]),stt(ww[-1]),len(clip)),clip)
        if plot_onsets:
            plt.vlines(stt(onset_times[ref]), -1, 1, 'r')
        if plot_ref:
            rr = data[ref][ww + tts(offsets[ref])] 
            plt.plot(np.linspace(stt(ww[0]),stt(ww[-1]),len(rr)),rr, 'k', alpha=0.7)
        plt.xlim(x_ax)
        plt.ylim(y_ax)

ref_select = widgets.Dropdown(
    options={b:a for a,b in enumerate([str(a) for a in np.arange(1,len(data)+1)])},
    value=0,
    description='Refernce wav:',
    disabled=False)
lines_select = widgets.SelectMultiple(
    options={b:a for a,b in enumerate([str(a) for a in np.arange(1,len(data)+1)])},
    value=[0],
    description='Wavs to plot',
    disabled=False)
center_slider = widgets.FloatSlider(
    value=1,
    min=0,
    max=10.0,
    step=0.1,
    description='Plot center:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.4f')
sc_slider = widgets.FloatLogSlider(
    value=1,
    base=2,
    min=-6, # max exponent of base
    max=7, # min exponent of base
    step=0.2, # exponent step
    description='Window size')
y_range = widgets.FloatRangeSlider(
    value=[-1, 1],
    min=-1,
    max=1,
    step=0.05,
    description='Window range:',
    disabled=False,
    continuous_update=True,
    orientation='vertical',
    readout=True,
    readout_format='.4f')
onset_cb = widgets.Checkbox(
    value=True,
    description='Plot Onsets',
    disabled=False,
    indent=False)
overlay_cb = widgets.Checkbox(
    value=True,
    description='Plot Ref',
    disabled=False,
    indent=False)
replot_button = widgets.ToggleButton(
    value=False,
    description='replot',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Description',
    icon='check' # (FontAwesome names without the `fa-` prefix)
)



def update_center_slider(*args):
    center_slider.min = max(center_slider.value - sc_slider.value*3, sc_slider.value)
    center_slider.max = min(center_slider.value + sc_slider.value*3, stt(len(data[ref_select.value]))-sc_slider.value)
#     center_slider.max = center_slider.value + sc_slider.value*3
    center_slider.step = sc_slider.value/4.0

lb = widgets.VBox([ref_select, lines_select])
rb = widgets.VBox([center_slider, sc_slider, widgets.HBox([widgets.VBox([onset_cb, overlay_cb]), replot_button])])
# rb.layout.width = '50%'
y_range.layout.width='100px'
ui = widgets.HBox([lb, rb, y_range])
ui.layout.width='100%'
out = widgets.interactive_output(do_settings, {'reference':ref_select,
                                               'lns':lines_select,
                                               'cs':center_slider,
                                               'sc':sc_slider,
                                               'yr':y_range,
                                               'oc':onset_cb,
                                               'pr':overlay_cb,
                                               'rp':replot_button
                                              })

sc_slider.observe(update_center_slider,'value')
center_slider.observe(update_center_slider,'value')

display(ui,out)

HBox(children=(VBox(children=(Dropdown(description='Refernce wav:', options={'1': 0, '2': 1, '3': 2, '4': 3}, …

Output()

In [18]:

offs = widgets.VBox([widgets.FloatSlider(value=offsets[k], min=-0.1, max=0.1, step=0.001, readout_format='.5f', description="offset %d"%(k+1), continuous_update=False) for k in range(len(data))])

def update_offsets(*args):
    global offsets
    for ix, slider in enumerate(offs.children):
        offsets[ix] = slider.value
        slider.min = slider.value-0.1
        slider.max = slider.value+0.1

d_map = {str(k):offs.children[k] for k in range(len(offs.children))}
d_map.update({'plot_onsets':fixed(True), 'plot_ref':fixed(True)})
out2 = widgets.interactive_output(plot, d_map)
for slider in offs.children:
    slider.observe(update_offsets, 'value')
display(offs, out2)

VBox(children=(FloatSlider(value=-0.051605118237771294, continuous_update=False, description='offset 1', max=0…

Output()

In [19]:
#cross correlate signals

#at each onset, take window (variable) around onset and correlate samples
temp_offsets = []
def corr_ex(window):
    global temp_offsets
    temp_offsets = [0 for k in offsets]
    print(tts(window), len(tts(window)))
    ref_clip = data[ref] 
    onsets = onset_times[ref]
    onsets = onsets[np.logical_and(onsets >= tts(x_ax[0]), onsets <= tts(x_ax[1]))]
    numonsets = len(onsets)
    f = plt.figure(figsize = (18,16))
    rc = np.ceil(np.sqrt(len(data)))
    for ix in range(len(data)):
#         corr = np.zeros((tts(window[1]-window[0]),))
        delay = 0
        prev_corr = 0
        for onset in onsets:
            ref_w = getwindow(onset, tts(0.25))
            ww = getwindow(onset, tts(window))
            ref_clip = data[ref][ref_w]
            corr=np.correlate(np.abs(ref_clip),np.abs(data[ix][ww+tts(offsets[ix])]), mode="valid")/numonsets + prev_corr
            prev_corr = corr
            delay+= stt(len(corr)/2 - np.argmax(corr))/numonsets
        plt.subplot(rc,rc,ix+1)
        plt.plot(corr)
        plt.vlines(np.argmax(corr),np.min(corr),np.max(corr))
        print("delay between %d and %d (ref) is %0.5f" % (ix,ref, delay))
        temp_offsets[ix]+=delay
    
window_range = widgets.FloatRangeSlider(
    value=[-0.0625, 0.0625],
    min=-0.25,
    max=0.25,
    step=0.05,
    description='Window range:',
    disabled=False,
    continuous_update=True,
    orientation='horizontal',
    readout=True,
    readout_format='.4f')

update_btn = widgets.Button(
    description='update offsets',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Click me',
    icon='check')

def update_button(*args):
    global offsets, temp_offsets, offs
    for ix, slider in enumerate(offs.children):
        offsets[ix] += temp_offsets[ix]
        slider.value = offsets[ix]
        slider.min = slider.value-0.1
        slider.max = slider.value+0.1
    display(plot(True,True))
        
out3 = widgets.interactive_output(corr_ex, {'window':window_range})
display(window_range,update_btn, out3)
update_btn.on_click(update_button)

FloatRangeSlider(value=(-0.0625, 0.0625), description='Window range:', max=0.25, min=-0.25, readout_format='.4…

Button(description='update offsets', icon='check', style=ButtonStyle(), tooltip='Click me')

Output()

In [27]:
print("Offsets (ms): %.2f, %.2f, %.2f, %.2f" % tuple([o*1e3 for o in offsets]))
print("Offsets (samples)", tts(offsets))
print("'Error' (ms): %.2f, %.2f, %.2f, %.2f" % tuple([to*1000 for to in temp_offsets]))

Offsets (ms): 0.02, 51.61, 54.08, 93.38
Offsets (samples) [0, 2276, 2384, 4117]
'Error' (ms): 0.01, -0.01, -0.41, -0.00
