In [1]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt 
import scipy.io
import scipy.signal
import heapq
from sklearn import preprocessing 

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
#inisialize
if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("cpu")

print(f'Devices: {device}')

Devices: cpu


In [3]:
#read file and load models path
h5path = '/Users/mclinwong/GitHub/CodesReproduction/DCN-DOA/Data/h5/'
pthpath = '/Users/mclinwong/GitHub/CodesReproduction/DCN-DOA/Data/pth/'
#read mat file
matpath = '/Users/mclinwong/GitHub/CodesReproduction/DCN-DOA/Data/matlib/'
figurepath = '/Users/mclinwong/GitHub/CodesReproduction/DCN-DOA/ReproducedCodes/Figures/'
read_temp=scipy.io.loadmat(matpath + 'data2_snr.mat')
T_SBC_R=read_temp['T_SBC_R']
T_SBC=read_temp['T_SBC']
SNR=read_temp['SNR']
S_est=read_temp['S_est']
S_est = S_est.transpose(0, 2, 1, 3)
[r2, K, I, S] = np.shape(S_est)
S_abs = np.zeros((r2, I*2, S))
for i in range(r2):
    for k in range(S):
        for j in range(I):
            S_abs[i, j, k] = S_est[i, 0, j, k]
            S_abs[i, I+j, k] = S_est[i, 1, j, k]    

S_label=read_temp['S_label']
R_est=read_temp['R_est']
DOA_train=read_temp['DOA_train']
theta=read_temp['theta']
gamma=read_temp['gamma']
gamma_R=read_temp['gamma_R']
S_label1 = np.expand_dims(S_label, 2)
normalizer = preprocessing.Normalizer().fit(R_est[:, :, 0])

#r2 for the number of data
#K for the number of sources
#I for the number of DOAs
#S for the number of SNRs

[r2,c,S]=np.shape(R_est)
[r2,I,S]=np.shape(S_label)
print(f'r2: {r2}, I: {I}, c: {c}')
print(f'S_est: {S_est.shape}, S_abs: {S_abs.shape} \nS_label: {S_label.shape},\nS_label1: {S_label1.shape} ,\nR_est: {R_est.shape}')


DOA = np.arange(I)-60
L = 120

r2: 1000, I: 120, c: 56
S_est: (1000, 2, 120, 31), S_abs: (1000, 240, 31) 
S_label: (1000, 120, 31),
S_label1: (1000, 120, 1, 31) ,
R_est: (1000, 56, 31)


In [4]:
class CNN_ReLu(nn.Module):
    def __init__(self):
        super(CNN_ReLu, self).__init__()
        self.cnn_1 = nn.Conv1d(in_channels=2, out_channels=12, kernel_size=25, padding=12)
        self.cnn_2 = nn.Conv1d(in_channels=12, out_channels=6, kernel_size=15, padding=7)
        self.cnn_3 = nn.Conv1d(in_channels=6, out_channels=3, kernel_size=5, padding=2)
        self.cnn_4 = nn.Conv1d(in_channels=3, out_channels=1, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
    def forward(self, x):
        x = self.relu(self.cnn_1(x))
        x = self.relu(self.cnn_2(x))
        x = self.relu(self.cnn_3(x))
        x = self.relu(self.cnn_4(x))
        return x
cnnrelu = torch.load(pthpath + 'cnnrelu.pth')

In [5]:
class CNN_tanh(nn.Module):
    def __init__(self):
        super(CNN_tanh, self).__init__()
        self.cnn_1 = nn.Conv1d(in_channels=2, out_channels=12, kernel_size=25, padding=12)
        self.cnn_2 = nn.Conv1d(in_channels=12, out_channels=6, kernel_size=15, padding=7)
        self.cnn_3 = nn.Conv1d(in_channels=6, out_channels=3, kernel_size=5, padding=2)
        self.cnn_4 = nn.Conv1d(in_channels=3, out_channels=1, kernel_size=3, padding=1)
        self.tanh = nn.Tanh()
    def forward(self, x):
        x = self.tanh(self.cnn_1(x))
        x = self.tanh(self.cnn_2(x))
        x = self.tanh(self.cnn_3(x))
        x = self.tanh(self.cnn_4(x))
        return x
cnntanh = torch.load(pthpath + 'cnntanh.pth')

In [6]:
class CNN_sigmoid(nn.Module):
    def __init__(self):
        super(CNN_sigmoid, self).__init__()
        self.cnn_1 = nn.Conv1d(in_channels=2, out_channels=12, kernel_size=25, padding=12)
        self.cnn_2 = nn.Conv1d(in_channels=12, out_channels=6, kernel_size=15, padding=7)
        self.cnn_3 = nn.Conv1d(in_channels=6, out_channels=3, kernel_size=5, padding=2)
        self.cnn_4 = nn.Conv1d(in_channels=3, out_channels=1, kernel_size=3, padding=1)
        self.sigmoid = nn.Sigmoid()
    def forward(self, x):
        x = self.sigmoid(self.cnn_1(x))
        x = self.sigmoid(self.cnn_2(x))
        x = self.sigmoid(self.cnn_3(x))
        x = self.sigmoid(self.cnn_4(x))
        return x
cnnsigmoid = torch.load(pthpath + 'cnnsigmoid.pth')

In [7]:
class DNN_ReLU(nn.Module):
    def __init__(self):
        super(DNN_ReLU, self).__init__()
        self.fc1 = nn.Linear(2*L, int(2*L/3))
        self.fc2 = nn.Linear(int(2*L/3), int(4*L/9))
        self.fc3 = nn.Linear(int(4*L/9), int(2*L/3))
        self.fc4 = nn.Linear(int(2*L/3), L)
        self.relu = nn.ReLU()
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.relu(self.fc3(x))
        x = self.relu(self.fc4(x))
        return x
dnnrelu = torch.load(pthpath + 'dnnrelu.pth') 

In [8]:
class DNN_Tanh(nn.Module):
    def __init__(self):
        super(DNN_Tanh, self).__init__()
        self.fc1 = nn.Linear(2*L, int(2*L/3))
        self.fc2 = nn.Linear(int(2*L/3), int(4*L/9))
        self.fc3 = nn.Linear(int(4*L/9), int(2*L/3))
        self.fc4 = nn.Linear(int(2*L/3), L)
        self.tanh = nn.Tanh()
    def forward(self, x):
        x = self.tanh(self.fc1(x))
        x = self.tanh(self.fc2(x))
        x = self.tanh(self.fc3(x))
        x = self.tanh(self.fc4(x))
        return x
dnntanh = torch.load(pthpath + 'dnntanh.pth')

In [9]:
class DNN_Sigmoid(nn.Module):
    def __init__(self):
        super(DNN_Sigmoid, self).__init__()
        self.fc1 = nn.Linear(2*L, int(2*L/3))
        self.fc2 = nn.Linear(int(2*L/3), int(4*L/9))
        self.fc3 = nn.Linear(int(4*L/9), int(2*L/3))
        self.fc4 = nn.Linear(int(2*L/3), L)
        self.sigmoid = nn.Sigmoid()
    def forward(self, x):
        x = self.sigmoid(self.fc1(x))
        x = self.sigmoid(self.fc2(x))
        x = self.sigmoid(self.fc3(x))
        x = self.sigmoid(self.fc4(x))
        return x
dnnsigmoid = torch.load(pthpath + 'dnnsigmoid.pth')

In [10]:
def Test(model, inputx, k, flag):
    #flag==0 for CNN, flag==1 for DNN
    model = model.to(device)
    model.eval()
    ls = []
    for i in range(r2):
        with torch.no_grad():
            if flag==0:
                x = torch.from_numpy(inputx[i,:,:,k]).float().to(device)
            if flag==1:
                x = torch.from_numpy(inputx[i,:,k]).float().to(device)
            x = x.to(device)
            y = model(x)
            y = y.cpu().numpy()
            ls.append(y)
    predict = np.array(ls)
    return predict

In [11]:
def DOAPredict(predict, flag, height = 0.1, nodetect = -200):
    #flag==0 for CNN, flag==1 for DNN
    peak = np.zeros((K, r2))
    for i in range(r2):
        if flag==0: #CNN
            li = predict[i,0,:]
        if flag==1: #DNN
            li = predict[i,:]
        peaks_st = np.zeros((K))
        peaks_st = peaks_st + nodetect
        peaks,_ = scipy.signal.find_peaks(li, height=height)
        maxamp = heapq.nlargest(K, li[peaks])
        rank = np.zeros(np.shape(maxamp)[0])
        for s in range(np.shape(maxamp)[0]):
            rank[s] = np.where(li==maxamp[s])[0].item()
        
        if len(peaks) == K:
            peaks_st = peaks
        elif len(peaks) == 0:
            peaks_st = peaks_st
        elif len(peaks) < K:
            for t in range(len(peaks)):
                peaks_st[t] = peaks[t]
        elif len(peaks) > K:
            for j in range(K):
                peaks_st[j] = rank[j]

        peak[:,i] = sorted(peaks_st, reverse=True)
        # if peaks_st[0] > peaks_st[1]:
        #     peak[:,i] = [peaks_st[0], peaks_st[1]]
        # else:
        #     peak[:,i] = [peaks_st[1], peaks_st[0]]
    return peak-60

In [14]:
# predict_cnntanh = Test(cnntanh, S_est, 0)
# doa_cnntanh = DOAPredict(predict_cnntanh, 0)
# predict_cnnrelu = Test(cnnrelu, S_est, 0)
# doa_cnnrelu = DOAPredict(predict_cnnrelu, 0)
# predict_cnnsigmoid = Test(cnnsigmoid, S_est, 0)
# doa_cnnsigmoid = DOAPredict(predict_cnnsigmoid, 0)

# predict_dnntanh = Test(dnntanh, S_abs, 1)
# doa_dnntanh = DOAPredict(predict_dnntanh, 1)
# predict_dnnrelu = Test(dnnrelu, S_abs, 1)
# doa_dnnrelu = DOAPredict(predict_dnnrelu,1)
# predict_dnnsigmoid = Test(dnnsigmoid, S_abs, 1)
# doa_dnnsigmoid = DOAPredict(predict_dnnsigmoid, 1)

# doa_sbl = DOAPredict(gamma, 1)
# doa_sblr = DOAPredict(gamma_R, 1)

TypeError: Test() missing 1 required positional argument: 'flag'

In [25]:
test_cnn_low=np.zeros((r2,1))
#change the column
def switchcolumn(inarray):
    a = np.zeros((K, r2))
    a[0,:] = inarray[1,:]
    a[1,:] = inarray[0,:]
    return a

def predict2doa(model, input, flag):
    predict = np.zeros((r2, I, S))
    for i in range(S):
        predict1 = Test(model, input, i, flag)
        predict[:, :, i] = predict1
    return predict

def caculate_error(predict):
    rmse_list = []
    for k in range(S):
        predict = Test(cnntanh, S_est, k, 0)
        # predict = switchcolumn(predict)
        for i in range(K):
            rmse = np.mean(np.sqrt(predict[i,:]-DOA_train[i,:,k]))
        rmse_list.append(rmse)

# doa_dnntanh = predict2doa(dnntanh, S_abs, 1)
doa_dcntanh = predict2doa(cnntanh, S_est, 0)

# err_dnntanh = caculate_error(doa_dnntanh)
