In [None]:
import numpy as np
import matplotlib.pyplot as plt
import scipy.linalg as LA
import time
import scipy.signal as ss
import cmath
import pandas as pd
import math
from sklearn.metrics import mean_squared_error

In [None]:
def array_steering_vector(array,DOA):
    N = array.shape
    v = np.exp(-1j*2*np.pi*array*np.sin(DOA))
    return v/np.sqrt(N)

In [None]:
def music(CovMat,L,N,array,Angles):
    # CovMat is the signal covariance matrix, L is the number of sources, N is the number of antennas
    # array holds the positions of antenna elements
    # Angles are the grid of directions in the azimuth angular domain
    _,V = LA.eig(CovMat)
    Qn  = V[:,L:N]
    numAngles = Angles.size
    pspectrum = np.zeros(numAngles)
    for i in range(numAngles):
        av = array_steering_vector(array,Angles[i])
        pspectrum[i] = 1/LA.norm((Qn.conj().transpose()@av))
    psindB    = np.log10(10*pspectrum/pspectrum.min())
    DoAsMUSIC,_= ss.find_peaks(psindB,height=1.35, distance=1.5)
    return DoAsMUSIC,psindB


In [None]:
lamda = 150 # wavelength
L = 2  # number of sources
w=np.array((np.pi/6,
            np.pi/4))
w=np.transpose(w)
N = 10  # number of ULA elements 
snr = 30 # signal to noise ratio
array = np.linspace(0,(N-1)/2,N)
Samples=10000

In [None]:
DOAs=[]
for i in range(Samples):
    DOA = np.pi*(np.random.rand(L)-1/2)# random source directions
    DOAs.append(DOA)
    
noise = np.random.randn(L) + np.random.randn(L)*1j # random source powers
noise = np.sqrt(1/2)*noise


In [None]:
Angles = np.linspace(-np.pi/2,np.pi/2,360)
numAngles = Angles.size
numAngles

In [None]:
snapshots = 400
X = np.zeros((N,snapshots)) + 1j*np.zeros((N,snapshots))
X

In [None]:
Ry=[]

for j in range(Samples):
    for iter in range(snapshots):
        data = np.zeros(N)
        for i in range(L):
            asv = 2*np.exp(1j*np.pi*np.random.rand(1))
            x=DOAs[j]
            data = data + asv*noise[i]*array_steering_vector(array,x[i])
        X[:,iter] = data + np.sqrt(0.5/snr)*(np.random.randn(N)+np.random.randn(N)*1j)
    CovMat = X@X.conj().transpose()
    Ry.append(CovMat)

In [None]:
Xu=np.zeros((Samples,N,N,3))
for i in range(Samples):
    Xu[i,:,:,0]=np.real(Ry[i])
    Xu[i,:,:,1]=np.imag(Ry[i])
    Xu[i,:,:,2]=np.angle(Ry[i])
Xu.shape

In [None]:
DoAsMUS=[]
ps=[]

for i in range(Samples):
    DoAsMUSIC , psindB = music(Ry[i],L,N,array,Angles)
    DoAsMUS.append(DoAsMUSIC)
    ps.append(psindB)
len(ps)


In [None]:
psindB=np.zeros((Samples,360))
for i in range(Samples):
    psindB[i]=ps[i]

In [None]:
def groupdelay(CovMat,L,N,Angles,array):
    numAngles=Angles.shape[0]
    music_phase = []
    diff_music_phase=[]
    for i in range(1,N-2):
        for ctr in range(numAngles):
            _,V = LA.eig(CovMat)
            Qn  = V[:,L:N]
            en=Qn[:,i-1]
            beam_search = array_steering_vector(array,Angles[ctr])
            Music_phase=np.angle(en.conj().transpose()@beam_search)
            music_phase.append(Music_phase)
            
    for ctr in range(1,numAngles+1):
        diff_Music_phase=music_phase[ctr]-music_phase[ctr-1]
        diff_music_phase.append(diff_Music_phase)
    
    unwrappeddiffmusicphase=np.unwrap(diff_music_phase)
    
    return unwrappeddiffmusicphase

In [None]:
Mult=[]
for i in range(Samples):
    musicphase=groupdelay(Ry[i],L,N,Angles,array)
    mult=ps[i]*abs(musicphase).T
    mult=np.transpose(mult)
    Mult.append(mult)

In [None]:
MusicGD=np.zeros((Samples,360))
for i in range(Samples):
    MusicGD[i]=Mult[i]

In [None]:
i=43
plt.plot(Angles,Mult[i])
plt.plot(Angles[DoAsMUS[i]],Mult[i][DoAsMUS[i]],'o')
plt.title('MUSIC')
plt.show()

In [None]:
import tensorflow as tf

In [None]:
inputs = tf.keras.layers.Input((N,N,1))
x = tf.keras.layers.Conv2D(256,(5,5), activation='relu',name="cnn_1", padding='same')(inputs)
x = tf.keras.layers.LayerNormalization(axis=1 , center=True , scale=True)(x)
x = tf.keras.layers.ReLU(max_value=None, negative_slope=0, threshold=0)(x)
x = tf.keras.layers.Conv2D(256,(5,5), activation='relu',name="cnn_2", padding='same')(x)
x = tf.keras.layers.LayerNormalization(axis=1 , center=True , scale=True)(x)
x = tf.keras.layers.ReLU(max_value=None, negative_slope=0, threshold=0)(x)
x = tf.keras.layers.Conv2D(256,(3,3), activation='relu',name="cnn_4", padding='same')(x)
x = tf.keras.layers.LayerNormalization(axis=1 , center=True , scale=True)(x)
x = tf.keras.layers.ReLU(max_value=None, negative_slope=0, threshold=0)(x)
x = tf.keras.layers.Conv2D(256,(3,3), activation='relu',name="cnn_5", padding='same')(x)
x = tf.keras.layers.LayerNormalization(axis=1 , center=True , scale=True)(x)
x = tf.keras.layers.ReLU(max_value=None, negative_slope=0, threshold=0)(x)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dropout(0.5)(x)
x = tf.keras.layers.Softmax(axis=-1)(x)
out = tf.keras.layers.Dense(360, activation = 'linear', name = 'output')(x)

In [None]:
model = tf.keras.Model(inputs = [inputs], outputs = [out])
optimizer=tf.keras.optimizers.Adam(learning_rate=0.01,beta_1=0.9,beta_2=0.999,epsilon=1e-08)
model.compile(optimizer=optimizer, loss='mse',metrics=['mean_squared_error'])
model.summary()

In [None]:
from tensorflow.keras.callbacks import EarlyStopping
callbacks = [EarlyStopping(monitor='val_loss', patience=5,verbose=1, mode='min')]

In [None]:
result = model.fit(Xu,psindB, epochs =200, shuffle = True,callbacks=callbacks,
                  verbose = 1,validation_split=0.2)

In [None]:
figsize=4,4
figure,ax=plt.subplots(figsize=figsize)
plt.plot(result.history['mean_squared_error'],label='Training Loss')
plt.plot(result.history['val_mean_squared_error'],label='Validation Loss')
plt.legend(loc='upper right')
plt.xlabel('Epoch')
plt.ylabel('MSE')
plt.title('Training and Validation Loss')
plt.show()

In [None]:
DOA = np.array([50,60])/180


for iter in range(snapshots):
    data = np.zeros(N)
    for i in range(L):
        asv = np.exp(1j*2*np.pi*np.random.rand(1))
        data = data + asv*array_steering_vector(array,DOA[i])
    X[:,iter] = data + np.sqrt(0.5/snr)*(np.random.randn(N)+np.random.randn(N)*1j)
CovMattest = X@X.conj().transpose()
CovMattest.shape

import time
start_time=time.time()

Xutest=np.zeros((N,N,3))
Xutest[:,:,0]=np.real(CovMattest)
Xutest[:,:,1]=np.imag(CovMattest)
Xutest[:,:,2]=np.angle(CovMattest)
Xutest=np.expand_dims(Xutest,axis=0)
Xutest.shape
predictions = model.predict(Xutest)
end_time=time.time()
print(end_time-start_time)

pred=predictions.reshape(360,)
pred.shape
DoAsMUSIC= ss.find_peaks(pred,height=1.0, distance=1.0)

plt.plot(Angles,pred)
plt.plot(Angles,pred)
plt.title('Group Delay MUSIC')
plt.show()

start_time=time.time()
musicphasetest=groupdelay(CovMattest,L,N,Angles,array)
DoAsMUSIC , psindB = music(CovMattest,L,N,array,Angles)
mult=psindB*abs(musicphasetest).T
mult=np.transpose(mult)
end_time=time.time()
print(end_time-start_time)

plt.plot(Angles,mult)
plt.title('Group Delay MUSIC')
plt.show()

In [None]:
lamda = 150 # wavelength
L = 2  # number of sources
N = 10  # number of ULA elements 
snr = 25 # signal to noise ratio
array = np.linspace(0,(N-1)/2,N)
noise = np.random.randn(L) + np.random.randn(L)*1j # random source powers
noise = np.sqrt(1/2)*noise

DOA = np.array([0,60])

SNAPSHOTS=np.array([10,20,50,100,200,500])
RMSE1=np.zeros((1,6))
for k in range(6):
  X = np.zeros((N,SNAPSHOTS[k])) + 1j*np.zeros((N,SNAPSHOTS[k]))
  X 
  for iter in range(SNAPSHOTS[k]):
      data = np.zeros(N)
      for i in range(L):
          asv = np.exp(1j*2*np.pi*np.random.rand(1))
          data = data + asv*noise[i]*array_steering_vector(array,DOA[i])
      X[:,iter] = data + np.sqrt(0.5/snr)*(np.random.randn(N)+np.random.randn(N)*1j)
  CovMattest = X@X.conj().transpose()
  CovMattest.shape

  Xutest=np.zeros((N,N,3))
  Xutest[:,:,0]=np.real(CovMattest)
  Xutest[:,:,1]=np.imag(CovMattest)
  Xutest[:,:,2]=np.angle(CovMattest)
  Xutest=np.expand_dims(Xutest,axis=0)
  Xutest.shape

  predictions = model.predict(Xutest)
  pred=predictions.reshape(360,)

  musicphasetest=groupdelay(CovMattest,L,N,Angles,array)
  DoAsMUSIC , psindB = music(CovMattest,L,N,array,Angles)
  mult=psindB*abs(musicphasetest).T
  mult=np.transpose(mult)

  mse=mean_squared_error(mult,pred)
  rmse=math.sqrt(mse)
  RMSE1[0][k]=rmse

plt.plot(SNAPSHOTS,RMSE1[0],marker='o')
plt.xlabel('Snapshots')
plt.ylabel('RMSE')
plt.show()

In [None]:
SNR=np.array([-30,-25,-20,-10,-5,5,10,20,25,30])
lamda =150 # wavelength
L = 2  # number of sources
N = 10  # number of ULA elements 
array = np.linspace(0,(N-1)/2,N)
noise = np.random.randn(L) + np.random.randn(L)*1j # random source powers
noise = np.sqrt(1/2)*noise

DOA = np.array([50,60])/180 # random source directions
RMSE2=np.zeros([1,10])
snapshots=400
X = np.zeros((N,snapshots)) + 1j*np.zeros((N,snapshots))

for a in range(10):
  for iter in range(snapshots):
        data = np.zeros(N)
        for i in range(L):
            asv = np.exp(1j*2*np.pi*np.random.rand(1))
            data = data + asv*noise[i]*array_steering_vector(array,DOA[i])
        X[:,iter] = data + cmath.sqrt(0.5/SNR[a])*(np.random.randn(N)+np.random.randn(N)*1j)
  CovMattest = X@X.conj().transpose()   
  CovMattest.shape

  Xutest=np.zeros((N,N,3))
  Xutest[:,:,0]=np.real(CovMattest)
  Xutest[:,:,1]=np.imag(CovMattest)
  Xutest[:,:,2]=np.angle(CovMattest)
  Xutest=np.expand_dims(Xutest,axis=0)
  Xutest.shape

  predictions = model.predict(Xutest)
  pred=predictions.reshape(360,)

  musicphasetest=groupdelay(CovMattest,L,N,Angles,array)
  DoAsMUSIC , psindB = music(CovMattest,L,N,array,Angles)
  mult=psindB*abs(musicphasetest).T
  mult=np.transpose(mult)

  mse=mean_squared_error(mult,pred)
  rmse=math.sqrt(mse)
  RMSE2[0][a]=rmse

plt.plot(SNR,RMSE2[0],marker='o')

plt.xlabel('SNR')
plt.ylabel('RMSE')
plt.show()

In [None]:
lamda = 150 # wavelength
L = 1  # number of sources
N = 10  # number of ULA elements 
snr = 30 # signal to noise ratio
array = np.linspace(0,(N-1)/2,N)
noise = np.random.randn(L) + np.random.randn(L)*1j # random source powers
noise = np.sqrt(1/2)*noise

doa = np.arange(90)

snapshots=400
X = np.zeros((N,snapshots)) + 1j*np.zeros((N,snapshots))
  
RMSE3=np.zeros((90))
for k in range(90):
   
  for iter in range(snapshots):
      data = np.zeros(N)
      for i in range(L):
          asv = np.exp(1j*2*np.pi*np.random.rand(1))
          data = data + asv*noise[i]*array_steering_vector(array,doa[k])
      X[:,iter] = data + np.sqrt(0.5/snr)*(np.random.randn(N)+np.random.randn(N)*1j)
  CovMattest = X@X.conj().transpose()
  CovMattest.shape

  Xutest=np.zeros((N,N,3))
  Xutest[:,:,0]=np.real(CovMattest)
  Xutest[:,:,1]=np.imag(CovMattest)
  Xutest[:,:,2]=np.angle(CovMattest)
  Xutest=np.expand_dims(Xutest,axis=0)
  Xutest.shape

  predictions = model.predict(Xutest)
  pred=predictions.reshape(360,)

  musicphasetest=groupdelay(CovMattest,L,N,Angles,array)
  DoAsMUSIC , psindB = music(CovMattest,L,N,array,Angles)
  mult=psindB*abs(musicphasetest).T
  mult=np.transpose(mult)

  mse=mean_squared_error(mult,pred)
  rmse=math.sqrt(mse)
  RMSE3[k]=rmse

plt.plot(doa,RMSE3)

plt.xlabel('Angles')
plt.ylabel('RMSE')
plt.show()

In [None]:
data_1={
   'RMSE1' : RMSE1,
    'Snapshots' : SNAPSHOTS,
    'RMSE2' : RMSE2,
    'SNR' : SNR,
    'RMSE3' : RMSE3,
    'DOA' : doa
}

In [None]:
df = pd.DataFrame.from_dict(data_1, orient='index')
df

In [None]:
df.to_excel('Data.xlsx', index=True,header=True, encoding='utf-8')