In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import torch 
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split

from utils import *
from models import *
from tqdm import tqdm

import matplotlib.pyplot as plt
plt.rcParams['figure.dpi'] = 500

dev = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
def generate_two_direction(gap: float):

    theta1 = torch.rand(1) * torch.pi - torch.pi/2
    theta2 = torch.fmod(theta1 + gap + torch.pi/2, torch.pi) - torch.pi/2
    theta = torch.cat((theta1, theta2), dim=0)
    return theta 



def generate_qpsk_symbols(nbSources: int, nbSymbols: int):
    
    bits = torch.randint(0, 2, (nbSources, 2 * nbSymbols))
    real_part = 2 * bits[:, 0::2] - 1  
    imag_part = 2 * bits[:, 1::2] - 1  
    qpsk_symbols = (real_part + 1j * imag_part) / sqrt(2)
    
    return qpsk_symbols



def generate_signal_two_sources(t: int, snr: float, array, qpsk: bool, coherent: bool, gap: float):
    """
    Generates an array signal with specified characteristics.

    Parameters:
    T (int): Length of the generated signals (number of time samples).
    D (int): Number of signal sources.
    SNR (float): Signal-to-noise ratio (in dB).
    array (ArrayModel): Predefined array model used to calculate steering vectors.
    coherent (bool): If True, the generated signals will be coherent. Otherwise, they will be independent (default is False).

    Returns:
    Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: 
        - Signal matrix of size (M, T), where M is the number of array elements.
        - Vector of angles of size (D), representing the directions of arrival (DOAs) for each source.
        - perturbation of sensors in x directions
        - perturbation of sensors in y directions
    """

    sigma_noise = 0.1

    theta = generate_two_direction(gap)

    A = array.get_steering_vector(theta)
    
    if not coherent:
        if qpsk:
            x = generate_qpsk_symbols(2, t) * 10 ** (snr / 20) * sigma_noise
        else:
            x = (torch.randn(2, t) + 1j * torch.randn(2, t)) / sqrt(2) * 10 ** (snr / 20) * sigma_noise
    else:
        if qpsk:
            x = generate_qpsk_symbols(1, t) * 10 ** (snr / 20) * sigma_noise
        else:
            x = (torch.randn(1, t) + 1j * torch.randn(1, t)) / sqrt(2) * 10 ** (snr / 20) * sigma_noise
        x = torch.Tensor.repeat(x, (2, 1))

    n = (torch.randn(array.m, t) + 1j * torch.randn(array.m, t)) / sqrt(2) * sigma_noise

    return A @ x + n, theta



def generate_data_two_sources(n: int, t: int, snr_min: float, snr_max: float, array, qpsk: bool, coherent: bool, gap: float):
    """
    Generates a dataset of array signals and corresponding incident angles.

    Parameters:
    N (int): Number of signal samples to generate.
    T (int): Length of each signal (number of time samples).
    D (int): Number of sources (directions of arrival).
    SNR (float): Signal-to-noise ratio (in dB).
    array (ArrayModel): Predefined array model used to calculate steering vectors.
    coherent (bool): If True, the generated signals will be coherent across sources (default is False).

    Returns:
    Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
        - `observations`: Tensor of size (N, T, M) representing the array signals, where M is the number of array elements.
        - `angles`: Tensor of size (N, D) containing the incident angles (DOAs) for each sample.
    """
    
    observations = []
    angles = []
    
    for _ in range(n):
        snr = random() * (snr_max - snr_min) + snr_min
        y, theta = generate_signal_two_sources(t, snr, array, qpsk, coherent, gap)
        observations.append(y.T) 
        angles.append(theta) 
    
    observations = torch.stack(observations, dim=0)
    angles = torch.stack(angles, dim=0)
    
    return observations, angles

In [4]:
m = 8
t = 200
n = 100000
lamda = 0.2
distance = 0.1
gap = 0.1
qpsk = False
coherent = False

array = ULA(m, lamda)
array.build_array(distance)
array.build_array_manifold()

if qpsk:
    path = 'saved_models_qpsk/'
else:
    path = 'saved_models/'

loss_func_cpu = RMSPE(2, 'cpu')

In [5]:
SNRs = [0, 5, 10, 15, 20]
methods = ["MUSIC", "Root-MUSIC", "DA-MUSIC", "DA-MUSIC v2", "RNN"]
mean_rmspe = {
    "MUSIC": [],
    "Root-MUSIC": [],
    "DA-MUSIC": [],
    "DA-MUSIC v2": [],
    "RNN": []
}
all_rmspe = {
    "MUSIC": [],
    "Root-MUSIC": [],
    "DA-MUSIC": [],
    "DA-MUSIC v2": [],
    "RNN": []
}

for snr in SNRs:

    observations, angles = generate_data_two_sources(n, t, snr, snr, array, qpsk, coherent, gap)

    results_music = []
    for i in range(observations.shape[0]):
        theta_est, _, _ = MUSIC(observations[i].T, 2, array)
        results_music.append(theta_est)
    results_music = torch.stack(results_music, dim=0)
    rmspe_music = loss_func_cpu.calculate(results_music, angles, average=False)
    all_rmspe['MUSIC'].append(rmspe_music)
    mean_rmspe['MUSIC'].append(torch.mean(rmspe_music))

    rmspe_root_music = loss_func_cpu.calculate(Root_MUSIC(observations, 2, array), angles, average=False)
    all_rmspe['Root-MUSIC'].append(rmspe_root_music)
    mean_rmspe['Root-MUSIC'].append(torch.mean(rmspe_root_music))

    test_set = DATASET(observations, angles)
    test_loader = DataLoader(test_set, batch_size=512, shuffle=False)

    da_music = DA_MUSIC(m, 2, array, dev)
    mean_rmspe_da_music, all_rmspe_da_music = test(da_music, test_loader, path+'da_music_'+str(snr)+'dB.pth', dev)
    mean_rmspe['DA-MUSIC'].append(mean_rmspe_da_music.cpu())
    all_rmspe['DA-MUSIC'].append(all_rmspe_da_music.cpu())

    da_music_v2 = DA_MUSIC_v2(m, 2, array, dev)
    mean_rmspe_da_music_v2, all_rmspe_da_music_v2 = test(da_music_v2, test_loader, path+'da_music_v2_'+str(snr)+'dB.pth', dev)
    mean_rmspe['DA-MUSIC v2'].append(mean_rmspe_da_music_v2.cpu())
    all_rmspe['DA-MUSIC v2'].append(all_rmspe_da_music_v2.cpu())

    rnn = RNN(m, 2, dev)
    mean_rmspe_rnn, all_rmspe_rnn = test(rnn, test_loader, path+'rnn_'+str(snr)+'dB.pth', dev)
    mean_rmspe['RNN'].append(mean_rmspe_rnn.cpu())
    all_rmspe['RNN'].append(all_rmspe_rnn.cpu())

In [None]:
plt.figure(figsize=(10, 6))
for method in methods:
    plt.plot(SNRs, mean_rmspe[method], marker='o', label=method)

plt.yscale("log")
plt.title("Performance Comparison Across SNR Levels", fontsize=14)
plt.xlabel("SNR (dB)", fontsize=12)
plt.ylabel("RMSPE (rad)", fontsize=12)
plt.xticks(SNRs)
plt.grid(True, which="both", linestyle="--", linewidth=0.5)
plt.legend(title="Methods")
plt.tight_layout()
plt.show()

In [None]:
fig, axes = plt.subplots(5, 5, figsize=(20, 20)) 

for i, snr in enumerate(SNRs):
    for j, name in enumerate(all_rmspe):
        axes[j, i].hist(all_rmspe[name][i], bins=100, color='blue', edgecolor='black', alpha=0.7, density=True, range=(0, torch.pi/2))
        axes[j, i].set_xlabel('RMSPE (rad)')
        axes[j, i].set_ylabel('Density')
        axes[j, i].set_title(name + ' at ' + str(snr) + 'dB')

fig.suptitle(str(n) + ' example, 2 sources, angle gap ' + str(gap))
plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.show()

In [None]:
fig, axes = plt.subplots(5, 5, figsize=(20, 20)) 

for i, snr in enumerate(SNRs):
    for j, name in enumerate(all_rmspe):
        axes[j, i].hist(all_rmspe[name][i], bins=100, color='blue', edgecolor='black', alpha=0.7, density=True, range=(0, torch.pi/2), log=True)
        axes[j, i].set_xlabel('RMSPE (rad)')
        axes[j, i].set_ylabel('Density')
        axes[j, i].set_title(name + ' at ' + str(snr) + 'dB')


fig.suptitle(str(n) + ' example, 2 sources, angle gap ' + str(gap))
plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.show()