In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from utils import get_eigenmike_em32_coords_from_table, numpy_to_audio_segment
import librosa
import numpy as np
import whisper
import pyroomacoustics as pra
import noisereduce as nr
from scipy.signal import stft, istft

In [3]:
# get the coordinates of the microphones (x,y,z) of em32 using the data provided in microphone datasheet
mic_coords = get_eigenmike_em32_coords_from_table()

print("Shape of mic_coords:", mic_coords.shape)

Shape of mic_coords: (3, 32)


In [5]:
start_audio, start_sr = librosa.load("audio.wav", sr=None, mono=False)
print(f"Loaded audio shape: {start_audio.shape}, Sample rate: {start_sr}")

Loaded audio shape: (32, 1323511), Sample rate: 44100


In [6]:
n_mics = start_audio.shape[0]

print("Number of microphones: ", n_mics)

Number of microphones:  32


In [7]:
# we would like to resample the audio to 16000 Hz
# as this is the standard sampling rate for audio for many models and algorithms
resample_fs = 16000

resampled_audio = []

for i in range(n_mics):
    resampled_audio.append(librosa.resample(y=start_audio[i, :], orig_sr=start_sr, target_sr=resample_fs))


resampled_audio = np.array(resampled_audio)
print("Resampled audio shape: ", resampled_audio.shape)

Resampled audio shape:  (32, 480186)


In [8]:
# through trial and error, we found that 4096 is a good number for the STFT both for alignment and for the MSICA
nfft = 4096
hop = nfft//2

In [9]:
# as we will be using pyroomacoustics library for signal alignment, we need to create a microphone array object
mic_arr = pra.MicrophoneArray(mic_coords, fs=resample_fs)

In [10]:
# for microphone alignment, we use the STFT of the audio signal, so we move it to the frequency domain
_, _, stft_audio = stft(
    resampled_audio,
    window='hann',
    nperseg=nfft,
    noverlap=hop,
)
print(f"STFT shape: {stft_audio.shape}")

STFT shape: (32, 2049, 236)


In [None]:
# we use the MUSIC algorithm to estimate the direction of the sources of audio
# we estiamte that there are 9 sources in the audio, as in the assignment description 9 people where showcased in scenario
# we set up dimension to 3, as we are in 3D space (real world scenario)
doa = pra.doa.MUSIC(mic_arr.R, resample_fs, nfft, num_src=9, dim=3)

doa.locate_sources(stft_audio)

In [None]:
# method locate_sources returns azimuth and colatitude of the sources
# we can use this information to calculate the direction of the sources in the 3D space
# the direction is given by the vector (x,y,z)
azimuths = doa.azimuth_recon  # Angles in radians
colatitudes = doa.colatitude_recon
directions = np.array([
    np.sin(colatitudes) * np.cos(azimuths),
    np.sin(colatitudes) * np.sin(azimuths),
    np.cos(colatitudes)
]).T  

# the shape of the directions array is (n_detected_sources, 3)
print("Directions shape: ", directions.shape)

# MUSIC had found 7 sources, so it probably means that some signals could be super close 

Directions shape:  (7, 3)


In [13]:
# we know want to use the directions to calculate the time delays for each microphone in regards to the first source
# we will then use these time delays to calculate the phase shifts for each frequency bin
# that will allow us to align the signals in the time domain


# we assume that the first source is the strongest one
# in production setting we would probably want to find the strongest source in some smarter way
d = directions[0]  

# speed of sound in m/s
c = pra.constants.get('c') 
ref_mic = 0
# array to store the time delays for each microphone in regards to the first mic
delta = np.zeros(n_mics)
for i in range(n_mics):
    # we calculate the time delay by first calculating the vector between the reference microphone and the current microphone
    # then we project this vector onto the direction vector of the source (this projection is the distance between the source and the microphone)
    # and divide by the speed of sound to get the time delay
    delta[i] = np.dot(mic_coords[:, ref_mic] - mic_coords[:, i], d) / c

k = np.arange(nfft // 2 + 1)  # Frequency bin indices
f = k * resample_fs / nfft  # Frequencies
# we calculate the phase shifts for each frequency bin this way we can apply them to all frames at once and transform sound wave so that it is aligned (starts at the same time)
phase_shifts = np.exp(-1j * 2 * np.pi * np.outer(f, delta))  # Shape: (n_freq, n_mics)
# aligning shapes for broadcasting
phase_shifts = phase_shifts.T[:, :, None]
stft_aligned = stft_audio * phase_shifts  # Apply to all frames

# once we aligned the signals in the frequency domain, we can move back to the time domain
_, aligned_audio = istft(stft_aligned, window='hann', nperseg=nfft, noverlap=hop)

# the shape should be the same as the original audio
print("Aligned audio shape: ", aligned_audio.shape)


Aligned audio shape:  (32, 481280)


In [14]:
from utils import fdica, ica, prewhiten, dewhitening
# performing our algorithm MSICA with whitening
fdica_audio = fdica(aligned_audio[:directions.shape[0], :], n_fft=nfft)
whitened_audio = prewhiten(fdica_audio)
ica_audio = ica(whitened_audio)
seperated_channels = dewhitening(ica_audio, np.cov(fdica_audio))

In [None]:
# after the msica we get 7 channels, each one containing a seperated source of signal
print("Shape of seperated_channels: ", seperated_channels.shape)

(7, 481280)

In [24]:
# we now will use whisper model to get probabilites of sound coming from certain language
# the base model is only 160 MBs, so it could be possibly even embedded in processing unit
whisper_model = whisper.load_model('base')

In [25]:
# we ensure that audio is converted to float from double (whisper model requires float32)
# we pad the channels to match the length required by the model
# finally we get the probabilities of sound coming from certain language
# the result of this is array of tuples of language_code and probability that this channel contains audio in this language
# we are interested in english speaking channels so we get the probabilities of english, sort to get the most likely ones and get the indices of the top 2
# as from assigment we now that there are 2 english speakers
en_probs = []
for i in range(len(seperated_channels)):
    audio = seperated_channels[i]
    audio = audio.astype(np.float32)
    spectogram = whisper.pad_or_trim(audio)
    spectogram = whisper.log_mel_spectrogram(spectogram)
    model_output = whisper_model.detect_language(spectogram)
    en_probs.append((i, model_output[1]['en']))

en_probs = sorted(en_probs, key=lambda x: x[1], reverse=True)

print("Probabilities of english in each channel: ", en_probs)

Probabilities of english in each channel:  [(1, 0.929402232170105), (5, 0.4777883291244507), (2, 0.05402468144893646), (0, 0.04671625792980194), (4, 0.021710628643631935), (3, 0.009984349831938744), (6, 0.008043442852795124)]


In [26]:
# we grab the top 2 channels with highest probability of english
top_2_idx = [en_probs[0][0], en_probs[1][0]]
top_2_channels = seperated_channels[top_2_idx]

In [27]:
# the audio after MSICA can contain quite some degree of noise, so we use noisereduce library to reduce it
# this library uses commonly used noise reduction techniques like spectral subtraction, Wiener filtering, etc.
noiseless_channels = [nr.reduce_noise(y=chan, sr=resample_fs) for chan in top_2_channels]

In [47]:
# once we have 2 channels containing english speaker we can merge them together to get the final result
# as this is conversation, we can just overlay the two channels
# we use pydub library to merge the two channels
# we have created a helper function to convert numpy array to pydub audio segment
# also we boost the volume of the final result to make it more audible 
# one segment was already louder than the other so the gain is not the same for both
segment1 = numpy_to_audio_segment(noiseless_channels[0], resample_fs, 25)
segment2 = numpy_to_audio_segment(noiseless_channels[1], resample_fs, 12)

merged_segment = segment1.overlay(segment2)

merged_segment.export("final_pipeline_result.wav", format="wav")

<_io.BufferedRandom name='final_pipeline_result.wav'>