In [None]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = "retina"
# print(plt.style.available)
plt.style.use("ggplot")
# plt.style.use("fivethirtyeight")
plt.style.use("seaborn-talk")
from tqdm import tnrange, tqdm_notebook

In [None]:
import numpy as np
import scipy as sp
from sklearn.linear_model import orthogonal_mp_gram

def compute_steering_vector_ULA(u, microphone_array):
    return np.exp(1j*2*np.pi*microphone_array.geometry*u).reshape((microphone_array.n_mics, 1))

def compute_MVDR_weight(source_steering_vector, signals):
    snapshot = signals.shape[1]
    sample_covariance_matrix = signals.dot(signals.transpose().conjugate()) / snapshot
    inverse_sample_covariance_matrix = np.linalg.inv(sample_covariance_matrix)
    normalization_factor = (source_steering_vector.transpose().conjugate().dot(inverse_sample_covariance_matrix).dot(source_steering_vector))
    weight = inverse_sample_covariance_matrix.dot(source_steering_vector) / normalization_factor
    return weight

def check_distortless_constraint(weight, source_steering_vector):
    assert(np.abs(weight.transpose().conjugate().dot(source_steering_vector)) - 1 < 1e-9)

def uniform_linear_array(n_mics, spacing):
    return spacing*np.arange(-(n_mics-1)/2, (n_mics-1)/2+1).reshape(1, n_mics)

def generate_gaussian_samples(power, shape):
    return np.sqrt(power/2)*np.random.randn(shape[0], shape[1]) + 1j*np.sqrt(power/2)*np.random.randn(shape[0], shape[1]); # signal samples

class MicrophoneArray():
    def __init__(self, array_geometry):
        self.dim = array_geometry.shape[0]
        self.n_mics = array_geometry.shape[1]
        self.geometry = array_geometry

class BaseDLBeamformer(object):
    def __init__(self, vs):
        """
        Parameters
        ----------
        vs: Source manifold array vector
        """
        self.vs = vs
        self.weights_ = None
        
    def _compute_weights(self, training_data):
        n_training_samples = len(training_data)
        n_mics, snapshot = training_data[0].shape
        D = np.zeros((n_mics, n_training_samples), dtype=complex)
        for i_training_sample in range(n_training_samples):
            nv = training_data[i_training_sample]
            Rnhat = nv.dot(nv.transpose().conjugate()) / snapshot
            Rnhatinv = np.linalg.inv(Rnhat)
            w = Rnhatinv.dot(self.vs) / (self.vs.transpose().conjugate().dot(Rnhatinv).dot(self.vs))
            D[:, i_training_sample] = w.reshape(n_mics,)
        return D

    def _initialize(self, X):
        pass

    def _choose_weights(self, x):
        n_dictionary_atoms = self.weights_.shape[1]
        R = x.dot(x.transpose().conjugate())
        proxy = np.diagonal(self.weights_.transpose().conjugate().dot(R).dot(self.weights_))
        optimal_weight_index = np.argmin(proxy)

#         min_energy = np.inf
#         optimal_weight_index = None
#         for i_dictionary_atom in range(n_dictionary_atoms):
#             w = self.weights_[:, i_dictionary_atom]
#             energy = np.real(w.transpose().conjugate().dot(R).dot(w))
#             if min_energy > energy:
#                 min_energy = energy
#                 optimal_weight_index = i_dictionary_atom
        return self.weights_[:, optimal_weight_index]
    
    def fit(self, training_data):
        """
        Parameters
        ----------
        X: shape = [n_samples, n_features]
        """
        D = self._compute_weights(training_data)
        self.weights_ = D
        return self

    def choose_weights(self, x):
        return self._choose_weights(x)

#### Setup

In [None]:
d = 0.5
n_mics = 10
array_geometry = uniform_linear_array(n_mics=n_mics, spacing=d)
microphone_array = MicrophoneArray(array_geometry)
us = 0
vs = compute_steering_vector_ULA(us, microphone_array)
SNRs = np.arange(0, 31, 10)
n_SNRs = len(SNRs)
sigma_n = 1

#### Training data

In [None]:
# u_list = [0.29, 0.45]
# n_training_samples = 500
# training_snapshot = 100
# sigma = 10**(20/10)
# training_noise_interference_data = []
# for i_training_sample in range(n_training_samples):
# #     u = np.random.uniform(0, 1)
#     u = np.random.choice(u_list)
# #     vi = np.exp(1j*2*np.pi*d_array*u)
#     vi = compute_steering_vector_ULA(u, microphone_array)
#     ii = np.sqrt(sigma/2)*np.random.randn(1, training_snapshot) + 1j*np.sqrt(sigma/2)*np.random.randn(1, training_snapshot) # interference samples
#     noise = np.sqrt(sigma_n/2)*np.random.randn(n_mics, training_snapshot) + 1j*np.sqrt(sigma_n/2)*np.random.randn(n_mics, training_snapshot) # Gaussian noise samples
#     nv = vi*ii + noise
#     training_noise_interference_data.append(nv)


# training_snapshots = [100, 200, 500]
# u_list = [0.29, 0.45]
# interference_powers = [10]   
# n_interference_list = [1, 2]

training_snapshots = [10, 50, 100]
interference_powers = [10, 20, 30]
n_interference_list = [1, 2, 3]
u_step = 0.5
u_list = np.arange(-1, 1+1e-6, u_step)

import itertools
training_noise_interference_data_various_snapshots = []
for training_snapshot in training_snapshots:
    training_noise_interference_data = []
    for n_interferences in n_interference_list:
        interferences_params = []
        for i_interference in range(n_interferences):
            interference_params = list(itertools.product(*[u_list, interference_powers]))
            interferences_params.append(interference_params)
        interferences_param_sets = list(itertools.product(*interferences_params))        

        # for param_set in interferences_param_sets:
        for param_set in interferences_param_sets:
            n_training_samples = 10
            for i_training_sample in range(n_training_samples):
                nv = np.zeros((microphone_array.n_mics, training_snapshot), dtype=complex)
                for i_interference in range(len(param_set)):
                    u, interference_power = param_set[i_interference]
                    vi = compute_steering_vector_ULA(u, microphone_array)
                    sigma = 10**(interference_power/10)
                    ii = generate_gaussian_samples(power=sigma, shape=(1, training_snapshot))
                    nv += vi.dot(ii)
                noise = generate_gaussian_samples(power=sigma_n, shape=(microphone_array.n_mics, training_snapshot))
                nv += noise
                training_noise_interference_data.append(nv)
    training_noise_interference_data_various_snapshots.append(training_noise_interference_data)

#### Train baseline dictionary

In [None]:
dictionaries = []
for i_training_snapshot in range(len(training_snapshots)):
    training_noise_interference_data = training_noise_interference_data_various_snapshots[i_training_snapshot]
    dictionary = BaseDLBeamformer(vs)
    dictionary.fit(training_noise_interference_data);
    dictionaries.append(dictionary)

#### Testing

In [None]:
n_trials = 200
snapshots = np.array([10, 20, 30, 40, 60, 100, 200, 500, 1000])
n_snapshots = len(snapshots)
# ui1 = np.random.uniform(0, 1)
# ui2 = np.random.uniform(0, 1)

u_list = [0.29, 100]
n_interferences_list = [len(u_list)]

# Wo = Rninv.dot(vs) / (vs.transpose().conjugate().dot(Rninv).dot(vs))

sinr_snr_mvdr = np.zeros((n_SNRs, n_snapshots))
sinr_snr_mpdr = np.zeros((n_SNRs, n_snapshots))
sinr_snr_baseline_mpdr = np.zeros((len(training_snapshots), n_SNRs, n_snapshots))
sinr_snr_baseline_mvdr = np.zeros((len(training_snapshots), n_SNRs, n_snapshots))

for i_SNR in tqdm_notebook(range(n_SNRs), desc="SNRs"):
    sigma_s = 10**(SNRs[i_SNR] / 10)
    Rs = sigma_s * vs.dot(vs.transpose().conjugate())
    
#     SINRopt = ( np.real(Wo.transpose().conjugate().dot(Rs).dot(Wo)) / np.real(Wo.transpose().conjugate().dot(Rn).dot(Wo)) )[0][0]
    
    for i_snapshot in tqdm_notebook(range(n_snapshots), desc="Snapshots", leave=False):
        snapshot = snapshots[i_snapshot]
        sinr_mvdr = np.zeros(n_trials)
        sinr_mpdr = np.zeros(n_trials)
        sinr_baseline_mpdr = np.zeros((len(training_snapshots), n_trials))
        sinr_baseline_mvdr = np.zeros((len(training_snapshots), n_trials))
        
        for i_trial in range(n_trials):
            ss = np.sqrt(sigma_s/2)*np.random.randn(1, snapshot) + 1j*np.sqrt(sigma_s/2)*np.random.randn(1, snapshot) # signal samples            
            nn = np.sqrt(sigma_n/2)*np.random.randn(microphone_array.n_mics, snapshot) + 1j*np.sqrt(sigma_n/2)*np.random.randn(microphone_array.n_mics, snapshot) # Gaussian noise samples
            
            n_interferences = 2
            nv = np.zeros((microphone_array.n_mics, snapshot), dtype=complex)
            Rn = np.zeros((microphone_array.n_mics, microphone_array.n_mics), dtype=complex)
            for i_interference in range(n_interferences):
                u = u_list[i_interference]
                sigma = 10**(20/10)     
                ii = generate_gaussian_samples(power=sigma, shape=(1, snapshot))
                interference_steering_vector = compute_steering_vector_ULA(u, microphone_array)
                nv += interference_steering_vector*ii
                Rn += sigma*interference_steering_vector.dot(interference_steering_vector.transpose().conjugate())
            nv += nn
            Rn += sigma_n*np.identity(microphone_array.n_mics)
            Rninv = np.linalg.inv(Rn)

            sv = vs*ss
            xx = sv + nv
            
            for i_dictionary in range(len(dictionaries)):
                dictionary = dictionaries[i_dictionary]
                w_baseline_p = dictionary.choose_weights(xx)
                sinr_baseline_mpdr[i_dictionary, i_trial] = np.real(w_baseline_p.transpose().conjugate().dot(Rs).dot(w_baseline_p)) / np.real(w_baseline_p.transpose().conjugate().dot(Rn).dot(w_baseline_p))
                w_baseline_v = dictionary.choose_weights(nv)
                sinr_baseline_mvdr[i_dictionary, i_trial] = np.real(w_baseline_v.transpose().conjugate().dot(Rs).dot(w_baseline_v)) / np.real(w_baseline_v.transpose().conjugate().dot(Rn).dot(w_baseline_v))                            
                check_distortless_constraint(w_baseline_p, vs)
                check_distortless_constraint(w_baseline_v, vs)
            wv = compute_MVDR_weight(vs, nv)
            wp = compute_MVDR_weight(vs, xx)
            
            check_distortless_constraint(wv, vs)
            check_distortless_constraint(wp, vs)
            
            
            sinr_mvdr[i_trial] = np.real(wv.transpose().conjugate().dot(Rs).dot(wv)) / np.real(wv.transpose().conjugate().dot(Rn).dot(wv))
            sinr_mpdr[i_trial] = np.real(wp.transpose().conjugate().dot(Rs).dot(wp)) / np.real(wp.transpose().conjugate().dot(Rn).dot(wp))
#             sinr_baseline_mpdr[i_trial] = np.real(w_baseline_p.transpose().conjugate().dot(Rs).dot(w_baseline_p)) / np.real(w_baseline_p.transpose().conjugate().dot(Rn).dot(w_baseline_p))
#             sinr_baseline_mvdr[i_trial] = np.real(w_baseline_v.transpose().conjugate().dot(Rs).dot(w_baseline_v)) / np.real(w_baseline_v.transpose().conjugate().dot(Rn).dot(w_baseline_v))
        sinr_snr_mvdr[i_SNR, i_snapshot] = np.sum(sinr_mvdr) / n_trials
        sinr_snr_mpdr[i_SNR, i_snapshot] = np.sum(sinr_mpdr) / n_trials
#         sinr_snr_baseline_mpdr[i_SNR, i_snapshot] = np.sum(sinr_baseline_mpdr) / n_trials    
#         sinr_snr_baseline_mvdr[i_SNR, i_snapshot] = np.sum(sinr_baseline_mvdr) / n_trials    
        for i_dictionary in range(len(dictionaries)):
            sinr_snr_baseline_mpdr[i_dictionary, i_SNR, i_snapshot] = np.sum(sinr_baseline_mpdr[i_dictionary, :]) / n_trials
            sinr_snr_baseline_mvdr[i_dictionary, i_SNR, i_snapshot] = np.sum(sinr_baseline_mvdr[i_dictionary, :]) / n_trials

In [None]:
fig = plt.figure(figsize=(9, 6*n_SNRs)); 
for i_SNR in range(n_SNRs):
    sigma_s = 10**(SNRs[i_SNR] / 10)
    Rs = sigma_s * vs.dot(vs.transpose().conjugate())
    ax = fig.add_subplot(n_SNRs, 1, i_SNR+1)
    ax.semilogx(snapshots, 10*np.log10(sinr_snr_mpdr[i_SNR, :]), marker="o", alpha=0.3, label="MPDR")
    ax.semilogx(snapshots, 10*np.log10(sinr_snr_mvdr[i_SNR, :]), marker="*", alpha=0.3, label="MVDR")
#     ax.semilogx(snapshots, 10*np.log10(sinr_snr_baseline_mpdr[i_SNR, :]))
#     ax.semilogx(snapshots, 10*np.log10(sinr_snr_baseline_mvdr[i_SNR, :]))
    for i_training_snapshot in range(len(training_snapshots)):
        ax.semilogx(snapshots, 10*np.log10(sinr_snr_baseline_mpdr[i_training_snapshot, i_SNR, :]), 
                    label="Baseline MPDR - {} training snapshots".format(training_snapshots[i_training_snapshot]))
        ax.semilogx(snapshots, 10*np.log10(sinr_snr_baseline_mvdr[i_training_snapshot, i_SNR, :]), 
                    label="Baseline MVDR - {} training snapshots".format(training_snapshots[i_training_snapshot]))
    ax.legend()
    ax.set_xlim(10, 1000); ax.set_ylim(-10, 45);
    ax.set_xlabel("Number of snapshots");
    ax.set_ylabel(r"$SINR_0$ [dB]");