In [None]:
import numpy as np
import pandas as pd
from scipy.io import wavfile, loadmat
import matplotlib.pyplot as plt


DATASET_PATH = r"DREGON/free-flight_whitenoise-high_room1" 

# Base filename prefix (without extension)
BASE_FILENAME = r"DREGON_free-flight_whitenoise-high_room1"

NOISE_TEST_FILE = f"{DATASET_PATH}/{BASE_FILENAME}.wav"
AUDIO_TS_MAT_FILE = f"{DATASET_PATH}/{BASE_FILENAME}_audiots.mat"
# IMU_MAT_FILE = f"{DATASET_PATH}/{BASE_FILENAME}_imu.mat"
SOURCE_POS_MAT_FILE = f"{DATASET_PATH}/{BASE_FILENAME}_sourcepos.mat"
# MOTORS_MAT_FILE = f"{DATASET_PATH}/{BASE_FILENAME}_motors.mat" # Optional to load
NOISE_TRAIN_FILE = r"DREGON\noise_training_room1.wav"


In [None]:
import scipy.io as sio

audio_ts_data = sio.loadmat(AUDIO_TS_MAT_FILE)['audio_timestamps']
#imu_data = sio.loadmat(IMU_MAT_FILE)['imu'][0][0] #dtype=[('timestamps', 'O'), ('angular_velocity', 'O'), ('acceleration', 'O')]
source_pos_data = sio.loadmat(SOURCE_POS_MAT_FILE)['source_position'][0][0] #dtype=[('timestamps', 'O'), ('azimuth', 'O'), ('elevation', 'O'), ('distance', 'O')]
# motors_data = sio.loadmat(MOTORS_MAT_FILE)['motor'][0][0] #[('command', 'O'), ('measured', 'O'), ('timestamps', 'O')]

# motors_data_r = motors_data.copy()
# motors_data_r[0] = motors_data[2]
# motors_data_r[1] = motors_data[0]
# motors_data_r[2] = motors_data[1]

In [3]:
import scipy.io.wavfile as wav

sample_rate, audio_data = wav.read(NOISE_TEST_FILE)
audio_data = audio_data.astype(np.float32)  
print(f"Sample Rate: {sample_rate} Hz")
print(f"Audio Data Shape: {audio_data.shape}")
print(f"Data Type: {audio_data.dtype}")

Sample Rate: 44100 Hz
Audio Data Shape: (2668292, 8)
Data Type: float32


In [4]:
fs = 16000
fftSize_sec = 0.064
freqRange = []
micPos = np.asarray([  [0.0420  ,  0.0615   , -0.0410  ],# mic 1
           [-0.0420,    0.0615,    0.0410],  # mic 2
           [-0.0615,    0.0420,   -0.0410],  # mic 3
           [-0.0615,   -0.0420,    0.0410],  # mic 4
           [-0.0420,   -0.0615,   -0.0410],  # mic 5
            [0.0420,   -0.0615,    0.0410],  # mic 6
            [0.0615,   -0.0420,   -0.0410],  # mic 7
			[0.0615,    0.0420,    0.0410] ])# mic 8    
subArray = np.asarray([0, 1, 2, 3, 4, 5, 6, 7]) # all mics

In [5]:
from scipy.signal import resample_poly
import os

print("Loading data...")

noisy_signal = audio_data.copy() # Make a copy if you plan to modify it
fs_true_noisy = sample_rate
noisy_signal = noisy_signal.T
max_vals = np.abs(noisy_signal).max(axis=1, keepdims=True)
max_vals[max_vals == 0] = 1
noisy_signal = noisy_signal / max_vals  

resample_fs, fs = 16000, 44100
resampled = np.array([
    resample_poly(ch, resample_fs, fs) for ch in noisy_signal
])

resampled_noisy_signal = resampled.T

print(f"Resampled noisy signal shape: {resampled_noisy_signal.shape}")

Loading data...
Resampled noisy signal shape: (968088, 8)


In [6]:
#resample timestamps
n_samples = resampled_noisy_signal.shape[0]

audio_ts = np.arange(n_samples)/resample_fs

print(f"Audio timestamps shape: {audio_ts.shape}")

Audio timestamps shape: (968088,)


In [8]:
noise_sample_rate, x_noise = wav.read(NOISE_TRAIN_FILE)
x_noise = x_noise.astype(np.float32)  
x_noise /= np.abs(x_noise).max()

x_noise = x_noise.T

fs_in = noise_sample_rate      # 44100 Hz
fs_out = resample_fs           # 16000 Hz
up, down = fs_out, fs_in

x_noise_resample = np.array([
    resample_poly(ch, up, down) for ch in x_noise
])  # shape: (8, new_time)

# Step 5: Subarray selection (if needed)

x_noise_resample = x_noise_resample[subArray, :]
x_noise_resample = x_noise_resample.T  # Transpose to shape (time, selected_channels)
print(f"Resampled noise shape: {x_noise_resample.shape}")  # (selected_channels, time)

Resampled noise shape: (116080, 8)


In [9]:
s_az = source_pos_data['azimuth']
s_el = source_pos_data['elevation']
gt_timestamps = source_pos_data['timestamps'].flatten()
s_dist = source_pos_data['distance']
print(f"Source position data shape: azimuth {s_az.shape}, elevation {s_el.shape}, distance {s_dist.shape}, timestamps {audio_ts.shape}")

Source position data shape: azimuth (6246, 1), elevation (6246, 1), distance (6246, 1), timestamps (968088,)


In [10]:
import torch
import torch.nn.functional as F
from scipy.signal import stft

def compute_multichannel_spectrogram(audio, fs, fft_size=1024, hop_size=512):
    """
    Convert multichannel waveform (shape: time x channels) into complex spectrogram
    Returns: np.array of shape (channels, freq_bins, time_frames, 2) [real, imag]
    """
    n_channels = audio.shape[1]
    spec_list = []

    for ch in range(n_channels):
        f, t, Zxx = stft(audio[:, ch], fs=fs, nperseg=fft_size, noverlap=fft_size - hop_size)
        spec = np.stack((np.real(Zxx), np.imag(Zxx)), axis=-1)  # shape: (freq_bins, time_frames, 2)
        spec_list.append(spec)

    spec_all = np.stack(spec_list, axis=0)  # shape: (channels, freq_bins, time_frames, 2)
    return spec_all


In [None]:
import torch.nn as nn

class AudioDOANet(nn.Module):
    def __init__(self, n_channels=8, input_shape=(256, 64), out_dim=3):
        super().__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(n_channels * 2, 32, kernel_size=3, padding=1),  # *2 due to [real, imag]
            nn.ReLU(),
            nn.MaxPool2d((2, 2)),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, 1))
        )
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, out_dim)  # azimuth, elevation, distance
        )

    def forward(self, x):
        return self.fc(self.cnn(x))
