In [None]:
import os
import sys
import random
import pickle
import scipy
import math
import numpy as np
import matplotlib.pyplot as plt
from numpy import linalg as la
from scipy.optimize import linear_sum_assignment
from scipy.spatial.distance import directed_hausdorff
from sklearn import metrics

sys.path.insert(0, '../../../methods')
from t_gane_lee2021 import *

In [None]:
def generate_dataset():
    snr_list = [-20,-15,-10,-5,0,5,10,15,20,25,30]
    T_list = [100,200,500,1000,2000,5000,10000]
    nn_gap_list = []
    ns_gap_list = []

    for snr in snr_list:
        # load data
        data = np.load(f'../../../data/experiment_3/scenario_1/data_test_snr{snr}_t1000.npy', allow_pickle=True)
        
        truths = [s['label'] for s in data]
        sensor_pos = [s['sensor_pos'] for s in data]
        data = [s['cm'] for s in data]
        
        for i in range(len(data)):
            R = data[i]
            M = R.shape[0]
            N = truths[i].shape[0]
            
            D, V = la.eigh(R)
            idx = abs(D).argsort()   
            D = D[idx]
            V = V[:,idx]
            evs = D
            evs = evs / sum(evs)
            
            for j in range(len(evs)-1):
                delta_ev = evs[j+1]-evs[j]

                if M-j-1 > N:
                    nn_gap_list.append(delta_ev)
                elif M-j-1 == N:
                    ns_gap_list.append(delta_ev)
                    break
                    
    for T in T_list:
        # load data
        data = np.load(f'../../../data/experiment_3/scenario_1/data_test_snr-10_t{T}.npy', allow_pickle=True)
        
        truths = [s['label'] for s in data]
        sensor_pos = [s['sensor_pos'] for s in data]
        data = [s['cm'] for s in data]
        
        for i in range(len(data)):
            R = data[i]
            M = R.shape[0]
            N = truths[i].shape[0]
            
            D, V = la.eigh(R)
            idx = abs(D).argsort()   
            D = D[idx]
            V = V[:,idx]
            evs = D
            evs = evs / sum(evs)
            
            for j in range(len(evs)-1):
                delta_ev = evs[j+1]-evs[j]

                if M-j-1 > N:
                    nn_gap_list.append(delta_ev)
                elif M-j-1 == N:
                    ns_gap_list.append(delta_ev)
                    break
       
    return nn_gap_list, ns_gap_list

In [None]:
nn_gap_list, ns_gap_list = generate_dataset()

In [None]:
plt.plot(nn_gap_list, '.')
plt.show()
plt.plot(ns_gap_list, '.')
plt.show()

In [None]:
x = nn_gap_list
Kmax_list = np.arange(1,11)
bic_list = []
omegas_list = []
mus_list = []
sigmas_list = []
max_iteration = 1000000

for Kmax in Kmax_list:
    print("processing K =", Kmax)
    
    # initialization
    omegas = [1/Kmax for _ in range(Kmax)]
    mus = [np.random.choice(x) for _ in range(Kmax)]
    sigmas = [np.std(x) for _ in range(Kmax)]

    l_prev = 0
    for i in range(Kmax):
        p = scipy.stats.norm(mus[i], sigmas[i]).pdf(x)
        l_prev += omegas[i]*p
    l_prev = sum(np.log(l_prev))/len(l_prev)

    for k in range(max_iteration):
        # expectation
        gammas = np.zeros((Kmax, len(x)))
        for i in range(Kmax):
            p = scipy.stats.norm(mus[i], sigmas[i]).pdf(x)
            gammas[i, :] = omegas[i]*p
        gammas = gammas / np.sum(gammas, axis=0) 

        # maximization
        omegas = np.sum(gammas, axis=1) / len(x)
        mus = np.sum(gammas * x, axis=1) / np.sum(gammas, axis=1)
        for i in range(Kmax):
            sigmas[i] = np.sqrt(np.sum(gammas[i] * (x - mus[i])**2) / np.sum(gammas[i]))

        # convergence check
        l = 0
        for i in range(Kmax):
            p = scipy.stats.norm(mus[i], sigmas[i]).pdf(x)
            l += omegas[i]*p

        l = sum(np.log(l))/len(l)
        if np.abs(l - l_prev) < 1e-4:
            break
        l_prev = l
    
    bic = 3*np.log(len(x)) - 2*l
    
    omegas_list.append(omegas)
    mus_list.append(mus)
    sigmas_list.append(sigmas)
    bic_list.append(bic)
    
print("omegas:", omegas_list)
print("mus:", mus_list)
print("sigmas:", sigmas_list)
print("bics:", bic_list)

sel_idx = np.argmin(bic_list)
omegas = omegas_list[sel_idx]
mus = mus_list[sel_idx]
sigmas = sigmas_list[sel_idx]

nn_omegas = omegas
nn_mus = mus
nn_sigmas = sigmas

print("selected omegas:", omegas)
print("selected mus:", mus)
print("selected sigmas:", sigmas)

In [None]:
x = ns_gap_list
Kmax_list = np.arange(1,11)
bic_list = []
omegas_list = []
mus_list = []
sigmas_list = []
max_iteration = 1000000

for Kmax in Kmax_list:
    print("processing K =", Kmax)
    
    # initialization
    omegas = [1/Kmax for _ in range(Kmax)]
    mus = [np.random.choice(x) for _ in range(Kmax)]
    sigmas = [np.std(x) for _ in range(Kmax)]

    l_prev = 0
    for i in range(Kmax):
        p = scipy.stats.norm(mus[i], sigmas[i]).pdf(x)
        l_prev += omegas[i]*p
    l_prev = sum(np.log(l_prev))/len(l_prev)

    for k in range(max_iteration):
        # expectation
        gammas = np.zeros((Kmax, len(x)))
        for i in range(Kmax):
            p = scipy.stats.norm(mus[i], sigmas[i]).pdf(x)
            gammas[i, :] = omegas[i]*p
        gammas = gammas / np.sum(gammas, axis=0) 

        # maximization
        omegas = np.sum(gammas, axis=1) / len(x)
        mus = np.sum(gammas * x, axis=1) / np.sum(gammas, axis=1)
        for i in range(Kmax):
            sigmas[i] = np.sqrt(np.sum(gammas[i] * (x - mus[i])**2) / np.sum(gammas[i]))

        # convergence check
        l = 0
        for i in range(Kmax):
            p = scipy.stats.norm(mus[i], sigmas[i]).pdf(x)
            l += omegas[i]*p

        l = sum(np.log(l))/len(l)
        if np.abs(l - l_prev) < 1e-4:
            break
        l_prev = l

    bic = 3*np.log(len(x)) - 2*l
    
    omegas_list.append(omegas)
    mus_list.append(mus)
    sigmas_list.append(sigmas)
    bic_list.append(bic)
    
print("omegas:", omegas_list)
print("mus:", mus_list)
print("sigmas:", sigmas_list)
print("bics:", bic_list)

sel_idx = np.argmin(bic_list)
omegas = omegas_list[sel_idx]
mus = mus_list[sel_idx]
sigmas = sigmas_list[sel_idx]

ns_omegas = omegas
ns_mus = mus
ns_sigmas = sigmas

print("selected omegas:", omegas)
print("selected mus:", mus)
print("selected sigmas:", sigmas)

In [None]:
from tqdm import tqdm

t_list = np.arange(0,1,0.0001)
p_fa_list = []
p_ms_list = []

for t in tqdm(t_list):
    p_fa = 0
    p_ms = 0

    for i in range(len(nn_omegas)):
        p_fa += nn_omegas[i] * (scipy.stats.norm(nn_mus[i], nn_sigmas[i]).cdf(1)-scipy.stats.norm(nn_mus[i], nn_sigmas[i]).cdf(t))

    for i in range(len(ns_omegas)):
        p_ms += ns_omegas[i] * scipy.stats.norm(ns_mus[i], ns_sigmas[i]).cdf(t)
        
    p_fa_list.append(p_fa)
    p_ms_list.append(p_ms)
    
p_error = np.array(p_fa_list) + np.array(p_ms_list)
sel_idx = np.argmin(p_error)
sel_t = t_list[sel_idx]

print("selected threshold:", sel_t)
print("error:", p_error[sel_idx])

In [None]:
plt.plot(t_list, p_fa_list, '.')
plt.plot(t_list, p_ms_list, '.')
plt.plot(t_list, p_error, '.')
plt.show()

In [None]:
snr_list = [-20,-15,-10,-5,0,5,10,15,20,25,30]
T_list = [100,200,500,1000,2000,5000,10000]
rmse_list1 = []
rmse_list2 = []
hausdorff_list1 = []
hausdorff_list2 = []
pdet_list1 = []
pdet_list2 = []
sn_acc_list1 = []
sn_acc_list2 = []

for snr in snr_list:
    # load data
    data = np.load(f'../../../data/experiment_3/scenario_1/data_test_snr{snr}_t1000.npy', allow_pickle=True)

    # get labels and data seperately
    truths = [s['label'] for s in data]
    sensor_pos = [s['sensor_pos'] for s in data]
    data = [s['cm'] for s in data]

    # apply music algorithm
    res = 1
    N_max = 4
    p_list = []
    h_list = []
    a_list = []
    n_list = []

    for i in range(len(data)):
        R = data[i]
        M = R.shape[0]
        N = truths[i].shape[0]
        truths[i] = truths[i].reshape(-1,)

        preds, spectrum = music(R,M,N_max,1000,sensor_pos[i],res,sel_t)
        preds = np.sort(preds)
        truths[i] = np.sort(truths[i])
        
        p_list.append(preds)

        hausdorff1 = directed_hausdorff(preds.reshape(-1,1), truths[i].reshape(-1,1))[0]
        hausdorff2 = directed_hausdorff(truths[i].reshape(-1,1), preds.reshape(-1,1))[0]
        h_list.append(max(hausdorff1, hausdorff2))
        
        # probability of detection
        temp = np.zeros(N)
        temp[:len(preds)] = 1
        a_list.extend(temp)
        
        # source number estimation accuracy
        n_list.append(N==len(preds))

        if snr==10 and i==0:
            np.save(f'../../../results/experiment_3/scenario_1/music_t-gane_spectrum_{truths[0]}deg.npy', spectrum)

    hausdorff = np.mean(h_list)
    pdet = np.sum(a_list) / len(a_list)
    sn_acc = np.sum(n_list) / len(n_list)
    hausdorff_list1.append(hausdorff)
    pdet_list1.append(pdet)
    sn_acc_list1.append(sn_acc)
    
    print(f"snr {snr}dB, T {1000}, test-hausdorff {hausdorff:.4f}, test-prob-detection {pdet:.4f}, test-sn-acc {sn_acc:.4f}")

    with open(f'../../../results/experiment_3/scenario_1/music_t-gane_preds_snr{snr}_t1000', "wb") as fp: 
        pickle.dump(p_list, fp)
    with open(f'../../../results/experiment_3/scenario_1/music_t-gane_truths_snr{snr}_t1000', "wb") as fp: 
        pickle.dump(truths, fp)
    
    
for T in T_list:
    # load data
    data = np.load(f'../../../data/experiment_3/scenario_1/data_test_snr-10_t{T}.npy', allow_pickle=True)

    # get labels and data seperately
    truths = [s['label'] for s in data]
    sensor_pos = [s['sensor_pos'] for s in data]
    data = [s['cm'] for s in data]

    # apply music algorithm
    res = 1
    N_max = 4
    p_list = []
    h_list = []
    a_list = []
    n_list = []

    for i in range(len(data)):
        R = data[i]
        M = R.shape[0]
        N = truths[i].shape[0]
        truths[i] = truths[i].reshape(-1,)

        preds, spectrum = music(R,M,N_max,T,sensor_pos[i],res,sel_t)
        preds = np.sort(preds)
        truths[i] = np.sort(truths[i])
        p_list.append(preds)

        hausdorff1 = directed_hausdorff(preds.reshape(-1,1), truths[i].reshape(-1,1))[0]
        hausdorff2 = directed_hausdorff(truths[i].reshape(-1,1), preds.reshape(-1,1))[0]
        h_list.append(max(hausdorff1, hausdorff2))
        
        # probability of detection
        temp = np.zeros(N)
        temp[:len(preds)] = 1
        a_list.extend(temp)
        
        # source number estimation accuracy
        n_list.append(N==len(preds))
    
    hausdorff = np.mean(h_list)
    pdet = np.sum(a_list) / len(a_list)
    sn_acc = np.sum(n_list) / len(n_list)
    hausdorff_list2.append(hausdorff)
    pdet_list2.append(pdet)
    sn_acc_list2.append(sn_acc)
    
    print(f"snr -10 dB, T {T}, test-hausdorff {hausdorff:.4f}, test-prob-detection {pdet:.4f}, test-sn-acc {sn_acc:.4f}")

    with open(f'../../../results/experiment_3/scenario_1/music_t-gane_preds_snr-10_t{T}', "wb") as fp: 
        pickle.dump(p_list, fp)
    with open(f'../../../results/experiment_3/scenario_1/music_t-gane_truths_snr-10_t{T}', "wb") as fp: 
        pickle.dump(truths, fp)
    
np.save('../../../results/experiment_3/scenario_1/music_t-gane_hausdorff1.npy', hausdorff_list1)
np.save('../../../results/experiment_3/scenario_1/music_t-gane_hausdorff2.npy', hausdorff_list2)
np.save('../../../results/experiment_3/scenario_1/music_t-gane_pdet1.npy', pdet_list1)
np.save('../../../results/experiment_3/scenario_1/music_t-gane_pdet2.npy', pdet_list2)
np.save('../../../results/experiment_3/scenario_1/music_t-gane_snacc1.npy', sn_acc_list1)
np.save('../../../results/experiment_3/scenario_1/music_t-gane_snacc2.npy', sn_acc_list2)

In [None]:
# plot rmse values
plt.figure()
plt.plot(snr_list, hausdorff_list1, '-o')
plt.title("rmse values for different snr levels")
plt.xlabel("snr (dB)")
plt.ylabel("hausdorff distance")
plt.legend(['music+t-gane (lee 2021)'])
plt.yscale("log")
plt.grid()

plt.figure()
plt.plot(T_list, hausdorff_list2, '-o')
plt.title("rmse values for different snapshot numbers")
plt.xlabel("T")
plt.ylabel("hausdorff distance")
plt.legend(['music+t-gane (lee 2021)'])
plt.yscale("log")
plt.grid()