In [1]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Jul 25 12:29:26 2023

@author: pp423
"""
import numpy as np
from numpy.random import binomial
from ipynb.fs.full.se_qgt import mmse_new

'''=== Find FPR and FNR for signal estimate ==='''
def fpr_fnr(beta_est, beta_0):    
    fp = np.sum((beta_est == 1) & (beta_0 == 0))
    tp = np.sum((beta_est == 1) & (beta_0 == 1))
    fn = np.sum((beta_est == 0) & (beta_0 == 1))
    tn = np.sum((beta_est == 0) & (beta_0 == 0))
    fpr = fp / (fp + tn)
    fnr = fn / (fn + tp)
    err_r = (fp + fn)/len(beta_0)
    return fpr, fnr, err_r
    
'''=== Quantize signal entries to 0, 1 according to threshold ==='''
def quantize_new(beta_hat, thresh):
    beta_hat[beta_hat <= thresh] = 0
    beta_hat[beta_hat > thresh] = 1
    return beta_hat


'''=== Create iid design matrix ==='''
def create_X_iid(alpha, m, n):
    X = binomial(1, alpha, (m, n))
    return X

'''=== Create signal vector beta ==='''
def create_beta(nu, n):
    beta_0 = np.zeros(n)
    for i in range(len(beta_0)):
        rand = np.random.random_sample()
        if rand < nu:
            beta_0[i] = 1
    return beta_0

'''=== Transform iid design matrix for AMP ==='''
def Xiid_to_Xtilde(X_iid, alpha):
    m, n = np.shape(X_iid)
    X_tilde = (1/np.sqrt(m*alpha*(1-alpha)))*(X_iid - alpha)
    return X_tilde

'''=== Transform measurements vector y for AMP ==='''
def y_iid_to_y_iid_tilde(y, alpha, nu, m, n, defect_no='None'):
    if defect_no == 'None':
        #Estimate no. defective items - doesn't work well
        y_tilde = (1/np.sqrt(m*alpha*(1-alpha)))*(y - alpha*n*nu)
    else:
        y_tilde = (1/np.sqrt(m*alpha*(1-alpha)))*(y - alpha*defect_no)
    return y_tilde

'''=== Denoising function g_in (aka f_k) ==='''
def g_in_bayes(s, tau_2, nu):
    if tau_2 < 1e-3:
        output = g_in_bayes(s, 1e-3, nu)
    else:
        gamma = s/tau_2
        beta = 1/(2*tau_2)
        m_1 = np.maximum(gamma, 0)
        m_2 = np.maximum(gamma, beta)
        num = nu*np.exp(gamma-m_1)
        denomin = (1-nu)*np.exp(beta-m_2) + nu*np.exp(gamma-m_2)
        output = np.exp(m_1 - m_2)*(num/denomin)
    return output

'''=== Derivative of denoising function g_in (aka f_k) ==='''
def deriv_g_in_bayes(s, tau_2, nu):
    if tau_2 < 1e-3:
        output = deriv_g_in_bayes(s, 1e-3, nu)
    else:
        gamma = s/tau_2
        beta = 1/(2*tau_2)
        m_3 = np.maximum(beta+gamma, 0)
        m_2 = np.maximum(gamma, beta)
        num = np.exp(beta+gamma-m_3)
        denomin = ((1-nu)*np.exp(beta-m_2) + nu*np.exp(gamma-m_2))**2
        output = np.exp(m_3 - 2*m_2)*((nu*(1-nu))/tau_2)*(num/denomin)
    return output

'''=== Bayes-optimal AMP algorithm for QGT ==='''
def amp_bayes(X, X_T, y, t, nu, beta_0):
    m, n = len(X), len(X[0])
    delta_true = m/n
    tau_array = []
    error_norm_array = []
    nc_array = []
    
    tau_2 = 100
    #Initialise x
    beta = np.ones(n)*nu   
    for iter_no in range(t):
        if iter_no==0:
            #Initialise x, z and tau
            z = y - np.dot(X, beta)
            tau_prev = 0
        else:
            Onsager = (1/m) * z * np.sum(deriv_g_in_bayes(betaXz, tau_2, nu))
            z             = y - np.dot(X,beta) + Onsager
            tau_prev = np.copy(tau)

        #Estimate noise variance tau from residual z
        tau_2  = (np.linalg.norm(z, ord=2)**2)/ m
        tau_2 = ((1/delta_true)*mmse_new(1/np.sqrt(tau_2), nu)+1e-10)
        #betaxz = B_K
        betaXz = np.dot(X_T, z) + beta
        #beta^hat_k
        beta     = g_in_bayes(betaXz, tau_2, nu)

        beta = np.nan_to_num(beta) #Avoid nan
    
        tau = np.sqrt(tau_2)
        tau_array.append(tau)
        
        #Compute performance metrics
        mse = (1/n)*(np.linalg.norm(beta - beta_0)**2)
        norm_correl = (np.dot(beta, beta_0)/(np.linalg.norm(beta)*np.linalg.norm(beta_0)))**2

        error_norm_array.append(mse)
        nc_array.append(norm_correl)
        
        #Stopping criterion - Relative norm tolerance
        if (tau - tau_prev)**2/tau_2 < 1e-9:
            break
        
        tau_prev = tau
        
    mse_pred = delta*(tau_2)

    return beta, mse_pred, tau_array, error_norm_array, nc_array


def create_SC_matrix(W, N, alpha, delin):
    R, C = np.shape(W)
    M = int(delin*N)
    m, n = M*R, N*C
    X_sc = np.zeros((m,n))
    for c in range(C):
        for r in range(R):
            X_sc[M*r:(M*r+M), N*c:(N*c + N )] = np.random.binomial(1, alpha * W[r,c], (M, N))
    return X_sc

def Xsc_to_Xsctilde(X_sc, W, alpha):
    m, n = np.shape(X_sc)
    R, C = np.shape(W)
    M, N = int(m/R), int(n/C)
    lam = C
    omega = R + 1 - lam
    X_sc_tilde = np.zeros((m,n))
    for c in range(C):
        for r in range(c, c + omega):
            X_sc_tilde[M*r:(M*r+M), N*c:(N*c + N )] = (1/np.sqrt(m*alpha*(1-alpha)/R))*(X_sc[M*r:(M*r+M), N*c:(N*c + N )] - alpha*W[r,c])
    return X_sc_tilde        

'''=== Bayes-optimal sub-linear AMP algorithm for QGT ==='''
def sub_amp_bayes(X, X_T, y, t, nu, beta_0, theta, delta):
    m, n = len(X), len(X[0])
    tau_array = []
    error_norm_array = []
    nc_array = []
    
    tau_2 = 100
    #Initialise x
    beta = np.ones(n)*nu
    #beta = create_beta(nu, n)

    for iter_no in range(t):
        if iter_no==0:
            #Initialise x, z and tau
            z = y - np.dot(X, beta)
            tau_prev = 0
        else:
            Onsager = (1/m) * z * np.sum(deriv_g_in_bayes(betaXz, tau_2, nu))
            z             = y - np.dot(X,beta) + Onsager
            tau_prev = np.copy(tau)

        #Estimate noise variance tau from residual z
        tau_2  = (np.linalg.norm(z, ord=2)**2)/ m
        #tau_2 = (n**(1-theta)/delta*mmse_new(1/np.sqrt(tau_2), nu)+1e-10)
        betaXz = np.dot(X_T, z) + beta
        beta     = g_in_bayes(betaXz, tau_2, nu)

        beta = np.nan_to_num(beta) #Avoid nan
    
        tau = np.sqrt(tau_2)
        tau_array.append(tau)
        
        #Compute performance metrics
        mse = (1/n)*(np.linalg.norm(beta - beta_0)**2)
        norm_correl = (np.dot(beta, beta_0)/(np.linalg.norm(beta)*np.linalg.norm(beta_0)))**2

        error_norm_array.append(mse)
        nc_array.append(norm_correl)
        
        #Stopping criterion - Relative norm tolerance
        if (tau - tau_prev)**2/tau_2 < 1e-9:
            break
        
        tau_prev = tau
        
    mse_pred = (1/n)*(np.linalg.norm(beta - beta_0)**2)

    return beta, mse_pred, tau_array, error_norm_array, nc_array


def create_W(lam, omega, alpha):
  R = lam + omega - 1
  C = lam
  W = np.zeros((R, C))
  for c in range(C):
      for r in range(c, c + omega):
        if alpha >= 0.5:
          W[r, c] = (1 + np.sqrt(1-4*alpha*(1-alpha)/omega)) / (2*alpha)
        elif alpha < 0.5:
          W[r, c] = (1 - np.sqrt(1-4*alpha*(1-alpha)/omega)) / (2*alpha)
  return W

def W_to_Wtilde(W, alpha):
    W_tilde = (W*(1-alpha*W))/(1-alpha)
    return W_tilde

def sc_amp_bayes(W, X, y, beta_0, nu, delta_hat, t):
    m, n = len(X), len(X[0])
    R, C = len(W), len(W[0])
    M = int(m/R)#
    N = int(n/C)
    Q = np.zeros((m,n))
    Q_tilde = np.zeros((R,C))
    deriv = np.zeros(C)
    phi = np.ones(R)*1e6
    psi = np.ones(C)*1e3
    phi_prev = np.zeros(R)
    b = np.zeros(m)
    
    mse_list = []
    
    #Initialise x, z and tau
    beta = np.ones(n)*nu   #Initialise x
    for iter_no in range(t):
        if iter_no==0:
            z = y
            phi_prev = np.zeros(R)
        else:
            for r in range(R):
                b[M*r:(M*r+M)]= (1/delta_hat)*np.sum(W[r,:]*Q_tilde[r,:]*deriv)
            Onsager = b * z
            z = y - np.dot(X,beta) + Onsager
            phi_prev = np.zeros(R) + phi
        #Calculate phi from z
        for r in range(R):
            phi[r] = (1/M)*(np.linalg.norm(z[M*r:(M*r+M)],ord=2)**2) + 1e-50
        
        
        
        #Calculate Q from phi
        for c in range(C):
            denomin = np.dot((1/phi),W[:,c])
            for r in range(R):
                Q[M*r:(M*r+M), N*c:(N*c + N )]=((1/phi[r])/denomin)*np.ones((M,N))
                Q_tilde[r,c]=(1/phi[r])/denomin
        
        denoise_arg = beta + np.dot(np.transpose(Q*X),z)
        for c in range(C):
            denoise_argument = denoise_arg[N*c:(N*c + N )]
            s = np.dot((1/phi),W[:,c])
            beta[N*c:(N*c + N )] = g_in_bayes(denoise_argument, s**(-1), nu)
            deriv[c]= (1/N)*np.sum(deriv_g_in_bayes(denoise_argument, s**(-1), nu))

        """"#Exact Phi
        for r in range(R):
            phi[r]= (1/delta_hat)*np.dot(psi, W[r,:]) + 1e-50
        print("Phi: ", phi)
        for c in range(C):
            psi[c]=mmse_new(np.sqrt(np.dot(W[:,c],(1/phi))), nu)"""
        #beta = np.nan_to_num(beta)
        mse = (1/n)*(np.linalg.norm(beta - beta_0)**2)

        mse_list.append(mse)
        
        #Stopping criterion - Relative norm tolerance
        if (np.linalg.norm(phi - phi_prev, 2)/np.linalg.norm(phi, 2)) < 1e-6:
            break
        

    return phi, beta, mse_list
'''=== Create Spatial Coupling design matrix X ==='''
def CreateSpatialCouplingX(Gamma, w, n, p, alpha, WrcTrue):
    #if(Gamma < 2*w-1):
    #    raise Exception("Gamma is too small")
    if(p % Gamma):
        raise Exception("p not divisible by Gamma")
    if(alpha <= 0 or alpha >= 1):
        raise Exception("alpha value invalid")
    C = Gamma
    R = Gamma + w - 1
    if(n % R != 0):
        n = R * np.ceil(n/R)
    RowsPerBlock = int(n/R)
    ColumnsPerBlock = int(p/C)
    Wrc = 1/(2*alpha)*(1-np.sqrt(1-4*alpha*(1-alpha)/w))
    if(WrcTrue == True):
        alpha = alpha*Wrc
    for i in range(C):
        for j in range(R):
            if(i <= j and j <= i+w-1):
                X1 = binomial(1, alpha, (RowsPerBlock, ColumnsPerBlock))
            else:
                X1 = np.zeros((RowsPerBlock, ColumnsPerBlock))
            if(j == 0):
                X2 = X1
            else:
                X2 = np.concatenate((X2, X1), axis=0)
        if(i==0):
            X3 = X2
        else:
            X3 = np.concatenate((X3, X2), axis=1)
    return X3

def Xspatial_to_Xtilde(X, Gamma, w, n, p, alpha, WrcTrue):
    C = Gamma
    R = Gamma + w - 1
    if(n % R != 0):
        n = R * np.ceil(n/R)
    RowsPerBlock = int(n/R)
    ColumnsPerBlock = int(p/C)
    for i in range(C):
        for j in range(R):
            if(i <= j and j <= i+w-1):
                X1 = np.ones((RowsPerBlock, ColumnsPerBlock))
            else:
                X1 = np.zeros((RowsPerBlock, ColumnsPerBlock))
            if(j == 0):
                X2 = X1
            else:
                X2 = np.concatenate((X2, X1), axis=0)
        if(i==0):
            X3 = X2
        else:
            X3 = np.concatenate((X3, X2), axis=1)
    if(WrcTrue == True):
        Wrc = 1/(2*alpha)*(1-np.sqrt(1-4*alpha*(1-alpha)/w))
        X4 = (X - alpha*Wrc*X3)/(np.sqrt(n*alpha*(1-alpha)/R))
    else:
        X4 = (X - alpha*X3)/(np.sqrt(n*w*alpha*(1-alpha)/R))
    return X4

def DefectCalc(beta, Gamma, p):
    if(p % Gamma != 0):
        raise Exception("p not a multiple of Gamma")
    Defects = np.zeros(Gamma)
    for i in range(Gamma):
        for j in range(int(p/Gamma)):
            Defects[i] += beta[int(i*p/Gamma+j)]
    return Defects
            
def Yspatial_to_Ytilde(y, Gamma, w, n, p, alpha, WrcTrue, Defects):
    C = Gamma
    R = Gamma + w - 1
    if(n % R != 0):
        n = R * np.ceil(n/R)
    RowsPerBlock = int(n/R)
    ColumnsPerBlock = int(p/C)
    for i in range(R):
        for j in range(C):
            if(j <= i and i <= j+w-1):
                X1 = np.ones(RowsPerBlock)*Defects[j]
            else:
                X1 = np.zeros(RowsPerBlock)
            if(j == 0):
                X2 = X1
            else:
                X2 = X2+X1
        if(i==0):
            X3 = X2
        else:
            X3 = np.concatenate((X3, X2), axis=0)
    if WrcTrue == True:
        Wrc = 1/(2*alpha)*(1-np.sqrt(1-4*alpha*(1-alpha)/w))
        Ytilde = (y - alpha*Wrc*X3)/np.sqrt(n*alpha*(1-alpha)/R)
    else:
        Ytilde = (y - alpha*X3)/np.sqrt(n*w*alpha*(1-alpha)/R)
    return Ytilde

0.2063124896754875
