In [12]:
import librosa
import os
import numpy as np
import scipy.signal
import random
import math
from scipy.io import loadmat
from IPython.display import Audio
import pescador
import os
import random,librosa

In [13]:
#pos_x and pos_y are integers from 1 to 9; snr_ratio should be from 0 to 1
def generate_examples(speech_path,noise_dir,srir_dir,pos_x,pos_y,d_yaw,d_pitch,d_roll,snr_ratio):
    #load in mono speech
    src_audio, sr = librosa.load(speech_path, sr=44100, mono=True)
    src_audio /= src_audio.max()
    
    #convolve speech with srir
    grid_x=pos_x
    grid_y=pos_y
    
    ch_out_list = []
    sh_names = ["W", "X", "Y", "Z"]
    for sh_str in sh_names:
        ch_ir_path = os.path.join(srir_dir, sh_str,
                              "{}x{:02d}y{:02d}.wav".format(sh_str, grid_x, grid_y))
        ch_ir, sr = librosa.load(srir_dir, sr=44100)
        
        ch_ir_len = ch_ir.shape[0]
        src_len = src_audio.shape[0]
    
        if ch_ir_len > src_len:
            pad_len = ch_ir_len - src_len
            src_audio = np.pad(src_audio, (0, pad_len), mode='constant')
        elif ch_ir_len < src_len:
            pad_len = src_len - ch_ir_len
            ch_ir = np.pad(ch_ir, (0, pad_len), mode='constant')
        
        ch_out = scipy.signal.fftconvolve(src_audio, ch_ir, mode='full')[:src_len]
        ch_out_list.append(ch_out)

    src_bformat = np.array(ch_out_list)
    
    #load in bformat noise
    noise_data = None
    while noise_data is None or noise_data.shape[1] < src_bformat.shape[1]:
        noise_data, sr = librosa.load(noise_dir, sr=44100, mono=False)
    
    #align bformat noise and bformat speech
    clip_len = src_bformat.shape[1]
    start_idx = np.random.randint(0, noise_data.shape[1] - clip_len)
    noise_data = noise_data[:,start_idx:start_idx + clip_len]
    
    #designate snr and scale
    snr = 10 * np.log10(np.mean(src_bformat[0,:] ** 2) / np.mean(noise_data[0,:] ** 2))
    snr_target = snr_ratio * 40.0 - 20.0
    alpha = 10.0**((snr_target - snr) / 20.0)#scaling factor
    src_bformat *= alpha
    
    #combine the noise+speech
    mix_bformat = src_bformat + noise_data
    
    return mix_bformat
    
    

In [14]:
#pos_x and pos_y are integers from 1 to 9; snr_ratio should be from 0 to 1
def generate_examples(speech_path,noise_dir,srir_dir,pos_x,pos_y,d_yaw,d_pitch,d_roll,snr_ratio):
    #load in mono speech
    src_audio, sr = librosa.load(speech_path, sr=44100, mono=True)
    src_audio /= src_audio.max()
    
    #convolve speech with srir
    grid_x=pos_x
    grid_y=pos_y
    
    ch_out_list = []
    sh_names = ["W", "X", "Y", "Z"]
    for sh_str in sh_names:
        ch_ir_path = os.path.join(srir_dir, sh_str,
                              "{}x{:02d}y{:02d}.wav".format(sh_str, grid_x, grid_y))
        ch_ir, sr = librosa.load(srir_dir, sr=44100)
        
        ch_ir_len = ch_ir.shape[0]
        src_len = src_audio.shape[0]
    
        if ch_ir_len > src_len:
            pad_len = ch_ir_len - src_len
            src_audio = np.pad(src_audio, (0, pad_len), mode='constant')
        elif ch_ir_len < src_len:
            pad_len = src_len - ch_ir_len
            ch_ir = np.pad(ch_ir, (0, pad_len), mode='constant')
        
        ch_out = scipy.signal.fftconvolve(src_audio, ch_ir, mode='full')[:src_len]
        ch_out_list.append(ch_out)

    src_bformat = np.array(ch_out_list)
    
    #load in bformat noise
    noise_data = None
    while noise_data is None or noise_data.shape[1] < src_bformat.shape[1]:
        noise_data, sr = librosa.load(noise_dir, sr=44100, mono=False)
    
    #align bformat noise and bformat speech
    clip_len = src_bformat.shape[1]
    start_idx = np.random.randint(0, noise_data.shape[1] - clip_len)
    noise_data = noise_data[:,start_idx:start_idx + clip_len]
    
    #designate snr and scale
    snr = 10 * np.log10(np.mean(src_bformat[0,:] ** 2) / np.mean(noise_data[0,:] ** 2))
    snr_target = snr_ratio * 40.0 - 20.0
    alpha = 10.0**((snr_target - snr) / 20.0)#scaling factor
    src_bformat *= alpha
    
    #combine the noise+speech
    mix_bformat = src_bformat + noise_data
    
    return src_bformat, noise_data, mix_bformat

In [15]:
def lstm_speech_mask_sampler(speech_path, noise_dir, srir_dir, sc_to_pos_dict):
    sc_list = sorted(sorted(list(sc_to_pos_dict.keys()), key=lambda x: x[1]), key=lambda x: x[0])
    azi_list, elv_list = zip(*sc_list)
    steer_mat = steer_vector(azi_list, elv_list)

    while True:
        steer_idx = np.random.randint(len(sc_list))
        azi, elv = azi_list[steer_idx], elv_list[steer_idx]
        pos_x, pos_y, d_yaw, d_pitch, d_roll = random.choice(sc_to_pos_dict[(azi, elv)])

        snr_ratio = np.random.random()

        src, noise, mix = generate_examples(speech_path,noise_dir,srir_dir,pos_x,pos_y,d_yaw,d_pitch,d_roll,snr_ratio)
        inp = featurematrix(steer_idx, mix, steer_mat)

        mask, _ = compute_masks(src[0], noise[0])
        
        yield {
            'input': inp,
            'mask': mask
        }

def lstm_data_generator(speech_dir, noise_dir, srir_dir, sc_to_pos_dict, batch_size, active_streamers,
                        rate, random_state=12345678):
    for root, dir_list, file_list in os.walk(speech_dir):
        for fname in file_list:
            if not fname.endswith('.wav'):
                continue

            speech_path = os.path.join(root, fname)

            streamer = pescador.Streamer(lstm_speech_mask_sampler, 
                                         speech_path, noise_dir, srir_dir, sc_to_pos_dict)
            seeds.append(streamer)

    # Randomly shuffle the seeds
    random.shuffle(seeds)

    mux = pescador.StochasticMux(seeds, active_streamers, rate=rate, random_state=random_state)

    if batch_size == 1:
        return mux
    else:
        return pescador.maps.buffer_stream(mux, batch_size)

In [7]:
speech_dir='./vctk-p225/p225_003.wav'
noise_dir = './ambiencelondonstreet.wav'
srir_dir='./isophonics/greathall'
pos_x=0
pos_y=0
snr_ratio=0.1
mix_bformat=generate_examples(speech_dir,noise_dir,srir_dir,pos_x,pos_y,snr_ratio)

FileNotFoundError: [Errno 2] No such file or directory: '/home/jtc440/dev/3daudio/ambisonic_source_separation/isophonics/greathall/Z/Zx00y03.wav'

In [None]:
def rotate_90(audio):
    return scipy.signal.hilbert(audio).imag
sr=44100
# Mix to stereo according to https://en.wikipedia.org/wiki/Ambisonic_UHJ_format#UHJ_encoding_and_decoding_equations
# S = 0.9396926*W + 0.1855740*X
# D = j(-0.3420201*W + 0.5098604*X) + 0.6554516*Y
# Left = (S + D)/2.0
# Right = (S - D)/2.0
S = 0.9396926 * mix_bformat[0] + 0.1855740 * mix_bformat[1]
D = rotate_90(-0.3420201 * mix_bformat[0] + 0.5098604 * mix_bformat[1]) + 0.6554516 * mix_bformat[2]
L = (S + D)/2.0
R = (S - D)/2.0
mix_mono = S
mix_stereo = np.stack([L,R])

import IPython.display as ipd

ipd.Audio(mix_stereo,rate=sr)

# Feature extraction Methods

1. steering matrix
2. beamformer (pseudo-inverse of steering matrix)
3. concatenate s_hat, n_hat, x_w to be the feature matrix

### Get azimuth and elevation pairs

In [8]:
# Note that we swap x and y here to be consistent with standard 
# spherical coordinate convention
speaker_coord = np.array([-2, 6, 0])

In [9]:
def cartesian_to_spherical(dist_coord, rads=True):
    # Convert cartesian coordiantes to spherical coordinates
    x, y, z = dist_coord
    r = np.linalg.norm(dist_coord)
    azi = math.atan2(y, x)
    # NOTE: This differs from typical elevation/inclination, which is usuall measured with 0 pointing up
    elv = math.asin(z/float(r))
    
    if elv > np.pi/2:                    
        elv = (np.pi - elv) % (np.pi/2)
        azi = (np.pi + azi) % (2 * np.pi) - np.pi
    elif elv < - np.pi/2:
        elv = -((np.pi + elv) % (np.pi/2))
        azi = (np.pi + azi) % (2 * np.pi) - np.pi

    
    if not rads:
        azi = azi * 180.0 / np.pi
        elv = elv * 180.0 / np.pi
        
    return np.array([r, azi, elv])

# https://github.com/polarch/Spherical-Harmonic-Transform/blob/master/euler2rotationMatrix.m
def euler_to_rotation_matrix(alpha, beta, gamma, order='xyz'):
    """
    %   alpha:  first angle of rotation
    %   beta:   second angle of rotation
    %   gamma:  third angle of rotation
    %
    %   order:  definition of the axes of rotation, e.g. for the y-convention
    %           this should be 'zyz', for the x-convnention 'zxz', and for
    %           the yaw-pitch-roll convention 'zyx'
    """
    def Rx(theta):
        return np.array([
            [1.0, 0.0, 0.0],
            [0.0, np.cos(theta), np.sin(theta)],
            [0.0, -np.sin(theta), np.cos(theta)]])
    
    def Ry(theta):
        return np.array([
            [np.cos(theta), 0.0, -np.sin(theta)],
            [0.0, 1.0, 0.0],
            [np.sin(theta), 0.0, np.cos(theta)]
        ])
    # [cos(theta) sin(theta) 0; -sin(theta) cos(theta) 0; 0 0 1]
    def Rz(theta):
        return np.array([
            [np.cos(theta), np.sin(theta), 0.0],
            [-np.sin(theta), np.cos(theta), 0.0],
            [0.0, 0.0, 1.0]])
    
    R = np.eye(3)
    for idx, dim in reversed(list(enumerate(order))):
        if dim == 'x':
            R_func = Rx
        elif dim == 'y':
            R_func = Ry
        elif dim == 'z':
            R_func = Rz
            
        if idx == 0:
            theta = alpha
        elif idx == 1:
            theta = beta
        elif idx == 2:
            theta = gamma
            
        R = np.dot(R, R_func(theta))
        
    return R

# https://github.com/polarch/Higher-Order-Ambisonics/blob/master/rotateBformat.m
def rotate_bformat(bfsig, yaw, pitch, roll, order='xyz'):
    R_mat = euler_to_rotation_matrix(-yaw*np.pi/180.0,
                                     -pitch*np.pi/180.0,
                                     roll*np.pi/180,
                                     order=order);

    # augment with zero order
    Rbf = np.zeros((4,4));
    Rbf[0,:] = 1.0;
    Rbf[1:,1:] = R_mat;

    # apply to B-format signals
    return np.dot(Rbf, bfsig)

def rotate_coord(coord, yaw, pitch, roll, order='xyz'):
    R_mat = euler_to_rotation_matrix(-yaw*np.pi/180.0,
                                     -pitch*np.pi/180.0,
                                     roll*np.pi/180,
                                     order=order);

    return np.dot(R_mat, coord)

In [10]:
azi_list = []
elv_list = []
pos_list = []

for grid_x in np.arange(13,step=1):
    for grid_y in np.arange(13, step=1):
        # Note that we swap x and y here to be consistent with standard 
        # spherical coordinate convention
        x = grid_y
        y = grid_x
        mic_coord = np.array([x, y, 0])
        for d_yaw in np.arange(-180, 180, 60):
            for d_pitch in np.arange(-90, 90 + 10, 45):
                for d_roll in np.arange(-90, 90 + 10, 45):


                    rel_src_coord = mic_coord - speaker_coord
                    rot_src_coord = rotate_coord(rel_src_coord, d_yaw, d_pitch, d_roll, order='xyz')
                    rot_src_coord_spherical = cartesian_to_spherical(rot_src_coord, rads=False)
                    rot_azi, rot_elv = rot_src_coord_spherical[1:]

                    azi_list.append(rot_azi)
                    elv_list.append(rot_elv)
                    pos_list.append((x, y, d_yaw, d_pitch, d_roll))

In [11]:
sc_to_pos_dict = {}
for azi, elv, pos in zip(azi_list, elv_list, pos_list):
    if (azi, elv) not in sc_to_pos_dict:
        sc_to_pos_dict[(azi, elv)] = []
    sc_to_pos_dict[(azi, elv)].append(pos)
    
sc_list = sorted(sorted(list(sc_to_pos_dict.keys()), key=lambda x: x[1]), key=lambda x: x[0])
azi_list, elv_list = zip(*sc_list)
del pos_list

### Steering matrix

In [1]:
#generate matrix of steering vectors that include azimuth & elevation of certain range
def steer_vector(azis,eles):
    #azi_res is resolution of azimuth angle from -180 to 180
    #ele_res is resolution of elevation angle from -90 to 90
    #theta=np.arange(-180,180,azi_res)
    #phi=np.arange(-90,90,ele_res)
    
    #azis and eles are pairs of chosen directions
    m=azis.shape[0]
    n=eles.shape[0]
    D=np.zeros((4,m*n))#the steering matrix is of size (4,len(pairs)))
  
    for idx, azi in enumerate(azis):
        for idx2, ele in enumerate(eles):            
            d=np.array([1,np.sqrt(3)*np.cos(azi)*np.cos(ele),np.sqrt(3)*np.sin(azi)*np.cos(ele),np.sqrt(3)*np.sin(ele)])
            D[:,idx*n+idx2]=d
    return D

In [4]:
#simple anechoic beamformer is the pseudo inverse of steering matrix.
def beamformer(pair,steer_mat):
    #pair is the index of desired pair of azimuth/elevation, D=(m,n), inv(D)=(n,m)
    u=np.zeros(steer_mat.shape[1])#(1,n)
    u[pair]=1
    beamformer=np.linalg.pinv(steer_mat)*u[:,None]#output=(n,m) should compute it only once
    return beamformer

function that given azi,ele info, output index in the steering matrix, subject to change

In [5]:
#index of the position pair (azi, ele) in the steering matrix
#def pairidx(azi,ele):
#    return int((azi+180)/30*11+(ele+50)/10)

#pair_idx=pairidx(0,0)
#bf=beamformer(pair_idx,D)

In [20]:
def featurematrix(tgt_idx,clip,D):
    #azi and ele are two numbers corresponding to position of targeted speech
    #clip is the synthesized 4-channel audio clip
    #D is the steering matrix, which is constant for all calculations
    clip_w=clip[0,:]
    bf=beamformer(tgt_idx,D)
    bf_tgt=bf[tgt_idx,:]#1 by 4 vector
    #compute stft of 4 channels of audioclip
    x_sp_w=signal.stft(clip[0,:], fs=16e3, window=('bohman',1024), nperseg=1024, noverlap=None)#also xw
    x_sp_x=signal.stft(clip[1,:], fs=16e3, window=('bohman',1024), nperseg=1024, noverlap=None)
    x_sp_y=signal.stft(clip[2,:], fs=16e3, window=('bohman',1024), nperseg=1024, noverlap=None)
    x_sp_z=signal.stft(clip[3,:], fs=16e3, window=('bohman',1024), nperseg=1024, noverlap=None)
    x_sp=np.stack((x_sp_w,x_sp_x,x_sp_y,x_sp_z), axis=2)#dimension should be (#time frame, #freq bin,4)
    #s_hat should be (#time frames, #frenquency bins)
    s_hat=np.abs(np.sum(bf_tgt[None,None,:]*x_sp,axis=2))# this part remains a question, broadcasting not equivalent to matrix multiplication
    #     #n_hat should be (#time frames, #frequency bins, #however many other directions we count as distraction)
    #     #problematic!!
    #     dis_idx=random.randint(0,D.shape[0]-1)#for now just pick a random direction
    #     bf_dis=bf[dis_idx,:]
    #     n_hat=np.sum(bf_dis[None,None,:]*x_sp,axis=2)
    #the final concatenated feature (#time frames, 3*#frequency bins)
    feature=np.stack((x_sp_w,s_hat),axis=1)
    return feature
    

testing part

In [12]:
import numpy as np
a=np.ones((2,3,4))*0.5
b=np.ones((2,3,4))*(-0.5)
print(np.sum((a*b),axis=2).shape)

(2, 3)


In [17]:
import random
random.randint(0,1)

0

# Ground Truth Masks Generation

1. compute mask from speech and noise signals in w channel
2. obtain multichannel weiner filter(MWF) using GEVD approach to replace common mask 

In [18]:
#input src and noise should be only in the W channel
def compute_masks(src, noise):
    #assuming each audio clip is sampled at 16kHz,compute the STFT
    #with a sinusoidal window of 1024 samples and 50% overlap.
    #window=signal.get_window('bohman',1024)
    sw=signal.stft(src, fs=16e3, window=('bohman',1024), nperseg=1024, noverlap=None)#need to check dimensions of these
    nw=signal.stft(noise, fs=16e3, window=('bohman',1024), nperseg=1024, noverlap=None)
    Ms=sw*sw/(sw*sw+nw*nw)#mask should be (#time frame, #frequency bin) is it point-wise multiplication?
    Mn=1-Ms
    return Ms,Mn

In [19]:
#compute speech s_hat from mask, then covariance matrix PHI_ss/PHI_nn from s_hat, then PHI_ss-r1/PHI_nn-r1, then wGEVD

def get_GEVD(Mask_s,Mask_n,mix,speech,noise):
    #Mask_s,Mask_n, mix, speech, noise are all of size (#time, #freq)
    s_bar=Mask_s*mix#pointwise multiplication
    T,F=s_bar.shape
    phi_ss=1/T*np.sum(s_bar*s_bar,axis=0)#should be of (#freq,)
    #phi_ss=1/T*np.matrix(np.sum(s_hat*s_hat.conjugate().transpose(),axis=0))#the dimension is not #freq then
    sn_bar==Mask_n*mix
    Tn,Fn=sn_bar.shape
    phi_nn=1/Tn*np.sum(sn_bar*sn_bar,axis=0)
    #phi_nn=1/Tn*np.matrix(np.sum(sn_hat*sn_hat.conjugate().transpose(),axis=0))
    #rank-1 approximation
    u,s,v=np.linalg.svd(phi_ss, full_matrices=False)
    phi_ss_r1=s[0] * np.outer(u[0], v[0])#u need
    un,sn,vn=np.linalg.svd(phi_nn, full_matrices=False)
    phi_nn_r1=sn[0]* np.outer(un[0],vn[0])
    #get wGEVD
    u1=np.zeros((phi_nn_r1.shape[0]))
    u1[0]=1
    wGEVD=np.matmul(inv(phi_ss_r1+phi_nn_r1),phi_ss_r1)*u1#what is u1 in this case



# Pass MWF back to mixture and reconstruct desired signals