In [1]:
# todo: clean up imports

import tensorflow as tf
from tensorflow.data import Dataset
from tensorflow.python.ops import gen_audio_ops
import python_speech_features

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import librosa as rosa

from IPython.display import display, Audio, HTML
from tqdm.notebook import tqdm

import random
import os
import glob

print(f'Tensorflow v{tf.__version__}')

Tensorflow v2.12.0


In [2]:
DATA_DIR = os.path.join('.', 'data')
words_dirs = [d for d in os.listdir(DATA_DIR) if os.path.isdir(os.path.join(DATA_DIR, d)) and not d.startswith('_')]
print(words_dirs)

TARGET_WORD = 'komputer'
TARGET_LABEL = 1.0
NOT_TARGET_LABEL = 0.0

OTHER_WORDS = list(filter(lambda w: w != TARGET_WORD, words_dirs))

SAMPLE_RATE = 16000
WINDOW_SIZE = 320
WINDOW_STEP = 160
WINDOW_LENGTH_IN_MS = WINDOW_SIZE / SAMPLE_RATE
WINDOW_STEP_IN_MS = WINDOW_STEP / SAMPLE_RATE

VOICE_THRESHOLD = 0.2
BG_VOLUME = 0.1
POOLING_SIZE = [1, 6]

NO_AUDIO_SHIFTS = 2
NO_BG_MIXED = 1
NO_FREQ_MASK_SPECS = 1
NO_TIME_MASK_SPECS = 1
MASK_MAX_WIDTH = 3

DEBUG = False
PLOT_SIZE = (10, 4)


In [3]:
def plot_audio(audio, ax=None, desc=None):
    if not DEBUG:
        return

    if desc is not None:
        display(HTML(f'<h1><center>{desc}</center></h1>'))
    
    axe = ax
    if ax is None:
        _, axe = plt.subplots(figsize=PLOT_SIZE)

    axe.plot(audio)
    axe.set_title('Audio wave')
    axe.set_xlabel('Time')
    axe.set_ylabel('Amplitude')
    
    if ax is None:
        plt.show()


def plot_spec(spec, ax=None, desc=None):
    if not DEBUG:
        return

    if desc is not None:
        display(HTML(f'<h1><center>{desc}</center></h1>'))

    axe = ax
    if ax is None:
        _, axe = plt.subplots(figsize=PLOT_SIZE)

    spec = np.squeeze(spec)
    im = axe.imshow(spec, aspect='auto', origin='lower', cmap='jet')
    axe.figure.colorbar(im, ax=axe, label='Magnitude')
    axe.set_title('Spectrogram')
    axe.set_xlabel('Time')
    axe.set_ylabel('Frequency')
    
    if ax is None:
        plt.show()


def debug_audio(audio, desc='Audio', feature_type='spec'):
    if not DEBUG:
        return

    display(HTML(f'<h1><center>{desc}</center></h1>'))
    fig, axes = plt.subplots(1, 2, figsize=PLOT_SIZE)
    
    if feature_type == 'mfcc':
        mfcc = get_mfcc(audio)
        plot_mfcc(mfcc, ax=axes[1])
    elif feature_type == 'spec':
        spec = get_spectrogram(audio)
        print(spec.shape)
        plot_spec(spec, ax=axes[1])

    plot_audio(audio, ax=axes[0])
    display(Audio(audio, rate=SAMPLE_RATE))

    plt.tight_layout()
    plt.subplots_adjust(wspace=0.3)
    plt.show()


In [4]:
def begin_voice(audio):
    """
        Move voice to the beginning of the recording.
    """
    start, end = find_voice(audio)
    start = max(0, start - 100)
    end = min(SAMPLE_RATE, end + 100)
    return cure_audio(audio[start:end])


def find_voice(audio):
    """
        Find voice start and end points in the audio.
    """

    audio_abs = np.abs(audio)
    start = np.argmax(audio_abs > VOICE_THRESHOLD)
    end = len(audio_abs) - np.argmax(audio_abs[::-1] > VOICE_THRESHOLD) - 1
    return start, end


def time_stretch_audio(audios, r):
    """
        Speed up or slow down provided audios by a given rate.
    """

    res = []
    for a in audios:
        copy = rosa.effects.time_stretch(a, rate=r)
        copy = cure_audio(copy)
        res.append(copy)
        debug_audio(copy, desc=f'Time stretched audio by rate {r}')
    return res


def pitch_shift_audio(audios, s):
    """
        Shift pitch by the given number of half-tones in provided audios.
    """

    res = []
    for a in audios:
        copy = rosa.effects.pitch_shift(a, sr=SAMPLE_RATE, n_steps=s)
        copy = cure_audio(copy)
        res.append(copy)
        debug_audio(copy, desc=f'Pitch shifted audio by {s} half-tones')
    return res


def mix_audio(audios):
    """
        Mix provided audios with background noise samples.
    """

    res = []
    for a in audios:
        for i in range(NO_AUDIO_SHIFTS):
            noise = bg_noises[random.randint(0, len(bg_noises) - 1)]
            noise_start = random.randint(0, len(noise) - SAMPLE_RATE)
            noise = noise[noise_start:noise_start + SAMPLE_RATE]
            copy = a + noise
            copy = cure_audio(copy)
            res.append(copy)
            debug_audio(copy, desc='Audio mixed with noise')
    return res


def shift_audio(audios):
    """
        Shifts voice in provided audios.
    """

    res = []
    for a in audios:
        start, end = find_voice(a)
        silence_len = len(a) + start - end
        silence_interval = silence_len // NO_AUDIO_SHIFTS
        for i in range(NO_AUDIO_SHIFTS):
            shift = (i + 1) * silence_interval
            shift = shift - start if shift > start else -shift
            copy = np.roll(a, shift)
            copy = cure_audio(copy)
            res.append(copy)
            debug_audio(copy, desc=f'Audio shifted by {shift} samples')
    return res


def freq_mask(spec, value=None, width=MASK_MAX_WIDTH):
    """
        Add the mean-value mask to the frequency dimension of the spectrogram.
    """

    if value is None:
        value = spec.mean()
        
    copy = spec.copy()
    mel_channels = copy.shape[1]
    w = random.randint(1, width)
    f_start = random.randint(0, mel_channels - w)
    f_end = random.randint(f_start + 1, f_start + w)
    copy[:, f_start:f_end, :] = value
    plot_spec(copy, desc='Frequency masked spectrogram')
    return copy


def time_mask(spec, value=None, width=MASK_MAX_WIDTH):
    """
        Add the zero mask to the frequency dimension of the spectrogram.
    """

    if value is None:
        value = spec.mean()

    copy = spec.copy()
    time_channels = copy.shape[0]
    w = random.randint(1, width)
    t_start = random.randint(0, time_channels - w)
    t_end = random.randint(t_start + 1, t_start + w)
    copy[t_start:t_end, :, :] = value
    plot_spec(copy, desc='Time masked spectrogram')
    return copy


def normalize_audio(audio):
    """
        Normalize audio.
    """

    audio = audio - np.mean(audio)
    audio = audio / np.max(np.abs(audio))
    return audio


def cut_audio_length(audio, length=SAMPLE_RATE):
    """
        Cut audio to SAMPLE_RATE length.
    """

    audio_len = len(audio)
    if audio_len < length:
        audio = np.append(audio, np.zeros(length - audio_len))
    audio = audio[:length]
    return audio


def cure_audio(audio):
    """
        Make sure that audio's length is fixed to SAMPLE_RATE and normalize.
    """

    audio = cut_audio_length(audio)
    audio = normalize_audio(audio)
    return np.array(audio)


def get_spectrogram(audio):
    """
        Generate a spectrogram for a given audio. Then apply pooling to reduce dimensions.
    """
    
    audio = cure_audio(audio)
    audio = tf.expand_dims(audio, -1)
    audio = tf.cast(audio[:], tf.float32)

    spec = gen_audio_ops.audio_spectrogram(audio,
                                           window_size=WINDOW_SIZE,
                                           stride=WINDOW_STEP,
                                           magnitude_squared=True).numpy()

    spec = tf.expand_dims(spec, -1)
    spec = tf.nn.pool(input=spec,
                      window_shape=POOLING_SIZE,
                      strides=POOLING_SIZE,
                      pooling_type='AVG',
                      padding='SAME')
    spec = tf.squeeze(spec, axis=0)
    spec = np.log(spec + np.finfo(float).eps)
    return spec


def process_file(file_path, repeats=1):
    """
        Process a file sample by adding more filters to all the samples generated in previous steps.
    """

    audio, _ = rosa.load(file_path, sr=SAMPLE_RATE, mono=True)
    audio = cure_audio(audio)
    debug_audio(audio, desc=f'Pure audio [{os.path.basename(file_path)}]')

    all_samples = [audio]
    stretched = []
    pitched = []
    
    for r in [0.8, 1.2]:
        stretched += time_stretch_audio(all_samples, r)
    
    for s in [-2, 2]:
        pitched += pitch_shift_audio(all_samples, s)
        
    all_samples += stretched
    all_samples += pitched
    
    for i in range(repeats):
        shifted = shift_audio(all_samples)
        all_samples += shifted

        bg_mixxx = mix_audio(all_samples)
        all_samples += bg_mixxx

    specs = []
    for s in all_samples:
        spec = get_spectrogram(cure_audio(s))
        specs.append(spec)

        for i in range(repeats):
            specs.append(freq_mask(spec))

        for i in range(repeats):
            specs.append(time_mask(spec))
            
    if DEBUG:
        print(f'Generated {len(specs)} spectrograms!')

    return specs

In [5]:
spectrograms = []

In [7]:
file_paths = glob.glob('data/_bgnoise/*.wav')
progress_bar = tqdm(total=len(file_paths), desc='_bgnoise')
bg_noises = []
no_specs = 0

for file_name in file_paths:
    audio, _ = rosa.load(file_name, sr=SAMPLE_RATE, mono=True)
    audio = normalize_audio(audio)
    for i in range(0, len(audio) - SAMPLE_RATE, SAMPLE_RATE):
        no_specs += 1
        spectrograms.append((get_spectrogram(audio[i:i+SAMPLE_RATE]), NOT_TARGET_LABEL))
    audio = BG_VOLUME * audio
    debug_audio(audio, desc=f'Background noise [{file_name}]')
    bg_noises.append(audio)
    progress_bar.update()
        
print(f'Created {no_specs} spectrograms for background noise')

_bgnoise:   0%|          | 0/11 [00:00<?, ?it/s]

Metal device set to: Apple M1
Created 12388 spectrograms for background noise


In [8]:
file_paths = glob.glob('data/_talking/*.wav')
progress_bar = tqdm(total=len(file_paths), desc='_talking')
no_specs = 0

for file_name in file_paths:
    audio, _ = rosa.load(file_name, sr=SAMPLE_RATE, mono=True)
    audio = normalize_audio(audio)
    for i in range(0, len(audio) - SAMPLE_RATE, SAMPLE_RATE):
        no_specs += 1
        spectrograms.append((get_spectrogram(audio[i:i+SAMPLE_RATE]), NOT_TARGET_LABEL))
    debug_audio(audio, desc=f'Talking noise [{file_name}]')
    progress_bar.update()
        
print(f'Created {no_specs} spectrograms for talking noise')

_talking:   0%|          | 0/87 [00:00<?, ?it/s]

Created 70900 spectrograms for talking noise


In [9]:
file_paths = glob.glob(f'data/{TARGET_WORD}/*.wav')
progress_bar = tqdm(total=len(file_paths), desc=TARGET_WORD)
no_specs = 0

for file_name in file_paths:
    for s in process_file(file_name, 1):
        no_specs += 1
        spectrograms.append((s, TARGET_LABEL))
    progress_bar.update()
        
print(f'Created {no_specs} spectrograms for {TARGET_WORD}')

komputer:   0%|          | 0/329 [00:00<?, ?it/s]

Created 44415 spectrograms for komputer


In [11]:
for word in OTHER_WORDS:
    file_paths = glob.glob(f'data/{word}/*.wav')
    progress_bar = tqdm(total=len(file_paths), desc=word)
    no_specs = 0

    for file_name in file_paths:
        audio, _ = rosa.load(file_name, sr=SAMPLE_RATE, mono=True)
        audio = cure_audio(audio)
        debug_audio(audio, desc=f'Pure audio [{os.path.basename(file_name)}]')
        spectrograms.append((get_spectrogram(audio), NOT_TARGET_LABEL))
        debug_audio(audio, desc=f'Word {word} [{file_name}]')
        progress_bar.update()

    print(f'Created {no_specs} spectrograms for {word}')

right:   0%|          | 0/3778 [00:00<?, ?it/s]

Created 0 spectrograms for right


eight:   0%|          | 0/3787 [00:00<?, ?it/s]

Created 0 spectrograms for eight


cat:   0%|          | 0/2031 [00:00<?, ?it/s]

Created 0 spectrograms for cat


tree:   0%|          | 0/1759 [00:00<?, ?it/s]

Created 0 spectrograms for tree


backward:   0%|          | 0/1664 [00:00<?, ?it/s]

Created 0 spectrograms for backward


learn:   0%|          | 0/1575 [00:00<?, ?it/s]

Created 0 spectrograms for learn


bed:   0%|          | 0/2014 [00:00<?, ?it/s]

Created 0 spectrograms for bed


happy:   0%|          | 0/2054 [00:00<?, ?it/s]

Created 0 spectrograms for happy


go:   0%|          | 0/3880 [00:00<?, ?it/s]

Created 0 spectrograms for go


dog:   0%|          | 0/2128 [00:00<?, ?it/s]

Created 0 spectrograms for dog


no:   0%|          | 0/3941 [00:00<?, ?it/s]

Created 0 spectrograms for no


wow:   0%|          | 0/2123 [00:00<?, ?it/s]

Created 0 spectrograms for wow


follow:   0%|          | 0/1579 [00:00<?, ?it/s]

Created 0 spectrograms for follow


nine:   0%|          | 0/3934 [00:00<?, ?it/s]

Created 0 spectrograms for nine


left:   0%|          | 0/3801 [00:00<?, ?it/s]

Created 0 spectrograms for left


stop:   0%|          | 0/3872 [00:00<?, ?it/s]

Created 0 spectrograms for stop


three:   0%|          | 0/3727 [00:00<?, ?it/s]

Created 0 spectrograms for three


sheila:   0%|          | 0/2022 [00:00<?, ?it/s]

Created 0 spectrograms for sheila


one:   0%|          | 0/3890 [00:00<?, ?it/s]

Created 0 spectrograms for one


bird:   0%|          | 0/2064 [00:00<?, ?it/s]

Created 0 spectrograms for bird


zero:   0%|          | 0/4052 [00:00<?, ?it/s]

Created 0 spectrograms for zero


seven:   0%|          | 0/3998 [00:00<?, ?it/s]

Created 0 spectrograms for seven


up:   0%|          | 0/3723 [00:00<?, ?it/s]

Created 0 spectrograms for up


visual:   0%|          | 0/1592 [00:00<?, ?it/s]

Created 0 spectrograms for visual


marvin:   0%|          | 0/2100 [00:00<?, ?it/s]

Created 0 spectrograms for marvin


two:   0%|          | 0/3880 [00:00<?, ?it/s]

Created 0 spectrograms for two


house:   0%|          | 0/2113 [00:00<?, ?it/s]

Created 0 spectrograms for house


down:   0%|          | 0/3917 [00:00<?, ?it/s]

Created 0 spectrograms for down


six:   0%|          | 0/3860 [00:00<?, ?it/s]

Created 0 spectrograms for six


yes:   0%|          | 0/4044 [00:00<?, ?it/s]

Created 0 spectrograms for yes


on:   0%|          | 0/3845 [00:00<?, ?it/s]

Created 0 spectrograms for on


five:   0%|          | 0/4052 [00:00<?, ?it/s]

Created 0 spectrograms for five


forward:   0%|          | 0/1557 [00:00<?, ?it/s]

Created 0 spectrograms for forward


off:   0%|          | 0/3745 [00:00<?, ?it/s]

Created 0 spectrograms for off


four:   0%|          | 0/3728 [00:00<?, ?it/s]

Created 0 spectrograms for four


In [16]:
random.shuffle(spectrograms)
spectrograms = np.array(spectrograms)

In [25]:
no_specs = len(spectrograms)
no_train_samples = int(no_specs * 0.6)
no_val_samples = int(no_specs * 0.2)

ds = np.array_split(spectrograms, [no_train_samples, no_train_samples + no_val_samples])

In [26]:
# todo: add id to the data to identify the sample after training to egzamine failures
# todo: save data separately: 'x':specs, 'y'=labels, 'id'=ids

np.savez_compressed('train.npz', ds[0])
np.savez_compressed('val.npz', ds[1])
np.savez_compressed('test.npz', ds[2])