# Meta-data generation script for Interspeech2024 speech enhancement dataset

Define RIRs for the dataset. 60.000 utterances, 70% 15% 15%

In [None]:
import masp as srs
import numpy as np
import soundfile as sf
from IPython.display import Audio
import scipy
import copy
import pandas as pd
import os
from os.path import join as pjoin
from multiprocessing import Pool
import matplotlib.pyplot as plt
import mat73
import tqdm
import librosa as lsa
import scipy.signal as sig

In [None]:
def load_speechdirectivity(path, plot):
    dirdata = scipy.io.loadmat(path)['azel_dir']
    d = {}
    bands = np.array(['100Hz', '125Hz', '160Hz', '200Hz', '250Hz', '315Hz', '400Hz', '500Hz', '630Hz', '800Hz', '1000Hz', '1250Hz', '1600Hz', '2000Hz', '2500Hz', '3150Hz', '4000Hz', '5000Hz', '6300Hz', '8000Hz', '10000Hz'])
    for i, band in enumerate(bands):
        d[band] = dirdata[i]
    az_axis = np.linspace(-180, 175, 72)
    el_axis = np.linspace(-90, 85, 36)
    d['az_axis'] = az_axis
    d['el_axis'] = el_axis
    if plot:
        plt.figure(figsize=(12,20))
        for i, band in enumerate(bands):
            plt.subplot(7,3,i+1)
            plt.imshow(d[band], cmap='jet')
            plt.yticks(range(len(d['el_axis']))[::8], [int(x) for x in d['el_axis'][::8]], fontsize=7)
            plt.ylabel('elevation')
            plt.xticks(range(len(d['az_axis']))[::8], [int(x) for x in d['az_axis'][::8]], rotation=90, fontsize=7)
            plt.xlabel('azimuth')
            plt.title(band)
            plt.clim(-20,0)
        cbar_ax = plt.gcf().add_axes([0.92, 0.15, 0.02, 0.72])  # [left, bottom, width, height]
        cbar = plt.colorbar(cax=cbar_ax)
        plt.savefig('speech_directivity.pdf')
    return d

In [None]:
d = load_speechdirectivity(path=pjoin('directivity_parsing_matlab', 'azel_dir.mat'), plot=False)

In [None]:
def get_6band_rt60_vector():
    np.random.seed() #we randomize so multiprocessing doesn't yield same RT60s
    alphas = np.array([1.7196874268124676,
                         1.6152228672267106,
                         1.9318203836226113,
                         2.55718115999814,
                         4.176814897493042,
                         2.4892656080814346])
    betas = np.array([0.38685390302225775,
                         0.24453641709737417,
                         0.14321372785643122,
                         0.10453218827453133,
                         0.08678871224845529,
                         0.18290733668646034])
    sim_rt60Fs = []
    for i in range(len(alphas)):
        sim_rt60Fs.append(np.random.gamma(alphas[i], betas[i], 1))
    return np.array(sim_rt60Fs).squeeze()

In [None]:
rt60s = []
for i in range(60000):
    rt60s.append(get_6band_rt60_vector())
rt60s = np.array(rt60s)

In [None]:
rt60s.shape

In [None]:
band_centerfreqs = np.zeros((6))
band_centerfreqs[0] = 125
for nb in range(5):
    band_centerfreqs[nb+1] = 2 * band_centerfreqs[nb]

In [None]:
len(rt60)

In [None]:
rt60 = np.array([np.mean(x) for x in rt60s])

In [None]:
plt.hist(rt60s[:, 0], 500, density=True,alpha=0.8);
plt.hist(rt60s[:, 1], 500, density=True,alpha=0.8);
plt.hist(rt60s[:, 2], 500, density=True,alpha=0.8);
plt.hist(rt60s[:, 3], 500, density=True,alpha=0.8);
plt.hist(rt60s[:, 4], 500, density=True,alpha=0.8);
plt.hist(rt60s[:, 5], 500, density=True,alpha=0.8);
plt.hist(rt60, 500, density=True, alpha=0.8);

plt.legend([str(int(x))+'Hz' for x in band_centerfreqs] + ['mean'])
#plt.title('RT60 histogram')
plt.xlabel('RT60[s]')
plt.ylabel('count')
plt.xlim([0, 2])
plt.grid(True)

In [None]:
def crop_echogram(anechoic_echogram):
    nSrc = anechoic_echogram.shape[0]
    nRec = anechoic_echogram.shape[1]
    nBands = anechoic_echogram.shape[2]
    # Returns the "anechoic" version of an echogram
    # Should keep the receiver directivy
    for src in range(nSrc):
        for rec in range(nRec):
            for band in range(nBands):
                anechoic_echogram[src, rec, band].time = anechoic_echogram[src, rec, band].time[:2]
                anechoic_echogram[src, rec, band].coords = anechoic_echogram[src, rec, band].coords[:2, :]
                anechoic_echogram[src, rec, band].value = anechoic_echogram[src, rec, band].value[:2,:]
                anechoic_echogram[src, rec, band].order = anechoic_echogram[src, rec, band].order[:2,:]
    return anechoic_echogram
def head_2_ku_ears(head_pos,head_orient):
# based on head pos and orientation, compute coordinates of ears
    ear_distance_ku100=0.0875
    theta = (head_orient[0]) * np.pi / 180
    R_ear = [head_pos[0] - ear_distance_ku100 * np.sin(theta),
              head_pos[1] + ear_distance_ku100 * np.cos(theta), 
              head_pos[2]]
    L_ear = [head_pos[0] + ear_distance_ku100 * np.sin(theta),
              head_pos[1] - ear_distance_ku100 * np.cos(theta), 
              head_pos[2]]
    return [L_ear,R_ear]
    
def plot_scene(room_dims,head_pos,head_orient,l_mic_pos,l_src_pos,perspective="xy"):
#   function to plot the designed scene
#   room_dims - dimensions of the room [x,y,z]
#   head_pos - head position [x,y,z]
#   head_orient - [az,el]
#   l_src_pos - list of source positions [[x,y,z],...,[x,y,z]]
#   perspective - which two dimensions to show 
    if perspective=="xy":
        dim1=1
        dim2=0
    elif perspective=="yz":
        dim1=2
        dim2=1
    elif perspective=="xz":
        dim1=2
        dim2=0
    fig = plt.figure()
    ax = fig.add_subplot()
    plt.xlim((0,room_dims[dim1]))
    plt.ylim((0,room_dims[dim2]))
    plt.axvline(head_pos[dim1], color='y') # horizontal lines
    plt.axhline(head_pos[dim2], color='y') # vertical lines
    plt.grid(True)
    # plot sources and receivers
    plt.plot(head_pos[dim1],head_pos[dim2], "o", ms=10, mew=2, color="black")
    # plot ears
    plt.plot(l_mic_pos[0][dim1],l_mic_pos[0][dim2], "o", ms=3, mew=2, color="blue")# left ear in blue
    plt.plot(l_mic_pos[1][dim1],l_mic_pos[1][dim2], "o", ms=3, mew=2, color="red")# right ear in red

    for i,src_pos in enumerate(l_src_pos):
        plt.plot(src_pos[dim1],src_pos[dim2], "o", ms=10, mew=2, color="red")
        plt.annotate(str(i), (src_pos[dim1],src_pos[dim2]))
    # plot head orientation if looking from above 
    if perspective=="xy":
        plt.plot(head_pos[dim1],head_pos[dim2], marker=(1, 1, -head_orient[0]), ms=20, mew=2,color="black")

    ax.set_aspect('equal', adjustable='box')


def process(src, headC, headOrient, room, rt60, maxlim, ambi_order, fs_rir, decoder, speech):
    
    band_centerfreqs = np.empty(len(rt60))
    if len(rt60) == 1:
        band_centerfreqs = np.array([1000])
    else:
        band_centerfreqs[0] = 125
        for nb in range(1, len(rt60)):
            band_centerfreqs[nb] = 2 * band_centerfreqs[nb-1]
    mic = np.array(head_2_ku_ears(headC,headOrient)) # we get BiMagLS mic points 
    mic = np.vstack((mic, headC)) # we add the head center microphone for MagLS decoders
    nRec = mic.shape[0]
    nSrc = src.shape[0]
    abs_walls,rt60_true = srs.find_abs_coeffs_from_rt(room, rt60)
    # Small correction for sound absorption coefficients:
    if sum(rt60_true-rt60>0.05*rt60_true)>0 :
        abs_walls,rt60_true = srs.find_abs_coeffs_from_rt(room, rt60_true + abs(rt60-rt60_true))
    # Generally, we simulate up to RT60:
    limits = np.minimum(rt60, maxlim)
    # Compute IRs with MASP at 48k:
    abs_echograms = srs.compute_echograms_sh(room, src, mic, abs_walls, limits, ambi_order, headOrient)
    #ane_echograms = crop_echogram(copy.deepcopy(abs_echograms))
    mic_rirs = srs.render_rirs_sh(abs_echograms, band_centerfreqs, fs_rir)/np.sqrt(4*np.pi)
    #ane_rirs = srs.render_rirs_sh(ane_echograms, band_centerfreqs, fs_rir)/np.sqrt(4*np.pi)
    # Pad anechoic rirs so we don't loose alignment when convolving
    #zeros_to_pad = len(mic_rirs) - len(ane_rirs)
    #zeros_to_pad = np.zeros((zeros_to_pad, mic_rirs.shape[1], mic_rirs.shape[2], mic_rirs.shape[3]))
    #ane_rirs = np.concatenate((ane_rirs, zeros_to_pad))
    bin_ir = np.array([sig.fftconvolve(np.squeeze(mic_rirs[:,:,0, 0]), decoder[:,:,0], 'full', 0).sum(1),
                    sig.fftconvolve(np.squeeze(mic_rirs[:,:,1, 0]), decoder[:,:,1], 'full', 0).sum(1)])
    #bin_aneIR = np.array([sig.fftconvolve(np.squeeze(ane_rirs[:,:,0, 0]), decoder[:,:,0], 'full', 0).sum(1),
    #                sig.fftconvolve(np.squeeze(ane_rirs[:,:,1, 0]), decoder[:,:,1], 'full', 0).sum(1)])
    reverberant_src = np.array([sig.fftconvolve(speech, bin_ir[0, :], 'same'), sig.fftconvolve(speech, bin_ir[1, :], 'same')])
    #anechoic_src = np.array([sig.fftconvolve(speech, bin_aneIR[0, :], 'same'), sig.fftconvolve(speech, bin_aneIR[1, :], 'same')])
    monoir = mic_rirs[:,:,2]
    est_sb_rt60 = pra.experimental.rt60.measure_rt60(monoir[:,0,0], fs=fs_rir, decay_db=20, plot=False)
    return reverberant_src, mic, np.array([est_sb_rt60])

In [None]:
decoder_path = pjoin('decoders_ord10', 'Ku100_ALFE_Window_sinEQ_bimag.mat') #10th order BimagLS decoder del KU100 sin HA a 48kHz
decoder = mat73.loadmat(decoder_path)['hnm']
decoder = np.roll(decoder,500,axis=0)
maxlim = 2 # maximum reflection time in seconds. Stop simulating if it goes beyond that time.
ambi_order = 10 # ambisonics order

headC_x = 2.0  
headC_y = 2.0
headC_z = 1.0
headOrient_azi = 0.0
headOrient_ele = 0.0
headC = np.array([headC_x, headC_y, headC_z])
headOrient = np.array([headOrient_azi,headOrient_ele])
src = np.array([[3,	3, 1]]) #speech speaker position following convention:

room = np.array([6, 4, 2.5]) #dimensions
rt60=np.array([0.01])
fs_rir = 48000
fs_target = fs_rir

speech, fs_speech = lsa.load('ane_speech.wav', sr=fs_rir)

In [None]:
rt60s = get_6band_rt60_vector()

In [None]:
rt60s

In [None]:
headOrient = np.array([0.,0.])
mic_rir = process(src, headC, headOrient, room, rt60s, maxlim, ambi_order, fs_rir, decoder, speech)

In [None]:
est_sb_rt60

In [None]:
np.mean(rt60s)

In [None]:
Audio(mb_rev, rate=fs_rir)

In [None]:
sb_rev, mic1, est_sb_rt60_new = process(src, headC, headOrient, room, np.array([np.mean(rt60s)]), maxlim, ambi_order, fs_rir, decoder, speech)
#plot_scene(room,headC, headOrient,mic0,src,perspective="xy")

In [None]:
Audio(sb_rev, rate=fs_rir)

In [None]:
head_orient_azi = np.random.uniform(low = -45, high = 45, size = len(df))
head_orient_ele = np.random.uniform(low = -10, high = 10, size = len(df))
angle = np.random.uniform(low = -45, high = 45, size = len(df))
dist = np.random.uniform(low = 0.5, high = 3, size = len(df))
#snr = np.random.uniform(low = 0, high = 6, size = len(df))
room_x = np.random.uniform(low = 3., high = 30., size = len(df))
room_y = room_x * np.random.uniform(low=0.5, high=1, size=len(room_x)) #avoid tunnels
room_z = np.random.uniform(low = 2.5, high = 5., size = len(df))
#
t60s =  np.random.uniform(low = .1, high = 1., size = len(df))
t60s = np.sort(t60s)
#
volumes = room_x * room_y * room_z
volumes = np.sort(volumes)
dist = np.sort(dist)
perm = np.random.permutation(len(volumes))
room_x = room_x[perm]
room_y = room_y[perm]
room_z = room_z[perm]
dist = dist[perm]
t60s = t60s[perm]
head_pos = []
for k in range(len(room_x)):
    head_pos.append(np.array([np.random.uniform(low = 0.35*room_x[k], high = 0.65*room_x[k]),
                        np.random.uniform(low = 0.35*room_y[k], high = 0.65*room_y[k]),
                        np.random.uniform(low = 1., high = 2.)]))
head_pos = np.array(head_pos)
room = np.array((room_x, room_y, room_z)).T
target_pos = []
for k in tqdm.tqdm(range(len(room_x))):
    target_pos.append(hlp.place_on_circle_in_room(head_pos[k], dist[k], 
                                                               angle[k]+head_orient_azi[k], room[k]))
target_pos = np.squeeze(np.array(target_pos))

In [None]:
# Checks:
np.all(target_pos < room) # all targets are in the room

In [None]:
np.all(head_pos < room) # all heads are in the room

In [None]:
# now let's check the ears:
ears_pos = []
for k in range(head_pos.shape[0]):
    ears_pos.append(np.array(hlp.head_2_ku_ears(head_pos[k], np.array([head_orient_azi[k],head_orient_ele[k]]))))

ears_pos = np.array(ears_pos)

In [None]:
np.all(ears_pos[:, 0, :] < room) # all left ears are in the room

In [None]:
np.all(ears_pos[:, 1, :] < room) # all right are in the room

In [None]:
np.all(ears_pos > 0)

In [None]:
# final MINIMUM distance between head and target (check we don't have an intra-craneal target)
min(np.sqrt(np.sum((target_pos - head_pos)**2, axis=1))) > 0.0875 * 2

In [None]:
# minimum distance of ears against a wall
min ( min(room[:, 0] - ears_pos[:, 0, 0]), min(room[:, 0] - ears_pos[:, 1, 0]))

In [None]:
min ( min(room[:, 1] - ears_pos[:, 0, 1]), min(room[:, 1] - ears_pos[:, 1, 1]))

In [None]:
min ( min(room[:, 2] - ears_pos[:, 0, 2]), min(room[:, 2] - ears_pos[:, 1, 2]))

In [None]:
# minimum distance of targets against a wall
min(min(room[:, 0] - target_pos[:, 0]), min(room[:, 1] - target_pos[:, 1]), min(room[:, 2] - target_pos[:, 2]))

In [None]:
df = df.rename(columns={'split': 'mls_split'})

In [None]:
df

In [None]:
df.insert(14, "room_x", room[:, 0])
df.insert(15, "room_y", room[:, 1])
df.insert(16, "room_z", room[:, 2])
df.insert(17, "rt60", t60s)
df.insert(18, "headC_x", head_pos[:,0])
df.insert(19, "headC_y", head_pos[:,1])
df.insert(20, "headC_z", head_pos[:,2])
df.insert(21, "src_x", target_pos[:,0])
df.insert(22, "src_y", target_pos[:,1])
df.insert(23, "src_z", target_pos[:,2])
df.insert(24, "headOrient_azi", head_orient_azi)
df.insert(25, "headOrient_ele", head_orient_ele)
df.insert(26, "snr", snr)

In [None]:
df['idx'] = range(len(df))

In [None]:
'''
# generate a figure for each situation:
for k in tqdm.tqdm(range(head_pos.shape[0])):
    hlp.plot_scene(room[k], head_pos[k], np.array([head_orient_azi[k], head_orient_ele[k]])
                   , ears_pos[k],[target_pos[k]], perspective="xy")
    plt.title(str(head_orient_azi[k])+ '_' + str(angle[k]))
    plt.savefig(pjoin('situation_plots_rot', os.path.splitext(os.path.basename(df.iloc[k].audio_path))[0]+'.pdf'))
    plt.close('all')
''';

In [None]:
df.to_csv('meta_microson_v1.csv', index=False, compression='infer')