In [205]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.linalg import svd
import uuid
import cvxpy as cp
import torch.optim as optim
import torch.nn as nn
from scipy.io import loadmat
import h5py

# Set random seed for reproducibility
np.random.seed(42)

In [206]:
# Step 1: Define System and Simulation Parameters
N = 64  # Number of BS antennas
K = 4   # Number of users
M = 4   # Number of RF chains
omega = 0.3  # Tradeoff weight
I_max = 120  # Maximum outer iterations
J = 10  # Can be 1, 10, or 20

SNR_dB = 12  # SNR in dB
sigma_n2 = 1  # Noise variance
P_BS = sigma_n2 * 10**(SNR_dB / 10)  # Transmit power
mu = 0.01  # Step size for analog precoder
lambda_ = 0.01  # Step size for digital precoder
L = 20  # Number of paths for channel
num_realizations = 100  # Number of channel realizations


# Dataset parameters
num_channels = 100
num_epochs = 100 if J == 100 else 30
snr_min, snr_max = 0, 12  # dB

In [207]:
import torch
def to_tensor(x, dtype=None, device=None):
    """Convert numpy or tensor input to a torch tensor on the right device."""
    if isinstance(x, torch.Tensor):
        return x.to(device=device or 'cpu', dtype=dtype)
    elif isinstance(x, np.ndarray):
        return torch.as_tensor(x, dtype=dtype, device=device)
    else:
        raise TypeError(f"Unsupported type: {type(x)}")

def to_numpy(x):
    """Convert tensor to numpy array (CPU)"""
    if isinstance(x, torch.Tensor):
        return x.detach().cpu().numpy()
    elif isinstance(x, np.ndarray):
        return x
    else:
        raise TypeError(f"Unsupported type: {type(x)}")


In [208]:
# Step 2: Define Sensing Parameters
P = 3  # Number of desired sensing angles
theta_d = np.array([-60, 0, 60]) * np.pi / 180  # Desired angles in radians
delta_theta = 5 * np.pi / 180  # Half beamwidth
theta_grid = np.linspace(-np.pi / 2, np.pi / 2, 181)  # Angular grid [-90, 90] degrees
B_d = np.zeros(len(theta_grid))  # Desired beampattern
for t, theta_t in enumerate(theta_grid):
    for theta_p in theta_d:
        if abs(theta_t - theta_p) <= delta_theta:
            B_d[t] = 1

# Wavenumber and antenna spacing
lambda_wave = 1  # Wavelength (normalized)
k = 2 * np.pi / lambda_wave
d = lambda_wave / 2  # Antenna spacing

In [209]:
import torch

# Step 3: Channel Matrix Generation (Saleh-Valenzuela Model)
def generate_channel(N, M, L):
    H = np.zeros((M, N), dtype=complex)
    for _ in range(L):
        alpha = np.random.normal(0, 1, 2).view(complex)[0] / np.sqrt(2)  # Complex gain
        phi_r = np.random.uniform(0, 2 * np.pi)  # Angle of arrival
        phi_t = np.random.uniform(0, 2 * np.pi)  # Angle of departure
        a_r = np.exp(1j * k * d * np.arange(M) * np.sin(phi_r)) / np.sqrt(M)
        a_t = np.exp(1j * k * d * np.arange(N) * np.sin(phi_t)) / np.sqrt(N)
        H += np.sqrt(N * M / L) * alpha * np.outer(a_r, a_t.conj())
    return H

# Steering vector function
def steering_vector(theta, N):
    return np.exp(1j * k * d * np.arange(N) * np.sin(theta)) / np.sqrt(N)


# Compute communication rate R
def compute_rate(H, A, D, sigma_n2):
    H_A = H @ A  # Effective channel
    R = 0
    for k in range(K):
        h_k = H_A[:, k]
        
        signal = torch.abs(h_k.conj().T @ D[:, k])**2
        interference = sum(torch.abs(h_k.conj().T @ D[:, j])**2 for j in range(K) if j != k)
        SINR = signal / (interference + sigma_n2)
        R += torch.log2(1 + SINR)
    return R

# Compute sensing error tau
def compute_tau(A, D, Psi, theta_grid):
    V = A @ D
    tau = 0
    for theta in theta_grid:
        a_theta = steering_vector(theta, N)
        # convert a_theta to torch tensor
        a_theta = torch.tensor(a_theta, dtype=torch.cfloat)
        tau += torch.abs(a_theta.conj().T @ V @ V.conj().T @ a_theta - a_theta.conj().T @ Psi @ a_theta)**2
    return tau / len(theta_grid)

def gradient_R_A(H, A, D, sigma_n2):
    xi = 1 / torch.log(torch.tensor(2.0, dtype=A.dtype, device=A.device))
    grad_A = torch.zeros_like(A, dtype=torch.cfloat)

    # Effective covariance of the digital precoder
    V = D @ D.conj().transpose(-2, -1)

    for k in range(K):
        # User-k effective channel outer product
        h_k = H[k, :].reshape(-1, 1)  # (M x 1)
        H_tilde_k = h_k @ h_k.conj().transpose(-2, -1)

        # D_bar_k = D with user k's column set to zero
        D_bar_k = D.clone()
        D_bar_k[:, k] = 0.0
        V_bar_k = D_bar_k @ D_bar_k.conj().transpose(-2, -1)

        # Denominator terms (trace parts)
        denom1 = torch.trace(A @ V @ A.conj().transpose(-2, -1) @ H_tilde_k) + sigma_n2
        denom2 = torch.trace(A @ V_bar_k @ A.conj().transpose(-2, -1) @ H_tilde_k) + sigma_n2

        # Gradient contribution
        term1 = H_tilde_k @ A @ V / denom1
        term2 = H_tilde_k @ A @ V_bar_k / denom2

        grad_A += xi * (term1 - term2)

    return grad_A

def gradient_R_D(H, A, D, sigma_n2):
    xi = 1 / torch.log(torch.tensor(2.0, dtype=A.dtype, device=A.device))
    grad_D = torch.zeros_like(D, dtype=torch.cfloat)

    for k in range(K):
        # Channel vector for user k
        h_k = H[k, :].reshape(-1, 1)  # (N x 1)
        H_tilde_k = h_k @ h_k.conj().transpose(-2, -1)

        # Effective digital-domain channel including analog precoder
        H_bar_k = A.conj().transpose(-2, -1) @ H_tilde_k @ A

        # D_bar_k = D with k-th column set to zero
        D_bar_k = D.clone()
        D_bar_k[:, k] = 0.0

        # Compute denominator terms (trace parts)
        denom1 = torch.trace(D @ D.conj().transpose(-2, -1) @ H_bar_k) + sigma_n2
        denom2 = torch.trace(D_bar_k @ D_bar_k.conj().transpose(-2, -1) @ H_bar_k) + sigma_n2

        # Compute gradient contributions
        term1 = (H_bar_k @ D) / denom1
        term2 = (H_bar_k @ D_bar_k) / denom2

        grad_D += xi * (term1 - term2)

    return grad_D

def gradient_tau_A(A, D, Psi):
    U = A @ D @ D.conj().transpose(-2, -1) @ A.conj().transpose(-2, -1)  # A D D^H A^H
    grad_A = 2 * (U - Psi) @ A @ D @ D.conj().transpose(-2, -1)
    return grad_A

def gradient_tau_D(A, D, Psi):
    U = A @ D @ D.conj().transpose(-2, -1) @ A.conj().transpose(-2, -1)  # A D D^H A^H
    grad_D = 2 * A.conj().transpose(-2, -1) @ (U - Psi) @ A @ D
    return grad_D



In [210]:

with h5py.File('Psi_all.mat', 'r') as f:
    Psi_all = np.array(f['Psi_all'], dtype=np.complex128)  # force numeric type
    SNR_dB = np.array(f['SNR_dB']).flatten()

    print("Raw Psi_all shape:", Psi_all.shape)

    # Optional: squeeze if necessary (depends on dimensions)
    if Psi_all.ndim == 4:
        Psi_all = np.squeeze(Psi_all, axis=2)
    
    print("Psi_all final shape:", Psi_all.shape)



def compute_psi(snr_db):
    idx = np.argmin(np.abs(SNR_dB - snr_db))
    Psi = Psi_all[idx, :, :]
    return Psi




Raw Psi_all shape: (121, 64, 64)
Psi_all final shape: (121, 64, 64)


In [211]:
def project_unit_modulus(A):
    return torch.exp(1j * torch.angle(A))

def project_power_constraint(A, D, P_BS):
    norm_factor = torch.norm(A @ D, 'fro')
    D = D * (torch.sqrt(P_BS) / norm_factor)
    return D

In [212]:
import torch

def proposed_initialization(H, theta_d, N, M, K, P_BS):
    G = np.array([H[k, :] for k in range(K)]).T  # N x K
    A0 = np.exp(1j * np.angle(G))
    X_ZF = np.linalg.pinv(H)
    D0 = np.linalg.pinv(A0) @ X_ZF
    D0 = np.sqrt(P_BS) * D0 / np.linalg.norm(A0 @ D0, 'fro')
    return A0, D0

In [213]:
import torch
import torch.nn as nn

class UPGANetLayer(nn.Module):
    def __init__(self, N, M, K, omega, J=10, eta=None):
        super(UPGANetLayer, self).__init__()
        self.J = J
        self.N, self.M, self.K = N, M, K
        self.omega = omega
        self.eta = eta if eta is not None else 1/N

        # Learnable step sizes (initialized to μ(0,0)=λ(0)=0.01)
        self.mu = nn.Parameter(torch.full((J,), 0.01, dtype=torch.float32))
        self.lambda_ = nn.Parameter(torch.tensor(0.01, dtype=torch.float32))

    def forward(self, H, A, D, Psi, sigma_n2, P_BS):
        # --- J inner updates for analog precoder ---
        for j in range(self.J):
            grad_RA = gradient_R_A(H, A, D, sigma_n2)
            grad_tauA = gradient_tau_A(A, D, Psi)
            A = A + self.mu[j] * (grad_RA - self.omega * grad_tauA)
            A = project_unit_modulus(A)

        # --- Digital precoder update ---
        grad_RD = gradient_R_D(H, A, D, sigma_n2)
        grad_tauD = gradient_tau_D(A, D, Psi)
        D = D + self.lambda_ * (grad_RD - self.omega * self.eta * grad_tauD)
        D = project_power_constraint(A, D, P_BS)

        return A, D

In [214]:
class UPGANet(nn.Module):
    def __init__(self, N, M, K, omega, I_max=120, J=10):
        super(UPGANet, self).__init__()
        self.layers = nn.ModuleList([
            UPGANetLayer(N, M, K, omega, J=J) for _ in range(I_max)
        ])
        self.I_max = I_max
        self.omega = omega

    def forward(self, H, A0, D0, Psi, sigma_n2, P_BS):
        A, D = A0, D0
        for i in range(self.I_max):
            A, D = self.layers[i](H, A, D, Psi, sigma_n2, P_BS)
        return A, D

In [215]:
def upganet_loss(H, A, D, Psi, sigma_n2, omega, theta_grid):
    R = compute_rate(H, A, D, sigma_n2)
    tau = compute_tau(A, D, Psi, theta_grid)
    return -(R - omega * tau)

In [216]:
J_values = [1, 10, 20]
for J in J_values:
    print(f"=== Training with J = {J} ===")

    # Instantiate model
    model = UPGANet(N, M, K, omega, I_max=I_max, J=J)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)

    for epoch in range(num_epochs):
        total_loss = 0.0
        for _ in range(num_channels):
            # 1. Generate random channel and Psi
            H = generate_channel(N, M, L=3)
            snr_values = np.arange(snr_min, snr_max, 0.1)
            snr_db = np.random.choice(snr_values)
            # P_BS = 10 ** (snr_db / 10)
            
            # FIX: unpack Psi and alpha_opt
            Psi = compute_psi(snr_db)

            A0, D0 = proposed_initialization(H, theta_d, N, M, K, P_BS)

            # Convert to tensors
            H_t = to_tensor(H, dtype=torch.cfloat)
            Psi_t = to_tensor(Psi, dtype=torch.cfloat)
            A0_t = to_tensor(A0, dtype=torch.cfloat)
            D0_t = to_tensor(D0, dtype=torch.cfloat)

            # sigma_n2 = to_tensor(sigma_n2, dtype=torch.float32)
            # P_BS = to_tensor(P_BS, dtype=torch.float32)
            P_BS_t = torch.tensor(P_BS, dtype=torch.cfloat) 

            # Forward pass
            A_final, D_final = model(H_t, A0_t, D0_t, Psi_t, sigma_n2, P_BS_t)

            # Compute loss (negative objective)
            loss = upganet_loss(H_t, A_final, D_final, Psi_t, sigma_n2, omega, theta_grid)

            # Backpropagation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            print(f"Channlel number {_+1}/{num_channels} processed.")

        print(f"[Epoch {epoch+1}/{num_epochs}] Loss: {total_loss/num_channels:.6f}")


=== Training with J = 1 ===
Channlel number 1/100 processed.
Channlel number 2/100 processed.
Channlel number 3/100 processed.
Channlel number 4/100 processed.
Channlel number 5/100 processed.
Channlel number 6/100 processed.
Channlel number 7/100 processed.
Channlel number 8/100 processed.
Channlel number 9/100 processed.
Channlel number 10/100 processed.
Channlel number 11/100 processed.
Channlel number 12/100 processed.


KeyboardInterrupt: 