# Blind Source Separation (BSS)

In [2]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import wavfile
from scipy.signal import fftconvolve
import IPython
import pyroomacoustics as pra
from sparseauxiva import *

Blind Source Separation techniques such as Independent Vector Analysis (IVA) using an Auxiliary function are implemented in ´pyroomacoustics´. IVA based algorithms work when the number of microphones is the same as the number of sources, i.e., the determinant case. Through this example, we will deal with the case of 2 sources and 2 microphones.

First, open and concatanate wav files from the CMU dataset.

In [3]:
# concatanate audio samples to make them look long enough
wav_files = [
        ['../../../examples/input_samples/cmu_arctic_us_axb_a0004.wav',
            '../../../examples/input_samples/cmu_arctic_us_axb_a0005.wav',
            '../../../examples/input_samples/cmu_arctic_us_axb_a0006.wav',],
        ['../../../examples/input_samples/cmu_arctic_us_aew_a0001.wav',
            '../../../examples/input_samples/cmu_arctic_us_aew_a0002.wav',
            '../../../examples/input_samples/cmu_arctic_us_aew_a0003.wav',]
        ]
fs = 16000;

signals = [ np.concatenate([wavfile.read(f)[1].astype(np.float32)
        for f in source_files])
for source_files in wav_files ]

In [4]:
print("Original signal 1 (Woman):")
IPython.display.Audio(signals[0], rate=fs)

Original signal 1 (Woman):


In [5]:
print("Original signal 2 (Man):")
IPython.display.Audio(signals[1], rate=fs)

Original signal 2 (Man):


Define an anechoic room envrionment, as well as the microphone array and source locations.

In [6]:
# Room 4m by 6m
room_dim = [8, 9]

# source locations and delays
locations = [[2.5,3], [2.5, 6]]
delays = [1., 0.]

# create an anechoic room with sources and mics
room = pra.ShoeBox(room_dim, fs=16000, max_order=15, absorption=0.35, sigma2_awgn=1e-8)

# add mic and good source to room
# Add silent signals to all sources
for sig, d, loc in zip(signals, delays, locations):
    room.add_source(loc, signal=np.zeros_like(sig), delay=d)

# add microphone array
room.add_microphone_array(pra.MicrophoneArray(np.c_[[6.5, 4.49], [6.5, 4.51]], room.fs))

Compute the RIRs as in the Room Impulse Response generation section.

In [7]:
# compute RIRs
room.compute_rir()

Mix the microphone recordings to simulate the observed signals by the microphone array in the frequency domain. To that end, we apply the STFT transform as explained in STFT.

In [8]:
from mir_eval.separation import bss_eval_images

# Record each source separately
separate_recordings = []
for source, signal in zip(room.sources, signals):

    source.signal[:] = signal

    room.simulate()
    separate_recordings.append(room.mic_array.signals)

    source.signal[:] = 0.
separate_recordings = np.array(separate_recordings)

# Mix down the recorded signals
mics_signals = np.sum(separate_recordings, axis=0)

# STFT frame length
L = 2048
# Observation vector in the STFT domain
X = np.array([pra.stft(ch, L, L, transform=np.fft.rfft, zp_front=L//2, zp_back=L//2) for ch in mics_signals])
X = np.moveaxis(X, 0, 2)

# Reference signal to calculate performance of BSS
ref = np.moveaxis(separate_recordings, 1, 2)
SDR, SIR = [], []

# Callback function to monitor the convergence of the algorithm
def convergence_callback(Y):
    global SDR, SIR
    ref = np.moveaxis(separate_recordings, 1, 2)
    y = np.array([pra.istft(Y[:,:,ch], L, L,
            transform=np.fft.irfft, zp_front=L//2, zp_back=L//2) for ch in range(Y.shape[2])])
    sdr, isr, sir, sar, perm = bss_eval_images(ref[:,:y.shape[1]-L//2,0], y[:,L//2:ref.shape[1]+L//2])
    SDR.append(sdr)
    SIR.append(sir)

In [9]:
print("Roomed signal 1:")
IPython.display.Audio(separate_recordings[0], rate=fs)

Roomed signal 1:


In [10]:
print("Roomed signal 2:")
IPython.display.Audio(separate_recordings[1], rate=fs)

Roomed signal 2:


In [11]:
print("Mixed signal:")
IPython.display.Audio(mics_signals[0], rate=fs)

Mixed signal:


Run AuxIVA to estimate the source images from the observation signals in the frequency domain.

In [12]:
# Run AuxIVA
Y = pra.bss.auxiva(X, n_iter=30, proj_back=True, callback=convergence_callback)

# run iSTFT
y = np.array([pra.istft(Y[:,:,ch], L, L, transform=np.fft.irfft, zp_front=L//2, zp_back=L//2) for ch in range(Y.shape[2])])

# Compare SIR and SDR with our reference signal
sdr, isr, sir, sar, perm = bss_eval_images(ref[:,:y.shape[1]-L//2,0], y[:,L//2:ref.shape[1]+L//2])

In [13]:
# Run ILRMA
Y = pra.bss.ilrma(X, n_iter=30, n_components=30, proj_back=True)

# run iSTFT
y = np.array([pra.istft(Y[:,:,ch], L, L, transform=np.fft.irfft, zp_front=L // 2, zp_back=L // 2) for ch in range(Y.shape[2])])


In [14]:
# run Trinicon
y,w = pra.bss.trinicon(mics_signals, filter_length=L, return_filters=True)
y,w = pra.bss.trinicon(mics_signals, w0=w, return_filters=True)
y,w = pra.bss.trinicon(mics_signals, w0=w, return_filters=True)
y,w = pra.bss.trinicon(mics_signals, w0=w, return_filters=True)
y,w = pra.bss.trinicon(mics_signals, w0=w, return_filters=True)
y,w = pra.bss.trinicon(mics_signals, w0=w, return_filters=True)
y = pra.bss.trinicon(mics_signals, w0=w)

In [15]:
print("Separated source 0:")
IPython.display.Audio(y[0], rate=fs)

Separated source 0:


In [16]:
print("Separated source 1:")
IPython.display.Audio(y[1], rate=fs)

Separated source 1:


Find the most influencial frequencies in the mix 

In [62]:
ratio = 0.35
average = np.abs(np.mean(np.mean(X, axis=2),axis=0))
k = np.int_(average.shape[0]*ratio)
S = np.argpartition(average, -k)[-k:]
S = np.sort(S)

Run SparseAuxIva

In [65]:
Y = sparseauxiva(X,S,mu=0,n_iter=20)

NameError: name 'sparseauxiva' is not defined