# Measurement of Loudspeaker to Sequential Spherical Microphone Array

This notebook sketches how to sequentially measure impulse responses of spherical microphone arrays (SMAs) with the VariSphere device.

* Two sweeps are played, one for the loudspeaker and one for the analog feedback. 
* Two channels are recorded, the SMA microphone and the analog feedback.
* After each measurement, the VariSphere rotates to the next microphone position on the SMA grid.

## Load requirements

In [None]:
# Automatically reload the acoustics_hardware package if anything is changed
%load_ext autoreload
%autoreload 2

from datetime import datetime, timedelta
from pathlib import Path
from sys import exit
from time import sleep, strftime

import matplotlib.pyplot as plt
import numpy as np
import sounddevice as sd
from IPython.display import Audio, display, Markdown
from ipywidgets import Output
from numpy.fft import rfft, rfftfreq
from scipy.io import savemat
from scipy.signal.waveforms import chirp

from acoustics_hardware.processors import deconvolve
from acoustics_hardware.serial import VariSphere

## Set up audio hardware

Adjust your settings for the individual audio hardware!

In [None]:
# Show list of available audio devices
print(sd.query_devices())

# Audio device configuration
device_name = 'Orion 32'
fs = 48000 # in Hz
# output channel mapping (first channel in list will be used as loopback)
device_output_ch = [1, 2] # IDs starting from 1
# input channel mapping (first channel in list will be used as loopback)
device_input_ch  = [1, 2] # IDs starting from 1

print(f'\nCheck audio device "{device_name}" ... ', end='')
try:
    sd.check_output_settings(
        device=device_name,
        channels=len(device_output_ch),
        samplerate=fs,
    )
    sd.check_input_settings(
        device=device_name,
        channels=len(device_input_ch),
        samplerate=fs,
    )
except (ValueError, sd.PortAudioError) as e:
    exit(e)
print('Done.')

# Sweep configuration
sweep_hp, sweep_lp = 50, 22e3    # in Hz
sweep_amplitude = -20            # in dB_FS
sweep_duration = [0.5, 3.0, 1.0] # in s, duration as pre-silence, sweep, post-silence
 
print(f'\nGenerate sweep ... ', end='')
data_out = chirp(
    t=np.arange(np.ceil(sweep_duration[1] * fs)) / fs, # in s
    f0=sweep_hp,          # in Hz
    t1=sweep_duration[1], # in s
    f1=sweep_lp,          # in Hz
    method='logarithmic',
    phi=90,               # in deg
)                         # as [samples,]
# Apply amplitude scaling
data_out *= 10**(sweep_amplitude / 20) # as factor
# Prepend and append zeroes
data_out = np.pad(
    array=data_out,
    pad_width=(int(sweep_duration[0] * fs), int(sweep_duration[2] * fs)),
)
print('Done.')

## Prepare functions for plotting

In [None]:
# Prepare output widgets
widget_preview_str = '<h3><center><font color="green">{}</font></center></h3>'
widget_preview_ir = Output()
widget_preview_ir.layout.border = '3px solid green'
widget_preview_rec = Output()
widget_preview_rec.layout.border = '1px solid green'

def print_time_estimation(duration):
    dur_str = (f'Estimated measurement duration is {duration} '
               f'(finished at {(datetime.now() + duration).strftime("%H:%M")})')
    display(Markdown(f'<h2><center>{dur_str}</center></h2>'))

def generate_levels_string(data):
    data_rms = np.sqrt(np.mean(np.power(data, 2), axis=-1)) # linear
    rms_str = np.array2string(
        20 * np.log10(np.abs(data_rms)),                    # in dB
        precision=1,
        sign='+',
        floatmode='fixed',
        separator=', ',
    )
    return f'PEAK {20 * np.log10(np.max(np.abs(data))):+.1f} dB, RMS {rms_str} dB'

def plot_grid(coords_az_el_r, title=None, is_show_lines=False):
    def sph2cart(az, el, r):
        rcos_theta = r * np.cos(el)
        x = rcos_theta * np.cos(az)
        y = rcos_theta * np.sin(az)
        z = r * np.sin(el)
        return x, y, z
    
    # as [azimuth, elevation, radius] in rad
    coords_az_el_r = np.atleast_2d(coords_az_el_r) 
    # as (x, y, z) in ms
    x, y, z = sph2cart(coords_az_el_r[0], coords_az_el_r[1], coords_az_el_r[2])
    
    plt.figure(title, figsize=(10, 10))
    ax = plt.gca(projection='3d')
    
    if is_show_lines:
        ax.plot3D(x, y, z)
        for pos in range(len(x)):
            ax.text(x[pos], y[pos], z[pos], s=pos, fontsize=14,
                    horizontalalignment='center', verticalalignment='center')
    else:
        ax.scatter3D(x, y, z, s=30, depthshade=True)
    
    ax.set_xlabel('X [m]')
    ax.set_ylabel('Y [m]')
    ax.set_zlabel('Z [m]')
    ax.set_box_aspect([1, 1, 1])
    ax.view_init(azim=-160, elev=25) # in deg
    
    plt.tight_layout()
    plt.show()

# Prevent 'divide by zero encountered in log10' warning
np.seterr(divide='ignore')

# noinspection PyShadowingNames
def plot_data(data, fs, ch_labels=None, is_stacked=False):
    data = np.atleast_2d(data)
    if ch_labels is not None and len(ch_labels) != data.shape[-2]:
        exit(f'Mismatch in number of audio channels ({data.shape[-2]}) '
             f'and number of provided labels {len(ch_labels)}.')
    if data.ndim == 3:
        # combine first and second dimension
        ch_labels = np.repeat(ch_labels, data.shape[0]) # also works for string lists
        data = np.reshape(data, (data.shape[0] * data.shape[1], -1), order='F')
        # as [[[channel0_iter0, samples], [channel0_iter1, samples], ...]
        
    n_ch, n_sample = data.shape
    t_data = np.linspace(0, n_sample / fs, n_sample) # in samples
    f_data = rfftfreq(n_sample, 1 / fs)              # in Hz
    spec_data = 20 * np.log10(np.abs(rfft(data)))    # in dB
    
    if is_stacked: n_ch = 1
        
    fig, axes = plt.subplots(
        nrows=n_ch,
        ncols=2 if is_stacked else 3,
        squeeze=False,
        sharex='col',
        figsize=(15, 5 if is_stacked else(3 * n_ch))
    )
    for ch in np.arange(n_ch): # individual channels
        ch_plot = (range(data.shape[0]) if is_stacked
                   else ch)
        
        # Time domain
        ax = axes[ch, 0]
        ax.plot(t_data, data[ch_plot, :].T)
        ax.set_xlim(0, n_sample / fs) # in s
        ax.grid()
        if ch_labels is not None:
            label = ([f'in_{l}' for l in ch_labels] if is_stacked 
                     else [f'in_{ch_labels[ch]}'])
            ax.legend(label, loc='upper right')
        ax.set_xlabel('Time (s)' if ch >= n_ch - 1 else '')
        ax.set_ylabel('Amplitude')

        # Frequency domain
        y_max = np.ceil(spec_data[ch_plot, :].max() / 5) * 5
        ax = axes[ch, 1]
        ax.semilogx(f_data, spec_data[ch_plot, :].T)
        ax.set_xlim(20, fs/2) # in Hz
        ax.set_ylim(y_max - (120 if is_stacked else 80), y_max) # in dB
        ax.grid()
        ax.set_xlabel('Frequency (Hz)' if ch >= n_ch - 1 else '')
        ax.set_ylabel('Magnitude (dB)')
        
        if not is_stacked:
            # Spectrogram
            ax = axes[ch, 2]
            ax.specgram(
                x=data[ch_plot, :].T,
                Fs=fs,        # in Hz
                NFFT=1024,    # in samples
                noverlap=512, # in samples
                mode='magnitude',
                scale='dB',
            )
            ax.set_yscale('log')
            ax.set_ylim(100, fs/2) # in Hz
            ax.set_xlabel('Time (s)' if ch >= n_ch - 1 else '')
            ax.set_ylabel('Frequency (Hz)')
    
    plt.show()

## Test audio

Execute this cell to test if the audio setup works.

In [None]:
%matplotlib inline

# Start playback and recording
print(f'\nRecord sweep ... ', end='')
data_rec = sd.playrec(
    data=data_out,                   # as [samples,]
    samplerate=fs,                   # in Hz
    device=device_name,
    input_mapping=device_input_ch,   # channel numbers start from 1
    output_mapping=device_output_ch, # channel numbers start from 1
    blocking=True,
).T                                  # as [channels, samples]
print('Done.')

# Print recorded peak and individual RMS levels
print(generate_levels_string(data_rec))

# Show recording audio preview
for ch in range(data_rec.shape[0]):
    display(
        Markdown(f'<h3>Channel in_{device_input_ch[ch]}</h3>'),
        Audio(data_rec[ch, :], rate=fs),
    )

# Plot recorded data
plot_data(data_rec, fs, ch_labels=device_input_ch)

# Deconvolve data
data_ir = deconvolve(
    input=data_rec[0, :], # first channel as loopback
    output=data_rec[1:, :],
    fs=fs,                # in Hz
    f_low=20,             # in Hz
    f_high=sweep_lp,      # in Hz
    T=1,                  # in s
)                         # as [channels, samples]
if not data_ir.shape[0]:
    exit('No deconvolved data available (only loopback channel recorded).')
    
# Plot IR data
display(Markdown('<h2><center>Deconvolved Impulse Responses</center></h2>'))
plot_data(data_ir, fs, ch_labels=device_input_ch[1:])
plot_data(data_ir, fs, ch_labels=device_input_ch[1:], is_stacked=True)

## Test VariSphere

Initialize the VariSphere and move the turntable to an arbitrary position.

In [None]:
# VariSphere configuration
vari_ip = '192.168.127.120' # default value

# Create VariSphere device
print(f'\nConnect to VariSphere at {vari_ip} ... ', end='')
vari = VariSphere(
    az_port='4001', # default value
    el_port='4002', # default value
    ip=vari_ip,
)
try:
    vari.az.check_pc_mc_communication()
    vari.el.check_pc_mc_communication()
except Exception as e:
    exit(e)
print('Done.')

# Try different angles (in deg) here
az, el = 0, 0 # in deg
print(f'\nMove VariSphere to {az=}°, {el=}° ... ', end='')
vari.move_blocking(az=az, el=el)
print('Done.')

## Prepare the SMA grid

Adjust your settings to the desired grid!

In [None]:
%matplotlib notebook

# Import requirement
from pyfar.spatial import samplings # from pip install git+https://github.com/pyfar/pyfar.git

# SMA grid configuration
array_grid_type = 'SMA_LE' # Lebedev grid
# array_grid_type = 'SMA_FM' # Fliege-Maier grid
array_sh_order = 8 # as spherical harmonics order
array_r = 0.085 # in m

# Determine spherical sampling grid
if 'LE' in array_grid_type.upper():
    array_grid = samplings.sph_lebedev(sh_order=array_sh_order, radius=array_r)
elif 'FM' in array_grid_type.upper():
    array_grid = samplings.sph_fliege(sh_order=array_sh_order, radius=array_r)
else:
    exit(f'Unknown spherical sampling grid type "{array_grid_type}".')

# Gather spherical coordinates and configuration name
array_coords = array_grid.get_sph(convention='top_elev', unit='rad').T # as [azimuth, elevation, radius] in rad
array_name = f'{array_grid_type.upper()}{array_coords.shape[-1]}'

# # Sort coordinates by ascending azimuth
# # (this yields an inefficient path with respect to elevation movement)
# array_coords_sort_ids = np.argsort(array_coords[0])
# array_coords_sorted = array_coords[:, array_coords_sort_ids]

# Sort coordinates by ascending azimuth and elevation
# (this yields an okay path with respect to elevation movement)
array_coords_sort_ids = np.lexsort((array_coords[1], array_coords[0]))
array_coords_sorted = array_coords[:, array_coords_sort_ids]

# Plot unsorted and sorted grid
plot_grid(array_coords, title=f'{array_name} raw', is_show_lines=True)
plot_grid(array_coords_sorted, title=f'{array_name} sorted', is_show_lines=True)

# Transform coordinates into deg
array_coords = np.rad2deg(array_coords)
array_coords_sorted = np.rad2deg(array_coords_sorted)

## Perform the measurement series

Adjust your settings to the individual measurement conditions!

In [None]:
%matplotlib inline

# Measurement configuration
out_file_name = f'data/{array_name}/{array_name}' + '_pos_{:04d}_az_{:03.0f}_el_{:03.0f}.mat'
vari_duration = [1.0, 1.0] # in s, duration as _ESTIMATED_ move, post-halt-time
meas_iterations = 2 # measurement iterations per position that will be averaged
deconv_ir_hp = 20          # in Hz
deconv_ir_len = fs         # in samples

# Create output data path if it does not exist
Path(out_file_name).parent.mkdir(parents=True, exist_ok=True)

# Print time estimation
total_meas = array_coords_sorted.shape[-1] * meas_iterations
print_time_estimation(timedelta(seconds=total_meas
                                * (np.sum(vari_duration) + (data_out.shape[0] / fs))))
display(widget_preview_ir)
widget_preview_ir.clear_output()
display(widget_preview_rec)
widget_preview_rec.clear_output()

# Wait before starting the measurement
sleep(1) # in s

try:
    for cur_id in range(array_coords_sorted.shape[-1]):
        az, el = array_coords_sorted[0, cur_id], array_coords_sorted[1, cur_id]
        
        # Move VariSphere
        print(f'\nMove VariSphere to {az=:.1f}°, {el=:.1f}° ... ', end='')
        vari.move_blocking(az=az, el=el)
        print('Done.')

        # Wait for vibrations to settle
        sleep(vari_duration[1]) # in s

        # Prepare data arrays
        data_rec = np.zeros((meas_iterations, len(device_input_ch), data_out.shape[-1]))
        data_ir = np.zeros((meas_iterations, len(device_input_ch) - 1, deconv_ir_len))

        for i in range(meas_iterations):
            pos_str = f'{az=:.1f}°, {el=:.1f}° [{meas_iterations * cur_id + i + 1}/{total_meas}]'

            # Start playback and recording)
            print(f'Record sweep for {pos_str} ... ', end='')
            data_rec[i] = sd.playrec(            # as [iterations, channels, samples]
                data=data_out,                   # as [samples,]
                samplerate=fs,                   # in Hz
                device=device_name,
                input_mapping=device_input_ch,   # channel numbers start from 1
                output_mapping=device_output_ch, # channel numbers start from 1
                blocking=True,
            ).T                                  # as [channels, samples]
            print('Done.')

            # Print recorded peak and individual RMS levels
            lvl_str = generate_levels_string(data_rec[i])
            print(lvl_str)

            # Deconvolve data
            data_ir[i] = deconvolve(  # as [iterations, channels-1, samples]
                input=data_rec[i, 0], # first channel as loopback
                output=data_rec[i, 1:],
                fs=fs,                # in Hz
                f_low=deconv_ir_hp,   # in Hz
                f_high=sweep_lp,      # in Hz
                T=deconv_ir_len / fs, # in s
            )                         # as [channels, samples]

            # Plot data preview (updated after each measurement)
            with widget_preview_rec:
                widget_preview_rec.clear_output(wait=True)
                plot_data(data_rec[i], fs, ch_labels=device_input_ch)
                display(Markdown(f'<h4><center>{lvl_str}</center></h4>'))
                display(Markdown(widget_preview_str.format(f'Preview individual signals for {pos_str}')))
#                     plot_data(data_ir[i], fs, ch_labels=device_input_ch[1:])
#                     display(Markdown(widget_preview_str.format(f'Preview individual IRs for {pos_str}')))

        # Average deconvolved IRs
        data_ir = np.mean(data_ir, axis=0) # as [channels-1, samples]

        # Plot averaged IRs preview (updated after each measurement)
        with widget_preview_ir:
            widget_preview_ir.clear_output(wait=True)
            plot_data(data_ir, fs, ch_labels=device_input_ch[1:], is_stacked=True)
            display(Markdown(widget_preview_str.format(f'Preview averaged IRs for {pos_str}')))

        # Store data
        grid_id = array_coords_sort_ids[cur_id]
        file_name = out_file_name.format(grid_id, az, el)
        print(f'Save data to {file_name} ... ', end='')
        savemat(
            file_name,
            {
                'signal': data_rec.T,          # as [samples, channels, iterations]
                'ir': data_ir.T,               # as [samples, channels-1]
                'fs': fs,                      # in Hz
                'ch_output': device_output_ch, # as list
                'ch_input': device_input_ch,   # as list
                'sh_order': array_sh_order,    # as spherical harmonics order
                'receiver_grid_id': grid_id,   # as position index
                'receiver_meas_id': cur_id,    # as position index
                'receiver_az_deg': az,         # in deg
                'receiver_el_deg': el,         # in deg
                'receiver_r': array_r,         # in m
                'receiver_scatter_r': array_r, # in m
            },
        )
        print('Done.')

    display(Markdown('<h2><center>Measurement finished</center></h2>'))
except KeyboardInterrupt:
    print('\n\nInterrupted by user.')