### Import required libraries and identify our lab USB card

Output is expected to identify the USB lab card as AudioStream or LabDevice

In [None]:
import sounddevice as sd
import liverecorder
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import ipywidgets as widgets
import time
from concurrent.futures import ThreadPoolExecutor
import math

# Acceptable API:s in increasing order of preference
accept_apis = ['Windows WDM-KS', 'ALSA', 'Core Audio']
all_apis = sd.query_hostapis()
useapi = -1
for pref in accept_apis:
    for idx, a in enumerate(sd.query_hostapis()):
        if a['name']==pref and len(a['devices'])>0:
            useapi = idx
if useapi<0:
    print('Could not find preferred sound API, sound capture may likely fail!!!')
else:
    print(f'Will use preferred sound API {all_apis[useapi]["name"]}')

lab_device = False
in_device  = sd.default.device['input']
out_device = sd.default.device['output']
devices    = sd.query_devices()
for idx, d in enumerate(devices):
    if (useapi>=0 and d['hostapi']==useapi) and ('AudioStream' in d['name'] or 'LabDevice' in d['name']) and d['max_input_channels']>0:
        in_device = idx
        lab_device = True
for idx, d in enumerate(devices):
    if (useapi>=0 and d['hostapi']==useapi) and ('AudioStream' in d['name'] or 'LabDevice' in d['name']) and d['max_output_channels']>0:
        out_device = idx
if lab_device:
    print(f'Will use lab USB card input device #{in_device} {devices[in_device]["name"]} over API {all_apis[devices[in_device]["hostapi"]]["name"]}')
    print(f'Will use lab USB card output device #{out_device} {devices[out_device]["name"]} over API {all_apis[devices[out_device]["hostapi"]]["name"]}')
else:
    print(f'Will use default input device #{in_device} {devices[in_device]["name"]} over API {all_apis[devices[in_device]["hostapi"]]["name"]}')
    print(f'Will use default output device #{out_device} {devices[out_device]["name"]} over API {all_apis[devices[out_device]["hostapi"]]["name"]}')   

### Define global state and plots

No output is expected from this cell

In [None]:
max_rate = 96000
# Sample rate in Hz set on ADC by a GUI widget
samplerate     = widgets.BoundedIntText(value = 48000, min = 1000, max = max_rate, 
                                        step = 100, description = "",
                                        layout = widgets.Layout(flex='1 1 auto', width='auto'))

# Limit in seconds for buffer size
max_seconds = 10
max_window  = 1.0 * max_seconds

current_data   = np.zeros(max_seconds * samplerate.value, dtype=np.int16)
current_length = current_data.size

def time_to_samples(t):
    return round(t * samplerate.value)

# Time window selected for analysis, initial values are recomputed below
window         = 1.0
offset         = 0.0

# ADC base parameters (will affect scaling of diagrams)
if lab_device:
    adc_fs         = 3.3
    adc_bits       = 12
else:
    adc_fs         = 3.3
    adc_bits       = 16

%matplotlib widget

ticks_milli = mpl.ticker.FuncFormatter(lambda x, pos: '{0:g}'.format(x * 1000))
ticks_micro = mpl.ticker.FuncFormatter(lambda x, pos: '{0:g}'.format(x * 1e6))

mpl.rcParams['path.simplify_threshold'] = 1.0

# Briefly turn interactive mode off to avoid making plots appear here, we will display them later
plt.ioff()
fig = plt.figure(figsize=(8, 8), layout='constrained'); # , subplotpars=mpl.figure.SubplotParams(left=0.5)
plt.ion()

fig.get_layout_engine().set(rect = (0.05, 0.0, 0.95, 1.0))
fig.canvas.header_visible = False

baseaxs, timeaxs, freqaxs = fig.subplots(3, 1, gridspec_kw = {'height_ratios': [1, 3,3]});

timetext = timeaxs.text(0.02, 0.98, 'Nothing yet...',
                        verticalalignment='top', horizontalalignment='left',
                        transform=timeaxs.transAxes,
                        color='orange', fontsize=12)

freqtext = freqaxs.text(0.02, 0.98, 'Nothing yet...',
                        verticalalignment='top', horizontalalignment='left',
                        transform=freqaxs.transAxes,
                        color='blue', fontsize=12)

baseaxs.set_title("Window",               fontdict = {'color': 'orange', 'fontsize': 14}, loc = 'center');
baseaxs.set_title("Now",                  fontdict = {'color': 'orange', 'fontsize': 10}, loc = 'right' );
baseaxs.set_title(f"{max_seconds} s ago", fontdict = {'color': 'orange', 'fontsize': 10}, loc = 'left'  );
timeaxs.set_title("Waveform in window",   fontdict = {'color': 'orange', 'fontsize': 14});
freqaxs.set_title("Spectrum in window",   fontdict = {'color': 'blue',   'fontsize': 14});

timeaxs.set_xlabel("Time [s]",       color='orange', fontsize=12);
timeaxs.set_ylabel("Sample value",   color='orange', fontsize=12);

freqaxs.set_xlabel("Frequency [Hz]", color='blue',   fontsize=12);
freqaxs.set_ylabel("dBV",            color='blue',   fontsize=12);

freqaxs.set_ylim(-100, 10)

def get_time_points(rate, window, offset):
    return np.linspace((-window-offset)/rate, -offset/rate, window)

def get_freq_points(rate, window):
    return np.fft.rfftfreq(window, d=1./rate)

def get_time_ampl(data, window, offset):
    if offset != 0:
        return data[-window-offset:-offset]
    else:
        return data[-window:]

# For dBV we relate voltages to 1 Volt rms, that is, an amplitude level of sqrt(2) Volts.
# With an amplitude scaling in the ADC of k0 = (2**N - 1)/V_fs as 0 dBV signal will, after
# fft.rfft, have an amplitude of sqrt(2) * k0 / 2 = k0 / sqrt(2) which we can pre-compute
# into a dB value to be subracted.
def get_freq_pwr(data, rate, window, offset):
    k0 = (2**adc_bits - 1)/adc_fs
    db_offset = 20 * math.log10(k0 / math.sqrt(2))
    return 20 * np.log10(np.abs(np.fft.rfft(get_time_ampl(data, window, offset)) / window) + 1e-9) - db_offset

baseplt = baseaxs.plot([-1, 0], [0, 0], drawstyle = 'steps-post', color='orange')
timeplt = timeaxs.plot([0], [0], drawstyle = 'steps-post', color='orange')
freqplt = freqaxs.plot([0], [0], color='blue')

def _update_spectrum(data, rate, window, offset):
    freq_pwr = get_freq_pwr(data, rate, window, offset)
    max_pwr_idx = np.argmax(freq_pwr)
    text = f'Peak {freq_pwr[max_pwr_idx]:.1f} dBV at {freqplt[0].get_xdata()[max_pwr_idx]:.1f} Hz'
    freqtext.set_text(text)
    freqplt[0].set_ydata(freq_pwr)
    
def _update_baseeplot(data, rate):
    amplitudes = get_time_ampl(data, min(max_seconds * rate, data.size), 0)
    baseplt[0].set_ydata(amplitudes)
    
def _update_timeplot(data, window, offset):
    amplitudes = get_time_ampl(data, window, offset)
    max_a = np.max(amplitudes)
    min_a = np.min(amplitudes)
    avg_a = np.mean(amplitudes)
    text = f'Max {max_a}, min {min_a}, avg {avg_a:.1f}'
    timetext.set_text(text)
    timeplt[0].set_ydata(amplitudes)
    
def _update_rate(data, rate, win, off):
    max_sample = 2**(adc_bits - 1) #2**(adc_bits.value - 1)
    baseaxs.set_ylim(-max_sample, max_sample)
    timeaxs.set_ylim(-max_sample, max_sample)
    fig.canvas.toolbar.home()
    bwin = min(max_seconds * rate, data.size)
    btimes = get_time_points(rate, bwin, 0)
    baseplt[0].set_xdata(btimes)
    baseaxs.set_xlim(btimes[0], btimes[-1])
    times = get_time_points(rate, win, off)
    timeplt[0].set_xdata(times)
    timeaxs.set_xlim(times[0], times[-1])
    freqs = get_freq_points(rate, win)
    freqplt[0].set_xdata(freqs)
    freqaxs.set_xlim(freqs[0], freqs[-1])
    fig.canvas.toolbar.update()
    
def update_data(data, rate, force = False):
    global current_data, current_length
    length = data.size
    win = time_to_samples(window)
    win = min(win, length)
    off = time_to_samples(offset)
    off = min(off, length - win)
    if force or length != current_length:
        current_length = length
        _update_rate(data, rate, win, off)
    current_data = data
    _update_baseeplot(data, samplerate.value)
    _update_timeplot(data, win, off)
    _update_spectrum(data, samplerate.value, win, off)
    fig.canvas.draw_idle()
    
def window_select(min_x, max_x):
    global window, offset
    window = min(round((max_x - min_x), 3), max_window)
    offset = round(-max_x, 3)
    if offset < 0:
        offset = 0
    if offset > max_window - window:
        offset = max_window - window
    win_selector.extents = (-window-offset, -offset)
    update_data(current_data, samplerate.value, force = True)

win_selector = mpl.widgets.SpanSelector(baseaxs, window_select, 'horizontal',
                                        interactive = True, drag_from_anywhere = True, ignore_event_outside = True)
    
window_select(-min(1.0, max_window), 0.0)       

### Define and display GUI to capture and analyze ADC data

In [None]:
btn_layout = widgets.Layout(flex='1 1 auto', width='auto')
run_button    = widgets.Button(description = "Capture", layout = btn_layout)
play_button_c = widgets.Button(description = "Play on computer", layout = btn_layout)
play_button_l = widgets.Button(description = "Play on lab card", layout = btn_layout)
clear_button  = widgets.Button(description = "Clear", layout = btn_layout)
fill_button   = widgets.Button(description = "Fill 440", layout = btn_layout)

msg_label = widgets.Label('No message yet...')

def is_playing():
    playing = False
    try:
        playing = sd.get_stream().active
    except RuntimeError:
        pass
    return playing
    
def play_data_comp(b):
    if is_playing():
        sd.stop()
    else:
        sound = get_time_ampl(current_data, time_to_samples(window), time_to_samples(offset)).copy()
        amplify = 2**(16-adc_bits)
        sd.play(amplify*sound, samplerate=samplerate.value, device=sd.default.device['output'])
    
def play_data_lab(b):
    if is_playing():
        sd.stop()
    else:
        sound = get_time_ampl(current_data, time_to_samples(window), time_to_samples(offset)).copy()
        amplify = 2**(16-adc_bits)
        sd.play(amplify*sound, samplerate=samplerate.value, device=out_device)

def clear_data(b):
    global current_data, current_length
    current_data   = np.zeros(max_seconds * samplerate.value, dtype=np.int16)
    current_length = current_data.size
    window_select(-min(1.0, max_window), 0.0)

def fill_data(b):
    global current_data, current_length
    freq          = 440
    time_points    = np.linspace(-max_seconds * freq * 2 * np.pi, 0, max_seconds * samplerate.value)
    current_data   = (2 ** adc_bits / 3.3 * np.sin(time_points)).astype(np.int16)
    current_length = current_data.size
    window_select(-max_window, 0.0)

def rate_window_change(change):
    update_data(current_data, samplerate.value, force = True)
    
try:
    pool.shutdown()
except NameError:
    pass

pool = ThreadPoolExecutor(3)
recorder = liverecorder.Liverecorder(max_seconds, max_rate)

def message_update():
    if not recorder.messages.empty():
        msg = recorder.messages.get(False)
        msg_label.value = msg
                

def live_update(in_device, rate):
    
    def _update(rate):
        run_button.description = "Hold"
        while recorder.running():
            recorder.get_update_event().wait(1)
            data, count = recorder.get_data()
            update_data(data, rate)
            message_update()
            time.sleep(0.2)
        run_button.description = "Capture"
        message_update()
            
    recorder.start(in_device, rate)
    return pool.submit(_update, rate)

def toggle_capture(b):
    if recorder.running():
        recorder.stop()
    else:
        live_update(in_device, samplerate.value)
    
samplerate.observe(rate_window_change, names='value')

run_button.on_click(toggle_capture)
play_button_c.on_click(play_data_comp)
play_button_l.on_click(play_data_lab)
clear_button.on_click(clear_data)
fill_button.on_click(fill_data)

widgets.VBox([ 
    widgets.HBox([
        widgets.VBox([widgets.Label('Sample rate [Hz]'),
                      widgets.Label('Capture message:'), ]),
        widgets.VBox([
            widgets.HBox([samplerate, run_button,
                          play_button_c, play_button_l, clear_button, fill_button]),
            msg_label])
    ]),
    fig.canvas
])