# Import library

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from copy import deepcopy, copy
import functools
import os
import cProfile
import pstats
from multiprocessing import Pool

## Random seed

In [2]:
RANDOM_SEED = 8
np.random.seed(RANDOM_SEED)

# Function to transform the data

In [3]:
# Function to transfer from dBW to W (power)
def db2pow(db: float) -> float:
    return 10**(db/10)

# Function to transfer from W to dBW (power)
def pow2db(pow: float) -> float:
    return 10*np.log10(pow)

# Hermitian transpose of a matrix
def HermTranspose(x: np.ndarray) -> np.ndarray:
    return x.conj().T

def chanGen(zeta: float, d: float, dim1: int, dim2: int) -> np.ndarray:
    """Function to generate Rayleigh fading channel coefficients

    Args:
        zeta: ξ is the path loss exponent
        d: the distance between the transmitter and the receiver
        dim1: the number of rows in the channel matrix
        dim2: the number of columns in the channel matrix
    """
    pl_ref: float = -30                                    # pathloss (dBW) at reference distance
    pl: float = db2pow(pl_ref - 10*zeta*np.log10(d))       # pathloss model at distance d
    y: np.ndarray = np.sqrt(0.5*pl)*(np.random.randn(dim1,dim2)\
        + 1j*np.random.randn(dim1,dim2))            # Rayleigh distribution
    return y

## Read input data & Parameters

In [4]:
sigma = db2pow(-75)                                                                 # noise power
N = 10                                                                              # number of transmit antennas
gamma = 0.8 # Lower bound for user's link quality
pmax = 1000  # Maximum transmit power of Alice

# Channel generation
def read_generate_channel() -> tuple:
    filepath = './data/general/'
    Hai = np.load(filepath + 'Hai.npy')
    hib = np.load(filepath + 'hib.npy')
    hab = np.load(filepath + 'hab.npy')
    hie = np.load(filepath + 'hie.npy')
    hae = np.load(filepath + 'hae.npy')
    return Hai, hib, hie, hab, hae

def read_fixed_eaves(num_of_users, num_of_eavesdroppers) -> tuple:
    filepath = f'./data/fixed_eaves/data_{num_of_users}users_{num_of_eavesdroppers}eaves/'
    Hai = np.load(filepath + 'Hai.npy')
    hib = np.load(filepath + 'hib.npy')
    hab = np.load(filepath + 'hab.npy')
    hie = np.load(filepath + 'hie.npy')
    hae = np.load(filepath + 'hae.npy')
    return Hai, hib, hie, hab, hae

def read_fixed_users(num_of_users, num_of_eavesdroppers) -> tuple:
    filepath = f'./data/fixed_users/data_{num_of_users}users_{num_of_eavesdroppers}eaves/'
    Hai = np.load(filepath + 'Hai.npy')
    hib = np.load(filepath + 'hib.npy')
    hab = np.load(filepath + 'hab.npy')
    hie = np.load(filepath + 'hie.npy')
    hae = np.load(filepath + 'hae.npy')
    return Hai, hib, hie, hab, hae

#Hai: Channel between Alice and RIS: Nris x N  
#hib: Channel between RIS and users: List of length number_of_users, elements are 1 x Nris
#hab: Channel between Alice and users: List of length number_of_users, elements are 1 x N
#hie: Channel between RIS and eavesdroppers: List of length number_of_eavesdroppers, elements are 1 x Nris
#hae: Channel between Alice and eavesdroppers: List of length number_of_eavesdroppers, elements are 1 x N

Hai, hib, hie, hab, hae = read_generate_channel()

Nris = Hai.shape[0]                                                                       
number_of_users = len(hib)
number_of_eavesdroppers = len(hie)




# Beamforming vectors and RIS functions

In [5]:
def normalise_beamforming_vectors(w: np.ndarray) -> np.ndarray:
    """Function to normalise the beamforming vectors

    Args:
        w: the beamforming vectors
    """
    total_norm_squared = 0
    for i in range(number_of_users):
        total_norm_squared += (np.linalg.norm(w[i]) ** 2)
    if total_norm_squared <= pmax:
        return w
    for i in range(number_of_users):
        w[i] = w[i] / (total_norm_squared ** 0.5) * np.sqrt(pmax)
    return w

def generate_random_beamforming_vector():
  '''
  Generate one random beamforming vector
  '''
  return np.random.uniform(-1, 1, (N, 1)) + 1j * np.random.uniform(-1, 1, (N, 1))

def generate_random_beamforming_vectors():
    # Generate random complex numbers for each element of the beamforming vector
    beamforming_vectors = [generate_random_beamforming_vector() for _ in range (number_of_users)]
    
    # Normalize the vectors
    beamforming_vectors = normalise_beamforming_vectors(beamforming_vectors)
    return beamforming_vectors
    #w: list of beamforming vectors, length = number of users, elements are N x 1

def generate_random_theta():
    theta = np.random.uniform(-np.pi, np.pi, (1, Nris))
    theta = np.exp(1j * theta)
    return theta
    #theta: phase shift of RIS, size 1 x Nris

def generate_random_theta_angles(size: int):
  """
    Generate a random vector of angles from -pi to pi
  """
  return np.random.uniform(-np.pi, np.pi, size=(1, size))

def theta_angles_to_theta_vector(angles: np.ndarray[np.float64]) -> np.ndarray[np.complex128]:
  """
    Convert a vector of angles to a vector of complex numbers on the unit circle
  """
  return np.exp(1j * angles)

def theta_vector_to_theta_angles(theta: np.ndarray[np.complex128]) -> np.ndarray[np.float64]:
  """
    Convert a vector of complex numbers on the unit circle to a vector of angles
  """
  return np.angle(theta)



    

# Objective Function

In [6]:
def secrecy_rate_objective_function(theta, w) -> float:
    secrecy_rate: float = 0
    for k in range(number_of_users):
        R_bk = []
        # Legitimate user k
        Z_bk = hib[k] @ np.diag(theta.flatten()) @ Hai + hab[k]
        numGamma_bk = np.abs(Z_bk @ w[k])**2
        denGamma_bk = 1 + np.sum([np.abs(Z_bk @ w[i])**2 for i in range(number_of_users) if i != k])
        gamma_bk = numGamma_bk/denGamma_bk
        C_bk = np.log2(1 + gamma_bk)
        
        for m in range(number_of_eavesdroppers):
            # Eavesdropper i
            Z_em = hie[m] @ np.diag(theta.flatten()) @ Hai + hae[m]
            numGamma_em = np.abs(Z_em @ w[k])**2
            denGamma_em = 1 + np.sum([np.abs(Z_em @ w[j])**2 for j in range(number_of_users) if j != k])
            gamma_em = numGamma_em/denGamma_em
            C_em = np.log2(1 + gamma_em)
            R_bk.append(C_bk - C_em)
        
        secrecy_rate += max(min(R_bk),float(0))

    # Return the only element in the matrix as it is currently a 1x1 np array
    return float(secrecy_rate)

# Check if the current set up is valid (every users' C_bk >= gamma)
# Returns the index of the invalid user whose C_bk is highest among the invalid or -1 if all users are valid
def check_validity(theta, w) -> int:
    user = -1
    maxx = 0
    for k in range(number_of_users):
        # Legitimate user k
        Z_bk = hib[k] @ np.diag(theta.flatten()) @ Hai + hab[k]
        numGamma_bk = np.abs(Z_bk @ w[k])**2
        denGamma_bk = 1 + np.sum([np.abs(Z_bk @ w[i])**2 for i in range(number_of_users) if i != k])
        gamma_bk = numGamma_bk/denGamma_bk
        C_bk = np.log2(1 + gamma_bk)
        
        if (C_bk < gamma):
            if C_bk > maxx:
                maxx = C_bk
                user = k
    return user

# Print the C_bk of each user
def print_users_Cbk(theta, w) -> None:
    for k in range(number_of_users):
        # Legitimate user k
        Z_bk = hib[k] @ np.diag(theta.flatten()) @ Hai + hab[k]
        numGamma_bk = np.abs(Z_bk @ w[k])**2
        denGamma_bk = 1 + np.sum([np.abs(Z_bk @ w[i])**2 for i in range(number_of_users) if i != k])
        gamma_bk = numGamma_bk/denGamma_bk
        C_bk = np.log2(1 + gamma_bk)
        print(float(C_bk), end = " ")
    print()

# Calculate the C_bk of a user
def calculate_user_Cbk(theta, w, user) -> float:
    Z_bk = hib[user] @ np.diag(theta.flatten()) @ Hai + hab[user]
    numGamma_bk = np.abs(Z_bk @ w[user])**2
    denGamma_bk = 1 + np.sum([np.abs(Z_bk @ w[i])**2 for i in range(number_of_users) if i != user])
    gamma_bk = numGamma_bk/denGamma_bk
    C_bk = np.log2(1 + gamma_bk)
    return C_bk

# Using gradient descent to repair the beamforming vector of a user
# Try to make that user's C_bk >= gamma
def repair_beamforming_vectors(theta, w, user, learning_rate = 0.01, max_iter = 500):
    #print("Initial norm of user beamforming vector: ", np.linalg.norm(w[user]))
    #print("Before update beamforming of user ", user, ": ")
    #print_users_Cbk(theta, w)
    while (calculate_user_Cbk(theta, w, user) < gamma + 0.2 and max_iter > 0):
        Z_bk = hib[user] @ np.diag(theta.flatten()) @ Hai + hab[user]
        numGamma_bk = np.abs(Z_bk @ w[user])**2
        denGamma_bk = 1 + np.sum([np.abs(Z_bk @ w[i])**2 for i in range(number_of_users) if i != user])
        gamma_bk = numGamma_bk/denGamma_bk
        
        num_grad_C_bk_to_w_k = 2 * (HermTranspose(Z_bk) @ Z_bk @ w[user])
        den_grad_C_bk_to_w_k = (1 + gamma_bk) * np.log(2) * (1 + sum([abs(Z_bk @ w[j]) for j in range (number_of_users) if j != user]))
        grad_C_bk_to_w_k = num_grad_C_bk_to_w_k / den_grad_C_bk_to_w_k
        w[user] = w[user] + learning_rate * grad_C_bk_to_w_k
        #w[user] *= 2
        max_iter -= 1

        w = normalise_beamforming_vectors(w)
    #print("New norm of user beamforming vector: ", np.linalg.norm(w[user]))
    #print("After update beamforming of user ", user, ": ")
    #print_users_Cbk(theta, w)
    return w

# Using gradient descent to repair the whole (theta, w) set up
def repair(theta, w, iter = 100):
    invalid_user = check_validity(theta, w)
    while (invalid_user != -1 and iter > 0):
        w = repair_beamforming_vectors(theta, w, invalid_user)
        invalid_user = check_validity(theta, w)
        iter -= 1
    if (invalid_user != -1):
        theta, w = generate_random_theta(), generate_random_beamforming_vectors()
        return repair(theta, w)
    return theta, w

    


# Method

## Gradient Descent for Maximization (GD)

In [7]:
def compute_gradient_w(theta, w):
    grad_w = []
    Z_e_max = [] #Z_e_max[k] = Z_e_max for user k
    gamma_e_max = [] #gamma_e_max[k] = gamma_e_max for user k
    Z_b = [] #Z_b[k] = Z_bk
    gamma_b = [] #gamma_b[k] = gamma_bk
    counted = [] #counted[k] = true if gamma_k > gamma_e_max[k]
    
    #Precalculation 
    for k in range(number_of_users):
        gamma_e = []
        for m in range (number_of_eavesdroppers):
            Z_em = hie[m] @ np.diag(theta.flatten()) @ Hai + hae[m]
            numGamma_em = np.abs(Z_em @ w[k])**2
            denGamma_em = 1 + np.sum([np.abs(Z_em @ w[j])**2 for j in range(number_of_users) if j != k])
            gamma_em = numGamma_em/denGamma_em
            gamma_e.append(gamma_em)
        
        gamma_e_max.append(max(gamma_e))
        index_e_max = gamma_e.index(max(gamma_e))
        Z_e = hie[index_e_max] @ np.diag(theta.flatten()) @ Hai + hae[index_e_max]
        Z_e_max.append(Z_e)

        Z_bk = hib[k] @ np.diag(theta.flatten()) @ Hai + hab[k]
        numGamma_bk = np.abs(Z_bk @ w[k])**2
        denGamma_bk = 1 + np.sum([np.abs(Z_bk @ w[i])**2 for i in range(number_of_users) if i != k])
        gamma_bk = numGamma_bk/denGamma_bk

        Z_b.append(Z_bk)
        gamma_b.append(gamma_bk)

        counted.append(gamma_bk > gamma_e[index_e_max])


    #Calculating grad for i-th beamforming vector
    for i in range(number_of_users):
        grad = np.zeros((N, 1))
        for k in range (number_of_users):
            if (counted[k] == False):
                continue
            if (k == i):
                num_grad_C_bk_to_w_k = 2 * (HermTranspose(Z_b[k]) @ Z_b[k] @ w[k])
                den_grad_C_bk_to_w_k = (1 + gamma_b[k]) * np.log(2) * (1 + sum([abs(Z_b[k] @ w[j]) for j in range (number_of_users) if j != k]))
                grad_C_bk_to_w_k = num_grad_C_bk_to_w_k / den_grad_C_bk_to_w_k

                num_grad_C_e_max_to_w_k = 2 * (HermTranspose(Z_e_max[k]) @ Z_e_max[k] @ w[k])
                den_grad_C_e_max_to_w_k = (1 + gamma_e_max[k]) * np.log(2) * (1 + sum([abs(Z_e_max[k] @ w[j]) for j in range (number_of_users) if j != k]))
                grad_C_e_max_to_w_k = num_grad_C_e_max_to_w_k / den_grad_C_e_max_to_w_k
                
                grad = grad - (grad_C_bk_to_w_k - grad_C_e_max_to_w_k)
            else:
                num_grad_C_bk_to_w_i = -2 * abs(Z_b[k] @ w[k]) * (HermTranspose(Z_b[k]) @ Z_b[k] @ w[i])
                den_grad_C_bk_to_w_i = (1 + gamma_b[k]) * np.log(2) * (1 + sum([abs(Z_b[k] @ w[j]) for j in range (number_of_users) if j != k])) ** 2
                grad_C_bk_to_w_i = num_grad_C_bk_to_w_i / den_grad_C_bk_to_w_i

                num_grad_C_e_max_to_w_i = -2 * abs(Z_e_max[k] @ w[k]) * (HermTranspose(Z_e_max[k]) @ Z_e_max[k] @ w[i])
                den_grad_C_e_max_to_w_i = (1 + gamma_e_max[k]) * np.log(2) * (1 + sum([abs(Z_e_max[k] @ w[j]) for j in range (number_of_users) if j != k])) ** 2
                grad_C_e_max_to_w_i = num_grad_C_e_max_to_w_i / den_grad_C_e_max_to_w_i

                grad = grad - (grad_C_bk_to_w_i - grad_C_e_max_to_w_i)
            
        grad_w.append(grad)
    return grad_w

def compute_gradient_theta_central(theta, w, epsilon=1e-3):
    perturbation = epsilon #+ epsilon * 1j
    grad_theta = []
    for i in range(Nris):
        theta_plus = copy(theta)
        theta_plus[0, i] += perturbation
        theta_minus = copy(theta)
        theta_minus[0, i] -= perturbation
        grad_theta_i = (secrecy_rate_objective_function(theta_plus, w) - secrecy_rate_objective_function(theta_minus, w)) / (2*epsilon)
        grad_theta.append(grad_theta_i)
            
    return np.array(grad_theta)

def compute_gradient_theta_forward(theta, w, original_secrecy_rate=None, epsilon=1e-3):
    """
    Faster implementation of the gradient calculation

    Improvements:
    - Use forward difference instead of central difference
    """
    perturbation = epsilon + epsilon * 1j
    grad_theta = []
    for i in range(Nris):
        theta_plus = copy(theta)
        theta_plus[0, i] += perturbation
        grad_theta_i = (secrecy_rate_objective_function(theta_plus, w) - original_secrecy_rate) / epsilon
        grad_theta.append(grad_theta_i)

    return np.array(grad_theta)

def compute_gradient_theta(theta, w):
    Z_e_max = [] #Z_e_max[k] = Z_e_max for user k
    gamma_e_max = [] #gamma_e_max[k] = gamma_e_max for user k
    Z_b = [] #Z_b[k] = Z_bk
    gamma_b = [] #gamma_b[k] = gamma_bk
    counted = [] #counted[k] = true if gamma_k > gamma_e_max[k]
    index_e_max_list = []
    #Precalculation
    for k in range(number_of_users):
        gamma_e = []
        for m in range (number_of_eavesdroppers):
            Z_em = hie[m] @ np.diag(theta.flatten()) @ Hai + hae[m]
            numGamma_em = np.abs(Z_em @ w[k])**2
            denGamma_em = 1 + np.sum([np.abs(Z_em @ w[j])**2 for j in range(number_of_users) if j != k])
            gamma_em = numGamma_em/denGamma_em
            gamma_e.append(gamma_em)
        
        gamma_e_max.append(max(gamma_e))
        index_e_max = gamma_e.index(max(gamma_e))
        index_e_max_list.append(index_e_max)
        Z_e = hie[index_e_max] @ np.diag(theta.flatten()) @ Hai + hae[index_e_max]
        Z_e_max.append(Z_e)

        Z_bk = hib[k] @ np.diag(theta.flatten()) @ Hai + hab[k]
        numGamma_bk = np.abs(Z_bk @ w[k])**2
        denGamma_bk = 1 + np.sum([np.abs(Z_bk @ w[i])**2 for i in range(number_of_users) if i != k])
        gamma_bk = numGamma_bk/denGamma_bk

        Z_b.append(Z_bk)
        gamma_b.append(gamma_bk)

        counted.append(gamma_bk > gamma_e[index_e_max])


    grad_theta = 0


    for k in range(number_of_users):
        if (counted[k] == False):
            continue

        grad_Z_bk_wk_to_theta = (1/sigma) * np.diag(hib[k].flatten()) @ Hai @ w[k]
        grad_gamma_bk_to_theta_first_term = (2 * (Z_b[k] @ w[k]) * grad_Z_bk_wk_to_theta) * (1 + sum([np.linalg.norm(Z_b[k] @ w[j])**2 for j in range (number_of_users) if j != k]))
        grad_gamma_bk_to_theta_second_term = sum([2 * (Z_b[k] @ w[j]) * grad_Z_bk_wk_to_theta for j in range(number_of_users) if j != k]) * (np.linalg.norm(Z_b[k] @ w[k]) ** 2)
        grad_gamma_bk_to_theta_third_term = (1 + sum([np.linalg.norm(Z_b[k] @ w[j]) for j in range (number_of_users) if j != k])**2) ** 2
        grad_gamma_bk_to_theta = (grad_gamma_bk_to_theta_first_term - grad_gamma_bk_to_theta_second_term) / grad_gamma_bk_to_theta_third_term

        grad_C_bk_to_theta = 1 / ((1 + gamma_b[k]) * np.log(2)) * grad_gamma_bk_to_theta

        grad_Z_emax_wk_to_theta = (1/sigma) * np.diag(hie[index_e_max_list[k]].flatten()) @ Hai @ w[k]

        grad_gamma_emax_to_theta_first_term = (2 * (Z_e_max[k] @ w[k]) * grad_Z_emax_wk_to_theta) * (1 + sum([np.linalg.norm(Z_e_max[k] @ w[j])**2 for j in range (number_of_users) if j != k]))
        grad_gamma_emax_to_theta_second_term = sum([2 * (Z_e_max[k] @ w[j]) * grad_Z_emax_wk_to_theta for j in range(number_of_users) if j != k]) * (np.linalg.norm(Z_e_max[k] @ w[k]) ** 2)
        grad_gamma_emax_to_theta_third_term = (1 + sum([np.linalg.norm(Z_e_max[k] @ w[j]) for j in range (number_of_users) if j != k])**2) ** 2
        grad_gamma_emax_to_theta = (grad_gamma_emax_to_theta_first_term - grad_gamma_emax_to_theta_second_term) / grad_gamma_emax_to_theta_third_term

        grad_C_emax_to_theta = 1 / ((1 + gamma_e_max[k]) * np.log(2)) * grad_gamma_emax_to_theta

        grad_theta -= (grad_C_bk_to_theta - grad_C_emax_to_theta)
    
    return np.array(grad_theta).reshape((1, Nris))
    


def gradient_descent_update(w, theta, learning_rate, original_secrecy_rate=None):
    grad_w = compute_gradient_w(theta, w)
    grad_theta = compute_gradient_theta(theta, w)
    w_new = [w[i] - learning_rate * grad_w[i] for i in range (number_of_users)]
    w_new = normalise_beamforming_vectors(w_new)
        
    theta_new = theta - learning_rate * grad_theta
    theta_new = np.exp(1j * np.angle(theta_new))

    return w_new, theta_new

In [8]:
# Reseed first
np.random.seed(RANDOM_SEED)

# Gradient Descent Algorithm
num_cycles = 500
learning_rate = 0.01
theta_GD = generate_random_theta()
w_GD = generate_random_beamforming_vectors()
theta_GD, w_GD = repair(theta_GD, w_GD)
print("Initial Secrecy Rate GD:", secrecy_rate_objective_function(theta_GD, w_GD))
current_secrecy_rate = secrecy_rate_objective_function(theta_GD, w_GD)

GD_results = []
GD_results.append(current_secrecy_rate)

for i in range(num_cycles):
    print("Iteration", i)
    print("Secrecy Rate:", current_secrecy_rate)
    w_new, theta_new = gradient_descent_update(w_GD, theta_GD, learning_rate, original_secrecy_rate=current_secrecy_rate)
    #if check_validity(theta_new, w_new) == False:
    #    print("Stop by invalidity")
    #    break
    new_secrecy_rate = secrecy_rate_objective_function(theta_new, w_new)
    if (new_secrecy_rate - current_secrecy_rate) < 1e-9:
        print("Converged")
        break
    w_GD = w_new
    theta_GD = theta_new
    GD_results.append(new_secrecy_rate)
    current_secrecy_rate = new_secrecy_rate

theta_GD, w_GD = repair(theta_GD, w_GD)
#print(GD_results)
print("Final Secrecy Rate GD:", secrecy_rate_objective_function(theta_GD, w_GD))
print_users_Cbk(theta_GD, w_GD)

  return float(secrecy_rate)


Initial Secrecy Rate GD: 8.654589608479382
Iteration 0
Secrecy Rate: 8.654589608479382
Iteration 1
Secrecy Rate: 8.764973004493099
Iteration 2
Secrecy Rate: 8.885368614289826
Iteration 3
Secrecy Rate: 9.007925422391958
Iteration 4
Secrecy Rate: 9.127891031569039
Iteration 5
Secrecy Rate: 9.245022252476616
Iteration 6
Secrecy Rate: 9.358955430307061
Iteration 7
Secrecy Rate: 9.469468513448808
Iteration 8
Secrecy Rate: 9.576434969509254
Iteration 9
Secrecy Rate: 9.679875541261058
Iteration 10
Secrecy Rate: 9.779900426449617
Iteration 11
Secrecy Rate: 9.87662336814829
Iteration 12
Secrecy Rate: 9.97013585420969
Iteration 13
Secrecy Rate: 10.060517410364334
Iteration 14
Secrecy Rate: 10.14784753904322
Iteration 15
Secrecy Rate: 10.232210757422855
Iteration 16
Secrecy Rate: 10.313696826481726
Iteration 17
Secrecy Rate: 10.392398976691673
Iteration 18
Secrecy Rate: 10.468411684850388
Iteration 19
Secrecy Rate: 10.541828549016856
Iteration 20
Secrecy Rate: 10.61274024581694
Iteration 21
Secre

  print(float(C_bk), end = " ")


## Particle Swarm Optimization (PSO)

In [13]:
class PSOParticle:
  def __init__(self) -> None:
    self.theta = generate_random_theta_angles(Nris)
    self.w = generate_random_beamforming_vectors()
    self.theta, self.w = repair(self.theta, self.w)
    self.best_theta = deepcopy(self.theta)
    self.best_w = deepcopy(self.w)
    self.best_secrecy_rate = secrecy_rate_objective_function(theta_angles_to_theta_vector(self.best_theta), self.best_w)
    self.current_secrecy_rate = secrecy_rate_objective_function(theta_angles_to_theta_vector(self.theta), self.w)
    self.velocity_theta = np.zeros((1, Nris))
    self.velocity_w = [np.zeros((N, 1)) for i in range(number_of_users)]

  
  def update_velocity(self, inertia, c1, c2, global_best_theta, global_best_w):
    r1 = np.random.rand()
    r2 = np.random.rand()
    self.velocity_theta = inertia * self.velocity_theta + c1 * r1 * (self.best_theta - self.theta) + c2 * r2 * (global_best_theta - self.theta)
    self.velocity_w = [inertia * self.velocity_w[i] + c1 * r1 * (self.best_w[i] - self.w[i]) + c2 * r2 * (global_best_w[i] - self.w[i]) for i in range(number_of_users)]

  def update_position(self):
    self.theta = self.theta + self.velocity_theta
    self.w = [self.w[i] + self.velocity_w[i] for i in range(number_of_users)]
    self.w = normalise_beamforming_vectors(self.w)
    self.theta, self.w = repair(self.theta, self.w)

  def update_velocity_theta(self, inertia, c1, c2, global_best_theta): #Used for PSO_GD
    self.velocity_theta = inertia * self.velocity_theta + c1 * np.random.rand() * (self.best_theta - self.theta) + c2 * np.random.rand() * (global_best_theta - self.theta)
  
  def update_position_theta(self): #Used for PSO_GD
    self.theta = self.theta + self.velocity_theta

  def update_best(self):
    self.theta, self.w = repair(self.theta, self.w)
    self.current_secrecy_rate = secrecy_rate_objective_function(theta_angles_to_theta_vector(self.theta), self.w)
    if self.current_secrecy_rate > self.best_secrecy_rate:
      self.best_secrecy_rate = self.current_secrecy_rate
      self.best_theta = deepcopy(self.theta)
      self.best_w = deepcopy(self.w)

In [14]:
def dynamic_inertia(i: int, max_iter: int, inertia_max: float, inertia_min: float) -> float:
  E_t = float((max_iter - i - 1)/max_iter)
  inertia = inertia_min + (inertia_max - inertia_min) * (2 /(1 + (np.e ** (-5 * E_t))) - 1)
  return inertia

In [15]:
def PSO_optimize_w_theta(max_iter: int, num_particles: int, w_min: float, w_max: float, c1: float, c2: float):
  particles = [PSOParticle() for _ in range(num_particles)]
  global_best_secrecy_rate = -np.inf
  global_best_theta = np.zeros((1, Nris))
  global_best_w = [np.zeros((N, 1)) for i in range(number_of_users)]

  results_secrecy_rate = []

  for particle in particles:
    if particle.best_secrecy_rate > global_best_secrecy_rate:
      global_best_secrecy_rate = particle.best_secrecy_rate
      global_best_theta = deepcopy(particle.best_theta)
      global_best_w = deepcopy(particle.best_w)

  results_secrecy_rate.append(global_best_secrecy_rate)

  for iteration in range(max_iter):
    print("iteration =", iteration, global_best_secrecy_rate)
    inertia = dynamic_inertia(iteration, max_iter, w_max, w_min)

    for particle in particles:
      particle.update_velocity(inertia, c1, c2, global_best_theta, global_best_w)
      particle.update_position()
      particle.update_best()

      if particle.best_secrecy_rate > global_best_secrecy_rate:
        global_best_secrecy_rate = particle.best_secrecy_rate
        global_best_theta = deepcopy(particle.best_theta)
        global_best_w = deepcopy(particle.best_w)

    results_secrecy_rate.append(global_best_secrecy_rate)
   

  return results_secrecy_rate

In [12]:
# Reseed
np.random.seed(RANDOM_SEED)

# PSO Algorithm
max_iter = 500
num_particles = 100
w_min = 0.5
w_max = 0.9
c1 = 1.5
c2 = 1.5

PSO_results = PSO_optimize_w_theta(max_iter, num_particles, w_min, w_max, c1, c2)
print("Initial Secrecy Rate PSO:", PSO_results[0])
print(PSO_results)
print("Final Secrecy Rate PSO:", PSO_results[-1])

  return float(secrecy_rate)


KeyboardInterrupt: 

## Genetic Algorithm (GA)

In [38]:
class GAIndividual:
  def __init__(self, theta: np.ndarray[np.float64] = None, w: np.ndarray[np.complex128] = None) -> None:
    if theta is None or w is None:
      self.theta = generate_random_theta_angles(Nris)
      self.w = generate_random_beamforming_vectors()
    else:
      self.theta = theta
      self.w = w
    self.theta, self.w = repair(self.theta, self.w)
    self.update_fitness()
  
  def update_fitness(self):
    self.fitness = secrecy_rate_objective_function(theta_angles_to_theta_vector(self.theta), self.w)


class GAPopulation:
  def __init__(self, population_size: int, crossover_rate: float = 0.85, mutation_rate: float = 0.3) -> None:
    self.population_size = population_size
    self.individuals = [GAIndividual() for _ in range(population_size)]
    self.crossover_rate = crossover_rate
    self.mutation_rate = mutation_rate

  def sort_population(self):
    self.individuals.sort(key=lambda x: x.fitness, reverse=True)

  def filter_population(self):
    self.sort_population()
    self.individuals = self.individuals[:self.population_size]

  def add_individual(self, individual: GAIndividual):
    self.individuals.append(individual)

  def select_parents(self) -> tuple[GAIndividual, GAIndividual]:
    parents = np.random.choice(self.individuals, 2, replace=False)
    return parents[0], parents[1]
  
  def crossover(self, parent1: GAIndividual, parent2: GAIndividual) -> GAIndividual:
    theta1, w1 = parent1.theta, parent1.w
    theta2, w2 = parent2.theta, parent2.w
    theta_child = (theta1 + theta2) / 2
    w_child = [(w1[i] + w2[i]) / 2 for i in range(number_of_users)]
    w_child = normalise_beamforming_vectors(w_child)
    theta_child, w_child = repair(theta_child, w_child)
    return GAIndividual(theta_child, w_child)
  
  def mutate(self, individual: GAIndividual) -> GAIndividual:
    if np.random.rand() < self.mutation_rate:
      mutation_index = np.random.randint(0, Nris)
      individual.theta[0, mutation_index] = np.random.uniform(-np.pi, np.pi)
      mutation_index = np.random.randint(len(individual.w))
      individual.w[mutation_index] = generate_random_beamforming_vector()
      individual.w = normalise_beamforming_vectors(individual.w)
      individual.theta, individual.w = repair(individual.theta, individual.w)
      individual.update_fitness()

    return individual
  
def GA_optimize_w_theta(population_size: int, max_iter: int, crossover_rate: float = 0.85, mutation_rate: float = 0.85):
  population = GAPopulation(population_size, crossover_rate, mutation_rate)
  population.sort_population()
  results_secrecy_rate = []
  results_secrecy_rate.append(population.individuals[0].fitness)

  for iteration in range(max_iter):
    print("iteration =", iteration, population.individuals[0].fitness)
    # Crossover and mutate
    for _ in range(population_size):
      if np.random.rand() < population.crossover_rate:
        parent1, parent2 = population.select_parents()
        child = population.crossover(parent1, parent2)
        population.mutate(child)
        population.add_individual(child)
    # Filter
    population.filter_population()
    
    results_secrecy_rate.append(population.individuals[0].fitness)
    print_users_Cbk(theta_angles_to_theta_vector(population.individuals[0].theta), population.individuals[0].w)
  return results_secrecy_rate

In [39]:
population_size = 100
num_generations = 500
crossover_rate = 0.9
mutation_rate = 0.3

# Reseed
np.random.seed(RANDOM_SEED)

# GA Algorithm
GA_results = GA_optimize_w_theta(population_size, num_generations, crossover_rate, mutation_rate)
print("Initial Secrecy Rate GA:", GA_results[0])
print(GA_results)
print("Final Secrecy Rate GA:", GA_results[-1])

  return float(secrecy_rate)


iteration = 0 8.684540212989983


  print(float(C_bk), end = " ")


0.915287996568156 1.0518887873062597 1.3636844478288732 0.9813535005809269 0.8815618598250937 1.0208359025651432 0.9257406922168826 1.057009266713857 1.351372971707073 0.9588922911378149 
iteration = 1 10.06294105308756
0.9440249564296034 0.8019567497357399 1.8304473400161527 0.9053456644482549 1.4991531171831545 0.990837237665529 1.0002796264122658 1.466264695676498 0.9382598674652616 1.269742606552447 
iteration = 2 11.369480157699714
0.9440249564296034 0.8019567497357399 1.8304473400161527 0.9053456644482549 1.4991531171831545 0.990837237665529 1.0002796264122658 1.466264695676498 0.9382598674652616 1.269742606552447 
iteration = 3 11.369480157699714
0.9440249564296034 0.8019567497357399 1.8304473400161527 0.9053456644482549 1.4991531171831545 0.990837237665529 1.0002796264122658 1.466264695676498 0.9382598674652616 1.269742606552447 
iteration = 4 11.369480157699714
0.9440249564296034 0.8019567497357399 1.8304473400161527 0.9053456644482549 1.4991531171831545 0.990837237665529 1.00

## Combination of PSO and GD

In [16]:
def PSO_GD(max_pso_iter, max_gd_iter, number_of_particles, learning_rate, w_max, w_min, c1, c2):
    particles = [PSOParticle() for _ in range(number_of_particles)]
    global_best_secrecy_rate = -np.inf
    global_best_theta = np.zeros((1, Nris))
    global_best_w = [np.zeros((N, 1)) for i in range(number_of_users)]

    results_secrecy_rate = []

    for particle in particles:
        if particle.best_secrecy_rate > global_best_secrecy_rate:
            global_best_secrecy_rate = particle.best_secrecy_rate
            global_best_theta = deepcopy(particle.best_theta)
            global_best_w = deepcopy(particle.best_w)

    results_secrecy_rate.append(global_best_secrecy_rate)
    
    for iteration in range(max_pso_iter):
        print("iteration =", iteration, "Global Best Secrecy Rate:", global_best_secrecy_rate)
        inertia = dynamic_inertia(iteration, max_pso_iter, w_max, w_min)

        """
            Update particles theta by velocity
        """
        for particle in particles:
            particle.update_velocity(inertia, c1, c2, global_best_theta, global_best_w)
            particle.update_position()
            particle.update_best()
            
            if particle.best_secrecy_rate > global_best_secrecy_rate:
                global_best_secrecy_rate = particle.best_secrecy_rate
                global_best_theta = deepcopy(particle.best_theta)
                global_best_w = deepcopy(particle.best_w)
        
        # GD
        for particle in particles:
            for _ in range (max_gd_iter):
                new_w, new_theta = gradient_descent_update(particle.w, theta_angles_to_theta_vector(particle.theta), learning_rate)
                new_secrecy_rate = secrecy_rate_objective_function(new_theta, new_w)
                if (new_secrecy_rate - particle.current_secrecy_rate) < 1e-9:
                    break
                
                particle.w = new_w
                particle.theta = theta_vector_to_theta_angles(new_theta)
                particle.current_secrecy_rate = new_secrecy_rate

            particle.update_best()
            
            if particle.best_secrecy_rate > global_best_secrecy_rate: 
                # No need to check validity as we already checked it in the inner loop
                global_best_secrecy_rate = particle.best_secrecy_rate
                global_best_theta = deepcopy(particle.best_theta)
                global_best_w = deepcopy(particle.best_w)
       
        results_secrecy_rate.append(global_best_secrecy_rate)
    return results_secrecy_rate

In [17]:
# Reseed
np.random.seed(RANDOM_SEED)

max_pso_iter = 500
max_gd_iter = 50
number_of_particles = 50
w_max = 0.9
w_min = 0.5
c1 = 1.5
c2 = 1.5

PSO_GD_results = PSO_GD(max_pso_iter, max_gd_iter, number_of_particles, learning_rate, w_max, w_min, c1, c2)
print("Initial Secrecy Rate PSO-GD:", PSO_GD_results[0])
print("Final Secrecy Rate PSO-GD:", PSO_GD_results[-1])

  return float(secrecy_rate)


iteration = 0 Global Best Secrecy Rate: 8.674423952533834
iteration = 1 Global Best Secrecy Rate: 12.07098280320081
iteration = 2 Global Best Secrecy Rate: 12.436881408414061
iteration = 3 Global Best Secrecy Rate: 12.592337028929794
iteration = 4 Global Best Secrecy Rate: 12.63652408107219
iteration = 5 Global Best Secrecy Rate: 13.397644828621033
iteration = 6 Global Best Secrecy Rate: 14.674890728603062
iteration = 7 Global Best Secrecy Rate: 15.474018714878824
iteration = 8 Global Best Secrecy Rate: 15.916089211719427
iteration = 9 Global Best Secrecy Rate: 17.15177319877177
iteration = 10 Global Best Secrecy Rate: 17.411168529561742
iteration = 11 Global Best Secrecy Rate: 17.659245221315743
iteration = 12 Global Best Secrecy Rate: 18.088974119369706
iteration = 13 Global Best Secrecy Rate: 18.24600983796281
iteration = 14 Global Best Secrecy Rate: 18.24600983796281
iteration = 15 Global Best Secrecy Rate: 18.24600983796281
iteration = 16 Global Best Secrecy Rate: 18.4299161241468

In [42]:
# Reseed
np.random.seed(RANDOM_SEED)

# GA-GD
population_size = 50
num_generations = 500
num_iter_gd = 50
crossover_rate = 0.9
mutation_rate = 0.3
learning_rate = 0.01


GA_GD_results = GA_GD_optimize_w_theta(population_size, num_generations, num_iter_gd, crossover_rate, mutation_rate)
print("Initial Secrecy Rate GA-GD:", GA_GD_results[0])
print(GA_GD_results)
print("Final Secrecy Rate GA-GD:", GA_GD_results[-1])


iteration = 0
Best secrecy rate: 8.77760303784856
Best secrecy rate: 12.05935334878857
iteration = 1
Best secrecy rate: 12.05935334878857
Best secrecy rate: 12.900200237234587
iteration = 2
Best secrecy rate: 12.900200237234587
Best secrecy rate: 13.035712120398289
iteration = 3
Best secrecy rate: 13.035712120398289
Best secrecy rate: 13.200768510368054
iteration = 4
Best secrecy rate: 13.200768510368054
Best secrecy rate: 13.170726361546052
iteration = 5
Best secrecy rate: 13.170726361546052
Best secrecy rate: 13.668487086288339
iteration = 6
Best secrecy rate: 13.668487086288339
Best secrecy rate: 13.668487086288339
iteration = 7
Best secrecy rate: 13.668487086288339
Best secrecy rate: 13.668487086288339
iteration = 8
Best secrecy rate: 13.668487086288339
Best secrecy rate: 13.884777470947189
iteration = 9
Best secrecy rate: 13.884777470947189
Best secrecy rate: 14.015671446895436
iteration = 10
Best secrecy rate: 14.015671446895436
Best secrecy rate: 13.955302788261184
iteration = 1

# Plot diagram

In [2]:
# iterations = range(0, num_cycles+10, 10)


iterations = range(0, 500)
# Extend the results to the same length
#GD_results_draw = GD_results[:len(iterations)]
PSO_results_draw = PSO_results[:len(iterations)]
GA_results_draw = GA_results[:len(iterations)]
PSO_GD_results_draw = PSO_GD_results[:len(iterations)]
#GA_GD_results_draw = GA_GD_results[:200]


#plt.plot(iterations, GD_results_draw, label='Gradient Descent')
plt.plot(iterations, PSO_results_draw, label='PSO')
plt.plot(iterations, GA_results_draw, label='Genetic Algorithm')
plt.plot(iterations, PSO_GD_results_draw, label='PSO-GD')
#plt.plot(iterations, GA_GD_results_draw, label='GA-GD')

plt.xlabel('Iteration number')
plt.ylabel('Secrecy rate (bps/Hz)')
plt.title('gamma =' + str(gamma))
plt.grid(True)
plt.legend()
plt.show()


NameError: name 'PSO_results' is not defined

In [18]:
# Best results of each methods
print("Best Secrecy Rate GD:", max(GD_results))
print("Best Secrecy Rate PSO:", max(PSO_results))
print("Best Secrecy Rate GA:", max(GA_results))
print("Best Secrecy Rate PSO-GD:", max(PSO_GD_results))
#print("Best Secrecy Rate GA-GD:", max(GA_GD_results))

Best Secrecy Rate GD: 12.60605974012883


NameError: name 'PSO_results' is not defined

# Save results

In [42]:
if os.path.exists('results') == False:
    os.mkdir('results')
os.chdir('results')
if os.path.exists('general') == False:
    os.mkdir('general')
os.chdir('general')
times = RANDOM_SEED
if os.path.exists(f'{times}_run') == False:
    os.mkdir(f'{times}_run')
    os.chdir(f'{times}_run')
    np.save('GD_results', GD_results)
    np.save('PSO_results', PSO_results)
    np.save('GA_results', GA_results)
    np.save('PSO_GD_results', PSO_GD_results)
    #np.save('GA_GD_results', GA_GD_results)
    os.chdir('../')
os.chdir('../../')



