In [1]:
from glob import glob
import random

In [2]:
files = glob('../emotion-classification-from-audio-files/TESS_Toronto_emotional_speech_set_data/**/*.wav',
             recursive = True)

len(files)

2800

In [3]:
labels = [f.split('_')[-1].replace('.wav','') for f in files]
set(labels)

{'angry', 'disgust', 'fear', 'happy', 'neutral', 'ps', 'sad'}

In [4]:
replace_labels = {'ps': 'surprise'}

In [5]:
def get_label(file):
    l = file.split('_')[-1].replace('.wav','')
    return replace_labels.get(l, l)

In [6]:
labels = [get_label(f) for f in files]

In [7]:
import soundfile as sf
from scipy import interpolate
import numpy as np
import librosa

def resample(data, old_samplerate, new_samplerate):
    old_audio = data
    duration = data.shape[0] / old_samplerate
    time_old = np.linspace(0, duration, old_audio.shape[0])
    time_new = np.linspace(
        0, duration, int(old_audio.shape[0] * new_samplerate / old_samplerate)
    )

    interpolator = interpolate.interp1d(time_old, old_audio.T)
    data = interpolator(time_new).T
    return data

def read_wav(file, sample_rate = 16000):
    y, sr = sf.read(file)
    if sr != sample_rate:
        y = resample(y, sr, sample_rate)
    return y, sample_rate



In [8]:
np.seterr(all='raise')

{'divide': 'warn', 'over': 'warn', 'under': 'ignore', 'invalid': 'warn'}

In [9]:
from scipy.special import expit

def sox_reverb(
    y, reverberance = 1, hf_damping = 1, room_scale = 1, stereo_depth = 1
):
    from pysndfx import AudioEffectsChain

    apply_audio_effects = AudioEffectsChain().reverb(
        reverberance = reverberance,
        hf_damping = hf_damping,
        room_scale = room_scale,
        stereo_depth = stereo_depth,
        pre_delay = 20,
        wet_gain = 0,
        wet_only = False,
    )
    y_enhanced = apply_audio_effects(y)

    return y_enhanced


def sox_augment_low(
    y,
    min_bass_gain = 5,
    reverberance = 1,
    hf_damping = 1,
    room_scale = 1,
    stereo_depth = 1,
    negate = 1,
):
    from pysndfx import AudioEffectsChain

    if negate:
        min_bass_gain = -min_bass_gain
    apply_audio_effects = (
        AudioEffectsChain()
        .lowshelf(gain = min_bass_gain, frequency = 300, slope = 0.1)
        .reverb(
            reverberance = reverberance,
            hf_damping = hf_damping,
            room_scale = room_scale,
            stereo_depth = stereo_depth,
            pre_delay = 20,
            wet_gain = 0,
            wet_only = False,
        )
    )
    y_enhanced = apply_audio_effects(y)

    return y_enhanced


def sox_augment_high(
    y,
    min_bass_gain = 5,
    reverberance = 1,
    hf_damping = 1,
    room_scale = 1,
    stereo_depth = 1,
    negate = 1,
):
    from pysndfx import AudioEffectsChain

    if negate:
        min_bass_gain = -min_bass_gain

    apply_audio_effects = (
        AudioEffectsChain()
        .highshelf(
            gain = -min_bass_gain * (1 - expit(np.max(y))),
            frequency = 300,
            slope = 0.1,
        )
        .reverb(
            reverberance = reverberance,
            hf_damping = hf_damping,
            room_scale = room_scale,
            stereo_depth = stereo_depth,
            pre_delay = 20,
            wet_gain = 0,
            wet_only = False,
        )
    )
    y_enhanced = apply_audio_effects(y)

    return y_enhanced


def sox_augment_combine(
    y,
    min_bass_gain_low = 5,
    min_bass_gain_high = 5,
    reverberance = 1,
    hf_damping = 1,
    room_scale = 1,
    stereo_depth = 1,
):
    from pysndfx import AudioEffectsChain

    apply_audio_effects = (
        AudioEffectsChain()
        .lowshelf(gain = min_bass_gain_low, frequency = 300, slope = 0.1)
        .highshelf(gain = -min_bass_gain_high, frequency = 300, slope = 0.1)
        .reverb(
            reverberance = reverberance,
            hf_damping = hf_damping,
            room_scale = room_scale,
            stereo_depth = stereo_depth,
            pre_delay = 20,
            wet_gain = 0,
            wet_only = False,
        )
    )
    y_enhanced = apply_audio_effects(y)

    return y_enhanced


def random_pitch(sample, low = 0.5, high = 1.0):
    y_pitch_speed = sample.copy()
    length_change = np.random.uniform(low = low, high = high)
    speed_fac = 1.0 / length_change
    tmp = np.interp(
        np.arange(0, len(y_pitch_speed), speed_fac),
        np.arange(0, len(y_pitch_speed)),
        y_pitch_speed,
    )
    minlen = min(y_pitch_speed.shape[0], tmp.shape[0])
    y_pitch_speed *= 0
    y_pitch_speed[:minlen] = tmp[:minlen]
    return y_pitch_speed


def random_amplitude(sample, low = 1.5, high = 3):
    y_aug = sample.copy()
    dyn_change = np.random.uniform(low = low, high = high)
    return y_aug * dyn_change


def random_stretch(sample, low = 0.5, high = 1.3):
    input_length = len(sample)
    stretching = sample.copy()
    random_stretch = np.random.uniform(low = low, high = high)
    stretching = librosa.effects.time_stretch(
        stretching.astype('float'), random_stretch
    )
    return stretching

def add_uniform_noise(sample, power = 0.01):
    y_noise = sample.copy()
    noise_amp = power * np.random.uniform() * np.amax(y_noise)
    return y_noise.astype('float64') + noise_amp * np.random.normal(
        size = y_noise.shape[0]
    )


def add_noise(sample, noise, random_sample = True, factor = 0.1):
    y_noise = sample.copy()
    if len(y_noise) > len(noise):
        noise = np.tile(noise, int(np.ceil(len(y_noise) / len(noise))))
    else:
        if random_sample:
            noise = noise[np.random.randint(0, len(noise) - len(y_noise) + 1) :]
    return y_noise + noise[: len(y_noise)] * factor

def sampling(combined, frame_duration_ms = 700, sample_rate = 16000):
    n = int(sample_rate * (frame_duration_ms / 1000.0))
    offset = 0
    while offset + n <= len(combined):
        yield combined[offset : offset + n]
        offset += n
    if offset < len(combined):
        yield combined[offset:]

In [10]:
def calc(signal):

    choice = random.randint(0, 4)
    if choice == 0:

        x = sox_augment_high(
            signal,
            min_bass_gain = random.randint(25, 50),
            reverberance = random.randint(0, 80),
            hf_damping = 10,
            room_scale = random.randint(0, 50),
            negate = 1,
        )
    if choice == 1:
        x = sox_augment_high(
            signal,
            min_bass_gain = random.randint(25, 70),
            reverberance = random.randint(0, 80),
            hf_damping = 10,
            room_scale = random.randint(0, 50),
            negate = 0,
        )
    if choice == 2:
        x = sox_augment_low(
            signal,
            min_bass_gain = random.randint(5, 30),
            reverberance = random.randint(0, 80),
            hf_damping = 10,
            room_scale = random.randint(0, 50),
            negate = random.randint(0, 1),
        )
    if choice == 3:
        x = sox_augment_combine(
            signal,
            min_bass_gain_high = random.randint(25, 70),
            min_bass_gain_low = random.randint(5, 30),
            reverberance = random.randint(0, 80),
            hf_damping = 10,
            room_scale = random.randint(0, 90),
        )
    if choice == 4:
        x = sox_reverb(
            signal,
            reverberance = random.randint(10, 80),
            hf_damping = 10,
            room_scale = random.randint(10, 90),
        )

    if random.randint(0, 1):
        x = add_uniform_noise(
            x, power = random.uniform(0.005, 0.015)
        )
        
    if random.random() > 0.75:
        r = random.choice(not_music)
        n = read_wav(r)[0]
        x = add_noise(x, n, factor = random.uniform(0.005, 0.01))

    return x

In [11]:
not_music = glob('../not-music/clean-wav/*.wav') + glob('../musan/music/**/*.wav', recursive = True) \
+ glob('musan/noise/**/*.wav', recursive = True)
files = files + not_music
labels = labels + ['not an emotion'] * len(not_music)

In [18]:
import IPython.display as ipd

In [12]:
y, sr = read_wav(files[-2])
y = y[:sr * 10]
y, sr, get_label(files[-2])

(array([0.        , 0.        , 0.        , ..., 0.05361938, 0.01065063,
        0.01055908]),
 16000,
 '../musan/music/jamendo/music-jamendo-0171')

In [13]:
calc(y)

array([-0.00169413, -0.00170154, -0.00164258, ...,  0.01697721,
        0.00490743,  0.00455676])

In [19]:
ipd.Audio(random_stretch(y), rate = sr)

In [20]:
ipd.Audio(random_pitch(y), rate = sr)

In [21]:
ipd.Audio(calc(y), rate = sr)

In [22]:
from sklearn.utils import shuffle
files, labels = shuffle(files, labels)

In [23]:
ipd.Audio(y, rate = sr)

In [24]:
len(not_music)

1096

In [25]:
set(labels)

{'angry',
 'disgust',
 'fear',
 'happy',
 'neutral',
 'not an emotion',
 'sad',
 'surprise'}

In [26]:
actual_labels = [
    'angry',
    'disgust',
    'fear',
    'happy',
    'sad',
    'surprise',
    'neutral',
    'not an emotion',
]

In [27]:
!cp ../mp.py .

In [28]:
import os
import tensorflow as tf

os.system('rm emotion/data/*')
DATA_DIR = os.path.expanduser('emotion/data')
tf.gfile.MakeDirs(DATA_DIR)

In [29]:
from tqdm import tqdm
from malaya_speech.train import prepare_data
from collections import defaultdict

def loop(files, dupe_factor = 15):
    files, no = files
    fname = f'{DATA_DIR}/part-{no}.tfrecords'
    writer = tf.python_io.TFRecordWriter(fname)
    counts = defaultdict(int)
    for file in tqdm(files):
        try:
            wav = read_wav(file[0])[0]
            if file[1] != 'not an emotion':
                d = dupe_factor
            else:
                d = 1
            for _ in range(d):
                if file[1] != 'not an emotion':
                    minimum = 1000
                else:
                    minimum = 200
                fs = sampling(wav, random.randint(minimum, 2000))
                for s in fs:
                    try:
                        if file[1] != 'not an emotion':
                            for _ in range(dupe_factor):
                                n = calc(s)
                                if len(n) > 50:
                                    example = prepare_data.to_example({'inputs': n.tolist(), 
                                                                       'targets': [actual_labels.index(file[1])]})
                                    writer.write(example.SerializeToString())
                                    counts[file[1]] += 1
                            n = s
                        else:
                            n = s
                        if len(n) > 50:
                            example = prepare_data.to_example({'inputs': n.tolist(), 
                                                               'targets': [actual_labels.index(file[1])]})
                            writer.write(example.SerializeToString())
                            counts[file[1]] += 1
                    except Exception as e:
                        print(e)
                        pass
        except Exception as e:
            print(e)
            pass

    writer.close()
    return [counts]

In [30]:
combined_all = list(zip(files, labels))

In [31]:
import mp
returned = mp.multiprocessing(combined_all, loop, cores = 10)

100%|██████████| 389/389 [2:06:42<00:00, 19.54s/it]  
100%|██████████| 389/389 [2:06:51<00:00, 19.57s/it]
100%|██████████| 389/389 [2:06:56<00:00, 19.58s/it]
100%|██████████| 389/389 [2:06:59<00:00, 19.59s/it]
100%|██████████| 389/389 [2:07:27<00:00, 19.66s/it]
100%|██████████| 389/389 [2:07:31<00:00, 19.67s/it]
100%|██████████| 6/6 [01:22<00:00, 13.73s/it]2s/it]
100%|██████████| 389/389 [2:08:12<00:00, 19.77s/it]
100%|██████████| 389/389 [2:08:15<00:00, 19.78s/it]
100%|██████████| 389/389 [2:08:27<00:00, 19.81s/it]
100%|██████████| 389/389 [2:11:01<00:00, 20.21s/it]


In [32]:
combined_d = defaultdict(int)
for d in returned:
    for k, v in d.items():
        combined_d[k] += v
combined_d

defaultdict(int,
            {'sad': 211504,
             'not an emotion': 484106,
             'neutral': 193120,
             'surprise': 189312,
             'happy': 187600,
             'angry': 173344,
             'fear': 158992,
             'disgust': 213744})