In [1]:
import os
import sys
import time
TORCHAUDIO_USE_BACKEND_DISPATCHER=0
sys.path.append('.')

import scipy.io
import numpy as np
np.set_printoptions(precision=3)
np.set_printoptions(threshold=4)

import local_functions as f
import NN_optimizer as nn_opt

import pyfar as pf
import sofar as sf

# Intro:
### In this notebook we load a .mat file with HRTF SH coefficients (and some other things) and optimize them to reduce the ILD error

| Variable           | dimensions                                                      | description |
| :----------------  | :------                                                         | :--- |
| p_f_high_lebedev   |  [$\Omega$ x positive frequencies x left/right]                 | Measured HRTF which represents the reference binaural signals from all directions |
| ILD_ref            |  [$f_c$ x $\Omega_{az}$ ]                                       | Reference ILD curve, based on the messured HRTF |
| f_band             |  [$f_{low}$ , $f_{high}$]                                       | The lower and upper frequency band for the ILD calculations |
| Hnm_low            |  [$(N_{low}+1)^2$ x positive frequencies x  left/right ]        | The SH least squeres coefficients |
| Hnm_mls            |  [$(N_{low}+1)^2$ x positive frequencies x  left/right ]        | The SH magnitude least squeres coefficients |
| Hnm_high           |  [$(N_{high}+1)^2$ x positive frequencies x  left/right ]       | The high order SH least squeres coefficients |
| fs                 |  scalar                                                         | sample rate |
| N_low              |  scalar                                                         | low order |
| N_high             |  scalar                                                         | high order for refference |
| f_vec              |  [$f$ x 1]                                                      | positive frequencies bins vector |
| ang_vec            |  [$\Omega_{az}$ x 1]                                            | Angles in degrees for ILD calculations |
| Y_high_lebedev     |  [$\Omega$ x $(N_{high}+1)^2$]                                  | SH matrix for high order over lebedev directions |
| Y_high_az          |  [$\Omega_{az}$ x $(N_{high}+1)^2$]                             | SH matrix for high order over azimuth directions |
| Y_low_lebedev      |  [$\Omega$ x $(N_{low}+1)^2$]                                   | SH matrix for low order over lebedev directions |
| Y_low_az           |  [$\Omega_{az}$ x $(N_{low}+1)^2$]                              | SH matrix for low order over azimuth directions |
| omega              |  [$\Omega$]                                                     | first row is elevation second row is azimuth for sphirical grid |
| omega_az           |  [$\Omega_{az}$]                                                | first row is elevation second row is azimuth for lateral grid |


In [2]:
# The following block usees the path_matlab_data to load the data and calculate the iMagLS HRTF coeeftients
base_dir = os.path.join('..')
path_matlab_data = os.path.join(
    base_dir, "matlab_saved_data", "11_04_24", "KU100_HRIR_L2702_N1.mat")
results_save_path = os.path.join(
    base_dir, "PyTorch", "results", "10_04_24/")
lambda_vec=[2,1,1,1.5]
epochs = 100
Hnm_imls, Hnm_mls, Hnm_high, Y_high_az, Y_low_az,Y_high_lebedev,Y_low_lebedev, fs, omega, omega_az = nn_opt.start(path_matlab_data,
                                                                                                 results_save_path,epochs,lambda_vec,
                                                                                                 shutup=True,is_save=False)



Layer (type (var_name))                  Kernel Shape  Input Shape   Output Shape  Param #       Mult-Adds
NN_v2_simp (NN_v2_simp)                  --            [4, 257, 2]   [4, 257, 2]   --            --
├─Linear (linear1)                       --            [257, 8]      [257, 8]      2,120         544,840
│    └─weight                            [8, 8]                                    ├─64
│    └─bias                              [257, 8]                                  └─2,056
├─Linear (linear2)                       --            [8, 257]      [8, 257]      68,105        544,840
│    └─weight                            [257, 257]                                ├─66,049
│    └─bias                              [8, 257]                                  └─2,056
Total params: 70,225
Trainable params: 70,225
Non-trainable params: 0
Total mult-adds (M): 1.09
Input size (MB): 0.03
Forward/backward pass size (MB): 0.13
Params size (MB): 1.12
Estimated Total Size (MB): 1.29


Loading...:   0%|          | 0/100 [00:00<?, ?it/s]

## Clockwise moving source - save .wav files to disk

In [3]:
# Function to print a loading bar
def print_loading_bar(progress,ang):
    bar_length = 30
    filled_length = int(progress * bar_length)
    bar = '=' * filled_length + '-' * (bar_length - filled_length)
    print(f'\r[{bar}] {ang:03d} [deg]', end='', flush=True)


def generate_rotating_source(angles,Hnm,Ynm,doa_az,dry_sig_path,save_path,save_name,fs):
    bp = [8e3, 20e3]
    dry_sig    = pf.io.read_audio(dry_sig_path)
    HRIR       = f.spatial_interpolation(Hnm,Ynm,True).cpu().numpy()
    HRIR       = np.transpose(HRIR,[0,2,1])
    HRIR_pf    = pf.Signal(HRIR, fs)
    HRIR_pf = pf.dsp.filter.butterworth(HRIR_pf, 8, bp, btype='bandpass')

    # using 50% overlap between two adjacent points using triangular window ###

    # The number of samples in each window
    t_0 =  2*(dry_sig.n_samples / (np.abs(angles[1] - angles[0])))

    # closest power of 2
    #window_len_samples = int(np.power(2, np.ceil(np.log2(t_0))))

    window_len_samples = int(np.ceil(t_0 / 2.) * 2)

    # create a triangular window
    window = np.bartlett(window_len_samples)

    # the length of the signals in term of window length
    signal_frames_len = int(dry_sig.n_samples / window_len_samples)

    # for 50% overlap
    num_overlap_frames = 2 * signal_frames_len-1

    half_window_samples = int(window_len_samples / 2)

    print("HRIR info:",HRIR_pf)
    print("dry_sig info:",dry_sig)


    # initialize output signal
    output_signal = pf.Signal(np.zeros(dry_sig.n_samples+HRIR_pf.n_samples), dry_sig.sampling_rate)

    ang_vec = np.floor(np.linspace(angles[0], angles[1], num=num_overlap_frames))
    ang_vec = ang_vec.astype('int32')
    ang_vec = ang_vec % 360

    angles = angles % 360


    for frame_idx in range(num_overlap_frames):
        first_sample = frame_idx*half_window_samples
        last_sample  = frame_idx*half_window_samples+window_len_samples
        curr_ang = ang_vec[frame_idx]
        idx_closest = np.abs(doa_az - curr_ang).argmin()


        curr_window = np.pad(window, (first_sample, dry_sig.n_samples - last_sample), 'constant', constant_values=(0, 0))
        curr_window_pf = pf.Signal(curr_window, dry_sig.sampling_rate)
        dry_sig_window = pf.multiply((dry_sig, curr_window_pf), domain='time')

        if frame_idx <1:
            print("start angle: ",ang_vec[frame_idx],"[deg]\n")
            output_signal = pf.dsp.convolve(HRIR_pf[idx_closest],dry_sig_window)
        else:
            output_signal = output_signal + pf.dsp.convolve(HRIR_pf[idx_closest],dry_sig_window)
        # Simulate progress
        print_loading_bar((frame_idx / num_overlap_frames),ang_vec[frame_idx])
        time.sleep(0.05)  # Simulate some work being done


    print("\n\nend angle: ",ang_vec[frame_idx],"[deg]")
    sig_length_time = dry_sig.n_samples / fs
    movmentspeed = (num_overlap_frames) / sig_length_time
    movmentspeed_in_sample = (num_overlap_frames) /  dry_sig.n_samples



    print("signal time is ", sig_length_time," seconds")
    print("angle speed is ", movmentspeed," deg per sec")
    print("total frame number: ", num_overlap_frames," (including 50% overlap)")
    print("num of sampels in each fram: is ", window_len_samples," [samples]")



    print("\n Saving to disk")
    output_signal    = pf.dsp.normalize(output_signal,channel_handling='max')
    tmp = save_path +save_name+".wav"
    pf.io.write_audio(output_signal,tmp)
    print("\n Done...\n\n")


save_data_path = os.path.join(base_dir, "PyTorch", "results", "17_04_24-dynamic_source-casta-bp")
dry_sig_path = os.path.join(base_dir, "dry_signals", "casta.wav")
angles = np.array([0,360]);

if not os.path.isdir(save_data_path):
        os.makedirs(save_data_path)

doa_az = np.rad2deg(omega_az)[:,1]
doa_az = ((doa_az - 180) % 360 +180)%360 # change from [-180,180) to [0,360) with +30 left and -30=330 right 0 is facing forword

save_name = "HOA_reference"
generate_rotating_source(angles,Hnm_high,Y_high_az,doa_az,dry_sig_path,save_data_path,save_name,fs)

save_name = "MagLS"
generate_rotating_source(angles,Hnm_mls,Y_low_az,doa_az,dry_sig_path,save_data_path,save_name,fs)

save_name = "iMagLS"
generate_rotating_source(angles,Hnm_imls,Y_low_az,doa_az,dry_sig_path,save_data_path,save_name,fs)


HRIR info: time domain energy Signal:
(361, 2) channels with 512 samples @ 48000 Hz sampling rate and none FFT normalization

dry_sig info: time domain energy Signal:
(1,) channels with 360447 samples @ 48000 Hz sampling rate and none FFT normalization

start angle:  0 [deg]


end angle:  0 [deg]
signal time is  7.5093125  seconds
angle speed is  47.54096996229681  deg per sec
total frame number:  357  (including 50% overlap)
num of sampels in each fram: is  2004  [samples]

 Saving to disk

 Done...


HRIR info: time domain energy Signal:
(361, 2) channels with 512 samples @ 48000 Hz sampling rate and none FFT normalization

dry_sig info: time domain energy Signal:
(1,) channels with 360447 samples @ 48000 Hz sampling rate and none FFT normalization

start angle:  0 [deg]


end angle:  0 [deg]
signal time is  7.5093125  seconds
angle speed is  47.54096996229681  deg per sec
total frame number:  357  (including 50% overlap)
num of sampels in each fram: is  2004  [samples]

 Saving to d

## Static source - save .wav files to disk

In [None]:
sig_path = os.path.join(base_dir, "dry_signals", "white-noise-burst.wav")
save_path = os.path.join(base_dir, "audio_examples", "10_04_24-mag-loss-noise")
ang_deg = -60
bp = [8e3, 20e3]


print("\n\n-------------------------------")
print("Time Frequcny plots for iMagLS")
print("-------------------------------")
if not os.path.isdir(os.path.join(save_path, "iMagLS")):
        os.makedirs(os.path.join(save_path, "iMagLS"))
nn_opt.plot_pyfar(Hnm_imls,Y_low_az,fs,sig_path,os.path.join(save_path, "iMagLS"),ang_deg,bp)

print("\n\n-------------------------------")
print("Time Frequcny plots for MagLS")
print("-------------------------------")
if not os.path.isdir(os.path.join(save_path, "MagLS")):
        os.makedirs(os.path.join(save_path, "MagLS"))
nn_opt.plot_pyfar(Hnm_mls,Y_low_az,fs,sig_path,os.path.join(save_path, "MagLS"),ang_deg,bp)

print("\n\n-------------------------------")
print("Time Frequcny plots for HOA")
print("-------------------------------")
if not os.path.isdir(os.path.join(save_path, "HOA")):
        os.makedirs(os.path.join(save_path, "HOA"))
nn_opt.plot_pyfar(Hnm_high,Y_high_az,fs,sig_path,os.path.join(save_path, "HOA"),ang_deg,bp)

# Save Hnm's as Sofa files -  used for sagittal-plane localization model in Matlab

In [None]:
def save_as_sofa(Hnm,Ynm,SourcePosition,fs,save_path,HRIR_name):
    # Save the SH coeffitients as a sofa file (in [time x space x left/right])
    HRIR_IR = f.spatial_interpolation(Hnm,Ynm,True).cpu().numpy()
    HRIR_IR = np.transpose(HRIR_IR,[0,2,1])
    sofa = sf.Sofa("SimpleFreeFieldHRIR")
    sofa.Data_SamplingRate = fs
    sofa.SourcePosition = SourcePosition
    sofa.Data_IR = HRIR_IR
    sofa.delete("SourceUp")
    sf.write_sofa(save_path+HRIR_name, sofa)
    #data_ir, source_coordinates, receiver_coordinates = pf.io.read_sofa(save_path+HRIR_name)
    #index, *_ = source_coordinates.find_nearest_k(90, 0, 3.25, k=1, domain='sph', convention='top_elev', unit='deg', show=True)
    #_, mask = source_coordinates.find_slice('elevation', unit='deg', value=0, show=True)







SourcePosition  = np.rad2deg(omega)
source_distance = 3.25
source_distance_column = np.full((SourcePosition.shape[0], 1), source_distance)
SourcePosition = np.concatenate((SourcePosition, source_distance_column), axis=1)
#SourcePosition[:,0] = (SourcePosition[:,0] + 180) % 360 - 180 # change from [0,360) to [-180,180) with +30 left and -30 right 0 is facing forword
SourcePosition[:,0] = -1*(SourcePosition[:,0] - 90)  # change from [0,180) to [-90,90) with +30 left and -30 right 0 is facing forword
SourcePosition[:, [0, 1]] = SourcePosition[:, [1, 0]]

save_path = os.path.join(base_dir, "PyTorch", "results", "16_04_24-sofa_save")
if not os.path.isdir(save_path):
        os.makedirs(save_path)

HRIR_name = "ARI_Ref_hrtf_M_dtf 256.sofa"
save_as_sofa(Hnm_high,Y_high_lebedev,SourcePosition,int(fs),save_path,HRIR_name)


HRIR_name = "ARI_MagLS_hrtf_M_dtf 256.sofa"
save_as_sofa(Hnm_mls,Y_low_lebedev,SourcePosition,int(fs),save_path,HRIR_name)

HRIR_name = "ARI_iMagLS_hrtf_M_dtf 256.sofa"
save_as_sofa(Hnm_imls,Y_low_lebedev,SourcePosition,int(fs),save_path,HRIR_name)






## Save .mat files for sagittal-plane localization model in Matlab

In [None]:
# save lateral HRIR's for sagittal-plane localization model from Baumgartner in Matlab
def save_baumgartner_matrix(Hnm,Ynm,data_path,save_path,save_name,fs):
    HRIR = f.spatial_interpolation(Hnm,Ynm,True).cpu().numpy()

    HRIR = np.transpose(HRIR,[0,2,1])
    HRIR_pf    = pf.Signal(HRIR, fs)
    HRIR_pf_bp = pf.dsp.filter.butterworth(HRIR_pf, 8, bp, btype='bandpass')
    HRIR_pf_bp = HRIR_pf_bp.time

    HRIR       = np.transpose(HRIR,[2,0,1])
    HRIR_pf_bp = np.transpose(HRIR_pf_bp,[2,0,1])

    omega, omega_az, fs, f_band, N_low, N_high, f_vec, ang_vec, nfft, Hnm_high, Hnm_low, Hnm_mls, Y_high_lebedev, Y_high_az, Y_low_lebedev, Y_low_az, p_f_high_lebedev, ILD_ref, p_high_az_t, AK_Nmax, AK_f_c, AK_n, AK_C, p_ref_az_t = nn_opt.import_matlab_data(data_path,"cpu",True)

    polsamp = np.rad2deg(omega_az[:,1]);
    polsamp = np.expand_dims(polsamp, axis=1)

    mdic = {"HRIR": HRIR_pf_bp, "polsamp": polsamp,"fs": fs}
    scipy.io.savemat(save_path+save_name, mdic)


save_data_path = os.path.join(base_dir, "audio_examples", "11_04_24-baumgartner")
if not os.path.isdir(save_data_path):
        os.makedirs(save_data_path)

save_name = "HOA_reference.mat"
save_baumgartner_matrix(Hnm_high,Y_high_az,path_matlab_data,save_data_path,save_name,fs)

save_name = "N1_imls.mat"
save_baumgartner_matrix(Hnm_imls,Y_low_az,path_matlab_data,save_data_path,save_name,fs)

save_name = "N1_mls.mat"
save_baumgartner_matrix(Hnm_mls,Y_low_az,path_matlab_data,save_data_path,save_name,fs)

