In [1]:
import numpy as np # type: ignore
import librosa # type: ignore
import matplotlib.pyplot as plt # type: ignore
import os
import IPython.display as ipd # type: ignore
import tensorflow as tf # type: ignore
from typing import Iterable
import tqdm # type: ignore
import cv2 # type: ignore
import tensorflow_io as tfio # type: ignore
import random

DATA_FOLDER = "../Data/train/audio"
NOISE_FACTOR = 0.2
BATCH_SIZE = 64
EPOCHS = 2
SR = 16000

tf.compat.v1.enable_eager_execution()

EPS = 1e-8

In [2]:
def get_spectrogram(wav):
    D = librosa.stft(wav, n_fft=480, hop_length=160,
                     win_length=480, window='hamming')
    spect, phase = librosa.magphase(D)
    return spect

def log_spectrum(wav):
    log_spect = np.log(get_spectrogram(wav))
    print('spectrogram shape:', log_spect.shape)
    plt.imshow(log_spect, aspect='auto', origin='lower',)
    plt.title('spectrogram of origin audio')
    plt.show()
    
def shift_time(wave):    
    start_ = int(np.random.uniform(-4800,4800))
    if start_ >= 0:
        wav_time_shift = np.r_[wave[start_:], np.random.uniform(-0.001,0.001, start_)]
    else:
        wav_time_shift = np.r_[np.random.uniform(-0.001,0.001, -start_), wave[:start_]]
    return wav_time_shift

In [3]:
file_path = DATA_FOLDER + "/bird/0a7c2a8d_nohash_0.wav"
wav, sr = librosa.load(file_path, sr=None)
print(wav.shape, wav.max(), wav.min())

(16000,) 0.30215454 -0.27563477


In [4]:
words = os.listdir(DATA_FOLDER)
print(len(words), words)

30 ['bed', 'bird', 'cat', 'dog', 'down', 'eight', 'five', 'four', 'go', 'happy', 'house', 'left', 'marvin', 'nine', 'no', 'off', 'on', 'one', 'right', 'seven', 'sheila', 'six', 'stop', 'three', 'tree', 'two', 'up', 'wow', 'yes', 'zero']


In [5]:
def add_noise(wav):
    noise_dir = os.listdir("../Data/train" + "/_background_noise_")
    noise_file = np.random.choice(noise_dir)
    noise_wav, noise_sr = librosa.load("../Data/train" + "/_background_noise_/" + noise_file, sr=None)
    mixed_wav = wav + shift_time(noise_wav)[:len(wav)] * NOISE_FACTOR
    return mixed_wav

def augment_speed(wav):
    # wav = wav.numpy()   
    speed_rate = np.random.uniform(0.8,1.2)
    wav_speed_tune = cv2.resize(wav, (1, int(len(wav) * speed_rate))).squeeze()
    if len(wav_speed_tune) < 16000:
        pad_len = 16000 - len(wav_speed_tune)
        wav_speed_tune = np.r_[np.random.uniform(-0.001,0.001,int(pad_len/2)),
                            wav_speed_tune,
                            np.random.uniform(-0.001,0.001,int(np.ceil(pad_len/2)))]
    else: 
        cut_len = len(wav_speed_tune) - 16000
        wav_speed_tune = wav_speed_tune[int(cut_len/2):int(cut_len/2)+16000]
    return wav_speed_tune

In [6]:
ipd.Audio(add_noise(augment_speed(wav)), rate=sr)

In [7]:
train_ds, val_ds = tf.keras.utils.audio_dataset_from_directory(
    directory=DATA_FOLDER,
    batch_size=BATCH_SIZE,
    label_mode='int',
    validation_split=0.2,
    seed=0,
    output_sequence_length=SR,
    subset='both')

label_names = np.array(train_ds.class_names)
print("Label Names:", label_names)

Found 64721 files belonging to 30 classes.
Using 51777 files for training.
Using 12944 files for validation.
Label Names: ['bed' 'bird' 'cat' 'dog' 'down' 'eight' 'five' 'four' 'go' 'happy'
 'house' 'left' 'marvin' 'nine' 'no' 'off' 'on' 'one' 'right' 'seven'
 'sheila' 'six' 'stop' 'three' 'tree' 'two' 'up' 'wow' 'yes' 'zero']


In [8]:
test_ds = val_ds.shard(num_shards=2, index=0)
val_ds = val_ds.shard(num_shards=2, index=1)

In [9]:
def make_model():
    inputs = tf.keras.layers.Input(shape=(SR, 1))
    x = tf.keras.layers.Conv1D(filters=16, kernel_size=8, strides=4, padding='same', activation='relu')(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=4, strides=4)(x)
    x = tf.keras.layers.Conv1D(filters=32, kernel_size=4, strides=2, padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=2, strides=2)(x)
    x = tf.keras.layers.Conv1D(filters=64, kernel_size=2, strides=1, padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=2, strides=2)(x)
    x = tf.keras.layers.Conv1D(filters=64, kernel_size=2, strides=1, padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=2, strides=2)(x)
    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(4096*2, activation='relu')(x)
    x = tf.keras.layers.Dense(2048, activation='relu')(x)
    output = tf.keras.layers.Dense(len(label_names), activation='softmax')(x)
    
    model = tf.keras.models.Model(inputs=inputs, outputs=output)
    
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)

    model.compile(
        optimizer=optimizer,
        loss=loss_fn,
        metrics=['accuracy']
    )
    
    return model, optimizer, loss_fn

In [10]:
class ModelTrainer:
    def __init__(self, model: tf.keras.models.Model, optimizer: tf.keras.optimizers.Optimizer, loss_fn: tf.keras.losses.Loss, metrics: Iterable[tf.keras.metrics.Metric] , device: tf.device, augment_data: bool = False):
        self.model = model
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        self.device = device
        self.train_metrics = metrics
        self.val_metrics = metrics.copy()
        self.augment_data = augment_data
    
    @tf.function
    def _train_step(self, X, y):
        
        # Augment here
        if (self.augment_data):
            X_new = []
            y_new = []
            for i in range(len(X)):
                # Original Audio
                X_new.append(X[i])
                y_new.append(y[i])
                # Speed augmentation
                if random.random() < 0.2:
                    X_new.append(tf.expand_dims(augment_speed(wav), axis=-1))
                    y_new.append(y[i])
                # Noise addition (2 different noise per audio)
                if random.random() < 0.2:
                    X_new.append(tf.expand_dims(add_noise(tf.reshape(X[i], [-1])), axis=-1))
                    y_new.append(y[i])
                if random.random() < 0.2:
                    X_new.append(tf.expand_dims(add_noise(tf.reshape(X[i], [-1])), axis=-1))
                    y_new.append(y[i])
            
            X = tf.convert_to_tensor(X_new, dtype=np.float64)
            y = tf.convert_to_tensor(y_new, dtype=np.float64)
        
        with tf.GradientTape() as tape:
            logits = self.model(X, training=True)
            loss_value = self.loss_fn(y, logits)
        grads = tape.gradient(loss_value, self.model.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_weights))
        for metric in self.train_metrics:
            metric.update_state(y, tf.math.argmax(logits, axis=1))
        return loss_value
    
    def _train_one_epoch(self, train_ds):
        total_steps = 0
        total_loss = 0
        pbar = tqdm.tqdm(total=len(train_ds), unit="Batches", leave=True, position=0, ascii=False)
        for step, (x_batch_train, y_batch_train) in enumerate(train_ds):
            loss_value = self._train_step(x_batch_train, y_batch_train)
            total_steps += 1
            total_loss += loss_value
            pbar.update(1)
        pbar.close()
        return total_loss/total_steps
        
    @tf.function
    def _val_step(self, X, y):
        logits = self.model(X, training=False)
        for metric in self.val_metrics:
            metric.update_state(y, tf.math.argmax(logits, axis=1))
        return self.loss_fn(y, logits)
        
    def _val_one_epoch(self, val_ds):
        total_steps = 0
        total_loss = 0
        pbar = tqdm.tqdm(total=len(val_ds), unit="Batches", leave=True, position=0, ascii=False)
        for x_batch_val, y_batch_val in val_ds:
            loss_value = self._val_step(x_batch_val, y_batch_val)
            total_steps += 1
            total_loss += loss_value
            pbar.update(1)
        pbar.close()
        return total_loss/total_steps
    
    def train(self, train_ds, val_ds = None, epochs = 1):
        train_losses = []
        val_losses = []
        train_metrics = []
        val_metrics = []
        with self.device:
            for epoch in range(epochs):
                print(f"========> EPOCH [START] : {epoch+1} <========")
                print("TRAINING ->")
                train_loss = self._train_one_epoch(train_ds)
                train_losses.append(train_loss)
                print(f"Loss: {train_loss: 0.5f}", end=', ')
                met_data = {}
                for metric in self.train_metrics:
                    print(f"{metric.name}: {metric.result(): 0.5f}", end=', ')
                    met_data[metric.name] = metric.result()
                train_metrics.append(met_data)

                    
                print('')
                if val_ds is not None:
                    print("VALIDATION ->")
                    val_loss = self._val_one_epoch(val_ds)
                    val_losses.append(val_loss)
                    print(f"Loss: {val_loss: 0.5f}", end=', ')
                    met_data = {}
                    for metric in self.train_metrics:
                        print(f"{metric.name}: {metric.result(): 0.5f}", end=', ')
                        met_data[metric.name] = metric.result()
                    val_metrics.append(met_data)
                    print('')
                print(f"========> EPOCH [END] : {epoch+1} <========")
                print('\n\n')
        return train_losses, val_losses, train_metrics, val_metrics
            
    def get_model(self):
        return self.model
        

In [11]:
augmeted = make_model()
mt_augment = ModelTrainer(model=augmeted[0], optimizer=augmeted[1], loss_fn=augmeted[2], metrics=[tf.keras.metrics.Accuracy()], device=tf.device('/gpu:0'), augment_data=True)
augment_history = mt_augment.train(train_ds, val_ds, 100)

TRAINING ->


 50%|████▉     | 404/810 [00:45<00:18, 21.80Batches/s]

In [None]:
plt.plot(augment_history[0], color='red')
plt.plot(augment_history[1], color='blue')
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()

train_accuracy = []
val_accuracy = []

for sample in augment_history[2]:
    train_accuracy.append(tf.keras.backend.get_value(sample['accuracy']))

for sample in augment_history[3]:
    val_accuracy.append(tf.keras.backend.get_value(sample['accuracy']))
    
plt.plot(train_accuracy, color='red')
plt.plot(val_accuracy, color='blue')
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.show()
    

In [None]:
normal = make_model()
mt_augment = ModelTrainer(model=normal[0], optimizer=normal[1], loss_fn=normal[2], metrics=[tf.keras.metrics.Accuracy()], device=tf.device('/gpu:0'), augment_data=False)
normal_history = mt_augment.train(train_ds, val_ds, 100)

In [None]:
plt.plot(normal_history[0], color='red')
plt.plot(normal_history[1], color='blue')
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()

train_accuracy = []
val_accuracy = []

for sample in normal_history[2]:
    train_accuracy.append(tf.keras.backend.get_value(sample['accuracy']))

for sample in normal_history[3]:
    val_accuracy.append(tf.keras.backend.get_value(sample['accuracy']))
    
plt.plot(train_accuracy, color='red')
plt.plot(val_accuracy, color='blue')
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.show()