In [106]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.nn.init as init
import scipy.linalg as sci
import scipy.io as sio
import numpy as np
import matplotlib.pyplot as plt

In [107]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Parameter 초기화

In [108]:
## System parameters
M = 64 #Number of BS antennas
P = 1 #Power
K =  2 #Number of users
L = 8 #Number of pilots
Lp = 2 #Number of paths
B = 30 #Number of feedback bits per user

## Limited scattering channel parameters
LSF_UE = np.array([0.0,0.0],dtype=np.float32) #Mean of path gains for K users
Mainlobe_UE= np.array([0,0],dtype=np.float32) #Center of the AoD range for K users
HalfBW_UE = np.array([30.0,30.0],dtype=np.float32) #Half of the AoD range for K users

# SNR
snr_dl = 10 #SNR in dB
noise_std_dl = np.float32(np.sqrt(1/2)*np.sqrt(P/10**(snr_dl/10))) #STD of the Gaussian noise (per real dim.)

In [109]:
## Learning parameters
initial_run = 1 #1: starts training from scratch; 0: resumes training 
n_epochs = 5000 #Number of training epochs, for observing the current performance set it to 0
learning_rate = 0.0001 #Learning rate

batch_size = 1024 #Mini-batch size
test_size = 10000 #Size of the validation/test set
batch_per_epoch = 20 #Numbers of mini-batches per epoch

anneal_param = 1.0 #Initial annealing parmeter
annealing_rate = 1.001 #Annealing rate

### Pilot Sequence 초기화 -- DFT Matrix 이용

In [110]:
DFT_Matrix = sci.dft(M) 
X_init = DFT_Matrix[0::int(np.ceil(M/L)),:] 
Xp_init = np.sqrt(P/M)*X_init
Xp_r_init = torch.Tensor(np.float32(np.real(Xp_init))).to(device)
Xp_i_init = torch.Tensor(np.float32(np.imag(Xp_init))).to(device)
## Pilot sequence cuda 지정 완료

### Matrix 연산 함수

In [111]:
def mult_mod(M, N, left_right):
    tensor_shape = M.shape
    dims = N.shape 

    if left_right == 'r':
        # M: (batch_size, n, m) N: (m, p)
        n = tensor_shape[1]
        m = dims[0]
        p = dims[1]

        # PyTorch의 행렬 곱
        y = torch.reshape(torch.matmul(M.view(-1, m), N), (-1, n, p))

    elif left_right == 'l':
        # M: (batch_size, n, m)
        # N: (p, n)
        m = tensor_shape[2]
        p = dims[0]
        n = dims[1]

        # PyTorch에서는 `permute()`로 전치 가능
        MT = torch.Tensor(M).permute(0, 2, 1)  # (batch_size, m, n)
        NT = N.T  # (n, p)

        MTNT = torch.reshape(torch.matmul(MT.view(-1, n), NT), (-1, m, p))
        y = MTNT.permute(0, 2, 1)  # (batch_size, n, p)로 변환

    return y.to(device)

def mult_mod_complex(Mr, Mi, Nr, Ni, left_right):
    yr = mult_mod(Mr, Nr, left_right) - mult_mod(Mi, Ni, left_right)
    yi = mult_mod(Mr, Ni, left_right) + mult_mod(Mi, Nr, left_right)
    return yr.to(device), yi.to(device)

### Batch data 생성 함수

In [112]:
## 리스트 인덱싱은 [층, 행, 열], [행, 열]임!
def generate_batch_data(batch_size,M,K,
                        Lp,#number of paths
                        LSF_UE #Mean of path gains for K users
                        ,Mainlobe_UE #Center of the AoD range for K users
                        ,HalfBW_UE #Half of the AoD range for K users
                        ):
    alphaR_input = np.zeros((batch_size,Lp,K))
    alphaI_input = np.zeros((batch_size,Lp,K))
    theta_input = np.zeros((batch_size,Lp,K))
    for kk in range(K): # for the number of users
        alphaR_input[:,:,kk] = np.random.normal(loc=LSF_UE[kk], scale=1.0/np.sqrt(2), size=[batch_size,Lp])
        alphaI_input[:,:,kk] = np.random.normal(loc=LSF_UE[kk], scale=1.0/np.sqrt(2), size=[batch_size,Lp])
        theta_input[:,:,kk] = np.random.uniform(low=Mainlobe_UE[kk]-HalfBW_UE[kk], high=Mainlobe_UE[kk]+HalfBW_UE[kk], size=[batch_size,Lp])
 
    #### Actual Channel
    from0toM = np.float32(np.arange(0, M, 1))
    alpha_act = alphaR_input + 1j*alphaI_input
    theta_act = (np.pi/180)*theta_input
    
    h_act = np.complex128(np.zeros((batch_size,M,K)))
    hR_act = np.float32(np.zeros((batch_size,M,K)))
    hI_act = np.float32(np.zeros((batch_size,M,K)))
    
    for kk in range(K):
        for ll in range(Lp):
            theta_act_expanded_temp = np.tile(np.reshape(theta_act[:,ll,kk],[-1,1]),(1,M))
            response_temp = np.exp(1j*np.pi*np.multiply(np.sin(theta_act_expanded_temp),from0toM))
            alpha_temp = np.reshape(alpha_act[:,ll,kk],[-1,1])
            h_act[:,:,kk] += (1/np.sqrt(Lp))*alpha_temp*response_temp
        hR_act[:,:,kk] = np.real(h_act[:,:,kk])
        hI_act[:,:,kk] = np.imag(h_act[:,:,kk])
        
    h_act = torch.tensor(h_act).to(device)
    hR_act = torch.tensor(hR_act).to(device)
    hI_act = torch.tensor(hI_act).to(device)
        
    return(h_act, hR_act, hI_act)

In [126]:
# 가중치 초기화 함수 정의 (He 초기화)
def he_init(tensor):
    init.kaiming_normal_(tensor, mode='fan_in', nonlinearity='relu')

In [114]:
# 입력 데이터 텐서
hR = torch.randn((batch_size, M, K), dtype=torch.float32)  # 실수 채널 행렬
hI = torch.randn((batch_size, M, K), dtype=torch.float32)  # 허수 채널 행렬

### Rate 계산 함수

In [115]:
def rate_func(hr, hi, vr, vi, noise_power, K, M, k_index): # 실제 사용 시 k번째 루프 값에 대한 값 입력
    ## 채널 입력은 각 user에 대한 것 : (batch_size, M)
    ## Precoding matrix 형태로 바꿔주기 (batch_size, M, K)
    vr = vr.reshape(-1, M, K) 
    vi = vi.reshape(-1, M, K)
    
    nom_denom = torch.zeros_like(torch.tensor(hr[:, :]).clone().detach()).to(device) + torch.tensor(noise_power).to(device)  # 초기화

    for kk in range(K): # k번째 user에 대한 power 계산
        hrvr = torch.bmm(torch.tensor(hr).clone().unsqueeze(-1).permute(0, 2, 1), vr[:, :, kk].clone().unsqueeze(-1)).squeeze(-1)
        hivi = torch.bmm(torch.tensor(hi).clone().unsqueeze(-1).permute(0, 2, 1), vi[:, :, kk].clone().unsqueeze(-1)).squeeze(-1)
        hrvi = torch.bmm(torch.tensor(hr).clone().unsqueeze(-1).permute(0, 2, 1), vi[:, :, kk].clone().unsqueeze(-1)).squeeze(-1)
        hivr = torch.bmm(torch.tensor(hi).clone().unsqueeze(-1).permute(0, 2, 1), vr[:, :, kk].clone().unsqueeze(-1)).squeeze(-1)

        real_part = hrvr - hivi 
        imag_part = hrvi + hivr
        norm2_hv = real_part.abs()**2 + imag_part.abs()**2 

        if kk == k_index:
            nom = norm2_hv  # Wanted signal power
        nom_denom += norm2_hv # Unwanted signal power

    denom = nom_denom - nom
    rate = torch.log2(1 + (nom / denom))
    
    return -rate.to(device)  # 손실 최소화를 위해 음수 처리

### Downlink Pilot Training -- Pilot Sequence as DNN parameters

In [127]:
## hR, hI 만 cuda로 들어가면 됨됨
class DLTrainingPhase(nn.Module):
    def __init__(self, P, noise_std_dl):
        super(DLTrainingPhase, self).__init__()
        
        # noise/annealing parameter
        self.noise_std = torch.tensor(noise_std_dl, dtype=torch.float32).to(device)
        self.aneal = torch.tensor(1.0, dtype=torch.float32).to(device)
        
        # Pilot sequence - Use pre-initialized values
        self.Xp_r = nn.Parameter(Xp_r_init.clone().to(device))
        self.Xp_i = nn.Parameter(Xp_i_init.clone().to(device))
        
        # Power normalizing
        self.P = P
        self.normalize_pilot()
        
    def normalize_pilot(self):
        # Function : Normalizing the pilot sequence vectors
        norm_X = torch.sqrt(torch.sum(self.Xp_r**2 + self.Xp_i**2, dim = 1, keepdim = True)) # (. , * , . ) *에 대해 sum 수행
        self.Xp_r.data = torch.sqrt(torch.tensor(self.P)) * (self.Xp_r / norm_X)   
        self.Xp_i.data = torch.sqrt(torch.tensor(self.P)) * (self.Xp_i / norm_X)
        
    def forward(self, hR, hI, K, M, L):
        y_nless = {}
        y_noisy = {}
        
        for kk in range(K):
            hR_temp = hR[:, :, kk].reshape(-1, M, 1) # 차원 (batch_size, M)
            hI_temp = hI[:, :, kk].reshape(-1, M, 1)

            # 복소수 행렬 곱 수행
            y_nless_r, y_nless_i = mult_mod_complex(hR_temp, hI_temp, self.Xp_r, self.Xp_i, 'l')

            # 실수 및 허수 결합 -> 복소수로 결합 아니고 real representation
            y_nless[kk] = torch.cat([y_nless_r.view(-1, L), y_nless_i.view(-1, L)], dim=1)

            # 가우시안 노이즈 추가
            noise = torch.randn_like(y_nless[kk]) * self.noise_std
            y_noisy[kk] = y_nless[kk] + noise
        
        return y_noisy

### UE side DNN - Quantizer for CSI feedback

In [117]:
class UE_DNN(nn.Module):
    def __init__(self, L, B, K, anneal = 1.0):
        super(UE_DNN, self).__init__()
        self.anneal = anneal
        self.input_dim = 2*L
        self.K=K
        
        self.model = nn.Sequential(
            nn.BatchNorm1d(self.input_dim),
            nn.Linear(self.input_dim, 1024),
            nn.ReLU(),
            
            nn.BatchNorm1d(1024),
            nn.Linear(1024, 512),
            nn.ReLU(),
            
            nn.BatchNorm1d(512),
            nn.Linear(512, 256),
            nn.ReLU(),
            
            nn.BatchNorm1d(256),
            nn.Linear(256, B)
        )
        self.model.to(device)
        
    def forward(self, x):
        InfoBits = {0:0}
        for kk in range(self.K):
            InfoBits_linear = self.model(x[kk])  # 신경망을 통과한 값

            # Straight-Through Estimator (STE) 적용
            InfoBits_tanh = torch.tanh(self.anneal * InfoBits_linear)
            InfoBits_sign = torch.sign(InfoBits_linear)

            # Forward: Sign 값을 사용, Backward: Tanh gradient 사용
            InfoBits[kk] = InfoBits_tanh + (InfoBits_sign - InfoBits_tanh).detach()
        
        return InfoBits

### BS side DNN - Precoder network of DSC structure

In [118]:
class BS_DNN(nn.Module):
    def __init__(self, M, K, B, P):
        super(BS_DNN, self).__init__()
        self.M = M
        self.K = K
        self.B = B
        self.P = P
        
        self.model = nn.Sequential(
            nn.Linear(K * B, 1024),
            nn.ReLU(),
            nn.BatchNorm1d(1024),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Linear(512, 2* M * K)  # Precoder 출력 (실수 및 허수 파트)
        )
        
        self.model.to(device)
    
    def forward(self, DNN_input_BS, hR, hI, noise_std):
        # DNN_input_BS : UE에서 생성한 정보 입력 - (batch_size, K * B)
        # hR, hI : channel matrix - (batch_size, M, K) - real/imag part
        
        #batch_size = DNN_input_BS[0].shape()
        
        # 0. Precoder input 생성
        for kk in range(self.K):
            if kk == 0:
                precoder_input = DNN_input_BS[0]
            else:
                precoder_input = torch.concat((precoder_input, DNN_input_BS[kk]), dim = 1)
        
        # 1. Generate the precoder
        precoder_output = self.model(precoder_input) # (batch_size, 2*M*K)
        
        # 2. seperate real/imag part
        V_r, V_i = torch.chunk(precoder_output, 2, dim=1)
        
        # 3. power normalization
        norm_V = torch.sqrt(torch.sum(V_r**2 + V_i**2, dim=1, keepdim=True))
        V_r = np.sqrt(self.P) * (V_r / norm_V)
        V_i = np.sqrt(self.P) * (V_i / norm_V)
        
        # 4. calculate sum rate
        rate = {}
        for kk in range(self.K):
            rate[kk] = self.compute_rate(hR[:, :, kk], hI[:, :, kk], V_r, V_i, 2*noise_std**2, K, M, kk)
            
        # 5. MRT (Maximum Ratio Transmission) 기반 Baseline 계산
        rate_UP = self.compute_mrt_baseline(hR, hI, noise_std)

        return rate, rate_UP
    
    def compute_rate(self, hR_kk, hI_kk, V_r, V_i, noise_power, K, M, kk):
        
        # 채널 용량(Rate) 계산을 위한 함수
        # hR_kk, hI_kk: 특정 사용자 kk에 대한 채널 정보
        # V_r, V_i: Precoder의 실수 및 허수 부분
        
        # 여기에 Rate_func을 PyTorch로 변환하여 사용 가능
        return rate_func(hR_kk, hI_kk, V_r, V_i, noise_power, K, M, kk)

    def compute_mrt_baseline(self, hR, hI, noise_std):
        """
        MRT (Maximum Ratio Transmission) 기반 Precoder 계산
        """
        #batch_size = hR.shape[0]

        # MRT Precoder 계산
        h_complex = torch.complex(torch.tensor(hR), torch.tensor(hI))  # 복소수 채널 행렬 생성
        V_MRT = h_complex.conj()  # 복소수 켤레 취함 (MRT 연산)

        # 실수 및 허수 부분 분리
        V_MRT_r = V_MRT.real
        V_MRT_i = V_MRT.imag

        # 전력 정규화 수행
        norm_V_MRT = torch.sqrt(torch.sum(V_MRT_r**2 + V_MRT_i**2, dim=1, keepdim=True))
        V_MRT_r = np.sqrt(self.P) * (V_MRT_r / norm_V_MRT)
        V_MRT_i = np.sqrt(self.P) * (V_MRT_i / norm_V_MRT)

        # 채널 용량(Rate) 계산
        rate_UP = {}
        for kk in range(self.K):
            rate_UP[kk] = self.compute_rate(hR[:, :, kk], hI[:, :, kk], V_MRT_r, V_MRT_i, 2 * noise_std**2, self.K, self.M, kk)

        return rate_UP

### 모델 불러오기

In [119]:
pilot_train = DLTrainingPhase(P, noise_std_dl)
ue_dnn = UE_DNN(L, B, K, annealing_rate)
bs_dnn = BS_DNN(M, K, B, P)

### 모델 학습하기

In [120]:
## Loss function 정의
class CustomLoss(nn.Module):
    def __init__(self, K):
        super(CustomLoss, self).__init__()
        self.K = K
    
    def forward(self, rate, rate_UP):
        rate_total = sum(rate[k] for k in range(self.K))
        rate_UP_total = sum(rate_UP[k] for k in range(self.K))
        
        loss = torch.mean(rate_total)
        loss_UP = torch.mean(rate_UP_total)
        
        return loss, loss_UP
    
Lossfunc = CustomLoss(K)

In [121]:
## Optimizer 설정
optimizer = optim.Adam(list(pilot_train.parameters())+list(ue_dnn.parameters())+list(bs_dnn.parameters())
                       , lr=learning_rate)

In [122]:
## Test set 생성
h_test, hR_test, hI_test = generate_batch_data(test_size, M, K, Lp, LSF_UE, Mainlobe_UE, HalfBW_UE)

# Tensor 변환 후 GPU 할당
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
hR_test = torch.tensor(hR_test, dtype=torch.float32).to(device)
hI_test = torch.tensor(hI_test, dtype=torch.float32).to(device)
noise_std_test = torch.tensor(noise_std_dl, dtype=torch.float32).to(device)

  hR_test = torch.tensor(hR_test, dtype=torch.float32).to(device)
  hI_test = torch.tensor(hI_test, dtype=torch.float32).to(device)


In [123]:
# Final dataset
AA = sio.loadmat('Data_test_K2M64Lp2L8_withParams.mat')
hR_test_Final = AA['hR_act_test_Final']
hI_test_Final = AA['hI_act_test_Final']

hR_test_Final = torch.tensor(hR_test_Final, dtype=torch.float32).to(device)
hI_test_Final = torch.tensor(hI_test_Final, dtype=torch.float32).to(device)

In [124]:
# 초기 모델 저장 경로 설정
save_path = './params.pth'

# 모델 불러오기 (처음 실행 여부 확인)
initial_run = 1
if initial_run == 0:
    checkpoint = torch.load(save_path)
    pilot_train.load_state_dict(checkpoint['pilot_train'])
    bs_dnn.load_state_dict(checkpoint['bs_dnn'])
    ue_dnn.load_state_dict(checkpoint['ue_dnn'])
    optimizer.load_state_dict(checkpoint['optimizer'])
    best_loss = checkpoint['best_loss']
    print("✅ 모델 파라미터 불러오기 완료!")
else:
    best_loss = float('inf')  # 초기 Best Loss 설정

In [125]:
## Loop
for epoch in range(n_epochs):
    optimizer.zero_grad()
    
    for _ in range(batch_per_epoch):
        # tarin dataset 생성
        h_batch, hR_batch, hI_batch = generate_batch_data(batch_size,M,K,Lp,LSF_UE,Mainlobe_UE,HalfBW_UE)
        
        y_pilot = pilot_train(hR_batch, hI_batch, K, M, L) # K user의 파일럿 수신 신호 리스트
        UE_Feedback = ue_dnn(y_pilot)
        rate, rate_UP = bs_dnn(UE_Feedback, hR_batch, hI_batch, noise_std_dl)
        loss, loss_UP = Lossfunc(rate, rate_UP)
        
        loss.backward()
        optimizer.step()
        
    if epoch%10==0:
        y_pilot = pilot_train(hR_test, hI_test, K, M, L)
        rate, rate_UP = bs_dnn(ue_dnn(y_pilot), hR_test, hI_test, noise_std_dl)
        loss_test, loss_UP_test = Lossfunc(rate, rate_UP)
        
        if loss_test < best_loss:
            best_loss = loss_test
            
            torch.save({
                'pilot_train' : pilot_train.state_dict(),
                'bs_dnn' : bs_dnn.state_dict(),
                'ue_dnn' : ue_dnn.state_dict(),
                'optimizer' : optimizer.state_dict(),
                'best_loss' : best_loss
            }, save_path)
            
            print(f"✅ Model saved at epoch {epoch}, Loss: {best_loss:.5f}")
        else:
            anneal_param = anneal_param*annealing_rate
        print('epoch',epoch,' anneal_param:%4.4f'%anneal_param)
        print('         loss_test:%2.5f'%-loss_test,'  best_test:%2.5f'%-best_loss,'  best_possible:%2.5f'%-loss_UP_test,\
                        ' Percentage::%1.3f'%(best_loss/loss_UP_test))
        
y_pilot = pilot_train(hR_test_Final, hI_test_Final, K, M, L)
rate, rate_UP = bs_dnn(ue_dnn(y_pilot), hR_test_Final, hI_test_Final, noise_std_dl)
loss_test_Final, loss_UP_test_Final = Lossfunc(rate, rate_UP)

print(loss_test_Final/loss_UP_test_Final)    
print(-loss_UP_test_Final)
sio.savemat('Data_K2M64Lp2L8_resultB30.mat',dict(loss_test_Final_B30=loss_test_Final.to('cpu'),\
                                        loss_UP_test_Final=loss_UP_test_Final.to('cpu')))  

  nom_denom = torch.zeros_like(torch.tensor(hr[:, :]).clone().detach()).to(device) + torch.tensor(noise_power).to(device)  # 초기화
  hrvr = torch.bmm(torch.tensor(hr).clone().unsqueeze(-1).permute(0, 2, 1), vr[:, :, kk].clone().unsqueeze(-1)).squeeze(-1)
  hivi = torch.bmm(torch.tensor(hi).clone().unsqueeze(-1).permute(0, 2, 1), vi[:, :, kk].clone().unsqueeze(-1)).squeeze(-1)
  hrvi = torch.bmm(torch.tensor(hr).clone().unsqueeze(-1).permute(0, 2, 1), vi[:, :, kk].clone().unsqueeze(-1)).squeeze(-1)
  hivr = torch.bmm(torch.tensor(hi).clone().unsqueeze(-1).permute(0, 2, 1), vr[:, :, kk].clone().unsqueeze(-1)).squeeze(-1)
  h_complex = torch.complex(torch.tensor(hR), torch.tensor(hI))  # 복소수 채널 행렬 생성


✅ Model saved at epoch 0, Loss: -2.09202
epoch 0  anneal_param:1.0000
         loss_test:2.09202   best_test:2.09202   best_possible:14.47989  Percentage::0.144
✅ Model saved at epoch 10, Loss: -5.28314
epoch 10  anneal_param:1.0000
         loss_test:5.28314   best_test:5.28314   best_possible:14.47989  Percentage::0.365
✅ Model saved at epoch 20, Loss: -6.62420
epoch 20  anneal_param:1.0000
         loss_test:6.62420   best_test:6.62420   best_possible:14.47989  Percentage::0.457
✅ Model saved at epoch 30, Loss: -7.37153
epoch 30  anneal_param:1.0000
         loss_test:7.37153   best_test:7.37153   best_possible:14.47989  Percentage::0.509
✅ Model saved at epoch 40, Loss: -8.01732
epoch 40  anneal_param:1.0000
         loss_test:8.01732   best_test:8.01732   best_possible:14.47989  Percentage::0.554
✅ Model saved at epoch 50, Loss: -8.67364
epoch 50  anneal_param:1.0000
         loss_test:8.67364   best_test:8.67364   best_possible:14.47989  Percentage::0.599
✅ Model saved at epoch 6

RuntimeError: Can't call numpy() on Tensor that requires grad. Use tensor.detach().numpy() instead.