In [1]:
####################################################################################
#training and testing the LcgNetV with LMMSE algorithm
#######################################################################################

In [None]:
import tensorflow.compat.v1 as tf
import numpy as np

tf.disable_eager_execution()

def generate_data(B, K, N, snr_low, snr_high, H_org):
    x_ = np.sign(np.random.rand(B, K) - 0.5)
    y_ = np.zeros([B, N])
    w = np.random.randn(B, N)
    Hy_ = x_ * 0
    H_ = np.zeros([B, N, K])
    HH_ = np.zeros([B, K, K])
    SNR_ = np.zeros([B])
    x_ind = np.zeros([B, K, 2])
    for i in range(B):
        for ii in range(K):
            if x_[i][ii] == 1:
                x_ind[i][ii][0] = 1
            if x_[i][ii] == -1:
                x_ind[i][ii][1] = 1
    for i in range(B):
        SNR = np.random.uniform(low=snr_low, high=snr_high)
        H = H_org
        tmp_snr = (H.T.dot(H)).trace() / K
       # H = H / np.sqrt(tmp_snr) * np.sqrt(SNR)
        H_[i, :, :] = H
        y_[i, :] = (H.dot(x_[i, :]) + w[i, :] * np.sqrt(tmp_snr) / np.sqrt(SNR))
        Hy_[i, :] = H.T.dot(y_[i, :])
        HH_[i, :, :] = H.T.dot(H_[i, :, :])
        SNR_[i] = SNR
    return y_, H_, Hy_, HH_, x_, SNR_, x_ind,tmp_snr


# Parameters
B = 1000 # Batch size
K = 12   # Number of users
N = 12    # Number of antennas
T = 8 # Total layers (iterations)
H_org = np.random.randn(N, K)
frobenius_norm = np.linalg.norm(H_org, 'fro')  # Frobenius norm of the matrix
H_org = H_org / frobenius_norm

snr_test = [13,12,11,10,9,8]  # SNR values for testing

# Define Learning Rates
trinit = 0.001  # Initial learning rate
#refinements = [1.0, 0.5, 0.1, 0.01]
refinements = [1.0,0.5,0.1]  # Refinement factors for fine-tuning
fine_tune_lrs = [trinit * refinement for refinement in refinements]

# Placeholders
y = tf.placeholder(tf.float32, shape=(None, N), name='y')
H = tf.placeholder(tf.float32, shape=(None, N, K), name='H')
Hy = tf.placeholder(tf.float32, shape=(None, K), name='Hy')
HH = tf.placeholder(tf.float32, shape=(None, K, K), name='HH')
x_true = tf.placeholder(tf.float32, shape=(None, K), name='x_true')
sigma2 = tf.placeholder(tf.float32, shape=(), name='noise_variance')  # Noise variance placeholder

# Identity matrix
I_Nt = tf.eye(K, dtype=tf.float32)

# Calculate A_rm and b_rm
A_rm = HH + sigma2 * I_Nt  # A_rm = H^T H + σ^2 I
b_rm = Hy  # b_rm = H^T y

# Initialize trainable parameters
alpha = [tf.Variable(tf.zeros([K]), name=f'alpha_{t}') for t in range(T)]
beta = [tf.Variable(tf.zeros([K]), name=f'beta_{t}') for t in range(T)]

# Initialize CG-based iterative refinement
s_hat = tf.zeros_like(x_true, name='s_hat_init')
r = b_rm - tf.squeeze(tf.matmul(A_rm, tf.expand_dims(s_hat, -1)), axis=-1)  # r^0_rm = b_rm
d = tf.identity(r)  # d^0_rm = r^0_rm
states = []

for t in range(T):
    # Update s_hat (estimate of x)
    s_hat = s_hat + tf.multiply(alpha[t],d)# s^(i+1)_rm
    r = r - tf.multiply(alpha[t], tf.squeeze(tf.matmul(A_rm, tf.expand_dims(d, -1)), axis=-1))  # r^(i+1)_rm
    d = r +tf.multiply(beta[t],d) # d^(i+1)_rm
    states.append(s_hat)

s_hat_final = states[-1]

# Loss and NMSE
loss = tf.reduce_mean(tf.square(s_hat_final - x_true))
nmse = tf.reduce_mean(tf.square(s_hat_final - x_true)) / tf.reduce_mean(tf.square(x_true))

# Training Operations for each layer
train_ops = [tf.train.AdamOptimizer(trinit).minimize(loss, var_list=[alpha[t], beta[t]]) for t in range(T)]

# Fine-Tuning Operations
fine_tune_ops = {
    lr: tf.train.AdamOptimizer(lr).minimize(loss)
    for lr in fine_tune_lrs
}

# Validation
def validate_model(sess, y_val, H_val, Hy_val, HH_val, x_val, sigma2_val):
    feed_dict = {
        y: y_val,
        H: H_val,
        Hy: Hy_val,
        HH: HH_val,
        x_true: x_val,
        sigma2: sigma2_val
    }
    return sess.run(nmse, feed_dict=feed_dict)


# Training with Early Stopping
def train_for_snr(sess, snr, train_ops, fine_tune_ops, y, H, Hy, HH, x_true, sigma2, patience=3):
    print(f"Training and Fine-tuning for SNR = {snr[0]} dB...")

    for t in range(T):
        print(f"Training Layer {t+1}/{T} for SNR = {snr[0]} dB...")

        y_train, H_train, Hy_train, HH_train, x_train, _, _,tmp_snr = generate_data(B, K, N, *snr, H_org)
        y_val, H_val, Hy_val, HH_val, x_val, _, _ ,tmp_snr= generate_data(B, K, N, *snr, H_org)

        sigma2_val = tmp_snr/snr[0]

        best_val_nmse = float('inf')
        patience_counter = 0

        for epoch in range(100):  # Max epochs
            feed_dict = {
                y: y_train,
                H: H_train,
                Hy: Hy_train,
                HH: HH_train,
                x_true: x_train,
                sigma2: sigma2_val
            }
            sess.run(train_ops[t], feed_dict=feed_dict)

            val_nmse = validate_model(sess, y_val, H_val, Hy_val, HH_val, x_val, sigma2_val)
            print(f"Layer {t+1}, Epoch {epoch+1}: Validation NMSE = {val_nmse:.6f}")

            if val_nmse < best_val_nmse:
                best_val_nmse = val_nmse
                patience_counter = 0
            else:
                patience_counter += 1

            if patience_counter >= patience:
                print(f"Early stopping at Layer {t+1}, Epoch {epoch+1} (Best Validation NMSE = {best_val_nmse:.6f})")
                break

        print(f"Fine-tuning Layers 1 to {t+1} for SNR = {snr[0]} dB...")
        for lr, fine_tune_op in fine_tune_ops.items():
            for epoch in range(50):
                feed_dict = {
                    y: y_train,
                    H: H_train,
                    Hy: Hy_train,
                    HH: HH_train,
                    x_true: x_train,
                    sigma2: sigma2_val
                }
                sess.run(fine_tune_op, feed_dict=feed_dict)
                val_nmse = validate_model(sess, y_val, H_val, Hy_val, HH_val, x_val, sigma2_val)
                print(f"Fine-tuning LR={lr}, Epoch {epoch+1}: Validation NMSE = {val_nmse:.6f}")


# Add LMMSE Detector
def lmmse_detector(H, y, sigma2):

    B, N, K = H.shape
    I_K = np.eye(K)  # Identity matrix of size K
    x_lmmse = np.zeros((B, K))  # Placeholder for LMMSE results

    for i in range(B):
        H_i = H[i]  # (N, K)
        y_i = y[i]  # (N,)
        A = H_i.T @ H_i + sigma2 * I_K  # (K, K)
        b = H_i.T @ y_i  # (K,)
        x_lmmse[i] = np.linalg.solve(A, b)  # Solve Ax = b for LMMSE
        
    return x_lmmse

# Update Testing Function to Include LMMSE
def test_model_with_lmmse(sess, snr_value, y_placeholder, H_placeholder, Hy_placeholder, HH_placeholder, x_true_placeholder, s_hat_final_op):
    #sigma2_val = 1 /snr_value
    y_test, H_test, Hy_test, HH_test, x_test, _, _,tmp_snr = generate_data(1000, K, N, snr_value, snr_value, H_org)
    sigma2_val = tmp_snr/snr_value
    # Run Iterative Model
    feed_dict = {
        y_placeholder: y_test,
        H_placeholder: H_test,
        Hy_placeholder: Hy_test,
        HH_placeholder: HH_test,
        x_true_placeholder: x_test,
        sigma2: sigma2_val
    }
    x_pred = sess.run(s_hat_final_op, feed_dict=feed_dict)
    x_pred_binary = np.sign(x_pred)

    # Run LMMSE Detector
    x_lmmse = lmmse_detector(H_test, y_test, sigma2_val)
    x_lmmse_binary = np.sign(x_lmmse)

    # Calculate BER for both models
    total_bits = np.prod(x_test.shape)
    
    ber_iterative = np.sum(x_pred_binary != x_test) / total_bits
    ber_lmmse = np.sum(x_lmmse_binary != x_test) / total_bits

    return ber_iterative, ber_lmmse

# SNR values for progressive training
snr_progressive = [30,15,14,11,10,9]  # dB

# Main Testing Loop with Progressive Training
print("\nTesting the model with progressive training (30 dB to 10 dB) and LMMSE detector...")
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())

    # Progressive training loop
    for snr_db in snr_progressive:
        snr = 10**(snr_db / 10.0)  # Convert dB to linear scale
        print(f"\nTraining for SNR = {snr_db} dB...")
        train_for_snr(sess, (snr,snr), train_ops, fine_tune_ops, y, H, Hy, HH, x_true, sigma2)

    # Testing after training
    for snr_db in snr_test:
        snr = 10**(snr_db / 10.0)  # Convert dB to linear scale
        ber_iterative, ber_lmmse = test_model_with_lmmse(sess, snr, y, H, Hy, HH, x_true, s_hat_final)
        print(f"SNR: {snr_db} dB, Iterative Model BER: {ber_iterative:.6f}, LMMSE BER: {ber_lmmse:.6f}")

print("Progressive training and testing complete!")