In [1]:
from audiomentations import AddBackgroundNoise, PolarityInversion
from imutils import paths
import soundfile as sf
import numpy as np
import librosa
import random
import shutil
import json
import os 

In [2]:
# path to the Kazakh Speech Corpus Dataset (KSCD)
dataset_path = "data/"
# path to save the processed dataset
new_dataset_path = "Keyword-MLP/data_kk/"
# path to the background noise dataset
bn_dataset_path = "ESC-50-master/audio" 

n = 40 # number of randomly selected subjects
    
commands = ["backward", "forward", "right", "left", "down", "up", "go", "stop", "on", "off", "yes", "no", 
            "learn", "follow", "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", 
            "nine", "bed", "bird", "cat", "dog", "happy", "house", "read", "write", "tree", "visual", "wow"]

In [3]:
# create new directories
for command in commands:
    command_path = os.path.join(new_dataset_path, command)
    if not os.path.exists(command_path):
        os.makedirs(command_path)

In [4]:
# check the num of speech commands in KSCD
wavPaths = list(paths.list_files(dataset_path, validExts="wav"))
print(len(wavPaths))

3623


In [5]:
# create a dictionary
# key: subject ID
# value: num of samples 
dic = {}

for wavPath in wavPaths:
    wav_name = wavPath.split('/')[-1]
    sub_id = wav_name.split('.')[0]
    if sub_id in dic:
        dic[sub_id] += 1
    else:
        dic[sub_id] = 1

In [6]:
# split the subjects into
# training, validation, and test set
train_subjects = []
test_subjects = []
val_subjects = []

for key, val in dic.items():
    if val == 35:
        test_subjects.append(key)
    else:
        train_subjects.append(key)

val_subjects = test_subjects[n:]
test_subjects = test_subjects[:n]

print("# training subjects:", len(train_subjects))
print("# validation subjects:", len(val_subjects))
print("# testing subjects:", len(test_subjects))

# training subjects: 69
# validation subjects: 10
# testing subjects: 40


In [7]:
sample_rate = 16000 # sampling rate in kHz
duration = 1 # audio length in seconds
speeds = [0.8, 1, 1.2] # time stretch rates
pitches = [-2, 0, 2] # pitch scales
gain_min_factor, gain_max_factor  = 0.8, 1.2 # min-max gain factors
noise_percentage_factor = 0.05 # white gaussian noise 
num_background_noise = 1 # number of background noises 

In [8]:
# https://github.com/musikalkemist/audioDataAugmentationTutorial/blob/main/3/dataaugmentation.py
def add_white_noise(signal, noise_percentage_factor):
    noise = np.random.normal(0, signal.std(), signal.size)
    augmented_signal = signal + noise * noise_percentage_factor
    return augmented_signal


def time_stretch(signal, time_stretch_rate):
    return librosa.effects.time_stretch(y=signal, rate=time_stretch_rate)


def pitch_scale(signal, sample_rate, num_semitones):
    return librosa.effects.pitch_shift(y=signal, sr=sample_rate, n_steps=num_semitones)


def random_gain(signal, min_factor, max_factor):
    gain_rate = random.uniform(min_factor, max_factor)
    augmented_signal = signal * gain_rate
    return augmented_signal


def invert_polarity(signal):
    return signal * -1


add_background_noise = AddBackgroundNoise(sounds_path=bn_dataset_path,
                                          min_snr_in_db=5.0,
                                          max_snr_in_db=30.0,
                                          noise_transform=PolarityInversion(),
                                          p=1.0)

In [9]:
# convert lists to sets for fast searching
train_subjects = set(train_subjects)
test_subjects = set(test_subjects)
val_subjects = set(val_subjects)

In [10]:
# create training, validation, and test lists
train_list = []
val_list = []
test_list = []

# loop over the speech commands
for i, wavPath in enumerate(wavPaths, 1):
    print("[INFO] Processing file: {}/{}".format(i, len(wavPaths)))
    
    # extract the command name, wav file name, and subject ID
    command, wav_file = wavPath.split('/')[-2:]
    subject = wav_file.split('.')[0]
    
    # load the raw audio signal
    raw_signal, _ = librosa.load(wavPath, sr=sample_rate)
    
    # define a path for saving the augmented command
    save_path = os.path.join(new_dataset_path, command)
    
    if subject in train_subjects or subject in val_subjects:
        # loop over the time stretch rates
        for speed in speeds:
            # apply time stretch augmentation 
            ts_signal = time_stretch(raw_signal, speed)     

            # loop over the pitch scales
            for pitch in pitches:
                # apply pitch scale augmentation
                ps_signal = pitch_scale(ts_signal, sample_rate, pitch)
                
                # apply gain scale augmentation
                gs_signal = random_gain(ps_signal, gain_min_factor, gain_max_factor)
                wav_file_name = "{}-{}-{}-0.wav".format(subject, speed, pitch)
                sf.write(os.path.join(save_path, wav_file_name), gs_signal, sample_rate)
                
                if subject in train_subjects:
                    train_list.append('./data_kk/' + command + '/' + wav_file_name)
                else:
                    val_list.append('./data_kk/' + command + '/' + wav_file_name)
                
                # add white gaussian noise 
                gn_signal = add_white_noise(gs_signal, noise_percentage_factor)
                gn_wav_file_name = "{}-{}-{}-1.wav".format(subject, speed, pitch)
                sf.write(os.path.join(save_path, gn_wav_file_name), gn_signal, sample_rate)
                
                if subject in train_subjects:
                    train_list.append('./data_kk/' + command + '/' + gn_wav_file_name)
                else:
                    val_list.append('./data_kk/' + command + '/' + gn_wav_file_name)
                    
                # add random background noise
                for i in range(num_background_noise):
                    bn_signal = add_background_noise(gs_signal, sample_rate=sample_rate)
                    bn_wav_file_name = "{}-{}-{}-{}.wav".format(subject, speed, pitch, i+2)
                    sf.write(os.path.join(save_path, bn_wav_file_name), bn_signal, sample_rate)
                    
                    if subject in train_subjects:
                        train_list.append('./data_kk/' + command + '/' + bn_wav_file_name)
                    else:
                        val_list.append('./data_kk/' + command + '/' + bn_wav_file_name)
    else:
        sf.write(os.path.join(save_path, wav_file), raw_signal, sample_rate)
        test_list.append('./data_kk/' + command + '/' + wav_file)

[INFO] Processing file: 1/3623
[INFO] Processing file: 2/3623
[INFO] Processing file: 3/3623
[INFO] Processing file: 4/3623
[INFO] Processing file: 5/3623
[INFO] Processing file: 6/3623
[INFO] Processing file: 7/3623
[INFO] Processing file: 8/3623
[INFO] Processing file: 9/3623
[INFO] Processing file: 10/3623
[INFO] Processing file: 11/3623
[INFO] Processing file: 12/3623
[INFO] Processing file: 13/3623
[INFO] Processing file: 14/3623
[INFO] Processing file: 15/3623
[INFO] Processing file: 16/3623
[INFO] Processing file: 17/3623
[INFO] Processing file: 18/3623
[INFO] Processing file: 19/3623
[INFO] Processing file: 20/3623
[INFO] Processing file: 21/3623
[INFO] Processing file: 22/3623
[INFO] Processing file: 23/3623
[INFO] Processing file: 24/3623
[INFO] Processing file: 25/3623
[INFO] Processing file: 26/3623
[INFO] Processing file: 27/3623
[INFO] Processing file: 28/3623
[INFO] Processing file: 29/3623
[INFO] Processing file: 30/3623
[INFO] Processing file: 31/3623
[INFO] Processing

In [11]:
label_list = [label for label in sorted(os.listdir(new_dataset_path)) if os.path.isdir(os.path.join(new_dataset_path, label)) and label[0] != "_"]
label_map = {idx: label for idx, label in enumerate(label_list)}

with open(os.path.join(new_dataset_path, "training_list.txt"), "w+") as f:
    f.write("\n".join(train_list))

with open(os.path.join(new_dataset_path, "validation_list.txt"), "w+") as f:
    f.write("\n".join(val_list))

with open(os.path.join(new_dataset_path, "testing_list.txt"), "w+") as f:
    f.write("\n".join(test_list))

with open(os.path.join(new_dataset_path, "label_map.json"), "w+") as f:
    json.dump(label_map, f)

In [12]:
print("Train samples:", len(train_list))
print("Val samples:", len(val_list))
print("Test samples:", len(test_list))

Train samples: 50571
Val samples: 9450
Test samples: 1400


In [16]:
for i, command in enumerate(commands,1):
    wavPaths = list(paths.list_files(os.path.join(new_dataset_path, command), validExts="wav"))
    print("{}. Command: {}, samples: {}".format(i, command, len(wavPaths)))

1. Command: backward, samples: 2011
2. Command: forward, samples: 1984
3. Command: right, samples: 1822
4. Command: left, samples: 1768
5. Command: down, samples: 1714
6. Command: up, samples: 1768
7. Command: go, samples: 1687
8. Command: stop, samples: 1849
9. Command: on, samples: 1687
10. Command: off, samples: 1795
11. Command: yes, samples: 1930
12. Command: no, samples: 1849
13. Command: learn, samples: 1876
14. Command: follow, samples: 1768
15. Command: zero, samples: 1795
16. Command: one, samples: 1849
17. Command: two, samples: 1633
18. Command: three, samples: 1849
19. Command: four, samples: 1579
20. Command: five, samples: 1768
21. Command: six, samples: 1687
22. Command: seven, samples: 1741
23. Command: eight, samples: 1741
24. Command: nine, samples: 1660
25. Command: bed, samples: 1579
26. Command: bird, samples: 1552
27. Command: cat, samples: 1579
28. Command: dog, samples: 1714
29. Command: happy, samples: 1687
30. Command: house, samples: 1849
31. Command: read, 

In [19]:
test_dataset_path = 'Keyword-MLP/test_data_kk'

for command in commands:
    command_path = os.path.join(test_dataset_path, command)
    
    if not os.path.exists(command_path):
        os.makedirs(command_path)
        
for test_path in test_list:
    command, wav_file = test_path.split('/')[-2:]
    source_path = os.path.join(new_dataset_path, command, wav_file)
    destination_path = os.path.join(test_dataset_path, command, wav_file)
    shutil.copy(source_path, destination_path)