## Q1. Source localization

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.fft import fft, fftfreq, ifft
import scipy.io.wavfile as scw
import scipy
import IPython.display as ipd
import pandas as pd

# Define the array parameters
M = 4           # the number of elements
d = 0.04        # inter-element spacing (m)

# f = 3430                  # frequency (Hz)
c = 343                     # speed of sound (m/s)
# k = 2 * np.pi * f / c      # wave number

### Q1_1: a mixture signal of one reverberant speaker signal and a sensor noise

In [2]:
Q1_1 = np.zeros((5, 160000, 4))
fs, Q1_1[0, :, :] = scw.read('Project_audio/Q1_1_1.wav') 
fs, Q1_1[1, :, :] = scw.read('Project_audio/Q1_1_2.wav') 
fs, Q1_1[2, :, :] = scw.read('Project_audio/Q1_1_3.wav') 
fs, Q1_1[3, :, :] = scw.read('Project_audio/Q1_1_4.wav') 
fs, Q1_1[4, :, :] = scw.read('Project_audio/Q1_1_5.wav') 

In [3]:
# VAD: 3~8s
start = 3 * fs
end = 8 * fs
Q1_1 = Q1_1[:, start:end, :]
for i in range(5):
    Q1_1[i] = Q1_1[i] / Q1_1[i].max(axis=0)

In [3]:
def DAS_localize(signal):   
    """Localization using delay and sum 
    Args:
        signal: received signal
    Return:
        loc: angle of sound source
    """
    sig_stft = []
    for i in range(M):
        f, t, Zxx = scipy.signal.stft(signal[:, i], fs=fs, nperseg=256, noverlap=128)
        # sig_filt = scipy.signal.wiener(Zxx)
        sig_stft.append(Zxx)
    sig_stft = np.array(sig_stft)

    theta = np.linspace(-np.pi/2, np.pi/2, 180*4+1)
    y = np.zeros_like(sig_stft[0], dtype=complex)
    P = np.zeros(len(theta))
    loc = []
    for i, theta_i in enumerate(theta):
        for j, f_j in enumerate(f):
            k = 2 * np.pi * f_j / c
            w_DAS = (1 / M) * np.exp(1j * np.arange(M) * k * d * np.sin(theta_i))
            x = sig_stft[:, j, :]
            y[j, :] = np.dot(np.conj(w_DAS).T, x)
        P[i] = np.sum(np.abs(y))
    # plt.plot(P)
    loc = theta[np.argmax(P)] / np.pi * 180
    # print(f"direction is {loc}")
    return loc

Without denoise

In [201]:
loc_1 = []
for i in range(5):
    loc_1.append(round(DAS_localize(Q1_1[i]), 2))
print(loc_1)

[67.75, 1.0, -0.5, 3.25, -0.25]


In [4]:
def wiener_filter(signal):
    # 定義STFT的參數
    window = scipy.signal.windows.hann(256)  # hann window
    nperseg = 256   # frame length
    noverlap = 128  # frame shift

    # STFT
    _, _, stft = scipy.signal.stft(signal, window=window, nperseg=nperseg, noverlap=noverlap)

    # 估計噪音的功率譜密度
    noise = np.median(np.abs(stft), axis=1)
    noise_psd = noise ** 2

    # 估計訊號的功率譜密度
    signal_psd = np.abs(stft) ** 2

    # 計算Wiener filter
    wiener_filter = signal_psd / (signal_psd + noise_psd[:, np.newaxis])

    # filter the noisy signal
    filtered_stft = stft * wiener_filter

    # inverse STFT
    _, filtered_signal = scipy.signal.istft(filtered_stft, window=window, nperseg=nperseg, noverlap=noverlap)

    return filtered_signal

In [43]:
Q1_1_denoise = np.zeros_like(Q1_1)

for i in range(Q1_1.shape[0]):
    for ch in range(M):
        Q1_1_denoise[i, :, ch] = wiener_filter(Q1_1[i, :, ch])

In [64]:
ipd.display(ipd.Audio(Q1_1[1, :, 0], rate=fs))
ipd.display(ipd.Audio(Q1_1_denoise[1, :, 0], rate=fs))

Denoise using Wiener filter

In [45]:
loc_1_denoise = []
for i in range(5):
    loc_1_denoise.append(round(DAS_localize(Q1_1_denoise[i]), 2))
print(f'After denoise: {loc_1_denoise}')

After denoise: [67.25, 0.75, -0.5, 3.0, -0.25]


### Q1_2: a mixture signal of one reverberant speech source signal and an isotropic noise

In [9]:
Q1_2 = np.zeros((5, 160000, 4))
fs, Q1_2[0, :, :] = scw.read('Project_audio/Q1_2_1.wav') 
fs, Q1_2[1, :, :] = scw.read('Project_audio/Q1_2_2.wav') 
fs, Q1_2[2, :, :] = scw.read('Project_audio/Q1_2_3.wav') 
fs, Q1_2[3, :, :] = scw.read('Project_audio/Q1_2_4.wav') 
fs, Q1_2[4, :, :] = scw.read('Project_audio/Q1_2_5.wav') 

In [10]:
# VAD: 3~8s
start = 3 * fs
end = 8 * fs
Q1_2 = Q1_2[:, start:end, :]
for i in range(5):
    Q1_2[i] = Q1_2[i] / Q1_2[i].max(axis=0)

DAS

In [11]:
loc_2 = []
for i in range(5):
    loc_2.append(round(DAS_localize(Q1_2[i]), 2))
print(loc_2)

[-29.75, -13.25, 1.0, 4.0, 0.0]


Denose using Wiener filter

In [12]:
Q1_2_denoise = np.zeros_like(Q1_2)

for i in range(Q1_2.shape[0]):
    for ch in range(M):
        Q1_2_denoise[i, :, ch] = wiener_filter(Q1_2[i, :, ch])

In [13]:
loc_2_denoise = []
for i in range(5):
    loc_2_denoise.append(round(DAS_localize(Q1_2_denoise[i]), 2))
print(f'After denoise: {loc_2_denoise}')

After denoise: [-29.25, -8.5, 1.0, 3.5, 0.0]


MVDR

In [5]:
def MVDR_localize(signal): 
    """Localization using MVDR
    Args:
        signal: received signal
    Return:
        loc: angle of sound source
    """
    
    sig_stft = []
    for i in range(M):
        f, t, Zxx = scipy.signal.stft(signal[:, i], fs=fs, nperseg=256, noverlap=128)
        # sig_filt = scipy.signal.wiener(Zxx)
        sig_stft.append(Zxx)
    sig_stft = np.array(sig_stft)

    theta = np.linspace(-np.pi/2, np.pi/2, 180*4+1)
    Y = np.zeros_like(sig_stft[0], dtype=complex)
    P = np.zeros(len(theta))
    loc = []
    for i, theta_i in enumerate(theta):
        for j, f_j in enumerate(f):
            k = 2 * np.pi * f_j / c
            a = np.exp(1j * np.arange(M) * k * d * np.sin(theta_i))
            X = sig_stft[:, j, :]
            # Rxx = np.dot(X, np.conj(X).T) / X.shape[1]
            Rxx = np.outer(a, np.conj(a).T)
            I = np.eye(M)
            eps = np.finfo(np.float32).eps
            w_MVDR = np.dot(np.linalg.inv(Rxx+eps*I), a) / np.dot(np.conj(a).T, np.dot(np.linalg.inv(Rxx+eps*I), a))
            # w_MVDR = np.dot(np.linalg.inv(Rxx), a) / np.dot(np.conj(a).T, np.dot(np.linalg.inv(Rxx), a))

            Y[j, :] = np.dot(np.conj(w_MVDR).T, X)
        P[i] = np.sum(np.abs(Y))
    # plt.plot(P)
    loc = theta[np.argmax(P)] / np.pi * 180
    # print(f"direction is {loc}")
    return loc

In [63]:
loc_2_MVDR = []
for i in range(5):
    loc_2_MVDR.append(round(MVDR_localize(Q1_2[i]), 2))
print(f'MVDR: {loc_2_MVDR}')

MVDR: [-29.75, -13.25, 1.0, 4.0, 0.0]


In [51]:
loc_2_MVDR_denoise = []
for i in range(5):
    loc_2_MVDR_denoise.append(round(MVDR_localize(Q1_2_denoise[i]), 2))
print(f'MVDR with denoise: {loc_2_MVDR_denoise}')

MVDR with denoise: [-29.25, -8.5, 1.0, 3.5, 0.0]


### Q1_3: a mixture signal of one reverberant speech source signal and a directional interferer

In [6]:
Q1_3 = np.zeros((5, 160000, 4))
fs, Q1_3[0, :, :] = scw.read('Project_audio/Q1_3_1.wav') 
fs, Q1_3[1, :, :] = scw.read('Project_audio/Q1_3_2.wav') 
fs, Q1_3[2, :, :] = scw.read('Project_audio/Q1_3_3.wav') 
fs, Q1_3[3, :, :] = scw.read('Project_audio/Q1_3_4.wav') 
fs, Q1_3[4, :, :] = scw.read('Project_audio/Q1_3_5.wav') 

In [7]:
# VAD: 3~8s
start = 3 * fs
end = 8 * fs
Q1_3 = Q1_3[:, start:end, :]
for i in range(5):
    Q1_3[i] = Q1_3[i] / Q1_3[i].max(axis=0)

DAS

In [8]:
loc_3 = []
for i in range(5):
    loc_3.append(round(DAS_localize(Q1_3[i]), 2))
print(f'DAS: {loc_3}')

DAS: [1.0, -0.5, 1.0, -0.5, 0.75]


In [14]:
Q1_3_denoise = np.zeros_like(Q1_3)

for i in range(Q1_3.shape[0]):
    for ch in range(M):
        Q1_3_denoise[i, :, ch] = wiener_filter(Q1_3[i, :, ch])

In [15]:
loc_3_denoise = []
for i in range(5):
    loc_3_denoise.append(round(DAS_localize(Q1_3_denoise[i]), 2))
print(f'DAS with denoise: {loc_3_denoise}')

DAS with denoise: [1.0, -0.5, 1.0, -0.5, 0.75]


MVDR

In [17]:
loc_3_MVDR = []
for i in range(5):
    loc_3_MVDR.append(round(MVDR_localize(Q1_3[i]), 2))
print(f'MVDR: {loc_3_MVDR}')

MVDR: [1.0, -0.5, 1.0, -0.5, 0.75]


In [18]:
loc_3_MVDR_denoise = []
for i in range(5):
    loc_3_MVDR_denoise.append(round(MVDR_localize(Q1_3_denoise[i]), 2))
print(f'MVDR with denoise: {loc_3_MVDR_denoise}')

MVDR with denoise: [1.0, -0.5, 1.0, -0.5, 0.75]


In [349]:
def LCMV_localize(signal):   
    sig_stft = []
    for i in range(M):
        f, t, Zxx = scipy.signal.stft(signal[:, i], fs=fs, nperseg=256, noverlap=128)
        # sig_filt = scipy.signal.wiener(Zxx)
        sig_stft.append(Zxx)
    sig_stft = np.array(sig_stft)

    theta_target = np.linspace(-np.pi/2, np.pi/2, 181)
    theta_interf = np.linspace(-np.pi/2, np.pi/2, 181)
    Y = np.zeros((len(theta_interf), len(f), len(t)), dtype=complex)
    P = np.zeros((len(theta_target), len(theta_interf)))
    C = np.zeros((M, 2), dtype=complex)
    g = np.array([1, 0])
    loc_target = []
    loc_interf = []
    for i, theta_i in enumerate(theta_target):
        for n, theta_n in enumerate(theta_interf):
            for j, f_j in enumerate(f):
                k = 2 * np.pi * f_j / c
                # a = np.exp(1j * np.arange(M) * k * d * np.sin(theta_i))
                C[:, 0]= np.exp(1j * np.arange(M) * k * d * np.sin(theta_i))
                C[:, 1]= np.exp(1j * np.arange(M) * k * d * np.sin(theta_n))
                X = sig_stft[:, j, :]
                Rxx = np.dot(X, np.conj(X).T) / X.shape[1]
                # Rxx = np.outer(a, np.conj(a).T)
                I = np.eye(M)
                eps = np.finfo(np.float32).eps
                # w_MVDR = np.dot(np.linalg.inv(Rxx+eps*I), a) / np.dot(np.conj(a).T, np.dot(np.linalg.inv(Rxx+eps*I), a))
                w_LCMV = np.dot(np.linalg.inv(Rxx+eps*I), C) @ np.linalg.inv(np.dot(np.conj(C).T, np.dot(np.linalg.inv(Rxx+eps*I), C)) + 1e-9*np.eye(2)) @ g

                Y[n, j, :] = np.dot(np.conj(w_LCMV).T, X)
            P[i, n] = np.sum(np.abs(Y))
    # plt.plot(P)
    Pmax = np.argmax(P)
    target_idx = Pmax // len(theta_interf)
    interf_idx = Pmax % len(theta_interf)
    loc_target.append(theta_target[target_idx] / np.pi * 180)
    loc_interf.append(theta_interf[interf_idx] / np.pi * 180)

    # print(f"direction is {loc}")
    return loc_target, loc_interf

In [204]:
submission = pd.read_csv("Q1_Results_sample.csv", header=[0, 1])
submission.iloc[:, 1] = loc_1
submission.iloc[:, 2] = loc_2
submission.iloc[:, 3] = loc_3
submission

Unnamed: 0_level_0,Localization (in degree),Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0
Unnamed: 0_level_1,Unnamed: 0_level_1,Q1_1,Q1_2,Q1_3
0,1,67.75,-29.75,1.0
1,2,1.0,-13.25,-0.5
2,3,-0.5,1.0,1.0
3,4,3.25,4.0,-0.5
4,5,-0.25,0.0,0.75


In [205]:
submission.to_csv('submission.csv', index=False)