In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pickle
%matplotlib inline
%config InlineBackend.figure_format = "retina"

from scipy.signal import stft, istft, get_window
from scipy.fftpack import fft, fftshift, fftfreq
from IPython.display import Audio
from tqdm import tnrange, tqdm_notebook, tqdm
from dlbeamformer_utilities import *
from utilities import *
from IPython.display import Audio

random_seed = 0
# Make pretty figures
palette, cmap = config_figures()

VISUALIZE_BEAMPATTERNS = False

In [2]:
datapath = "CMU_ARCTIC/cmu_us_bdl_arctic/wav"
train_data, test_data = load_data(datapath)

sampling_frequency, stft_params, sound_speed = parse_parameters()
signal_max_frequency = sampling_frequency / 2

# Array geometry
pos_x = np.array([-35.0, -35.0, 0.0, 35.0, 35.0, 0.0, 0.0]) * 1e-3
pos_y = np.array([20.0, -20.0, -40.0, -20.0, 20.0, 40.0, 0.0]) * 1e-3
n_mics = len(pos_x)
pos_z = np.zeros(n_mics)
array_geometry = np.row_stack((pos_x, pos_y, pos_z))

# Fix elevation angle
elevation = -90 # [degree]

# Source/Target/Look angles
elevation_s = np.array([elevation]) # [degree]
azimuth_s = np.array([180])
source_steering_vectors = compute_steering_vectors(array_geometry, 
        sampling_frequency=sampling_frequency, n_fft_bins=stft_params["n_fft_bins"], 
        elevation_grid=elevation_s, 
        azimuth_grid=azimuth_s)

# Scanning angles
scanning_elevation_grid = np.array([elevation]) # [degree]
scanning_azimuth_grid = np.arange(0, 360, 0.1) # [degree]
scanning_steering_vectors = compute_steering_vectors(array_geometry, 
        sampling_frequency=sampling_frequency, n_fft_bins=stft_params["n_fft_bins"], 
        elevation_grid=scanning_elevation_grid, 
        azimuth_grid=scanning_azimuth_grid)

In [3]:
ds_tf_beamformers = 1./n_mics * source_steering_vectors

# Delay-sum beam pattern
ds_tf_beampattern = compute_tf_beampattern(ds_tf_beamformers, scanning_steering_vectors)
ds_tf_beampattern_db = to_db(ds_tf_beampattern)

if VISUALIZE_BEAMPATTERNS:
    frequency_bins = [7, 31, 63, 127]
    visualize_beampattern_1d(ds_tf_beampattern_db[:, 0, :], scanning_azimuth_grid, frequency_bins, 
        signal_max_frequency, source_azimuths= azimuth_s, title="Delay-sum TF beam patterns")

    visualize_beampattern_1d_average(np.abs(ds_tf_beampattern[:, 0, :]), scanning_azimuth_grid, 
        frequency_range=(0, 63), source_azimuths=azimuth_s, title="Delay-sum TF average beam pattern")

    visualize_beampattern_2d(np.abs(ds_tf_beampattern[:, 0, :]), 
        scanning_azimuth_grid, signal_max_frequency);

In [4]:
beamformers = {}
beamformer_list = ["delaysum", "mvdr", "mpdr"]
for beamformer_name in beamformer_list:
    beamformers[beamformer_name] = {
        "weights": None,
        "sinr_db": [],
        "average_sinr_db": 0,
        "out": None
    }
    
n_MC_iters = 100
for i_MC_iter in tqdm(range(n_MC_iters), desc="Monte Carlo iterations"):
    
    source = {
        "signal": test_data[np.random.choice(len(test_data))],
        "elevation": elevation_s,
        "azimuth": azimuth_s
    }
    interferences = []
    interference = {
        "signal": test_data[np.random.choice(len(test_data))],
        "elevation": np.array([elevation]),
        "azimuth": np.array([np.random.uniform(
            scanning_azimuth_grid[0], scanning_azimuth_grid[-1]
        )])
    }
    interferences.append(interference)

    received_stft_multichannel, source_stft_multichannel, interference_stft_multichannel \
        = simulate_multichannel_tf_mixtures(array_geometry, source,
            interferences, sampling_frequency, stft_params)

    for beamformer_name in beamformer_list:        
        if beamformer_name.lower() == "delaysum":
            tf_frames_multichannel = None
        elif beamformer_name.lower() == "mvdr":
            tf_frames_multichannel = interference_stft_multichannel
        elif beamformer_name.lower() == "mpdr":
            tf_frames_multichannel = received_stft_multichannel
            
        tf_beamformer = compute_tf_beamformers(source_steering_vectors[:, 0, 0, :], 
                beamformer_name=beamformer_name,
                tf_frames_multichannel=tf_frames_multichannel)
        
        out, tf_out, _ = compute_tf_beamformer_output(tf_beamformer, 
                            received_stft_multichannel, sampling_frequency, 
                            stft_params)
        
        sinr_db, sinr  = compute_sinr(source_stft_multichannel, 
            interference_stft_multichannel, tf_beamformer)
        
        beamformers[beamformer_name]["weights"] = tf_beamformer
        beamformers[beamformer_name]["sinr_db"].append(sinr_db[0][0])
        beamformers[beamformer_name]["out"] = out
        
for beamformer_name in beamformers.keys():
    beamformers[beamformer_name]["average_sinr_db"] = \
        to_db(np.mean(from_db(np.asarray(beamformers[beamformer_name]["sinr_db"]))))


Monte Carlo iteration: 100%|██████████| 100/100 [00:09<00:00,  8.58it/s]


In [5]:
print(beamformers["delaysum"]["average_sinr_db"])
print(beamformers["mpdr"]["average_sinr_db"])
print(beamformers["mvdr"]["average_sinr_db"])

1.5659911930561066
5.963009595870972
42.362847328186035


In [6]:
Audio(beamformers["mvdr"]["out"], rate=sampling_frequency, autoplay=True)