In [None]:
import numpy as np
import tensorflow.keras as keras
import tensorflow as tf
import matplotlib.pyplot as plt
import random

# Equations

Iterative Soft Thresholding can be defined by the following update equation:

\begin{equation}
    \mathbf{x^{l+1}} = \eta((\mathbf{I} - \frac{1}{L}\mathbf{A^T}\mathbf{A})\mathbf{x^l} + \frac{1}{L}\mathbf{A^T}\mathbf{y})
\end{equation}

The Eta function: \begin{equation}    \eta(u;T) = 
    \left\{
    \begin{array}{lr}
        u - T &\text{if } u \ge T \\
        u + T &\text{if } u \le -T \\
        0 &\text{else}&
    \end{array}
    \right. \end{equation}

For Approximate Message Passing The residual error is calculated by:

\begin{equation}
    \mathbf{z}^t = \mathbf{y}-\mathbf{Ax}^t + \frac{1}{2\delta}z^{t-1}(\langle\frac{\partial\eta^{R}}{\partial x_R}(x^{t-1}+A^Hz^{t-1};\lambda^{t-1})\rangle+\langle\frac{\partial\eta^I}{\partial x_I}(x^{t-1}+A^Hz^{t-1};\lambda^{t-1})\rangle)
\end{equation}



The denoising/thresholding function differs from the one used in ISTA and is shown below.

\begin{equation}
    \eta(u+iv;\lambda) = (u+iv-\frac{\lambda(u+iv)}{\sqrt{u^2+v^2}})\mathbb{I}_{\{u^2+v^2\geq \lambda\}}
\end{equation}

and

\begin{equation}
    \langle\eta'(\alpha;\lambda)\rangle = \frac{\sum_{i=1}^N \eta'(\alpha_i;\lambda)}{N}
\end{equation}

The estimate of $\mathbf{x}$ can then be computed as follows

\begin{equation}
    \mathbf{x}^{t+1} = \eta(\mathbf{x}^t+\mathbf{A}^H\mathbf{z}^t;\lambda^t).
\end{equation}

# Shared Functions

In [None]:
# the thresholding function used in ISTA
# also used for real valued AMP
def eta(u, T, cmplx):
  # keeps the angle the same and reduces the magnitude
  if cmplx:
    return np.exp(1j*np.angle(u)) * (np.maximum(np.abs(u)-T, 0))
  else:
      return (u - T)*(u >= T) + (u + T)*(u <= -T)

In [None]:
# used to generate data for ISTA and AMP
def make_data(sys_param, tfs):
  '''sys_param contains: n,N,k,sigma
  tfs (true/false) contains: noise, fading, dual_bs, cmplx
    noise is boolean, true if noise
    fading is boolean, true if fading; fading is real or complex depending on the value of cmplx
    sigma is the variance of the noise given
    dual_bs is a boolean, true for 2 BS case
    cmplx is boolean, true if it wants complex values
    returns A_1, A_2, x_uL, y_uL1, y_uL2, L1, L2
    if it is a single base station then the unneeded values will be returned as zero
    '''

    # unwrap the arguments
    n, N, k, sigma = sys_param
    noise, fading, dual_bs, cmplx = tfs
    
    # creates the x-vector with k-nonzero entries
    x_uL = np.zeros((N, 1), dtype=np.complex_)
    idx_nonzero_entries = np.random.permutation(N)[0:k]
    x_uL[idx_nonzero_entries] = 1

    # if there's two base stations, these values will be overridden
    A_2 = 0
    L2 = 0
    y_uL2 = 0

    if cmplx:  # complex single BS
        # sensing matrices
        A_1 = np.sqrt(1 / (2 * n)) * np.random.randn(n, N) + 1j * np.sqrt(1 / (2 * n)) * np.random.randn(n, N)
        # finding thresholding parameter
        _, Lambda1, _ = np.linalg.svd(A_1)
        L1 = np.max(Lambda1) + 1
        # creating complex noise
        w1 = np.sqrt((sigma ** 2) / 2) * np.random.randn(n).reshape(-1, 1) + 1j * np.sqrt(
            (sigma ** 2) / 2) * np.random.randn(n).reshape(-1, 1)

        if dual_bs:  # create second pair of matrices
            A_1 = np.sqrt(1 / (4 * n)) * np.random.randn(n, N) + 1j * np.sqrt(1 / (4 * n)) * np.random.randn(n, N)
            A_2 = A_1.copy()
            _, Lambda1, _ = np.linalg.svd(A_1)
            L1 = np.max(Lambda1) + 1
            L2 = L1
            w2 = np.sqrt((sigma ** 2) / 2) * np.random.randn(n).reshape(-1, 1) + 1j * np.sqrt(
                (sigma ** 2) / 2) * np.random.randn(n).reshape(-1, 1)

        if fading:  # complex fading
            # creating fading effects -> CN(0,1)
            h_uL1 = np.sqrt(1 / 2) * np.random.random(N) + np.sqrt(1 / 2) * 1j * np.random.random(N)
            h_uL1 = np.diag(h_uL1)
            # sensing matrix with fading applied
            A_1 = A_1 @ h_uL1
            # finding thresholding parameter
            _, Lambda1, _ = np.linalg.svd(A_1)
            L1 = np.max(Lambda1) + 1

            if noise:  # single BS, complex fading and noise
                y_uL1 = A_1 @ x_uL + w1
            else:
                y_uL1 = A_1 @ x_uL

            if dual_bs:  # complex fading, 2 BS
                # create second set of fading
                # creating fading effects -> CN(0,1)
                h_uL2 = np.sqrt(1 / 2) * np.random.random(N) + np.sqrt(1 / 2) * 1j * np.random.random(N)
                h_uL2 = np.diag(h_uL2)
                # sensing matrix with fading applied
                A_2 = A_2 @ h_uL2
                # finding thresholding parameter
                _, Lambda1, _ = np.linalg.svd(A_2)
                L2 = np.max(Lambda1) + 1

                if noise: 
                    y_uL2 = A_2 @ x_uL + w2
                else:
                    y_uL2 = A_2 @ x_uL

        else:  # complex, no fading
            if noise:
                y_uL1 = A_1 @ x_uL + w1
            else:
                y_uL1 = A_1 @ x_uL

            if dual_bs: 
                if noise:
                    y_uL2 = A_2 @ x_uL + w2
                else:
                    y_uL2 = A_2 @ x_uL

    else:  # real valued
        # sensing matrices
        A_1 = np.sqrt(1 / n) * np.random.randn(n, N)
        # finding thresholding parameter
        _, Lambda1, _ = np.linalg.svd(A_1)
        L1 = np.max(Lambda1) + 1
        # creating noise
        w1 = sigma * np.random.randn(n).reshape(-1, 1)

        if dual_bs: # create second set of matrices
            A_1 = np.sqrt(1 / (2*n)) * np.random.randn(n, N)
            A_2 = A_1.copy()
            _, Lambda1, _ = np.linalg.svd(A_1)
            L1 = np.max(Lambda1) + 1
            L2 = L1
            w2 = sigma * np.random.randn(n).reshape(-1, 1)

        if fading:  # real valued, 1 BS fading
            # creating fading effects -> N(0,1)
            h_uL1 = np.random.rayleigh(size=N)
            h_uL1 = np.diag(h_uL1)
            # sensing matrix with fading applied
            A_1 = A_1 @ h_uL1
            # finding thresholding parameter
            _, Lambda1, _ = np.linalg.svd(A_1)
            L1 = np.max(Lambda1) + 1

            if noise: # real, 1 BS, noise, fading
                y_uL1 = A_1 @ x_uL + w1
            else:
                y_uL1 = A_1 @ x_uL

            if dual_bs:
                # creating fading effects -> CN(0,1)
                h_uL2 = np.random.rayleigh(size=N)
                h_uL2 = np.diag(h_uL2)
                # sensing matrix with fading applied
                A_2 = A_2 @ h_uL2
                # finding thresholding parameter
                _, Lambda1, _ = np.linalg.svd(A_2)
                L2 = np.max(Lambda1) + 1
                if noise:
                    y_uL2 = A_2 @ x_uL + w1
                else:
                    y_uL2 = A_2 @ x_uL

        else:  # no fading, real values
            if noise:
                y_uL1 = A_1 @ x_uL + w1
            else:
                y_uL1 = A_1 @ x_uL

            if dual_bs:
                if noise:
                    y_uL2 = A_2 @ x_uL + w2
                else:
                    y_uL2 = A_2 @ x_uL

    return A_1, A_2, x_uL, y_uL1, y_uL2, L1, L2

# ISTA Functions

In [None]:
def ist_mse_iter(sys_param, tfs, num_iterations, avg):
'''sys_param holds n,N,k vals and sigma
  tfs (true-falses) holds system information
  avg: the number of trials to average over
  num_iterations: the number of iterations of the algorithm to run through
  returns the MSE per iterations
'''

    alpha = .25  # predetermined for the 1 BS case
    # unwrap the arguments
    n,N,k,sigma = sys_param
    noise, fading, dual_bs, cmplx = tfs

    # create the matrix to hold the results
    mse_per_iteration = np.zeros(num_iterations)
    
    for j in range(avg):  # average over a given number of trials
      A1, A2, x_uL, y_uL1, y_uL2, L1, L2 = make_data(sys_param, tfs)  # make the data needed
      
      if dual_bs: # concatenate the necessary vectors then proceed
          A1 = np.vstack((A1,A2))
          y_uL1 = np.vstack((y_uL1,y_uL2))
          _, Lambda2, _ = np.linalg.svd(A1)
          L1 = np.max(Lambda2) + 1
          alpha = alpha / 2 # half the single BS case

      if cmplx:
          xHt = np.zeros(x_uL.shape, dtype=np.complex_)
          # create the weights for ISTA
          x_weight = np.eye(N) - (1/L1)*(A1.conj().T @ A1)
          y_weight = (1/L1)*A1.conj().T
              
      else:
          xHt = np.zeros(x_uL.shape)
          # creating weights
          x_weight = np.eye(N) - (1/L1)*(A1.T @ A1)
          y_weight = (1/L1)*A1.T

      # this part is same for complex or real
      for idx in range(num_iterations): # iterate through ISTA
          update = (x_weight @ xHt) + (y_weight @ y_uL1)
          xHt = eta(update, alpha/L1, cmplx)
          
          # update the result after each iteration
          mse_per_iteration[idx] += (1/N) * np.sum(np.abs(x_uL - xHt)**2)

        # return the average  
    return mse_per_iteration / avg

In [None]:
def ist_mse_snr(sys_param, tfs, num_iterations, avg):
'''sys_param holds n,N,k vals and sigma
  tfs (true-falses) holds system information
  avg: the number of trials to average over
  num_iterations: the number of iterations of the algorithm to run through
  returns the MSE for vs some SNRs'''


    alpha = .25  # predetermined for the 1 BS case
    # unwrap the arguments
    n,N,k,sigma = sys_param
    noise, fading, dual_bs, cmplx = tfs

    if dual_bs: # halve the alpha value
      alpha = alpha/2

    # creating noise matrix
    SNRdB = np.array([-12,-10, -5, 1, 5, 10, 20])  # in dB

    # convert out of dB and calculate the variance
    if dual_bs:
      SNRs = (10**(SNRdB/10))*2*n
    else:
      SNRs = (10**(SNRdB/10))*n
    sigmas = 1/(np.sqrt(SNRs))

    # create matrix to hold the results
    mse_per_iteration = np.zeros(len(sigmas))

    cnt = 0
    for sig in sigmas:  # loop through all the variance values
        mse = 0
        for i in range(avg):  # average the results over a given number of trials
          sys_param = (n,N,k,sig) # update sys_param with new variance for given SNR
          A1, A2, x_uL, y_uL1, y_uL2, L1, L2 = make_data(sys_param, tfs)  # make the data needed

          if dual_bs: # concatenate the necessary vectors then proceed
            A1 = np.vstack((A1,A2))
            y_uL1 = np.vstack((y_uL1,y_uL2))

          if cmplx:
            # create the estimate of x and its weights
            xHt = np.zeros(x_uL.shape, dtype=np.complex_)
            x_weight = np.eye(N) - (1/L1)*(A1.conj().T @ A1)
            y_weight = (1/L1)*A1.conj().T
              
          else:
            # create the estimate of x and its weights
            xHt = np.zeros(x_uL.shape)
            x_weight = np.eye(N) - (1/L1)*(A1.T @ A1)
            y_weight = (1/L1)*A1.T


          # ISTA
          for idx_iter in range(num_iterations):  # cycle through a set number of iterations
            update = (x_weight @ xHt) + (y_weight @ y_uL1)
            xHt = eta(update, alpha/L1, cmplx)
          # calculate mse after a complete cycle of ISTA iterations
          mse_per_iteration[cnt] += (1/N) * np.sum(np.abs(x_uL - xHt)**2)
        cnt += 1 # update cnt after all the avg trials have finished and move on to the next variance

            # average the result
    return mse_per_iteration / avg

In [None]:
def ist_md_fa(sys_param, tfs, num_iterations, avg, threshold):
  '''sys_param holds n,N,k vals and sigma
  tfs (true-falses) holds system information
  avg: the number of trials to average over
  num_iterations: the number of iterations of the algorithm to run through
  threshold: the value above which a user is considered active
  returns the number of MD/FA vs iterations'''

   # unwrap the input tuples
    n,N,k,sigma = sys_param
    noise, fading, dual_bs, cmplx = tfs
    
    # create the results matrices
    ista_fa = np.zeros(num_iterations)
    ista_md = np.zeros(num_iterations)
      

    for j in range(avg):
      # create new data
      A1, A2, x_uL, y_uL1, y_uL2, L1, L2 = make_data(sys_param, tfs)

      if dual_bs: # concatenate the necessary vectors then proceed
          A1 = np.vstack((A1,A2))
          y_uL1 = np.vstack((y_uL1,y_uL2))

      # find active users
      real_users = np.where(x_uL == 1)[0]

      # create initial estimate of x and weights
      if cmplx:
        x_weight = np.eye(N) - (1/L1)*(A1.conj().T @ A1)
        y_weight = (1/L1)*A1.conj().T
        xHt = np.zeros(x_uL.shape, dtype=np.complex_)
      else:
        x_weight = np.eye(N) - (1/L1)*(A1.T @ A1)
        y_weight = (1/L1)*A1.T
        xHt = np.zeros(x_uL.shape)

      for idx in range(num_iterations):  # iterate through ISTA
          update = (x_weight @ xHt) + (y_weight @ y_uL1)
          xHt = eta(update, alpha/L1, cmplx)

          # find detection errors at each iteration
          xHt_user_guess = np.where(abs(xHt) > threshold)[0]
          fa = np.setdiff1d(xHt_user_guess, real_users)  # returns the guessed values that aren't in real ones. aka False Alarm
          md = np.setdiff1d(real_users, xHt_user_guess)  # real values that aren't in the guessed ones. aka Misdetection

          ista_fa[idx] += len(fa)
          ista_md[idx] += len(md)


    # divide it out by the number of averages
    ista_md = ista_md / avg
    ista_fa = ista_fa / avg

    return ista_md, ista_fa  # return the results

In [None]:
def ist_md_fa_snr(sys_param, tfs, num_iterations, avg, threshold):
   '''sys_param holds n,N,k vals and sigma
  tfs (true-falses) holds system information
  avg: the number of trials to average over
  num_iterations: the number of iterations of the algorithm to run through
  threshold: the value above which a user is considered active
  returns the number of MD/FA vs SNR'''

  # unwrap parameters
  n,N,k,sigma = sys_param
  noise, fading, dual_bs, cmplx = tfs

   # creating noise matrix
  SNRdB = np.array([-12,-10, -5, 1, 5, 10, 20]) # in dB

  # convert out of dB and find variance
  if dual_bs:
    SNRs = (10**(SNRdB/10))*2*n
  else:
    SNRs = (10**(SNRdB/10))*n
  sigmas = 1/(np.sqrt(SNRs))

  # create result matrices
  ista_snr_fa = np.zeros(len(sigmas))
  ista_snr_md = np.zeros(len(sigmas))

  cnt = 0 
  for sig in sigmas: # loop through all the variance values
    for j in range(avg):
      # create new data
      sys_param = (n,N,k,sig)
      A1, A2, x_uL, y_uL1, y_uL2, L1, L2 = make_data(sys_param, tfs)
      
      if dual_bs: # concatenate the necessary vectors then proceed
          A1 = np.vstack((A1,A2))
          y_uL1 = np.vstack((y_uL1,y_uL2))

      # find the active users
      real_users = np.where(x_uL == 1)[0]

      # create initial estimate of x and weights
      if cmplx:
        x_weight = np.eye(N) - (1/L1)*(A1.conj().T @ A1)
        y_weight = (1/L1)*A1.conj().T
        xHt = np.zeros(x_uL.shape, dtype=np.complex_)
      else:
        x_weight = np.eye(N) - (1/L1)*(A1.T @ A1)
        y_weight = (1/L1)*A1.T
        xHt = np.zeros(x_uL.shape)

      # iterate through ISTA for the given number of iterations
      for idx in range(num_iterations):
          update = (x_weight @ xHt) + (y_weight @ y_uL1)
          xHt = eta(update, alpha/L1, cmplx)

      # find detection errors
      xHt_user_guess = np.where(abs(xHt) > threshold)[0]
      fa = np.setdiff1d(xHt_user_guess, real_users)  # returns the guessed values that aren't in real ones. aka False Alarm
      md = np.setdiff1d(real_users, xHt_user_guess)  # real values that aren't in the guessed ones. aka Misdetection
      ista_snr_fa[cnt] += len(fa)
      ista_snr_md[cnt] += len(md)

    cnt += 1

  # divide it out by the number of averages
  ista_snr_md = ista_snr_md / avg
  ista_snr_fa = ista_snr_fa / avg

  return ista_snr_md, ista_snr_fa

# AMP Functions

In [None]:
# used for real valued AMP
def onsager(z, r, tau):
# calculates and returns the onsager correction term
  n = len(z)
  return (z/n) * np.sum(eta(r, tau,cmplx) != 0)

In [None]:
# used for complex valued AMP
# defined in the equations section
def complex_soft_thresholding(x, T):
  eps = 1e-9
  return (np.abs(x) > T)*(np.abs(x) - T)*(x/np.abs(x+eps))

def complex_soft_thresholding_derivative(xHt, T):
  eps = 1e-9
  xr = np.real(xHt).flatten()
  xi = np.imag(xHt).flatten()
  den = (xr**2 + xi**2)**(3/2) + eps
  indicator = ((xr**2 + xi**2) > T**2).astype(float)
  
  dRdr = indicator*(1-T*xi**2/den)
  dIdi = indicator*(1-T*xr**2/den)

  return dRdr, dIdi

In [None]:
def amp_mse_iter(sys_param, tfs, num_iterations, avg): 
'''sys_param holds n,N,k vals and sigma
  tfs (true-falses) holds system information
  avg: the number of trials to average over
  num_iterations: the number of iterations of the algorithm to run through
  returns the MSE per iterations'''

    # unwrap the inputs
    n,N,k,sigma = sys_param
    noise, fading, dual_bs, cmplx = tfs

    # create result vector
    mse_per_iteration = np.zeros(num_iterations)

    for j in range(avg):  # average over a given number of trials
      A1, A2, x_uL, y_uL1, y_uL2, L1, L2 = make_data(sys_param, tfs)  # make the data needed

      if dual_bs: # concatenate the necessary vectors then proceed
          A1 = np.vstack((A1,A2))
          y_uL1 = np.vstack((y_uL1,y_uL2))
      
      if cmplx: # run complex AMP
          xHt = np.zeros(x_uL.shape, dtype=np.complex_)
          z = y_uL1 - (A1@xHt)

          for idxit in range(num_iterations):
            # update estimate of x
            r = A1.conj().T @ z + xHt
            sigma_hat = 1/np.sqrt(np.log(2))*np.median(np.abs(r))
            xHt = complex_soft_thresholding(r, sigma_hat)
            # calculate residual error
            etaderR, etaderI = complex_soft_thresholding_derivative(r, sigma_hat);
            z = y_uL1 - A1@xHt + z*(np.sum(etaderR)+np.sum(etaderI))/(2*n);
            mse_per_iteration[idxit] += np.linalg.norm(xHt.flatten() - x_uL.flatten())**2 / N
          
      
      else: # run real valued AMP
          z = np.zeros(y_uL1.shape)
          xHt = np.zeros(x_uL.shape)
          r = 0
          tau = 0
          ons = 0

          for idx_iter in range(num_iterations):
              ons = onsager(z, r, tau)
              z = y_uL1 - A1 @ xHt + ons
              if dual_bs:  # tau is varied in two BS case by factor of 1/sqrt(2)
                tau = np.sqrt(1/(2*n)) * np.linalg.norm(z)
              else:
                tau = np.sqrt(1/n) * np.linalg.norm(z)              
              r = xHt + A1.T @ z
              xHt = eta(r, tau, cmplx)

              # find the current error
              mse_per_iteration[idx_iter] += (1/N) * np.sum(abs(x_uL - xHt)**2)
    
    # average out the result
    return mse_per_iteration / avg

In [None]:
def amp_mse_snr(sys_param, tfs, num_iterations, avg):
'''sys_param holds n,N,k vals and sigma
tfs (true-falses) holds system information
avg: the number of trials to average over
num_iterations: the number of iterations of the algorithm to run through
returns the MSE for some SNRs'''

    alpha = .25  # predetermined for the 1 BS case
    # unwrap the inputs
    n,N,k,sigma = sys_param
    noise, fading, dual_bs, cmplx = tfs

    # creating noise matrix
    SNRdB = np.array([-12,-10, -5, 1, 5, 10, 20]) # in dB

    # get out of dB and calculate the variance
    if dual_bs:
      SNRs = (10**(SNRdB/10))*2*n
    else:
      SNRs = (10**(SNRdB/10))*n
    sigmas = 1/(np.sqrt(SNRs))
    
    # create the result matrix
    mse_snr = np.zeros(len(sigmas))

    cnt = 0  # keeps track of where we are in ur results matrix
    for sig in sigmas:  # loop through all the variance values
      mse = 0  # reset the mse for each new variance value
      for i in range(avg):  # average over a given number of trials
        # update sys_param with new variance value
        sys_param = (n,N,k,sig)
        A1, A2, x_uL, y_uL1, y_uL2, L1, L2 = make_data(sys_param, tfs)  # make the data needed

        if dual_bs: # concatenate the necessary vectors then proceed
          A1 = np.vstack((A1,A2))
          y_uL1 = np.vstack((y_uL1,y_uL2))

        if cmplx: # run complex AMP
            xHt = np.zeros(x_uL.shape, dtype=np.complex_)
            z = y_uL1 - (A1@xHt)

            for idxit in range(num_iterations):
              # update the estimate of x
              r = A1.conj().T @ z + xHt
              sigma_hat = 1/np.sqrt(np.log(2))*np.median(np.abs(r))
              xHt = complex_soft_thresholding(r, sigma_hat)
              # calculate the residual error
              etaderR, etaderI = complex_soft_thresholding_derivative(r, sigma_hat);
              z = y_uL1 - A1@xHt + z*(np.sum(etaderR)+np.sum(etaderI))/(2*n);
            # find the MSE
            mse_snr[cnt] += np.linalg.norm(xHt.flatten() - x_uL.flatten())**2 / N
            
        
        else: # run real valued AMP
            z = np.zeros(y_uL1.shape)
            xHt = np.zeros(x_uL.shape)
            r = 0
            tau = 0
            ons = 0

            for idx_iter in range(num_iterations):
                ons = onsager(z, r, tau)
                z = y_uL1 - A1 @ xHt + ons
                if dual_bs:
                  tau = np.sqrt(1/(2*n)) * np.linalg.norm(z)
                else:
                  tau = np.sqrt(1/n) * np.linalg.norm(z) 
                r = xHt + A1.T @ z
                xHt = eta(r, tau, cmplx)
            # find the current error
            mse_snr[cnt] += (1/N) * np.sum((x_uL - xHt)**2)
  
      cnt+=1 # increase the count as you move to the next variance value
    return mse_snr / avg

In [None]:
def amp_md_fa_iter(sys_param, tfs, num_iterations, avg, threshold):
'''sys_param holds n,N,k vals and sigma
tfs (true-falses) holds system information
avg: the number of trials to average over
num_iterations: the number of iterations of the algorithm to run through
returns the MD/FA vs iterations'''

    # unwrap the input tuples
    n,N,k,sigma = sys_param
    noise, fading, dual_bs, cmplx = tfs
    
    # create results matrices
    fa_iter = np.zeros(num_iterations)
    md_iter = np.zeros(num_iterations)

    for j in range(avg):
      # create new data
      A1, A2, x_uL, y_uL1, y_uL2, L1, L2 = make_data(sys_param, tfs)

      # find active users
      real_users = np.where(x_uL == 1)[0]

      if dual_bs: # concatenate the necessary vectors then proceed
          A1 = np.vstack((A1,A2))
          y_uL1 = np.vstack((y_uL1,y_uL2))
      
      if cmplx: # run complex AMP
          xHt = np.zeros(x_uL.shape, dtype=np.complex_)
          z = y_uL1 - (A1@xHt)

          for idxit in range(num_iterations):
            # update estimate of x
            r = A1.conj().T @ z + xHt
            sigma_hat = 1/np.sqrt(np.log(2))*np.median(np.abs(r))
            xHt = complex_soft_thresholding(r, sigma_hat)
            # find the residual error
            etaderR, etaderI = complex_soft_thresholding_derivative(r, sigma_hat);
            z = y_uL1 - A1@xHt + z*(np.sum(etaderR)+np.sum(etaderI))/(2*n);

            # find detection errors at each iteration
            xHt_user_guess = np.where(abs(xHt) > threshold)[0]
            fa = np.setdiff1d(xHt_user_guess, real_users)  # returns the guessed values that aren't in real ones. aka False Alarm
            md = np.setdiff1d(real_users, xHt_user_guess)  # real values that aren't in the guessed ones. aka Misdetection
            # add results to the matrices
            fa_iter[idxit] += len(fa)
            md_iter[idxit] += len(md)
      
      else: # run real valued AMP
          z = np.zeros(y_uL1.shape)
          xHt = np.zeros(x_uL.shape)
          r = 0
          tau = 0
          ons = 0

          for idx_iter in range(num_iterations):
            ons = onsager(z, r, tau)
            z = y_uL1 - A1 @ xHt + ons
            if dual_bs:
              tau = np.sqrt(1/(2*n)) * np.linalg.norm(z)
            else:
              tau = np.sqrt(1/n) * np.linalg.norm(z) 
            r = xHt + A1.T @ z
            xHt = eta(r, tau, cmplx)

            # find detection errors
            xHt_user_guess = np.where(abs(xHt) > threshold)[0]
            fa = np.setdiff1d(xHt_user_guess, real_users)  # returns the guessed values that aren't in real ones. aka False Alarm
            md = np.setdiff1d(real_users, xHt_user_guess)  # real values that aren't in the guessed ones. aka Misdetection

            fa_iter[idx_iter] += len(fa)
            md_iter[idx_iter] += len(md)
            
    # average out the result
    return md_iter / avg, fa_iter / avg

In [None]:
def amp_md_fa_snr(sys_param, tfs, num_iterations, avg, threshold):
'''sys_param holds n,N,k vals and sigma
tfs (true-falses) holds system information
avg: the number of trials to average over
num_iterations: the number of iterations of the algorithm to run through
returns the MD/FA vs SNRs'''

  # unwrap parameters
  n,N,k,sigma = sys_param
  noise, fading, dual_bs, cmplx = tfs

 # creating noise matrix
  SNRdB = np.array([-12,-10, -5, 1, 5, 10, 20]) # in dB
  # convert out of dB and find the variance
  if dual_bs:
    SNRs = (10**(SNRdB/10))*2*n
  else:
    SNRs = (10**(SNRdB/10))*n
  sigmas = 1/(np.sqrt(SNRs))

  # create the result vectors
  fa_snr = np.zeros(len(sigmas))
  md_snr = np.zeros(len(sigmas))

  cnt = 0 
  for sig in sigmas:  # loop through all the variance values
    for j in range(avg):  # average over a given number of trials
      # create new data
      sys_param = (n,N,k,sig)  # update the sys_param with new variance value
      A1, A2, x_uL, y_uL1, y_uL2, L1, L2 = make_data(sys_param, tfs)

      # find the active users
      real_users = np.where(x_uL == 1)[0]

      if dual_bs: # concatenate the necessary vectors then proceed
          A1 = np.vstack((A1,A2))
          y_uL1 = np.vstack((y_uL1,y_uL2))

      if cmplx: # run complex AMP
          xHt = np.zeros(x_uL.shape, dtype=np.complex_)
          z = y_uL1 - (A1@xHt)

          for idxit in range(num_iterations):
            # update the estimate of x
            r = A1.conj().T @ z + xHt
            sigma_hat = 1/np.sqrt(np.log(2))*np.median(np.abs(r))
            xHt = complex_soft_thresholding(r, sigma_hat)
            # calculate the residual error
            etaderR, etaderI = complex_soft_thresholding_derivative(r, sigma_hat);
            z = y_uL1 - A1@xHt + z*(np.sum(etaderR)+np.sum(etaderI))/(2*n);
          # find detection errors
          xHt_user_guess = np.where(abs(xHt) > threshold)[0]
          fa = np.setdiff1d(xHt_user_guess, real_users)  # returns the guessed values that aren't in real ones. aka False Alarm
          md = np.setdiff1d(real_users, xHt_user_guess)  # real values that aren't in the guessed ones. aka Misdetection

          # add the results to the matrices
          fa_snr[cnt] += len(fa)
          md_snr[cnt] += len(md)
      
      else: # run real valued AMP
          z = np.zeros(y_uL1.shape)
          xHt = np.zeros(x_uL.shape)
          r = 0
          tau = 0
          ons = 0

          for idx_iter in range(num_iterations):
              ons = onsager(z, r, tau)
              z = y_uL1 - A1 @ xHt + ons
              if dual_bs:
                tau = np.sqrt(1/(2*n)) * np.linalg.norm(z)
              else:
                tau = np.sqrt(1/n) * np.linalg.norm(z) 
              r = xHt + A1.T @ z
              xHt = eta(r, tau, cmplx)

          # find detection errors
          xHt_user_guess = np.where(abs(xHt) > threshold)[0]
          fa = np.setdiff1d(xHt_user_guess, real_users)  # returns the guessed values that aren't in real ones. aka False Alarm
          md = np.setdiff1d(real_users, xHt_user_guess)  # real values that aren't in the guessed ones. aka Misdetection
          fa_snr[cnt] += len(fa)
          md_snr[cnt] += len(md)
    cnt+=1 

    # return the averaged results
  return md_snr / avg, fa_snr / avg

# LISTA Functions

In [None]:
# thresholding function for complex LISTA
def tf_cmplx_eta(u,T):
  return tf.math.exp(tf.complex(0.0,tf.math.angle(u))) * tf.complex(tf.math.maximum(tf.math.abs(u)-T, 0),0.0)

In [None]:
class CRF_LISTALayer(keras.layers.Layer):
  # class used to create our network layers
  
  # defining variables for the layer
  def __init__(self, A, L, init_alpha, Q, W):
    # initialize the weights of the system
    super().__init__()
    self.A = A
    self.L = L
    self.n, self.N = self.A.shape
    # init values for the following weights are calculated later
    self.alpha = tf.Variable(initial_value=init_alpha, dtype='float32', trainable=True)
    self.Q = tf.Variable(initial_value=init_Q, dtype='complex64', trainable=True)
    self.W = tf.Variable(initial_value=init_W, dtype='complex64', trainable=True)

  # creating inputs and updating things on each call
  def call(self, inputs):
    # separating the x and y
    xHt = inputs[:, 0:self.N]
    y = inputs[:, self.N:]
    update = xHt @ self.Q + y @ self.W  # weighting and combining the vectors
    new_x = tf_cmplx_eta(update, self.alpha/self.L)  # thresholding and updating x
    return tf.concat([new_x, y], axis=1)  # reconcatenate x and y

class CustomMSELoss(keras.losses.Loss):
# custom loss function b/c we have x and y concatenated
  def __init__(self, N):
    super().__init__()
    self.N = N
  
  def call(self, true, pred):
    # separate x and y and then compute MSE
    x_true = true[:, 0:self.N]
    x_pred = pred[:, 0:self.N]
    return tf.reduce_mean(tf.math.squared_difference(tf.complex(x_true,0.0), x_pred))
  
class CustomMSEMetric(keras.metrics.Mean):
# custom metric function b/c we have x and y concatenated
  def __init__(self, N):
    super().__init__()
    self.N = N

  def update_state(self, true, pred, sample_weight=None):
    # separate x and y and then compute the MSE
    x_true = true[:, 0:self.N]
    x_pred = pred[:, 0:self.N]
    val = tf.reduce_mean(tf.math.squared_difference(tf.complex(x_true,0.0), x_pred))
    return super().update_state(val, sample_weight=None)

# LISTA

In [None]:
# define problem parameters
n = 270
N = 1024
k = 400

# for MD, FA
threshold = .4
num_layers = 15

# define number of training/validation/testing samples
num_train_samples = 75000
num_valid_samples = 25000
num_test_samples = 25000
total_num_samples = num_train_samples + num_valid_samples + num_test_samples

# use default rng
rng = np.random.default_rng()

# change these values as you wish to create your system model
cmplx = True
dual_bs = False
fading = True
noise = True
snr_dB = 5 

**Make the data set**

In [None]:
# specify the original alpha value
init_alpha = 0.3

# create sensing matrix
if cmplx:
  if dual_bs:
    # create sensing matrix A - will stack so energy of col of A = 1/2
    A = np.sqrt(1/(4*n))*np.random.randn(n, N) + 1j*np.sqrt(1/(4*n))*np.random.randn(n, N)  
  else:
    A = np.sqrt(1/(2*n))*np.random.randn(n, N) + 1j*np.sqrt(1/(2*n))*np.random.randn(n, N)
else:
  if dual_bs:
    A = np.sqrt(1 / (2*n)) * np.random.randn(n, N) 
  else:
    A = np.sqrt(1 / n) * np.random.randn(n, N)

# creates the appropiate sigma
if dual_bs:
  snr = (10**(snr_dB/10))*2*n  # correct SNR def for 2 BS
else:
  snr = (10**(snr_dB/10))*n 
sigma = 1/(np.sqrt(snr))

# create noise vector(s)
w1 = []
for i in range(total_num_samples):
  if cmplx:
    w1.append(np.sqrt((sigma**2)/2) * np.random.randn(n) + 1j*np.sqrt((sigma**2)/2) * np.random.randn(n))
  else:
    w1.append(sigma*np.random.randn(n))
w1 = np.array(w1)

if dual_bs:
  w2 = []
  for i in range(total_num_samples):
    if cmplx:
      w2.append(np.sqrt((sigma**2)/2) * np.random.randn(n) + 1j*np.sqrt((sigma**2)/2) * np.random.randn(n))
    else:
      w2.append(sigma*np.random.randn(n))
  w2 = np.array(w2)
  w1 = np.hstack((w1,w2))  # stack to make one noise vector if it is dual BS

# create fading effects
if fading:
  # creating fading effects
  h = np.sqrt(1/2)*np.random.randn(N) + 1j*np.sqrt(1/2)*np.random.randn(N) 
  h = np.diag(h)  # creates a diagonal matrix of all the h coefficients

  # matrix with fading added
  A1 = A @ h

  if dual_bs: # create second set of fading coefficents
    h2 = np.sqrt(1/2)*np.random.randn(N) + 1j*np.sqrt(1/2)*np.random.randn(N) 
    h2 = np.diag(h2) 

    A2 = A @ h2
    # now stack and make one A
    A = np.vstack((A1, A2))

  else:
    A = A1  # assign it with the one BS fading case

if dual_bs and not fading:
  # A is just stacked with itself if no fading
  A = np.vstack((A,A))

# creating initial values for trainable parameters
_, Lambda, _ = np.linalg.svd(A)
L = np.max(Lambda) + 1

init_Q = np.eye(N) - (A.T @ A) / L
init_W = A / L

# create the x vectors
x = np.zeros((total_num_samples, N))
idx_nonzero = rng.permuted(np.tile(np.arange(N), (total_num_samples, 1)), axis=1)
for i in range(k): # k-sparse
  x[np.arange(total_num_samples), idx_nonzero[:, i].flatten()] = 1

# create the received y vectors
if noise:
  y = x @ A.T + w  # adding noise here
else:
  y = x @ A.T
y = np.hstack((np.zeros((total_num_samples, N)), y))

# Divide data into training, validation, testing sets
train_data = y[0:num_train_samples, :]
train_labels = x[0:num_train_samples, :]
valid_data = y[num_train_samples:num_train_samples+num_valid_samples, :]
valid_labels = x[num_train_samples:num_train_samples+num_valid_samples, :]
test_data = y[num_train_samples+num_valid_samples:, :]
test_labels = x[num_train_samples+num_valid_samples:, :]

**Create the network with 'num_layers' layers**

In [None]:
# this allows you to find number of MD/FA for a 'num_layers' layer network
# you are also able to see the MSE after the given number of layers

learn_rate = .0001  # custom learning rate
op = keras.optimizers.Adam(learning_rate=learn_rate)

# creating the input
input = keras.layers.Input(shape=(n+N,), name='y_inputs',dtype='complex64')
for idx_layer in range(num_layers):  # creates a given number of layers
  # calls the layer differently depending on what layer it is 
  if num_layers == 1:
    output = CRF_LISTALayer(A, L, init_alpha, init_Q, init_W) (input)
  elif num_layers > 1:
    if idx_layer == 0:
      x_hidden = CRF_LISTALayer(A, L, init_alpha, init_Q, init_W) (input)
    elif idx_layer == num_layers - 1:  # output is populated on the last layer
      output = CRF_LISTALayer(A, L, init_alpha, init_Q, init_W) (x_hidden)
    else:
      x_hidden = CRF_LISTALayer(A, L, init_alpha, init_Q, init_W) (x_hidden)
   
model = keras.Model(inputs=input, outputs=output, name='CRF_LISTAModel')
# compile keras model
custom_mse_loss = CustomMSELoss(N)
custom_mse_metric = CustomMSEMetric(N)
model.compile(optimizer=op, loss=custom_mse_loss, metrics=[custom_mse_metric])

# train model
history = model.fit(train_data,
                  train_labels,
                  batch_size=128,
                  epochs=50,
                  validation_data=(valid_data, valid_labels))

# evaluate the model
results = model.evaluate(test_data, test_labels)
print(results) # prints the MSE after a fixed number of layers

To find MD/FA vs. SNR you can run the above cell with a different SNR then run the following cell and keep track of the SNRs you use as well as the results. These results can then be plotted.

Similar process can be done to find MSE vs. SNR for a fixed number of layers

In [None]:
# Find the number of MD/FA
xHt = model.predict(test_data)  # guesses based on our network
xHt = xHt.T[:N,:]
xHt = xHt.T  # maniulated xHt to be useable
x = test_labels   # actual values

# create arrys to hold data
fa_ar = np.zeros(num_test_samples)
md_ar = np.zeros(num_test_samples)

for i in range(num_test_samples):
  # user is active if value above a certain threshold
  user_guess = np.where(abs(xHt[i]) > threshold)
  real_users = np.where(abs(x[i]) == 1)
  # FA if the user is said as active, but not in real users
  fa_ar[i] = len(np.setdiff1d(user_guess, real_users))
  # MD if a user is in the real_users array, but was not guessed to be active
  md_ar[i] = len(np.setdiff1d(real_users, user_guess))

# average the results
avg_fa = sum(fa_ar) / num_test_samples
avg_fa_ar.append(avg_fa)
avg_md = sum(md_ar) / num_test_samples
avg_md_ar.append(avg_md)

print(f'MD: {avg_md_ar}')
print(f'FA: {avg_fa_ar}')

**The following cell allows you to find MSE or MD/FA vs. Number of layers**

In [None]:
cnt = 1  # initial number of layers used

# create the results lists
lista_mse_cmplx = []
avg_fa_ar = []
avg_md_ar = []

# custom learning rate
learn_rate = .0001
op = keras.optimizers.Adam(learning_rate=learn_rate)

for i in range(num_layers):
  # create initial input
  input = keras.layers.Input(shape=(n+N,), name='y_inputs',dtype='complex64')
  for idx_layer in range(cnt):
    # calls the layer differently depending on what layer it is 
    if cnt == 1:
      output = CRF_LISTALayer(A, L, init_alpha, init_Q, init_W) (input)
    elif cnt > 1:
      if idx_layer == 0:
        x_hidden = CRF_LISTALayer(A, L, init_alpha, init_Q, init_W) (input)
      elif idx_layer == cnt - 1:  # output is populated on the last layer
        output = CRF_LISTALayer(A, L, init_alpha, init_Q, init_W) (x_hidden)
      else:
        x_hidden = CRF_LISTALayer(A, L, init_alpha, init_Q, init_W) (x_hidden)
    
 # increase count so it goes through more iterations next time
  cnt += 1

  model = keras.Model(inputs=input, outputs=output, name='CRF_LISTAModel')
  # compile keras model
  custom_mse_loss = CustomMSELoss(N)
  custom_mse_metric = CustomMSEMetric(N)
  model.compile(optimizer=op, loss=custom_mse_loss, metrics=[custom_mse_metric])

  # train model
  history = model.fit(train_data,
                    train_labels,
                    batch_size=128,
                    epochs=50,
                    validation_data=(valid_data, valid_labels))

  # evaluate the model and display the results
  results = model.evaluate(test_data, test_labels)
  lista_mse_cmplx.append(results[0])  # results represents MSE

  ## find MD, FA ##
  xHt = model.predict(test_data)
  xHt = xHt.T[:N,:]
  xHt = xHt.T  # rearrange xHt to be useable
  x = test_labels # contains the actual values of x

  # create necessary results arrays
  fa_ar = np.zeros(num_test_samples)
  md_ar = np.zeros(num_test_samples)
  for i in range(num_test_samples): 
    # user is actvie if the value at their index is above a certain threshold
    user_guess = np.where(abs(xHt[i]) > threshold)
    real_users = np.where(abs(x[i]) == 1)
    # FA if user is guessed as acitvce but not in real_users
    fa_ar[i] = len(np.setdiff1d(user_guess, real_users))
    # MD if user is in real_users but not guessed as active
    md_ar[i] = len(np.setdiff1d(real_users, user_guess))

  # average the results
  avg_fa = sum(fa_ar) / num_test_samples
  avg_fa_ar.append(avg_fa)
  avg_md = sum(md_ar) / num_test_samples
  avg_md_ar.append(avg_md)

# display the MD/FA vs. iterations results
print(f'MD: {avg_md_ar}')
print(f'FA: {avg_fa_ar}')

# Pre-run data

In [None]:
### include any pre run data to make graphs you want to save time ###

# Results

**System Parameters**

In [None]:
# k: number of active users
# N: number of total users
# n: number of measurements
k = 40
N = 1024
n = 270

num_iterations = 15
num_avg = 10
alpha = .25
threshold = .4 # used for MD, FA

dual_bs = False  # one or two base stations
cmplx = True     # real or complexed valued data
fading = True    # fading or no fading
noise = True     # noise or no noise

# calculate noise parameters
snr_dB = 5   # enter an SNR in dB
if dual_bs:
  SNR = (10**(snr_dB/10))*n*2
else:
  SNR = (10**(snr_dB/10))*n
sigma = 1/(np.sqrt(SNR)) # calculate the variance

# create tuples needed for functions
sys_param = (n,N,k,sigma)
tfs = (noise,fading,dual_bs,cmplx)

In [None]:
# ISTA: MSE vs. Iterations
print(f'Dual BS: {dual_bs} | Complex: {cmplx} | Fading: {fading} | Noise: {noise}')
if noise:
  print(f'SNR: {snr_dB}')
  
ista_mse_iter = ist_mse_iter(sys_param, tfs, num_iterations, num_avg)
print(np.array2string(ista_mse_iter, separator=','))

In [None]:
# AMP: MSE vs. Iterations
print(f'Dual BS: {dual_bs} | Complex: {cmplx} | Fading: {fading} | Noise: {noise}')
if noise:
  print(f'SNR: {snr_dB}')
  
amp_mse_iter = amp_mse_iter(sys_param, tfs, num_iterations, num_avg)
print(np.array2string(amp_mse_iter, separator=','))

In [None]:
# ISTA: MSE vs. SNR
# array of SNRs in dB to be tested over
SNRdB = np.array([-12,-10, -5, 1, 5, 10, 20])
SNRs = (10**(SNRdB/10))*n
sigmas = 1/(np.sqrt(SNRs))

print(f'Dual BS: {dual_bs} | Complex: {cmplx} | Fading: {fading}')

ista_mse_snr_ar = ist_mse_snr(sys_param, tfs, num_iterations, num_avg)
print(np.array2string(ista_mse_snr_ar, separator=','))

In [None]:
# AMP: MSE vs. SNR
# array of SNRs in dB to be tested over
SNRdB = np.array([-12,-10, -5, 1, 5, 10, 20])
SNRs = (10**(SNRdB/10))*n
sigmas = 1/(np.sqrt(SNRs))

print(f'Dual BS: {dual_bs} | Complex: {cmplx} | Fading: {fading}')

amp_mse_snr_ar = amp_mse_snr(sys_param, tfs, num_iterations, num_avg)
print(np.array2string(amp_mse_snr_ar, separator=','))

In [None]:
# ISTA MD/FA vs. Iterations
SNRs = (10**(snr_dB/10))*n
sigmas = 1/(np.sqrt(SNRs))
sys_param = (n,N,k,sigmas)
tfs = (noise,fading,dual_bs,cmplx)

print(f'Dual BS: {dual_bs} | Complex: {cmplx} | Fading: {fading} | Noise: {noise}')
if noise:
  print(f'SNR: {snr_dB}')

ista_md_iter, ista_fa_iter = ist_md_fa(sys_param, tfs, num_iterations, num_avg, threshold)
print(f'MD: {np.array2string(ista_md_iter, separator=',')}')
print(f'FA: {np.array2string(ista_fa_iter, separator=',')}')

In [None]:
# AMP MD/FA vs. Iterations
SNRs = (10**(snr_dB/10))*n
sigmas = 1/(np.sqrt(SNRs))
sys_param = (n,N,k,sigmas)
tfs = (noise,fading,dual_bs,cmplx)

print(f'Dual BS: {dual_bs} | Complex: {cmplx} | Fading: {fading} | Noise: {noise}')
if noise:
  print(f'SNR: {snr_dB}')

amp_md_iter, amp_fa_iter = amp_md_fa_iter(sys_param, tfs, num_iterations, num_avg, threshold)
print(f'MD: {np.array2string(amp_md_iter, separator=',')}')
print(f'FA: {np.array2string(amp_fa_iter, separator=',')}')

In [None]:
# ISTA MD/FA vs. SNR
tfs = (noise, fading, dual_bs, cmplx)

print(f'Dual BS: {dual_bs} | Complex: {cmplx} | Fading: {fading}')

ista_md_snr, ista_fa_snr = ist_md_fa_snr(sys_param, tfs, num_iterations, num_avg, threshold)
print(f'MD: {np.array2string(ista_md_snr, separator=',')}')
print(f'FA: {np.array2string(ista_fa_snr, separator=',')}')

In [None]:
# AMP MD/FA vs. SNR
tfs = (noise, fading, dual_bs, cmplx)

print(f'Dual BS: {dual_bs} | Complex: {cmplx} | Fading: {fading}')

amp_md_snr, amp_fa_snr = amp_md_fa_snr(sys_param, tfs, num_iterations, num_avg, threshold)
print(f'MD: {np.array2string(amp_md_snr, separator=',')}')
print(f'FA: {np.array2string(amp_fa_snr, separator=',')}')