# DOASimulation

Notebook to apply beamforming algorithms to simulated data (either anayltical or using pyroomacoustics).

In [None]:
import math
import sys
from copy import deepcopy

sys.path.append("../")

import IPython
import matplotlib.pylab as plt
import numpy as np

from constants import SPEED_OF_SOUND

%reload_ext autoreload
%autoreload 2

#%matplotlib inline
%matplotlib notebook

np.random.seed(1)

def save_signals(signals_all, signals_source, save_as, signal_type):
    from scipy.io.wavfile import write
    import os
    if not os.path.exists(os.path.dirname(save_as)):
        os.makedirs(os.path.dirname(save_as))
        print('created new directory', os.path.dirname(save_as))
    
    for name, arr in zip(['all', 'source'], [signals_all, signals_source]):
        for i in range(arr.shape[0]):
            fname = f"{save_as}/{signal_type}_{name}_mic{i+1}.wav"
            write(fname, Fs, arr[i])
            print('wrote as', fname)
    mic_fname = f"{save_as}/{signal_type}_mics.npy"
    np.save(mic_fname, mics_2d)
    print('wrote mics as', mic_fname)

# 1. General setup

In [None]:
from signals import *

#Fs = 32000  # Hz, sampling frequency
Fs = 44100  # Hz, sampling frequency

dimension = 2
uniform = False
gt_elevation = None
baseline = 0.11  # 15
distance = 1.925  # distance of source
duration = 1  # 0.1  # seconds

sources_2d = np.array([0, distance]).reshape((1, -1))
gt_azimuth = np.arctan2(sources_2d[0, 1], sources_2d[0, 0])

gt_degrees = 40
gt_azimuth = (90 - gt_degrees) * np.pi / 180.0
sources_2d = distance * np.array([np.cos(gt_azimuth), np.sin(gt_azimuth)]).reshape(
    (1, -1)
)
print('ground truth direction:', gt_azimuth * 180 / np.pi)

propellers = range(4) #[0]  # [0, 1, 2, 3]  # which propellers to consider
source_type = "mono"  # mono or random
signals_props = None

thrust = 42000
fname = f"../data/propellers/{thrust}.wav"

frequency = 200  # frequency for mono signal, Hz

# generate the signals alaytically or using pyroomacoustics
pyroomacoustics = False #True #False

# save simulated data
save_as = None
#save_as = "../data/simulated/"

In [None]:
from mic_array import get_square_array, get_uniform_array
from plotting_beamform import plot_setup

propellers_2d = get_square_array(baseline=baseline, delta=0.03)
propellers_2d = propellers_2d[propellers, :]

if uniform:
    mics_2d = get_uniform_array(mic_number=4, dimension=dimension, baseline=baseline)
else:
    mics_2d = get_square_array(baseline=baseline, delta=0)
    #mics_2d = 0.2 * np.c_[[0, 1], [1, 0], [2, 0]].T
    #mics_2d += np.random.normal(scale=1e-3, size=mics_2d.shape)

plot_setup(mics_2d, sources_2d, propellers_2d)

# 2. Signals

## 2.a Generate signals analytically

In [None]:
np.random.seed(1)

if pyroomacoustics:
    print('not using analytical generation')
else:
    from algos_basics import get_mic_delays, get_mic_delays_near
    from mic_array import ambiguity_test
    from signals import MonoSignal, generate_signal

    np.random.seed(1)
    added_noise_std = 1e-3

    source_signal = MonoSignal(f=frequency)

    times = np.arange(0, duration, step=1 / Fs)

    ambiguity_test(mics_2d, source_signal.params["f"])

    # compute exact delays (not far-field) for the source
    mic_ref = mics_2d[0]

    delays_near = get_mic_delays_near(mics_2d, sources_2d[0])
    delays = get_mic_delays(mics_2d, gt_azimuth)
    print("delays:", np.round(delays, 5))
    print("difference near vs. far", delays_near - delays)

    # generate delayed signals
    signals_source = np.array(
        [source_signal.evaluate(times - d, noise=0) for d in delays], dtype=np.float
    )
    if added_noise_std > 0:
        signals_source += np.random.normal(
            loc=0, scale=added_noise_std, size=signals_source.shape
        )
    signals_source = amplify_signal(signals_source, target_dB=20.0)

    signals_all = deepcopy(signals_source)
    if propellers:
        signals_props = np.empty(signals_source.shape)
        for mic in range(signals_props.shape[0]):
            for i in propellers:
                prop_signal = generate_signal(
                    Fs, signal_type="real", duration=duration, fname=fname
                )
                prop_signal = amplify_signal(prop_signal, target_dB=0.0)
                signals_props[mic, :] += prop_signal
        signals_all += signals_props 
    else:
        signals_props = None
    signals_sum = deepcopy(signals_all)
    
    if save_as is not None:
        save_signals(signals_all, signals_source, save_as, 'analytical')
        
    n_times = signals_source.shape[1]
    print('number of samples:', n_times)

## 2.b Generate signals with pyroomacoustics

In [None]:
if not pyroomacoustics: 
    print('not using pyroomacoustics')
    pass
else:
    import pyroomacoustics as pra

    ## source signals only
    room = pra.AnechoicRoom(fs=Fs, dim=dimension)
    for i, source in enumerate(sources_2d):

        if source_type == "random":
            drone_signal = generate_signal(Fs, signal_type="random", duration=duration)
        elif source_type == "mono":
            drone_signal = generate_signal(
                Fs, signal_type="mono", duration=duration, frequency=frequency
            )
        else:
            raise ValueError(source_type)
        drone_signal = amplify_signal(drone_signal, target_dB=0.0)
        room.add_source(source[:dimension], signal=drone_signal, name=f"drone{i}")
    beam_former = pra.Beamformer(mics_2d[:, :dimension].T, room.fs)
    room.add_microphone_array(beam_former)
    room.simulate()
    signals_source = deepcopy(room.mic_array.signals)
    n_times = signals_source.shape[1]
    print(n_times)
    signals_sum = deepcopy(signals_source)  # will add propellers if not empty

    ## propeller signals only
    room = pra.AnechoicRoom(fs=Fs, dim=dimension)
    for i, prop in enumerate(propellers_2d):
        if not len(prop):
            break
        # prop_signal = generate_signal(Fs, signal_type='random', duration=duration)
        prop_signal = generate_signal(
            Fs, signal_type="real", duration=duration, fname=fname
        )
        prop_signal = amplify_signal(prop_signal, target_dB=0.0)
        room.add_source(prop[:dimension], signal=prop_signal, name=f"prop{i}")
    beam_former = pra.Beamformer(mics_2d[:, :dimension].T, room.fs)
    room.add_microphone_array(beam_former)

    try:
        room.simulate()
        signals_props = deepcopy(room.mic_array.signals)

        max_length = min(signals_props.shape[1], signals_source.shape[1])
        signals_sum[:, :max_length] += signals_props[:, :max_length]
    except:
        signals_props = None
        print("Not adding any propellers signal.")

    ## all signals
    room = pra.AnechoicRoom(fs=Fs, dim=dimension)
    for i, prop in enumerate(propellers_2d):
        if not len(prop):
            break
        # prop_signal = generate_signal(Fs, signal_type='random', duration=duration)
        prop_signal = generate_signal(
            Fs, signal_type="real", duration=duration, fname=fname
        )

        prop_signal = amplify_signal(prop_signal, target_dB=0.0)
        room.add_source(prop[:dimension], signal=prop_signal, name=f"prop{i}")

    for i, source in enumerate(sources_2d):
        if source_type == "mono":
            drone_signal = generate_signal(
                Fs, signal_type="mono", duration=duration, frequency=frequency
            )
        elif source_type == "random":
            drone_signal = generate_signal(Fs, signal_type="random", duration=duration)
        else:
            raise ValueError(source_type)

        drone_signal = amplify_signal(drone_signal, target_dB=20.0)
        room.add_source(source[:dimension], signal=drone_signal, name=f"drone{i}")

    beam_former = pra.Beamformer(mics_2d[:, :dimension].T, room.fs)
    room.add_microphone_array(beam_former)
    room.simulate()

    signals_all = deepcopy(room.mic_array.signals)

    if save_as is not None:
        save_signals(signals_all, signals_source, save_as, 'analytical')
        
    n_times = signals_all.shape[1]
    print('number of samples:', n_times)

## DEBUGGING: Signal Check

In [None]:
from plotting_beamform import plot_sine_signals
plot_sine_signals(signals_source, normalize_signal=True, length=200)
plot_sine_signals(signals_all, normalize_signal=True, length=200)

# 3. Preprocess signals

In [None]:
from plotting_beamform import *
from signals import get_power

time_index = 0
n_buffer = 2**8 #800 #2 ** 13  # 256  # 1024
assert n_buffer <= n_times, (n_buffer, n_times)
times_buffer = np.arange(0, n_buffer / Fs, step=1 / Fs)
frequencies = np.fft.rfftfreq(n=n_buffer,d=1/Fs)

buffer_source = signals_source[:, time_index : time_index + n_buffer]
max_bins = get_max_bins(buffer_source)
max_frequencies = frequencies[max_bins]
#plot_signals(buffer_source, Fs=Fs, plot_frequencies=max_frequencies)
#plt.title('source only')

plt.figure()
plt.plot(buffer_source.T)
plt.title(f"{time_index}:{time_index+n_buffer}")
plt.show()

if signals_props is not None:
    buffer_props = signals_props[:, time_index : time_index + n_buffer]
    max_bins = get_max_bins(buffer_props)
    max_frequencies = frequencies[max_bins]
    plot_signals(buffer_props, Fs=Fs, plot_frequencies=max_frequencies)
    plt.title('propellers only')

buffer_all = signals_all[:, time_index : time_index + n_buffer]
max_bins = get_max_bins(buffer_all)
max_frequencies = frequencies[max_bins]
plot_signals(buffer_all, Fs=Fs, plot_frequencies=max_frequencies)
plt.title('propellers and source combined')

buffer_sum = signals_sum[:, time_index : time_index + n_buffer]
max_bins = get_max_bins(buffer_sum)
max_frequencies = frequencies[max_bins]
plot_signals(buffer_sum, Fs=Fs, plot_frequencies=max_frequencies)
plt.title('sum of propellers and source, should be same as above!')


In [None]:
print('SNR:', get_power(signals_source) - get_power(signals_props))

# 4. Algorithms

## 4.1 choose frequencies

In [None]:
buffer = buffer_source
#buffer = buffer_sum
#buffer = buffer_all # TODO: for some reason this does not work for pyroomcoustics.


frequencies = np.fft.rfftfreq(n=n_buffer, d=1 / Fs)

# frequency_bins = np.arange(1, num_frequencies)
# frequency_bins = np.linspace(1, len(frequencies)-1, num_frequencies).astype(np.int)
#frequency_bins = [round(frequency / Fs * n_buffer)]
#print(frequency_bins)
frequency_bins = range(3)

frequencies_hz = frequencies[frequency_bins]
print(n_buffer, Fs)
print(frequencies_hz)

plot_signals(buffer, Fs=Fs, plot_frequencies=frequencies_hz)

## 4.2 apply algos_beamforming

In [None]:
from algos_basics import get_autocorrelation
from algos_beamforming import get_lcmv_beamformer, get_das_beamformer
from algos_beamforming import get_beampattern, get_powers

R = get_autocorrelation(buffer, frequency_bins)

thetas_scan = np.linspace(0, 360, 361) * np.pi / 180
mvdr_scan = np.empty((R.shape[0], len(thetas_scan)))
das_scan = np.empty((R.shape[0], len(thetas_scan)))

for i, theta_scan in enumerate(thetas_scan):

    constraints = [(theta_scan, 1)]
    H_mvdr, conds, conds_big = get_lcmv_beamformer(
        R, frequencies_hz, mics_2d, constraints, lamda=1e-3
    )
    H_das = get_das_beamformer(theta_scan, frequencies_hz, mics_2d)

    # mvdr_scan[:, i] = get_beampattern(theta_scan, frequencies_hz, mics_2d, H)
    # das_scan[:, i] = get_beampattern(theta_scan, frequencies_hz, mics_2d, H_das)

    mvdr_scan[:, i] = get_powers(H_mvdr, R)
    das_scan[:, i] = get_powers(H_das, R)

    # TESTING
    # test = get_das_powers(theta_scan, frequencies, mics_2d, Rx)
    # np.testing.assert_allclose(test, das_scan[:, i])
#print(R)

## 4.3 analyze results

In [None]:
from plotting_beamform import plot_spectrum_linear

# SNR-dependent weighting?
# combination = np.product
combination = np.sum
# combination = None

fig, axs = plt.subplots(1, 2, sharex=True)
fig.set_size_inches(10, 5)
plot_spectrum_linear(
    mvdr_scan,
    thetas_scan,
    frequencies_hz,
    gt_azimuth=gt_azimuth,
    ax=axs[0],
    combination=combination,
)
plot_spectrum_linear(
    das_scan,
    thetas_scan,
    frequencies_hz,
    gt_azimuth=gt_azimuth,
    ax=axs[1],
    combination=combination,
)
axs[0].set_title("MVDR")
axs[1].set_title("DAS")