In [1]:
import os
import math
from random import shuffle, sample, choice
import numpy as np
from scipy.io import wavfile
from scipy.signal import convolve
from pathlib import Path
from tqdm.notebook import tqdm
from librosa import load, resample, get_samplerate

In [2]:
# speech_dir = Path('./WAVs/speech')
# noise_dir = Path('./WAVs/noise')
speech_dir = Path('/home/guillaume/Nextcloud/Thesis_Guillaume/Datasets/Dry Speech/Parsed TIMIT/parsed_output')
noise_dir = Path('/home/guillaume/Nextcloud/Thesis_Guillaume/Datasets/Noise/ETSI_Background_Noise_split')
RIR_dir = Path('/home/guillaume/Nextcloud/Thesis_Guillaume/Datasets/Recorded RIRs/Barcelona')

SNRs = (0, 30)

testing_frac = 0.1
validation_frac = 0.1
training_frac = 1 - testing_frac - validation_frac

max_size = 15000

output_dir = Path('./WAVs/dataset')

In [3]:
def power(x):
    return np.sum(np.square(x))

def SNR(s, n):
    return 10*math.log10(power(s)/power(n))

In [4]:
def resample_convolve(speech_wav, RIR, SNRs):
    speech_wav_upsampled = resample(speech_wav, 16000, 48000, res_type='kaiser_fast')
    convolved = convolve(speech_wav_upsampled, RIR)
    convolved = convolved[:3*len(speech_wav)]
    convolved_downsampled = resample(convolved, 48000, 16000, res_type='kaiser_fast')
    current_SNR = SNR(speech_wav, convolved_downsampled)
    alpha = 1/math.sqrt(math.pow(10, (np.random.randint(SNRs[0], SNRs[1] + 1) - current_SNR)/10))
    return speech_wav + alpha * convolved_downsampled

In [5]:
def generate_wavs(speech_dir, noise_dir, RIR_dir, speech_filenames, noise_filenames, SNRs, output_path, max_size):
    clean_output_path = output_path.joinpath('clean')
    noisy_output_path = output_path.joinpath('noisy')
    if not Path.exists(clean_output_path):
        os.makedirs(clean_output_path)
    if not Path.exists(noisy_output_path):
        os.makedirs(noisy_output_path)
        
    RIR_filenames = os.listdir(RIR_dir)
    
    for filename_speech in tqdm(speech_filenames):
        path_speech = speech_dir.joinpath(filename_speech)
        if get_samplerate(path_speech) == 16000:
            speech_wav, _ = load(path_speech, sr=None, mono=True)
        else:
            speech_wav, _ = load(path_speech, sr=16000, mono=True, res_type='kaiser_fast')
        
        # make sure all WAVs are 128000 samples (8s) long
        length_difference = 128000 - len(speech_wav)
        if length_difference > 0:
            start = np.random.randint(0, length_difference)
            speech_wav = np.pad(speech_wav, (start, length_difference - start))
        if length_difference < 0:
            start = np.random.randint(0, -length_difference)
            speech_wav = speech_wav[start:start+128000]
            
        assert len(speech_wav) == 128000, 'Speech waveform has ' + str(len(speech_wav)) + ' samples instead of 128000 samples!'
        
        RIR = choice(np.load(RIR_dir.joinpath(choice(RIR_filenames))).transpose())
        speech_convolved_wav = resample_convolve(speech_wav, RIR, (5, 25))
        
        if np.max(np.abs(speech_convolved_wav)) > 1: # make sure we are not clipping
            Print('Convolved speech was clipping!')
            speech_convolved_wav = speech_convolved_wav/np.max(np.abs(speech_convolved_wav))
        
        speech_convolved_output_wav = speech_convolved_wav * 2**15
        clean_speech_filename = clean_output_path.joinpath(filename_speech.rsplit( ".", 1 )[0] + '.wav')
        wavfile.write(clean_speech_filename, 16000, speech_convolved_output_wav.astype(np.int16))
        
        if len(speech_filenames) * len(noise_filenames) > max_size:
            noise_filenames_sampled = sample(list(noise_filenames), max_size // len(speech_filenames))
        else:
            noise_filenames_sampled = noise_filenames
        
        for filename_noise in noise_filenames_sampled:
            path_noise = noise_dir.joinpath(filename_noise)
            if get_samplerate(path_noise) == 16000:
                noise_wav, _ = load(path_noise, sr=None, mono=True)
            else:
                noise_wav, _ = load(path_noise, sr=16000, mono=True, res_type='kaiser_fast')

            current_SNR = SNR(speech_wav, noise_wav)

            length_difference = len(speech_wav) - len(noise_wav)

            desired_SNR = np.random.randint(SNRs[0], SNRs[1] + 1)
            alpha = 1/math.sqrt( math.pow(10, (desired_SNR - current_SNR)/10)) # alpha so the SNR matches the wanted SNR
            if length_difference == 0:
                noisy_speech_wav = speech_wav + alpha * noise_wav
            if length_difference > 0:
                print("Noise was shorter than 8s!")
                start = np.random.randint(0, length_difference)
                noisy_speech_wav = speech_wav + alpha * np.pad(noise_wav, (start, length_difference - start))
            if length_difference < 0:
                start = np.random.randint(0, -length_difference)
                noisy_speech_wav = speech_wav + alpha * noise_wav[start:start+len(speech_wav)]
                
            if np.max(np.abs(noisy_speech_wav)) > 1: # make sure we are not clipping
                Print('Noisy speech was clipping!')
                noisy_speech_wav = noisy_speech_wav/np.max(np.abs(noisy_speech_wav))

            noisy_speech_wav = noisy_speech_wav * 2**15
            noisy_speech_wav = noisy_speech_wav.astype(np.int16)
            noisy_speech_filename = noisy_output_path.joinpath(filename_speech.rsplit( ".", 1 )[0] + '+' + filename_noise.rsplit( ".", 1 )[0] + '+SNR' + str(desired_SNR) + 'dB' + '.wav')
            wavfile.write(noisy_speech_filename, 16000, noisy_speech_wav)

In [6]:
def split_generate_wavs(speech_dir, noise_dir, RIR_dir, SNRs, testing_frac, validation_frac, training_frac, max_size, output_dir):
    assert testing_frac + validation_frac + training_frac == 1, 'Split fractions do not sum to 1!'
    
    speech_filenames = os.listdir(speech_dir)
    shuffle(speech_filenames)
    
    noise_filenames = os.listdir(noise_dir)
    shuffle(noise_filenames)
    
    speech_filenames = sample(speech_filenames, 2000) # use 2000 clean speech files to generate dataset
    

    speech_train, speech_val, speech_test = np.split(speech_filenames,
                                                     [int(training_frac*len(speech_filenames)), int((training_frac + validation_frac)*len(speech_filenames))])


    noise_seen, noise_unseen = np.split(noise_filenames,
                                        [int((training_frac+validation_frac)*len(noise_filenames))])
    
    training_output_path = output_dir.joinpath('training')
    val_output_path = output_dir.joinpath('validation')
    test_seen_noise_output_path = output_dir.joinpath('testing_seen_noise')
    test_unseen_noise_output_path = output_dir.joinpath('testing_unseen_noise')
    
    # generate training set
    generate_wavs(speech_dir, noise_dir, RIR_dir, speech_train, noise_seen, SNRs, training_output_path, int(max_size * training_frac))
    
    # generate validation set
    generate_wavs(speech_dir, noise_dir, RIR_dir, speech_val, noise_seen, SNRs, val_output_path, int(max_size * validation_frac))
    
    # generate testing set on seen noise
    generate_wavs(speech_dir, noise_dir, RIR_dir, speech_test, noise_seen, SNRs, test_seen_noise_output_path, int(max_size * testing_frac))
    
    # generate testing set on unseen noise
    generate_wavs(speech_dir, noise_dir, RIR_dir, speech_test, noise_unseen, SNRs, test_unseen_noise_output_path, int(max_size * testing_frac))

In [7]:
split_generate_wavs(speech_dir, noise_dir, RIR_dir, SNRs, testing_frac, validation_frac, training_frac, max_size, output_dir)

HBox(children=(FloatProgress(value=0.0, max=1600.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=200.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=200.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=200.0), HTML(value='')))


