In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Block 1: Channel Generation
def generate_rician_channel(shape, K_factor=10.0, path_loss_dB=-30.0):
    """
    Generate a Rician fading channel with path loss.
    
    Parameters:
    shape : tuple, shape of the channel array
    K_factor : float, Rician K-factor (LOS to NLOS power ratio)
    path_loss_dB : float, path loss in dB
    
    Returns:
    ndarray, Rician fading channel scaled by path loss
    """
    path_loss = 10 ** (path_loss_dB / 10)
    los = np.exp(1j * np.random.uniform(0, 2 * np.pi, shape))
    nlos = (np.random.randn(*shape) + 1j * np.random.randn(*shape)) / np.sqrt(2)
    channel = np.sqrt(K_factor / (K_factor + 1)) * los + np.sqrt(1 / (K_factor + 1)) * nlos
    return channel * np.sqrt(path_loss)

def compute_cascaded_channel(H, h_1):
    """
    Compute the cascaded channel h_r = h_1.conj() * H.
    
    Parameters:
    H : ndarray, BS-to-RIS channel (1 x N)
    h_1 : ndarray, RIS-to-user channel (N,)
    
    Returns:
    ndarray, cascaded channel (N,)
    """
    return h_1.conj() * H.flatten()

# Block 2: Power and SNR Calculation
def compute_received_power(h_d, h_r, e, transmit_power):
    """
    Compute the received signal power.
    
    Parameters:
    h_d : complex, direct channel
    h_r : ndarray, cascaded channel (N,)
    e : ndarray, RIS coefficients (N,)
    transmit_power : float, transmit power (Watts)
    
    Returns:
    float, received signal power
    """
    combined_channel = h_d + np.dot(h_r, e)
    return transmit_power * np.abs(combined_channel) ** 2

def compute_snr(received_power, noise_power=1e-12):
    """
    Compute the SNR.
    
    Parameters:
    received_power : float, received signal power
    noise_power : float, noise power (Watts)
    
    Returns:
    float, SNR (linear scale)
    """
    return received_power / noise_power


# Block 3: Gradient Descent with Adam Optimizer
def compute_gd_gradient(h_d, h_r, e, transmit_power):
    """
    Compute the gradient of the negative received power with respect to phase shifts for GD.
    
    Parameters:
    h_d : complex, direct channel
    h_r : ndarray, cascaded channel (N,)
    e : ndarray, RIS coefficients (N,)
    transmit_power : float, transmit power (Watts)
    
    Returns:
    ndarray, gradient with respect to phases theta (N,)
    """
    N = len(h_r)
    combined = h_d + np.dot(h_r, e)
    grad = np.zeros(N)
    for m in range(N):
        term = h_r[m] * combined.conj()
        grad[m] = -2 * transmit_power * np.real(1j * e[m] * term)
    return grad

def gradient_descent_adam(h_d, H, h_1, N, transmit_power, learning_rate=0.01, max_iterations=1000, beta1=0.9, beta2=0.999, epsilon=1e-8):
    """
    Perform gradient descent with Adam optimizer to optimize RIS phase shifts.
    
    Parameters:
    h_d : complex, direct channel
    H : ndarray, BS-to-RIS channel (1 x N)
    h_1 : ndarray, RIS-to-user channel (N,)
    N : int, number of RIS elements
    transmit_power : float, transmit power (Watts)
    learning_rate : float, step size
    max_iterations : int, number of iterations
    beta1, beta2, epsilon : Adam parameters
    
    Returns:
    tuple, (optimized phase shifts, powers, SNRs, iterations)
    """
    h_r = compute_cascaded_channel(H, h_1)
    theta = np.random.uniform(0, 2 * np.pi, N)
    e = np.exp(1j * theta)
    powers = [compute_received_power(h_d, h_r, e, transmit_power)]
    snrs = [compute_snr(powers[0])]
    iterations = [0]
    
    # Adam variables
    m = np.zeros(N)  # First moment
    v = np.zeros(N)  # Second moment
    t = 0  # Time step
    
    for iteration in range(max_iterations):
        t += 1
        grad = compute_gd_gradient(h_d, h_r, e, transmit_power)
        
        # Adam update
        m = beta1 * m + (1 - beta1) * grad
        v = beta2 * v + (1 - beta2) * (grad ** 2)
        m_hat = m / (1 - beta1 ** t)
        v_hat = v / (1 - beta2 ** t)
        theta -= learning_rate * m_hat / (np.sqrt(v_hat) + epsilon)
        theta = np.mod(theta, 2 * np.pi)
        e = np.exp(1j * theta)
        
        power = compute_received_power(h_d, h_r, e, transmit_power)
        powers.append(power)
        snrs.append(compute_snr(power))
        iterations.append(iteration + 1)
    
    return theta, powers, snrs, iterations




In [None]:
import numpy as np
import json
from tqdm import tqdm

# Parameters for dataset generation
N = 16  # Number of RIS elements
K_factor = 10.0  # Rician K-factor
direct_path_loss_dB = -70.0  # Direct path loss
ris_path_loss_dB = -30.0  # RIS path loss
max_iterations = 1000  # Optimization iterations
num_realizations = 1000  # Number of channel realizations
P_t = 1
np.random.seed(42)  # For reproducibility

# File to store dataset
output_file = 'ris_dataset.txt'

# Generate dataset
dataset = []
for realization in tqdm(range(num_realizations),  desc="Generating dataset"):
    # Generate Rician fading channels
    h_direct = generate_rician_channel((), K_factor, direct_path_loss_dB)
    h_bs_ris = generate_rician_channel((1, N), K_factor, ris_path_loss_dB)
    h_ris_user = generate_rician_channel((N,), K_factor, ris_path_loss_dB)
    
    # Run Gradient Descent optimization with Adam
    optimized_phase_shifts, _, _, _ = gradient_descent_adam(
        h_direct, h_bs_ris, h_ris_user, N, P_t, learning_rate=0.01, max_iterations=max_iterations)
    
    # Prepare data entry with descriptive names
    entry = {
        'input': {
            'direct_channel_real': round(float(np.real(h_direct)), 4),
            'direct_channel_imag': round(float(np.imag(h_direct)), 4),
            'bs_ris_channel_real': [round(float(np.real(h)), 4) for h in h_bs_ris.flatten()],
            'bs_ris_channel_imag': [round(float(np.imag(h)), 4) for h in h_bs_ris.flatten()],
            'ris_user_channel_real': [round(float(np.real(h)), 4) for h in h_ris_user],
            'ris_user_channel_imag': [round(float(np.imag(h)), 4) for h in h_ris_user],
            'num_ris_elements': int(N)
        },
        'output': {
            'optimized_phase_shifts': [round(float(theta), 4) for theta in optimized_phase_shifts]
        }
    }
    
    # Append to dataset
    dataset.append(json.dumps(entry))

# Write dataset to file
with open(output_file, 'w') as f:
    for entry in dataset:
        f.write(entry + '\n')

print(f"Dataset generated and saved to {output_file} with {len(dataset)} entries.")

Generating dataset: 100%|██████████| 1000/1000 [00:49<00:00, 20.05it/s]

Dataset generated and saved to ris_dataset.txt with 1000 entries.



