In [None]:
#
import numpy as np
class MultiChannel:
    def __init__(self,ro,sv,type,iar):
        self.ho=self.__array__(ro,type)
        DI,D,L,hsel=self.__setup__(iar)
        self.hsel=hsel
        self.sv=sv
        self.D=D
        self.DI=DI
        self.L=L
    #
    def __str__(self): return "Multi-Channel direction finding using fft"
    #
    def __array__(self,ro,type):
        ''' contruct volimetric array of hydrophones
            ro   : radius of bounding sphere
            type : configuration ("tetahedron","octahedron","cube")'''
        dz=np.sqrt(0.5)
        if type=='tetrahedron':
            ho=np.array([[1,0,-dz],[-1,0,-dz],[0,-1,dz],[0,1,dz]])
        elif type=='octahedron':
            ang=np.array([0,120,240,60,180,300])*np.pi/180
            ho=np.transpose([np.cos(ang),np.sin(ang),0*ang])
            ho[:3,2]=-dz
            ho[3:,2]=+dz
        elif type=='cube':
            ho=np.array([[1,0,-dz],[0,1,-dz],[-1,0,-dz],[0,-1,-dz],
                        [1,0, dz],[0,1, dz],[-1,0, dz],[0,-1, dz]])
        ho *=ro
        return ho
    def __setup__(self,iar=-1):
        ''' generate vector of hydrophone pairs
            ho  : hydrophone vectors
            iar : selected sub array (-1: all pairs)'''
        nh=self.ho.shape[0]
        ho=self.ho
        hsel=np.zeros((nh*(nh-1)//2,2),dtype=int)
        #
        kk=0
        for ii in range(nh): 
            for jj in range(ii+1,nh): 
                hsel[kk,:]=[jj,ii]
                kk +=1
        #
        D=ho[hsel[:,0],:]-ho[hsel[:,1],:]
        L=np.sqrt(np.sum(D**2,1))
        Lc=(L*1e+6).astype(int)
        Lunique=np.unique(Lc)
        if (iar>=0) & (iar<len(Lunique)):
            nl=np.where(Lunique[iar]==Lc)[0]
            D=D[nl,:]
            L=L[nl]
            hsel=hsel[nl,:]
        #
        DI=np.linalg.pinv(D)
        return DI,D,L,hsel

    def __quadInt__(self,uu,imx,nd1):
        ''' three point quardatic interpolation 
        to find location and value of interpolated maxima'''
        nc=len(imx)
        uo=uu[imx,range(nc)]
        um=uu[imx-1,range(nc)]
        up=uu[imx+1,range(nc)]
        #
        b=(up+um-2*uo)/2;
        #
        xo=(um-up)/(4*b);
        yo=uo-b*xo**2;
        #
        ux=np.zeros((nc,2))
        ux[:,0]=xo+imx-nd1
        ux[:,1]=yo
        return ux
    #
    def __proc_fft__(self,xx,nfft):
        ''' fft-based crosscorrelation returning spectrum, cross-spectrum and correlation lag'''
        hsel=self.hsel
        uu=np.fft.rfft(xx*self.win,axis=0,n=nfft)
        vv=uu[:,hsel[:,0]]*np.conjugate(uu[:,hsel[:,1]])
        vv = vv/np.abs(vv)
        yy=np.fft.fftshift(np.fft.irfft(vv,axis=0),axes=0)

        jmx= yy[1:-1,:].argmax(axis=0)+1
        ux=self.__quadInt__(yy,jmx,nfft//2)
        return uu, -vv.imag, ux

    def sensorDistances(self): return self.L   
    def hydrophoneLocations(self): return self.ho
    def hydrophoneSelection(self): return self.hsel     
    #
    def __delay__(self,S,fs):
        '''
        estimate delays using direction vector
        S direction vector
        '''
        DC=np.sum(self.ho*S,1)   # ho are locations relative to array center
        DS=DC*fs/self.sv
        return DS
    #
    def signalDirection(self,az,el):
        '''
        estimate direction using arrival angles
        az : azimuth angle
        el : elevation angle
        '''
        return np.array([np.cos(az)*np.cos(el),np.sin(az)*np.cos(el), np.sin(el)])
    #
    def signalDirection_fromGeometry(self,az,dx,dy):
        '''
        estimate direction using distance, depth geometry
        az : azimuth angle
        dx : horizontal distance 
        dy : vertical separation 
        '''
        el=np.arctan2(dy,dx)
        C=self.signalDirection(az,el)
        return C
    #
    def simulate_fromAngles(self,xx,az,el,fs,noise):
        C=self.signalDirection(az,el)
        return self.simulate(xx,C,fs,noise)
    #
    def simulate_fromGeometry(self,xx,az,dx,dy,fs,noise):
        C=self.signalDirection_fromGeometry(az,dx,dy)
        return self.simulate(xx,C,fs,noise)
    #
    def simulate(self,xx,C,fs,noise):
        '''
        delay signal according to array geometry 
        (use fractional delay with sinc function)
        xx      : time series
        S       : direction vector
        fs      : sampling frequency
        noise   : noise variance
        '''
        DS=self.__delay__(C,fs)
        #
        N=int(5*np.max(np.abs(DS)))
        kk=np.arange(-N,N).reshape(-1,1)
        if 0:
            win=np.sinc(kk+DS)*kk
            ss=fft_filt(win,xx)
        else:
            ss=np.zeros((len(xx),len(DS)))
            for ii in range(len(DS)):
                win=np.sinc(kk+DS[ii])
                ss[:,ii]=fft_filt(win,xx)[:,0]

        nn=np.random.normal(scale=noise, size=ss.shape)
        ss +=nn
        return ss

    def process(self,ss,fs,nw,step,nfft):
        ''' process time series '''
        nd,nch=ss.shape
        offsets=np.arange(0,nd-nw,step,dtype=int)
        no = offsets.shape[0]
        nh = self.hsel.shape[0]
        nfr=1+nfft//2
        Q  = np.zeros((no,nfr,nch),dtype='complex')
        C  = np.zeros((no,nfr,nh))
        ux = np.zeros((no,nh,2))
        self.win=np.hanning(nw).reshape(-1,1)
        #
        for ii,jj in zip(offsets,range(no)):
            xx=ss[ii:ii+nw,:]
            Q[jj,:,:],C[jj,:,:],ux[jj,:,:]=self.__proc_fft__(xx,nfft)
        #
        # accumulate 3-d components
        V=-ux[:,:,0]@self.DI.T
        I=-C@self.DI.T
        #
        t=(offsets+nw//2)/fs
        f=np.arange(nfr)*fs/nfft
        return Q,I,V,ux,f,t
    
    def directions(self,ss,fs,nw,step,nfft):
        ''' direction finding together with power spectrum and sound intensity '''
        Q,I,V,ux,f,t=self.process(ss,fs,nw,step,nfft)
        P=np.abs(Q)
        #
        In=np.sqrt(np.sum(I**2,2))
        In[:,0]=In[:,1]
        In[:,-1]=In[:,-2]
        J=I/In.reshape(In.shape[0],In.shape[1],1)
        In[:,0]=0
        In[:,-1]=0

        A=np.mean(ux,axis=1)
        #
        AZ=180/np.pi*np.arctan2(J[:,:,1],J[:,:,0])
        EL=180/np.pi*np.arctan2(J[:,:,2],np.sqrt(J[:,:,0]**2+J[:,:,1]**2))
        az=180/np.pi*np.arctan2(V[:,1],V[:,0])
        el=180/np.pi*np.arctan2(V[:,2],np.sqrt(V[:,0]**2+V[:,1]**2))
        return P,In,A,AZ,EL,az,el,f,t,ux
    
    def beamform(self,ss,aze,ele,fs,nw,nfft,step=None):
        ''' given angles of arrival, beamform (align) timeseries of array data'''
        nd,nch=ss.shape
        Se =np.array([cosd(aze)*cosd(ele),sind(aze)*cosd(ele), sind(ele)])
        DS =np.sum(self.ho*Se,1)*fs/self.sv
        #
        nt=int(np.ceil(3*np.abs(DS).max()))
        kk=np.arange(-nt,nt).reshape(-1,1)
        uu=np.sinc(kk-DS)
        vv=np.fft.rfft(uu,n=nfft,axis=0)
        #
        if step==None:
            # single continuous time series
            step=nfft-2*nt
            offsets = range(0, nd, step)
            zz=np.zeros(nd+nfft)            # one-dimensional output time series
            for nn in offsets:
                xx=ss[nn:nn+nw,:]
                yy=np.fft.rfft(xx,n=nfft,axis=0)
                xv=np.fft.irfft(yy*vv,axis=0)
                zz[nn:nn+nfft] += np.mean(xv,1)
            #
            return zz[nt:nt+nd]
        else:
            # one or more time snippets
            if step==0:
                offsets = range(1)
            else:
                offsets = range(0, nd, step)
            no = len(offsets)
            zz=np.zeros((nw,no))            # two-dimensional output time series (snippets)
            for nn,jj in zip(offsets,range(no)):
                xx=ss[nn:nn+nw,:]
                yy=np.fft.rfft(xx,n=nfft,axis=0)
                xv=np.fft.irfft(yy*vv,axis=0)
                zzo=xv[:nw,:]
                zz[:,jj] = np.mean(zzo,1)
            return zz,zzo
