**2. Codes for analyzing the PCNet/PCNet+ model**

**2.1 PCNet Model: For training with a given background noise power**

In [None]:
## Reference
# F. Liang, C. Shen, W. Yu, and F. Wu, “Towards optimal power control via
# ensembling deep neural networks,” IEEE Transactions on Communications, vol. 68,
# no. 3, pp. 1760–1776, 2020, doi: https://doi.org/10.1109/TCOMM.2019.2957482.

In [None]:
import numpy as np

## Number of transmitter-receiver pairs
K = 5

## Variances for noise signals
sigma_sqr_noise = np.array([1e-0, 1e-0, 1e-0, 1e-0, 1e-0], dtype = float)

## Minimum rate for the achievable SINR of multiple concurrent transmissions
SINR_P_min = np.array([0.5, 0.5, 0.5, 0.5, 0.5], dtype = float)

## Maximum transmit power
p_max = 1.0

In [None]:
## Loading a CSV file (F_H_2D.csv) for feasible H matrices that was uploaded to
## Google Collab's session storage.
from numpy import loadtxt

## Reading an array from the file
F_H_2D_L = np.loadtxt('F_H_2D.csv', delimiter = ',', dtype = str)

## Reshaping the array from 2D to 3D
F_H_3D = F_H_2D_L.reshape(F_H_2D_L.shape[0], F_H_2D_L.shape[1] // K, K)
F_H_3D_size = F_H_3D.shape[0]

In [None]:
## Converting string data to complex data and removing the initial whitespace
F_H_list = []
for k in range(F_H_3D_size):
  for i in range(K):  # Total rows
    for j in range(K):  # Total columns
      F_H_temp = complex(F_H_3D[k][i][j].strip())
      F_H_list.append(F_H_temp)
F_H_array = np.array(F_H_list)
F_H = F_H_array.reshape((F_H_3D_size, K, K)) # H_size X row X column_count
print(F_H.shape)
F_H_size = F_H.shape[0]
# print(F_H)

(250005, 5, 5)


In [None]:
import numba as nb

## Function to compute the square of the absolute value of an array of complex numbers
@nb.vectorize([nb.float64(nb.complex128),nb.float32(nb.complex64)])
def cmplx_abs_sqr(cmplx_var):
  return cmplx_var.real**2 + cmplx_var.imag**2

In [None]:
## Function to generate the matrix A (K x K)
def generate_A(F_H_size, K, SINR_P_min, F_H):
  Aij_list = []
  F_H_abs_sqr = cmplx_abs_sqr(F_H)

  for k in range(F_H_size):
    for i in range(K):  # Total rows
      Aj_list =[]
      for j in range(K): # Total columns
        if i==j:
          A = F_H_abs_sqr[k,i,j]
        else:
          A = np.multiply(-SINR_P_min[i], F_H_abs_sqr[k,i,j])
        Aj_list.append(A)
      Aij_list.append(Aj_list)
  Aij_array = np.array(Aij_list)
  Aij = Aij_array.reshape((F_H_size, K, K)) # H_size X row X column
  return Aij

In [None]:
## Create matrix A
A = generate_A(F_H_size, K, SINR_P_min, F_H)
print(A.shape)
# print(A)

(250005, 5, 5)


In [None]:
## Function to generate the vector b (K x 1)
def generate_b(F_H_size, K, SINR_P_min, sigma_sqr_noise, F_H):
  bi_list = []
  for k in range(F_H_size):
    for i in range(K):  # Total rows, i.e., total transmitters
      b = np.multiply(SINR_P_min[i], sigma_sqr_noise[i])
      bi_list.append(b)
  bi_array = np.array(bi_list)
  bi = bi_array.reshape((F_H_size, K, 1)) # H_size X row X column
  return bi

In [None]:
## Create vector b
b = generate_b(F_H_size, K, SINR_P_min, sigma_sqr_noise, F_H)
print(b.shape)
# print(b)

(250005, 5, 1)


In [None]:
## Create matrix A_inv, i.e., the pseudo inverse of matrix A
A_inv = np.linalg.pinv(A)
print(A_inv.shape)
# print(A_inv)

(250005, 5, 5)


In [None]:
## Create a vector p_hat = (A_inv x b)
p_hat = np.matmul(A_inv, b)
print(p_hat.shape)
# print(p_hat)

(250005, 5, 1)


In [None]:
# ## Function to compute the square of the absolute value of an array of complex numbers
# import numba as nb

# @nb.vectorize([nb.float64(nb.complex128),nb.float32(nb.complex64)])
# def cmplx_abs_sqr(cmplx_var):
#   return cmplx_var.real**2 + cmplx_var.imag**2

In [None]:
## Function to split datasets for training, validation, and testing.

def split(np_array):
  # data_size = np_array.shape[0]
  # train_data_size = int(data_size * 0.8)
  # valid_data_size = int(data_size * 0.1)
  # test_data_size = int(data_size * 0.1)

  train_data_size = int(200000)
  valid_data_size = int(25000)
  test_data_size = int(25000)

  train_e_indx = train_data_size
  valid_e_indx = train_e_indx + valid_data_size
  # test_e_indx = valid_e_indx + test_data_size - 2
  test_e_indx = valid_e_indx + test_data_size
  test_data_size_n = test_e_indx - valid_e_indx

  row_count = np_array.shape[1]
  column_count = np_array.shape[2]

  train_data = np.empty((train_data_size, row_count, column_count), dtype = complex, order = 'C')
  valid_data = np.empty((valid_data_size, row_count, column_count), dtype = complex, order = 'C')
  test_data = np.empty((test_data_size_n, row_count, column_count), dtype = complex, order = 'C')

  for i in range(train_e_indx):
    train_data[i] = np_array[i]

  xv = 0
  for j in range(train_e_indx, valid_e_indx):
    valid_data[xv] = np_array[j]
    xv = xv + 1

  xt = 0
  for k in range(valid_e_indx, test_e_indx):
    test_data[xt] = np_array[k]
    xt = xt + 1

  # print(train_data.shape, valid_data.shape, test_data.shape)

  ## Training input will be the absolute value
  train_input = np.absolute(train_data)
  valid_input = np.absolute(valid_data)
  test_input = np.absolute(test_data)

  print(train_input.shape, valid_input.shape, test_input.shape)

  return [train_input, valid_input, test_input, test_data]

In [None]:
## Split F_H Matrix
F_H_S = split(F_H)
train_input_F_H = F_H_S[0]
valid_input_F_H = F_H_S[1]
test_input_F_H = F_H_S[2]
test_data_F_H = F_H_S[3]

(200000, 5, 5) (25000, 5, 5) (25000, 5, 5)


In [None]:
## Split p_hat Matrix
p_hat_S = split(p_hat)
train_input_p_hat = p_hat_S[0]
valid_input_p_hat = p_hat_S[1]
test_input_p_hat = p_hat_S[2]
test_data_p_hat = p_hat_S[3]

(200000, 5, 1) (25000, 5, 1) (25000, 5, 1)


In [None]:
## Define the DNN model - The Sequential model
import tensorflow as tf
from tensorflow import keras
## from tensorflow.keras import layers # shows warning
from keras.api._v2.keras import layers

model = keras.Sequential(name = "sequential_model")

model.add(keras.Input(shape = (K,K), name = "hij_inputs"))
model.add(layers.Flatten(name = "flatten_layer_hij"))

model.add(layers.Dense(units = 2*K*K, activation = 'relu', input_shape = (K*K,), name = "dense_layer_1"))
model.add(layers.BatchNormalization())

model.add(layers.Dense(units = K*K, activation = 'relu', input_shape = (2*K*K,), name = "dense_layer_2"))
model.add(layers.BatchNormalization())

model.add(layers.Dense(units = K, activation = 'sigmoid', input_shape = (K*K,), name = "P_hat"))

model.summary()

Model: "sequential_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 flatten_layer_hij (Flatten)  (None, 25)               0         
                                                                 
 dense_layer_1 (Dense)       (None, 50)                1300      
                                                                 
 batch_normalization (BatchN  (None, 50)               200       
 ormalization)                                                   
                                                                 
 dense_layer_2 (Dense)       (None, 25)                1275      
                                                                 
 batch_normalization_1 (Batc  (None, 25)               100       
 hNormalization)                                                 
                                                                 
 P_hat (Dense)               (None, 5)            

In [None]:
## Plot the model as a graph
# keras.utils.plot_model(model, "Sequential_Model.png")

In [None]:
## Display the input and output shapes of each layer
# keras.utils.plot_model(model, "Sequential_Model_with_shape_info.png", show_shapes=True)

In [None]:
## Convert sigma_sqr_noise from numpy array to tensor
sigma_sqr_noise_t = tf.convert_to_tensor(sigma_sqr_noise, dtype = float)
tf.print(sigma_sqr_noise_t)

[1 1 1 1 1]


In [None]:
## Convert SINR_P_min from numpy array to tensor
SINR_P_min_t = tf.convert_to_tensor(SINR_P_min, dtype = float)
tf.print(SINR_P_min_t)

[0.5 0.5 0.5 0.5 0.5]


In [None]:
## The customized loss function that penalizes the constraint violation
def custom_loss(y_true, y_pred):
  p = tf.math.multiply(p_max, y_pred)
  hij = tf.reshape(y_true[:,0:K*K], (-1,K,K))
  hij_abs_sqr = tf.math.square(tf.math.abs(hij))

  lambda_l = 5.0
  R_P = 0.0
  pnlty_f_CV = 0.0

  for i in range(K):  # Total rows
    ph = 0.0
    for j in range(K):  # Total columns
      ph_j = tf.math.multiply(p[:,j], hij_abs_sqr[:,i,j])
      ph = tf.math.add(ph, ph_j)

    numr = tf.math.multiply(p[:,i], hij_abs_sqr[:,i,i])
    dnumr = tf.math.add(sigma_sqr_noise_t[i], tf.math.subtract(ph, numr))
    SINR_i = tf.math.divide(numr, dnumr)
    R_P = tf.math.add(R_P, (tf.math.log(1 + SINR_i)/tf.math.log(2.0)))
    pnlty_f_CV = tf.math.add(pnlty_f_CV,
                             tf.nn.relu((tf.math.log(1 + SINR_P_min_t[i])/tf.math.log(2.0))
                                      - (tf.math.log(1 + SINR_i)/tf.math.log(2.0))))

  loss = tf.math.add(-R_P, tf.math.multiply(lambda_l, pnlty_f_CV))
  loss = tf.reduce_mean(loss) # batch mean
  return loss

In [None]:
## Build and compile the DNN model
## Training and Testing
import matplotlib.pyplot as plt

optA = tf.keras.optimizers.Adam(learning_rate = 0.0001)
model.compile(optimizer = optA, loss = custom_loss)

history = model.fit(train_input_F_H, train_input_F_H, epochs = 50, validation_data = (valid_input_F_H, valid_input_F_H), batch_size = 1000)

plt.plot(history.epoch, history.history['loss'], color = "blue", label = "Training")
plt.plot(history.epoch, history.history['val_loss'], color="black", label = "Validation")
plt.xlabel("epochs")
plt.ylabel("loss")
plt.legend()
plt.show()

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [None]:
## Constraint violation probability and
## finding indexes of test_input_F_H matrix with the hij set that do not satisfy
## constraint on the minimum SINR_P_min rate but satisfy the maximum transmit
## power p_max

output_P_hat_temp = p_max * model.predict(test_input_F_H)
output_P_hat = output_P_hat_temp.reshape((output_P_hat_temp.shape[0], output_P_hat_temp.shape[1], 1)) # test_input_F_H_size X row X column
output_P_hat_size = output_P_hat.shape[0]
test_data_F_H_abs_sqr = cmplx_abs_sqr(test_data_F_H)

indx_n = []
count_v = 0

for k in range(output_P_hat_size):
  for i in range(K):  # Total rows
    ph = 0
    for j in range(K):  # Total columns
      ph_j = np.multiply(output_P_hat[k,j], test_data_F_H_abs_sqr[k,i,j])
      ph = ph + ph_j

    numr = np.multiply(output_P_hat[k,i], test_data_F_H_abs_sqr[k,i,i])
    dnumr = sigma_sqr_noise[i] + ph - numr
    SINR_out = np.divide(numr, dnumr)
    if np.round(SINR_out, decimals = 3) < SINR_P_min[i]:
      indx_n.append(k)
      count_v = count_v + 1
      # print(SINR_out)
      break

violation_prb = (count_v / output_P_hat_size) * 100
print("Constraints Violation Probability: {:.2f}%".format(violation_prb))
# print(len(indx_n))
# print(indx_n)

Constraints Violation Probability: 80.76%


In [None]:
## Function to calculate the average sum rate
# Here, p_model is the output of DNN, and it is a 2D array.
import math

def average_sum_rate(hij, p_model, sigma_sqr_noise, K):
  R = 0
  hij_size = hij.shape[0]
  hij_abs_sqr = cmplx_abs_sqr(hij)

  for k in range(hij_size):
    for i in range(K):  # Total rows
      phn = 0
      for j in range(K):  # Total columns
        phn_j = np.multiply(p_model[k,j], hij_abs_sqr[k,i,j])
        phn = phn + phn_j

      numr_s = np.multiply(p_model[k,i], hij_abs_sqr[k,i,i])
      dnumr_s = sigma_sqr_noise[i] + phn - numr_s
      R_temp = math.log2(1 + np.divide(numr_s, dnumr_s))
      R = R + R_temp

  return (R/hij_size)

In [None]:
# Calculating the curated power vector p_tilda
# p_tilda = test_input_p_hat when SINR_P_min is not met
# p_tilda = output_P_hat when SINR_P_min is met

p_tilda = np.empty((output_P_hat_size, K, 1), dtype = float, order = 'C')

i = 0
for j in range(output_P_hat_size):
  if (i < len(indx_n)) and (j == indx_n[i]):
    p_tilda[j] = (test_input_p_hat[j] * p_max) / np.amax(test_input_p_hat[j])
    i = i + 1
  else:
    p_tilda[j] = output_P_hat[j]

print(p_tilda.shape)
# print(p_tilda)

(25000, 5, 1)


In [None]:
## Checking p_tilda, i.e., the power for test_data_F_H for negative values
## and Hit Rate i.e. percentage for 0 <= p_tilda <= p_max
count_p_t = 0
count_n_t = 0

for n in range(output_P_hat_size):
  P_max = np.amax(p_tilda[n])
  if np.round(P_max, decimals = 3) <= 1:
    count_p_t = count_p_t + 1

  if np.any(p_tilda[n] < 0):
    count_n_t = count_n_t + 1
    print(n,'\n')
    print(p_tilda)

p_tilda_hit_rate = (count_p_t / output_P_hat_size) * 100
print("Hit Rate for Power p_tilda: {:.2f}%".format(p_tilda_hit_rate))
print("Negative power count: ", count_n_t)

Hit Rate for Power p_tilda: 100.00%
Negative power count:  0


In [None]:
## Constraint violation probability for p_tilda on the SINR_P_min
# indx_t = []
count_v_t = 0

for k in range(output_P_hat_size):
  for i in range(K):  # Total rows
    ph = 0
    for j in range(K):  # Total columns
      ph_j = np.multiply(p_tilda[k,j], test_data_F_H_abs_sqr[k,i,j])
      ph = ph + ph_j

    numr = np.multiply(p_tilda[k,i], test_data_F_H_abs_sqr[k,i,i])
    dnumr = sigma_sqr_noise[i] + ph - numr
    SINR_out_t = np.divide(numr, dnumr)
    # if k == 24463:
    #   print(SINR_out_t)

    if np.round(SINR_out_t, decimals = 2) < SINR_P_min[i]:
      # indx_t.append(k)
      count_v_t = count_v_t + 1
      break

violation_prb_t = (count_v_t / output_P_hat_size) * 100
print("SINR_P_min Constraints Violation Probability for p_tilda: {:.2f}%".format(violation_prb_t))

SINR_P_min Constraints Violation Probability for p_tilda: 0.00%


In [None]:
## DNN Sum Rate for test_data_F_H
sumrate_s_F_H = average_sum_rate(test_data_F_H, p_tilda, sigma_sqr_noise, K)
print("Total Average Sum Rate for all H matrices: {:.3f} Bit/Second/Hertz".format(sumrate_s_F_H))

Total Average Sum Rate for all H matrices: 3.273 Bit/Second/Hertz
