In [1]:
import numpy as np
import cvxpy as cp
import scipy.sparse as sp
from sklearn.cluster import KMeans

In [2]:
# =========================
# 0) 基本原子与信道生成
# =========================
def ula_steering(Nr, theta, d_over_lambda=0.5):
    m = np.arange(Nr)
    return np.exp(1j*2*np.pi*d_over_lambda * m * np.sin(theta))

def delay_atom(K, tau, delta_f):
    k = np.arange(K)
    return np.exp(-1j*2*np.pi * tau * delta_f * k)

def doppler_atom(S, nu, T_sym):
    s = np.arange(S)
    return np.exp(1j*2*np.pi * nu * T_sym * s)

def synth_channel_3d(
    Nr=16, Nt=1, K=12, S=14, L=4, N_RB=128,
    carrier_Hz=10e9, scs_Hz=120e3, cp_ratio=0.0,
    d_over_lambda=0.5, snr_dB=20.0, pilots_per_RB=6, seed=1
):
    rng = np.random.default_rng(seed)
    Delta_f = scs_Hz
    T_sym = 1.0/Delta_f * (1.0 + cp_ratio)

    # 大尺度参数（跨 RB 固定）
    thetas = rng.uniform(-0.6, 0.6, size=L)          # AOA [rad]
    taus   = rng.uniform(0.2e-6, 3.0e-6, size=L)     # delay [s]
    nus    = rng.uniform(-300.0, 300.0, size=L)      # Doppler [Hz]
    mags   = np.sort(rng.uniform(0.5, 1.0, size=L))[::-1]  # |g_l|

    A_r = np.stack([ula_steering(Nr, th, d_over_lambda) for th in thetas], axis=1)  # (Nr,L)
    A_k = np.stack([delay_atom(K, tau, Delta_f)        for tau in taus],   axis=1)  # (K,L)
    A_s = np.stack([doppler_atom(S, nu, T_sym)         for nu  in nus],    axis=1)  # (S,L)

    # 小尺度相位：每 RB 独立
    phases = rng.uniform(-np.pi, np.pi, size=(N_RB, L))
    G = (mags[None, :] * np.exp(1j*phases))  # (N_RB, L)

    H = np.zeros((Nr, 1, K, S, N_RB), dtype=complex)
    for r in range(N_RB):
        Hr = np.zeros((Nr, K, S), dtype=complex)
        for l in range(L):
            Hr += G[r, l] * np.einsum('m,k,s->mks',
                                      A_r[:, l],
                                      np.conj(A_k[:, l]),
                                      A_s[:, l])
        H[:, 0, :, :, r] = Hr

    # 导频掩码：每 RB 选 pilots_per_RB 个 (k,s)
    masks = []
    for r in range(N_RB):
        mask = np.zeros((K, S), dtype=bool)
        idxs = np.unravel_index(
            np.random.choice(K*S, size=pilots_per_RB, replace=False),
            (K, S)
        )
        mask[idxs] = True
        masks.append(mask)

    # 加噪仅在观测 RE
    avg_pow = np.mean(np.abs(H)**2)
    snr_lin = 10.0**(snr_dB/10.0)
    noise_var = avg_pow / snr_lin
    rngn = np.random.default_rng(seed+33)
    noise = np.sqrt(noise_var/2) * (rngn.standard_normal(H.shape) + 1j*rngn.standard_normal(H.shape))

    Y = np.zeros_like(H)
    for r in range(N_RB):
        M = masks[r]
        Y[:, 0, M, r] = H[:, 0, M, r] + noise[:, 0, M, r]

    gt = dict(thetas=thetas, taus=taus, nus=nus, magnitudes=mags,
              scs_Hz=Delta_f, T_sym=T_sym)
    return H, Y, masks, gt

In [3]:
# =========================
# 1) 3D-DANM（多 RB 联合）
# =========================
def hermitian_toeplitz_from_col(col):
    N = col.shape[0]
    T = cp.Variable((N, N), hermitian=True)
    cons = []
    for i in range(N):
        for j in range(N):
            if i >= j:
                cons += [ T[i, j] == col[i-j] ]
            else:
                cons += [ T[i, j] == cp.conj(col[j-i]) ]
    cons += [ cp.imag(col[0]) == 0 ]
    return T, cons

def build_permutation_mats(Nr, K, S):
    entries_r, entries_tau, entries_nu = [], [], []
    for s in range(S):
        for k in range(K):
            for m in range(Nr):
                idx_r   = m + Nr * (k + s*K)          # vec(X_r)
                idx_tau = k + K  * (m + s*Nr)         # vec(X_tau)
                idx_nu  = s + S  * (m + k*Nr)         # vec(X_nu)
                entries_r.append(idx_r)
                entries_tau.append(idx_tau)
                entries_nu.append(idx_nu)
    Ntot = Nr*K*S
    P_tau = sp.coo_matrix(
        (np.ones(Ntot), (np.array(entries_tau), np.array(entries_r))),
        shape=(Ntot, Ntot)
    ).tocsr()
    P_nu = sp.coo_matrix(
        (np.ones(Ntot), (np.array(entries_nu), np.array(entries_r))),
        shape=(Ntot, Ntot)
    ).tocsr()
    return P_tau, P_nu

def danm_3d_multi_rb(Y_list, mask_list, Nr, K, S, eps=0.0, solver="SCS", verbose=False):
    Q = len(Y_list)
    # 共享 Toeplitz
    ur = cp.Variable(Nr, complex=True)
    ut = cp.Variable(K,  complex=True)
    un = cp.Variable(S,  complex=True)
    Tr, cons_Tr = hermitian_toeplitz_from_col(ur)
    Tt, cons_Tt = hermitian_toeplitz_from_col(ut)
    Tn, cons_Tn = hermitian_toeplitz_from_col(un)

    P_tau, P_nu = build_permutation_mats(Nr, K, S)
    cons = []
    cons += cons_Tr + cons_Tt + cons_Tn
    obj = cp.trace(Tr)/(2*Nr) + cp.trace(Tt)/(2*K) + cp.trace(Tn)/(2*S)

    for q in range(Q):
        Xr_q = cp.Variable((Nr, K*S), complex=True)
        Zr_q   = cp.Variable((K*S, K*S), hermitian=True)
        Ztau_q = cp.Variable((Nr*S, Nr*S), hermitian=True)
        Znu_q  = cp.Variable((Nr*K, Nr*K), hermitian=True)

        vec_Xr   = cp.vec(Xr_q)
        vec_Xtau = P_tau @ vec_Xr
        vec_Xnu  = P_nu  @ vec_Xr
        Xtau_q = cp.reshape(vec_Xtau, (K,  Nr*S))
        Xnu_q  = cp.reshape(vec_Xnu,  (S,  Nr*K))

        cons += [ cp.bmat([[Tr,  Xr_q   ], [Xr_q.H,   Zr_q  ]]) >> 0 ]
        cons += [ cp.bmat([[Tt,  Xtau_q ], [Xtau_q.H, Ztau_q]]) >> 0 ]
        cons += [ cp.bmat([[Tn,  Xnu_q  ], [Xnu_q.H,  Znu_q ]]) >> 0 ]

        # 数据一致性
        Y_rb = Y_list[q]; mask_rb = mask_list[q]
        diffs = []
        ks = np.argwhere(mask_rb)
        for (k, s) in ks:
            col = k + s*K
            diffs.append(Xr_q[:, col] - Y_rb[:, k, s])
        if len(diffs) > 0:
            cons += [ cp.norm(cp.hstack(diffs), 'fro') <= eps ]

    prob = cp.Problem(cp.Minimize(obj), cons)
    prob.solve(solver=solver, verbose=verbose, eps=1e-5)
    return Tr.value, Tt.value, Tn.value

# =========================
# 2) ESPRIT 与单位转换
# =========================
def esprit_1d(T, n_paths):
    vals, vecs = np.linalg.eigh(T)
    Es = vecs[:, -n_paths:]
    E1, E2 = Es[:-1, :], Es[1:, :]
    Phi = E2 @ np.linalg.pinv(E1)
    lam, _ = np.linalg.eig(Phi)
    mu = np.angle(lam)/(2*np.pi)
    mu = (mu + 0.5) % 1.0 - 0.5
    return np.sort(mu.real)

def mu_to_aoa(mu_r, d_over_lambda=0.5):
    return np.arcsin(np.clip(mu_r / d_over_lambda, -1.0, 1.0))

def mu_to_delay(mu_tau, delta_f):
    return -mu_tau / delta_f

def mu_to_doppler(mu_nu, T_sym):
    return mu_nu / T_sym

# =========================
# 3) 每 RB 小尺度增益 g_{r,l} 的最小二乘闭式
# =========================
def estimate_small_scale_g_per_RB(Y_rb, mask_rb, thetas_hat, taus_hat, nus_hat,
                                  d_over_lambda, delta_f, T_sym):
    """
    输入：Y_rb (Nr,K,S), mask_rb (K,S)；已估计的三维大尺度参数
    输出：g_r (L, ) 的复数估计（闭式最小二乘）
    方法：
      对每个导频 (k,s) 堆叠 Nr 个接收通道：Y[:,k,s] = A_r diag(g_r) v_{k,s}
      其中 v_{k,s}[l] = a_k^*(k,l) a_s(s,l)
      令 Y_stack = [Y(:,k1,s1), ..., Y(:,kP,sP)]，V = [v_{k_i,s_i}] (L×P)
      Z = A_r^† Y_stack = diag(g_r) V  ==>  对每个 l：Z_l = g_r,l * V_l
      => g_r,l = <V_l, Z_l> / ||V_l||^2
    """
    Nr, K, S = Y_rb.shape
    L = len(thetas_hat)
    A_r = np.stack([ula_steering(Nr, th, d_over_lambda) for th in thetas_hat], axis=1)  # (Nr,L)
    # 构造观测列
    idxs = np.argwhere(mask_rb)
    P = idxs.shape[0]
    if P == 0:
        return np.zeros(L, dtype=complex)
    Y_stack = np.zeros((Nr, P), dtype=complex)
    V = np.zeros((L, P), dtype=complex)
    for p, (k, s) in enumerate(idxs):
        ak = np.exp(+1j*2*np.pi * taus_hat * delta_f * k)   # conj of delay atom -> +j
        aslow = np.exp(1j*2*np.pi * nus_hat * T_sym * s)
        V[:, p] = ak * aslow                              # elementwise per l
        Y_stack[:, p] = Y_rb[:, k, s]

    # 伪逆投影
    Ar_pinv = np.linalg.pinv(A_r)
    Z = Ar_pinv @ Y_stack   # (L,P) = diag(g_r) * V  (噪声下近似)

    # 行向量最小二乘解
    g_r = np.zeros(L, dtype=complex)
    for l in range(L):
        num = np.vdot(V[l, :], Z[l, :])     # conj(V)·Z
        den = np.vdot(V[l, :], V[l, :]) + 1e-12
        g_r[l] = num / den
    return g_r

In [4]:
Nr, Nt, K, S, L, N_RB = 8, 1, 6, 6, 4, 12
PILOTS_PER_RB = 6
RB_JOIN = 2           # 一次联合多少个 RB 做 3D-DANM（建议 8~32）
SNR_dB = 20.0
SOLVER = "SCS"          # 可改 "MOSEK"
carrier_Hz=10e9
# ---- 生成数据 ---- 
H, Y, masks, gt = synth_channel_3d(
    Nr=Nr, Nt=Nt, K=K, S=S, L=L, N_RB=N_RB,
    carrier_Hz=carrier_Hz, d_over_lambda=0.5, snr_dB=SNR_dB,
    pilots_per_RB=PILOTS_PER_RB, seed=7
)
Delta_f = gt['scs_Hz']; T_sym = gt['T_sym']

In [None]:
# ---- 选取若干 RB 进行联合 3D-DANM ----
rng = np.random.default_rng(0)
subset = rng.choice(N_RB, size=RB_JOIN, replace=False)
Y_list = [Y[:, 0, :, :, r] for r in subset]  # (Nr,K,S)
mask_list = [masks[r] for r in subset]

# 噪声容差（可按 SNR 设定；演示先用 0）
Tr, Tt, Tn = danm_3d_multi_rb(Y_list, mask_list, Nr=Nr, K=K, S=S,
                                eps=0.0, solver=SOLVER, verbose=False)

# ---- ESPRIT 提取 large-scale 三维参数 ----
mu_r   = esprit_1d(Tr, n_paths=L)
mu_tau = esprit_1d(Tt, n_paths=L)
mu_nu  = esprit_1d(Tn, n_paths=L)

theta_est = np.sort(mu_to_aoa(mu_r, d_over_lambda=0.5))
tau_est   = np.sort(mu_to_delay(mu_tau, delta_f=Delta_f))
nu_est    = np.sort(mu_to_doppler(mu_nu, T_sym=T_sym))

# 与真值对齐（按升序简单配对）
theta_gt = np.sort(gt['thetas'])
tau_gt   = np.sort(gt['taus'])
nu_gt    = np.sort(gt['nus'])

print("\n=== Large-scale parameters ===")
print("AOA (rad)  gt vs est:\n", np.vstack([theta_gt, theta_est]).T)
print("Delay (s)  gt vs est:\n", np.vstack([tau_gt,   tau_est]).T)
print("Dopp (Hz)  gt vs est:\n", np.vstack([nu_gt,    nu_est]).T)
print("AOA err(rad):", theta_est - theta_gt)
print("Tau err(s)  :", tau_est   - tau_gt)
print("Nu  err(Hz) :", nu_est    - nu_gt)

# ---- 基于 large-scale 参数，逐 RB 估小尺度 g_{r,l} ----
g_est_all = np.zeros((N_RB, L), dtype=complex)
nmse_rb = np.zeros(N_RB)
for r in range(N_RB):
    Y_rb = Y[:, 0, :, :, r]      # (Nr,K,S)
    mask_rb = masks[r]           # (K,S)
    g_hat = estimate_small_scale_g_per_RB(
        Y_rb, mask_rb,
        theta_est, tau_est, nu_est,
        d_over_lambda=0.5, delta_f=Delta_f, T_sym=T_sym
    )
    g_est_all[r, :] = g_hat

    # 用估计参数重构导频位置的 H 并算 NMSE（仅在观测 RE 上）
    # 组装 A_r, A_k, A_s（用估计参数）
    A_rh = np.stack([ula_steering(Nr, th, 0.5) for th in theta_est], axis=1)  # (Nr,L)
    A_kh = np.stack([delay_atom(K, tau, Delta_f) for tau in tau_est], axis=1) # (K,L)
    A_sh = np.stack([doppler_atom(S, nu, T_sym) for nu in nu_est], axis=1)    # (S,L)
    # 观测位置的重构
    Yhat = np.zeros_like(Y_rb)
    ks = np.argwhere(mask_rb)
    for (k, s) in ks:
        v = (np.conj(A_kh[k, :]) * A_sh[s, :])  # (L,)
        Yhat[:, k, s] = A_rh @ (g_hat * v)

    num = np.sum(np.abs(Y_rb[ :, mask_rb ] - Yhat[ :, mask_rb ])**2)
    den = np.sum(np.abs(Y_rb[ :, mask_rb ])**2) + 1e-12
    nmse_rb[r] = (num/den).real

print("\n=== Per-RB small-scale gain estimation ===")
print("NMSE median over RBs: {:.3f} dB".format(10*np.log10(np.median(nmse_rb))))
print("NMSE 25/75 pct (dB): {:.3f} / {:.3f}".format(
    10*np.log10(np.percentile(nmse_rb, 25)),
    10*np.log10(np.percentile(nmse_rb, 75))
))

# 也可输出部分 RB 的 g_{r,l} 与真值幅度对比（真值未知相位）
# 注意：真值小尺度 g_{r,l} 的相位是随机，本方法估到的是复数（幅相皆可变）

    You didn't specify the order of the vec expression. The default order
    used in CVXPY is Fortran ('F') order. This default will change to match NumPy's
    default order ('C') in a future version of CVXPY.
    
    You didn't specify the order of the reshape expression. The default order
    used in CVXPY is Fortran ('F') order. This default will change to match NumPy's
    default order ('C') in a future version of CVXPY.
    
