In [1]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

In [2]:
# Load saved data
from google.colab import drive
drive.mount('/content/drive')
data = np.load('/content/drive/My Drive/Github/mtc-device-activation/data/activity-models/e2-general-model.npz', allow_pickle=True)
all_DeviceLocations = data['all_DeviceLocations']
num_samples = data['num_samples']
device_count = data['device_count']
sparsity = data['sparsity']

data = np.load("/content/drive/My Drive/Github/mtc-device-activation/data/communication-models/e2-communication-simulation-theta.npz", allow_pickle=True)
all_theta = data["all_theta"]

data = np.load("/content/drive/My Drive/Github/mtc-device-activation/data/communication-models/e2-communication-simulation-gamma.npz", allow_pickle=True)
all_gamma = data["all_gamma"]

data = np.load("/content/drive/My Drive/Github/mtc-device-activation/data/communication-models/e2-communication-simulation-z.npz", allow_pickle=True)
all_z = data["all_z"]

data = np.load("/content/drive/My Drive/Github/mtc-device-activation/data/communication-models/e2-communication-simulation-noise.npz", allow_pickle=True)
all_noise = data["all_noise"]

data = np.load("/content/drive/My Drive/Github/mtc-device-activation/data/communication-models/e2-communication-simulation-received-signal.npz", allow_pickle=True)
all_received_signal = data["all_received_signal"]

data = np.load("/content/drive/My Drive/Github/mtc-device-activation/data/communication-models/e2-communication-simulation-general.npz", allow_pickle=True)
snr_db_array = data["snr_db_array"]
pilot_length_array = data["pilot_length_array"]
M = data["M"]

snr_db_index = 2
snr_db=snr_db_array[snr_db_index]

pilot_length_index = 2
pilot_length=pilot_length_array[pilot_length_index]

max_iter = 500

Mounted at /content/drive


In [3]:
num_samples = 1000
sample_range = range(0, 1000)

Compressive Sampling Matching Pursuit (CoSaMP)

In [4]:
def cosamp_convergence(z, gamma, theta, y, sparsity_level, max_iter=max_iter, stopping_criterion=1e-4, lambda_reg=1e-2):
    L, N = theta.shape
    _, M = y.shape

    nmse_list = []

    x_hat = np.zeros((N, M), dtype=np.complex128)
    residual = y.copy()
    support = set()

    for t in range(max_iter):
        # Step 1: Compute proxy
        proxy = theta.conj().T @ residual
        row_energy = np.sum(np.abs(proxy)**2, axis=1)
        omega = np.argpartition(row_energy, -2 * sparsity_level)[-2 * sparsity_level:]

        # Step 2: Merge with previous support
        merged_support = list(set(omega).union(support))
        theta_s = theta[:, merged_support]

        # Step 3: Regularized LS on merged support
        A = theta_s
        AHA = A.conj().T @ A + lambda_reg * np.eye(A.shape[1])
        AHY = A.conj().T @ y
        x_temp = np.linalg.solve(AHA, AHY)

        # Step 4: Prune to top-K rows
        row_energy_temp = np.sum(np.abs(x_temp)**2, axis=1)
        top_indices = np.argpartition(row_energy_temp, -sparsity_level)[-sparsity_level:]
        support = set([merged_support[i] for i in top_indices])

        # Step 5: Final LS on pruned support
        theta_final = theta[:, list(support)]
        A = theta_final
        AHA = A.conj().T @ A + lambda_reg * np.eye(A.shape[1])
        AHY = A.conj().T @ y
        x_support = np.linalg.solve(AHA, AHY)

        # Step 6: Construct full x_hat
        x_hat.fill(0)
        for i, idx in enumerate(support):
            x_hat[idx, :] = x_support[i, :]

        # Step 7: Update residual
        residual = y - theta @ x_hat

        # Step 8: Compute NMSE
        nmse = np.linalg.norm(z[gamma == 1] - x_hat[gamma == 1])**2 / np.linalg.norm(z[gamma == 1])**2
        nmse_list.append(nmse)

        # Step 9: Check stopping
        if np.linalg.norm(residual) < stopping_criterion:
            print(f"CoSaMP converged at iteration {t+1}")
            break

    return nmse_list

In [5]:
# Placeholder arrays for results
nmse_iter_res = np.zeros((num_samples, max_iter))

In [6]:
# Loop through each sample with an outer progress bar
for sample_index in tqdm(sample_range, desc="Processing Samples", position=0):
    gamma = all_gamma[sample_index]  # Ground truth gamma values (true support)
    loc = all_DeviceLocations[sample_index]
    theta = all_theta[sample_index, pilot_length_index, :pilot_length]
    received_signal = all_received_signal[sample_index, pilot_length_index, snr_db_index, :pilot_length]
    z = all_z[sample_index]
    snr = 10 ** (snr_db / 10)
    signal_power = np.mean(np.abs(np.matmul(theta, z))**2)
    noise_power = signal_power / snr

    # Apply SBL algorithm for the current sample and Pilot Length
    nmse_per_iter = cosamp_convergence(z, gamma, theta, received_signal, sparsity)

    # Save NMSEs (pad if early stopping)
    nmse_iter_res[sample_index, :len(nmse_per_iter)] = nmse_per_iter

Processing Samples: 100%|██████████| 1000/1000 [08:56<00:00,  1.86it/s]


In [7]:
np.savez_compressed(
    '/content/drive/My Drive/Github/mtc-device-activation/data/results/e2-results-1000-convergence-cosamp-snr-db-12-pilot-length-30.npz',
    nmse_iter_res=nmse_iter_res,
    pilot_length=pilot_length,
    snr_db=snr_db
)

print("All results have been saved")

All results have been saved
