# Proposed Model

# **Biblioheque**

In [None]:
import random
import numpy as np
from scipy.stats import rice
# import pandas as pd
import torch
import torch.nn as nn
from torch.nn import functional as F
import sys
import timeit
import os

# torch.set_default_tensor_type(torch.cuda.DoubleTensor)
torch.set_default_dtype(torch.float64)

# class to save results in file

In [None]:
class Record:
    def __init__(self, TextName):
        self.out_file = open(TextName, 'a')
        self.old_stdout = sys.stdout
        sys.stdout = self

    def write(self, text):
        self.old_stdout.write(text)
        self.out_file.write(text)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        sys.stdout = self.old_stdout

# **slicer the data**

In [None]:
def slicer(data):
    dataI = data[slice(0, len(data), 2)]
    dataQ = data[slice(1, len(data), 2)]
    return(dataI, dataQ)

# **Modulation**

In [None]:
def mapper_16QAM(QAM16, data):
    map0 = 2*data[slice(0, len(data), 2)] + data[slice(1, len(data), 2)]
    map0 = list(map(int, map0))
    dataMapped = []
    for i in range(len(map0)):
        dataMapped.append(QAM16[map0[i]])
    return(dataMapped)

In [None]:
def calculate_bits(Modulation,NumSubcarriers,NumDataSymb):
    if Modulation=='QPSK':
        Nbpscs=2
    elif Modulation=='16QAM':
        Nbpscs=4
    return NumDataSymb*NumSubcarriers*Nbpscs


# **generate noise**

In [None]:
def AWGN(IFsig, SNR):
    dP = np.zeros(len(IFsig))
    P = 0

    for i in range(len(IFsig)):
        dP[i] = abs(IFsig[i])**2
        P = P + dP[i]

    P = P/len(IFsig)
    gamma = 10**(SNR/10)
    N0 = P/gamma
    n = ((N0/2)**(0.5))*np.random.standard_normal(len(IFsig))
    IF_n = np.zeros((len(IFsig),1))

    for i in range(len(IFsig)):
        IF_n[i,:] = IFsig[i] + n[i]

    return(IF_n)

# Generate channel model

In [None]:
def Generate_channel(Nr, Nt, type):
    if (type == 'gauss'):
        return (np.random.normal(size=(Nr,Nt))+1j*np.random.normal(size=(Nr,Nt)))/np.sqrt(2)
    if (type == 'rayleigh'):
        return (np.random.rayleigh(scale=(1/np.sqrt(2)), size=(Nr,Nt)) + 1j*np.random.rayleigh(scale=(1/np.sqrt(2)), size=(Nr,Nt)))/np.sqrt(2)
    if (type == 'rician'):
        b = 1/np.sqrt(2)
        return (rice.rvs(b, size=(Nr,Nt)) + 1j*rice.rvs(b, size=(Nr,Nt)))/np.sqrt(2)

# **Generate Dataset**



In [None]:
DataSet_x   = []  # x dataset after modulation
DataSet_y   = []  # y dataset
DataSet_HH  = []  # H dataset
DataSet_b   = []  # binary dataset
SNR_min_dB  = 0
SNR_max_dB  = 20
step_dB     = 5
num_dB      = int((SNR_max_dB - SNR_min_dB) / step_dB) + 1

SNR         = np.linspace(SNR_min_dB, SNR_max_dB, num=num_dB)


Nt = 8             # Tx: 8
Nr = 64            # Rx: 128
N_samp = 100


def Gen_dataset(mode, snr, imperfect, N_samp):    
    DataSet_x   = []  # x dataset after modulation
    DataSet_y   = []  # y dataset
    DataSet_H   = []  
    DataSet_HH  = []

    NumSubcarriers = 1
    Modulation = '16QAM'
    QAM16 = [-1, -0.333, 0.333, 1]
    NumDataSymb = 1
    N_type = 'gauss'

    if mode == 'train':
        for snr in SNR:
            for runIdx in range(0, N_samp):      # ! 20000 x Nt: samples
                H = Generate_channel(Nr, Nt, N_type)
                HH = np.concatenate((np.concatenate((H.real, -H.imag), axis=1),
                                    np.concatenate((H.imag, H.real), axis=1)), axis=0)
                x = np.zeros((2*Nt, NumSubcarriers))
                a = calculate_bits(Modulation, NumSubcarriers, NumDataSymb)
                DataRaw = np.zeros((Nt, a))
                for t in range(Nt):
                    #"data symbol generate"
                    NumBits = calculate_bits(Modulation, NumSubcarriers, NumDataSymb)
                    bit = np.random.randint(1, 3, NumBits)-1
                    DataRaw[t, :] = bit
                    for j in range(4):
                        DataSet_b.append(bit[j])
                    I = np.zeros((1, a))
                    I[0, :] = DataRaw[t, :]
                    (dataI, dataQ) = slicer(I[0])

                    # Mapper
                    mapI = mapper_16QAM(QAM16, dataI)
                    mapQ = mapper_16QAM(QAM16, dataQ)
                    x[t] = mapI[0]
                    x[t+Nt] = mapQ[0]

                # transpose
                H = H.transpose()
                HH = HH.transpose()
                x = x.transpose()

                y_wo_noise = np.matmul(x, HH)

                # noise
                noise = AWGN(y_wo_noise.transpose(), snr)

                y = y_wo_noise + noise.transpose()

                DataSet_x.append(x)    # ! I, Q sample distance by Nt.
                DataSet_y.append(y)                 # ! output sample
                
                # Imperfect channel: 5%
                # coef = (2*np.random.randint(0,2,size=HH.shape) - 1)
                # HH = HH + coef * HH * 0.05
                DataSet_HH.append(HH)
                DataSet_H.append(H)               # ! Generated channel
                
    else:
        for runIdx in range(0, N_samp):      # ! 20000 x Nt: samples
            H = Generate_channel(Nr, Nt, N_type)
            HH = np.concatenate((np.concatenate((H.real, -H.imag), axis=1),
                                np.concatenate((H.imag, H.real), axis=1)), axis=0)
            x = np.zeros((2*Nt, NumSubcarriers))
            a = calculate_bits(Modulation, NumSubcarriers, NumDataSymb)
            DataRaw = np.zeros((Nt, a))
            for t in range(Nt):
                #"data symbol generate"
                NumBits = calculate_bits(Modulation, NumSubcarriers, NumDataSymb)
                bit = np.random.randint(1, 3, NumBits)-1
                DataRaw[t, :] = bit
                for j in range(4):
                    DataSet_b.append(bit[j])
                I = np.zeros((1, a))
                I[0, :] = DataRaw[t, :]
                (dataI, dataQ) = slicer(I[0])

                # Mapper
                mapI = mapper_16QAM(QAM16, dataI)
                mapQ = mapper_16QAM(QAM16, dataQ)
                x[t] = mapI[0]
                x[t+Nt] = mapQ[0]

            # transpose
            H = H.transpose()
            HH = HH.transpose()
            x = x.transpose()

            y_wo_noise = np.matmul(x, HH)

            # noise
            noise = AWGN(y_wo_noise.transpose(), snr)

            y = y_wo_noise + noise.transpose()

            DataSet_x.append(x)    # ! I, Q sample distance by Nt.
            DataSet_y.append(y)                 # ! output sample
            
            # Imperfect channel: 5%

            DataSet_HH.append(HH)
            DataSet_H.append(H)               # ! Generated channel


    # Shuffle dataset
    random.seed(1)
    temp = list(zip(DataSet_x, DataSet_y, DataSet_H, DataSet_HH))
    random.shuffle(temp)
    DataSet_x, DataSet_y, DataSet_H, DataSet_HH = zip(*temp)

    return DataSet_x, DataSet_y, DataSet_H, DataSet_HH

In [None]:
def reconstruct_channel (H):
# H_raw = [R(H) I(H); -I(H) R(H)]
    H_est = []
    H_est_Re = H[0:Nt, 0:Nr]
    H_est_Im = H[Nt:2*Nt, 0:Nr]
    H_est = H_est_Re + 1j * H_est_Im
    return H_est

In [None]:
def NMSE(H_est, H_raw):
    H_est = reconstruct_channel(H_est)

    H_raw_vec = torch.reshape(H_raw, [Nt * Nr, 1])
    H_est_vec = torch.reshape(H_est, [Nt * Nr, 1])

    mse       = (torch.norm(H_raw_vec - H_est_vec)**2) / len(H_raw_vec)
    sigEner   = torch.norm(H_raw_vec)**2
    nmse      = mse / sigEner
    # E = H_raw - H_est
    
    # # Tính tổng các bình phương của sự khác biệt
    # sum_squares_E = torch.sum(E * torch.conj(E))
    
    # # Tính tổng các bình phương của các phần tử của ma trận H
    # sum_squares_H = torch.sum(H_raw * torch.conj(H_raw))
    
    # # Tính NMSE
    # nmse = sum_squares_E / sum_squares_H

    return torch.abs(nmse)

In [None]:
def Input_ISDNN(mode, DataSet_x, DataSet_y, DataSet_H, DataSet_HH, N_samp):
    H_in = []        # ! H_in    , np.diag(np.diag()) return a diag matrix instead of diag components.
    H_true = []   # ! generated s
    H_raw = []
    e = []        # ! vector errors
    xTx = []
    xTy = []
    Di = []
    # steering = [] # ! Steering vector: ZoA and AoA

    if mode == 'train':
        n_sample = N_samp * len(SNR)
    else:
        n_sample = N_samp
        
    for i in range (n_sample):
        H_true.append(torch.tensor(DataSet_HH[i]))
        H_raw.append(torch.tensor(DataSet_H[i]))
        Di.append(torch.tensor(np.linalg.pinv(np.diag(np.diag(np.dot(DataSet_x[i].transpose(), DataSet_x[i]))))))
        xTy.append(torch.tensor(np.dot(DataSet_x[i].transpose(), DataSet_y[i])))
        H_in.append(torch.matmul(Di[i], xTy[i]))
        e.append(torch.rand([2*Nt, 2*Nr]))
        xTx.append(torch.tensor(np.dot(DataSet_x[i].transpose(), DataSet_x[i])))
        # steering.append(torch.tensor(DataSet_Steering[i]))

    H_true = torch.stack(H_true, dim=0)
    H_raw = torch.stack(H_raw, dim=0)
    H_in = torch.stack(H_in, dim=0)
    e = torch.stack(e, dim=0)
    xTx = torch.stack(xTx, dim=0)
    xTy = torch.stack(xTy, dim=0)
    Di = torch.stack(Di, dim=0)
    # steering = torch.stack(steering, dim=0)

    return H_true, H_raw, H_in, e, xTx, xTy, Di

# Model

In [None]:
class xv(nn.Module):
    def __init__(self):
        super(xv, self).__init__()

        self.alpha1 = torch.nn.parameter.Parameter(torch.rand(1))
        self.alpha2 = torch.nn.parameter.Parameter(torch.tensor([0.5]))

    def forward(self, Di, H, e, xTx, xTy):

        xTxH = torch.bmm(xTx, H)

        z    = H + torch.bmm(Di, torch.sub(xTy, xTxH)) + self.alpha1 * e

        e    = torch.sub(xTy, xTxH)

        H    = torch.add((1 - self.alpha2) * z, self.alpha2 * H)

        return H, e

In [None]:
class model_driven(nn.Module):
    def __init__(self):
        super(model_driven, self).__init__()
        self.fc1 = torch.nn.Linear(2*Nr, 2*Nr)
        self.fc2 = torch.nn.Linear(2*Nr, 2*Nr)
        self.fc3 = torch.nn.Linear(2*Nr, 2*Nr)
        self.fc4 = torch.nn.Linear(2*Nr, 2*Nr)
        self.fc5 = torch.nn.Linear(2*Nr, 2*Nr)
        self.fc6 = torch.nn.Linear(2*Nr, 2*Nr)
        self.fc7 = torch.nn.Linear(2*Nr, 2*Nr)
        self.fc8 = torch.nn.Linear(2*Nr, 2*Nr)

        self.layer1=xv()
        self.layer2=xv()
        self.layer3=xv()
        self.layer4=xv()
    
    def forward(self, Di, H_in, e, xTx, xTy):
        e = self.fc1(e)
        e = self.fc2(e)

        H, e = self.layer1(Di, H_in, e, xTx, xTy)
        H = torch.tanh(H)

        e = self.fc3(e)
        e = self.fc4(e)
        H, e = self.layer2(Di, H, e, xTx, xTy)
        H = torch.tanh(H)

        e = self.fc5(e)
        e = self.fc6(e)
        H, e = self.layer3(Di, H, e, xTx, xTy)
        H = torch.tanh(H)

        e = self.fc7(e)
        e = self.fc8(e)
        H, e = self.layer4(Di, H, e, xTx, xTy)

        return H, e

# Define model, optimizer, and loss function

In [None]:
def def_model():
    model = model_driven()
    loss = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

    folder_model = './model/'
    
    if not os.path.isdir(folder_model):
        os.makedirs(folder_model)
    
    file_model = folder_model + 'H'
    # if os.path.isfile(file_model):
    #     generator = torch.load(file_model)

    record_file = 'H'
    return model, loss, optimizer, record_file, file_model

# Main program

In [None]:
epoch         = 0
expected_epoch = 10000
num_samp      = N_samp * len(SNR)
best_nmse     = 1e9
early_stop    = 0
best_model    = ''

DataSet_x, DataSet_y, DataSet_H, DataSet_HH = Gen_dataset('train', 0, 0, N_samp)
H_true, H_raw, H_in, e, xTx, xTy, Di = Input_ISDNN('train', DataSet_x, DataSet_y, DataSet_H, DataSet_HH, N_samp)

print("Begin training...") 

while(True):
        epoch = epoch + 1 

        init_loss = 1e9
        while( epoch == 1 and init_loss > 140):
                
                model, loss, optimizer, record_file, file_model = def_model()
                
                H_1, e_1 = model(Di, H_in, e, xTx, xTy)   # predict output from the model 
                init_loss = loss(H_1, H_true).item()
                print(init_loss)

        # optimizer.zero_grad()   # zero the parameter gradients
        H_o, e_o = model(Di, H_in, e, xTx, xTy)   # predict output from the model 
        train_loss = loss(H_o, H_true)   # calculate loss for the predicted output  
        train_loss.backward()   # backpropagate the loss 
        optimizer.step()        # adjust parameters based on the calculated gradients 

        if (epoch % 100 == 0 or epoch == 1):
                nmse = 0
                for j in range (num_samp):
                        nmse += NMSE(H_o[j], H_raw[j])
                nmse = nmse / num_samp
                
                if (nmse <= best_nmse):
                        torch.save(model.state_dict(), file_model + '_' + str(epoch) + '.pth')
                        best_model = file_model + '_' + str(epoch) + '.pth'
                        best_nmse = nmse
                        early_stop = 0
                else:
                        early_stop += 1

                if (nmse > best_nmse and early_stop == 2):
                        with Record(record_file + '_log.txt'):
                                print(epoch, nmse.item(), train_loss.item()) 
                        break

                with Record(record_file + '_log.txt'):
                        print(epoch, nmse.item(), train_loss.item()) 

        if epoch  == expected_epoch:
                torch.save(model.state_dict(), file_model + '_' + str(epoch) + '.pth')
                best_model = file_model + '_' + str(epoch) + '.pth'
                with Record(record_file + '_log.txt'):
                        print("epoch:\n", epoch)
                        print("Latest NMSE:\n", nmse.item())
                        print("Latest Loss:\n", train_loss.item()) 

                break

# Test function

# Function to test the model

In [None]:
def test(H_raw, Di, H_in, e, xTx, xTy, N_test, log): 
    # Load the model that we saved at the end of the training loop 
    model = model_driven()
    model.load_state_dict(torch.load(best_model)) 
      
    with torch.no_grad(): 
        H_o, e_o = model(Di, H_in, e, xTx, xTy)

        nmse = 0
        for j in range (N_test):
                
                nmse += NMSE(H_o[j], H_raw[j])

        nmse = nmse / N_test
        with Record(log):
            print(format(nmse.item(), '.7f'))

In [None]:
## Generate dataset for test

In [None]:
SNR_min_dB  = 0
SNR_max_dB  = 20
step_dB     = 5
num_dB      = int((SNR_max_dB - SNR_min_dB) / step_dB) + 1

SNR         = np.linspace(SNR_min_dB, SNR_max_dB, num=num_dB)
log         = './model/log_test.txt'

N_test = int(N_samp)

for i in range (100):
    for snr in SNR:
        with Record(log):
            print(snr)
        DataSet_x, DataSet_y, DataSet_H, DataSet_HH = Gen_dataset('test', snr, 0, N_test)
        H_true, H_raw, H_in, e, xTx, xTy, Di = Input_ISDNN('test', DataSet_x, DataSet_y, DataSet_H, DataSet_HH, N_test)
        
        test(H_raw, Di, H_in, e, xTx, xTy, N_test, log)